Skip to content

perf: Add performance tracing capability #1706

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 23 commits into
base: main
Choose a base branch
from
Draft
46 changes: 23 additions & 23 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
memory_profiling_enabled: jboolean,
) -> jlong {
try_unwrap_or_throw(&e, |mut env| {
trace_begin("create_plan");
let _ = TraceGuard::new("createPlan");

// Init JVM classes
JVMClasses::init(&mut env);
Expand Down Expand Up @@ -245,8 +245,6 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
memory_profiling_enabled: memory_profiling_enabled != JNI_FALSE,
});

trace_end("create_plan");

Ok(Box::into_raw(exec_context) as i64)
})
}
Expand Down Expand Up @@ -370,7 +368,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
schema_addrs: jlongArray,
) -> jlong {
try_unwrap_or_throw(&e, |mut env| {
trace_begin("execute_plan");
let _ = TraceGuard::new("executePlan");

// Retrieve the query
let exec_context = get_execution_context(exec_context);
Expand Down Expand Up @@ -466,9 +464,6 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
match poll_output {
Poll::Ready(Some(output)) => {
// prepare output for FFI transfer

trace_end("execute_plan");

return prepare_output(
&mut env,
array_addrs,
Expand All @@ -492,9 +487,6 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
);
}
}

trace_end("execute_plan");

return Ok(-1);
}
// A poll pending means there are more than one blocking operators,
Expand Down Expand Up @@ -595,7 +587,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_writeSortedFileNative
compression_level: jint,
) -> jlongArray {
try_unwrap_or_throw(&e, |mut env| unsafe {
trace_begin("writeSortedFileNative");
let _ = TraceGuard::new("writeSortedFileNative");

let data_types = convert_datatype_arrays(&mut env, serialized_datatypes)?;

Expand Down Expand Up @@ -659,8 +651,6 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_writeSortedFileNative
let long_array = env.new_long_array(2)?;
env.set_long_array_region(&long_array, 0, &[written_bytes, checksum])?;

trace_end("writeSortedFileNative");

Ok(long_array.into_raw())
})
}
Expand All @@ -674,12 +664,10 @@ pub extern "system" fn Java_org_apache_comet_Native_sortRowPartitionsNative(
size: jlong,
) {
try_unwrap_or_throw(&e, |_| {
trace_begin("sortRowPartitionsNative");
let _ = TraceGuard::new("sortRowPartitionsNative");
// SAFETY: JVM unsafe memory allocation is aligned with long.
let array = unsafe { std::slice::from_raw_parts_mut(address as *mut i64, size as usize) };
array.rdxsort();
trace_end("sortRowPartitionsNative");

Ok(())
})
}
Expand All @@ -697,17 +685,12 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_decodeShuffleBlock(
schema_addrs: jlongArray,
) -> jlong {
try_unwrap_or_throw(&e, |mut env| {
trace_begin("decodeShuffleBlock");

let _ = TraceGuard::new("decodeShuffleBlock");
let raw_pointer = env.get_direct_buffer_address(&byte_buffer)?;
let length = length as usize;
let slice: &[u8] = unsafe { std::slice::from_raw_parts(raw_pointer, length) };
let batch = read_ipc_compressed(slice)?;
let return_value = prepare_output(&mut env, array_addrs, schema_addrs, batch, false);

trace_end("decodeShuffleBlock");

return_value
prepare_output(&mut env, array_addrs, schema_addrs, batch, false)
})
}

Expand Down Expand Up @@ -767,3 +750,20 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_traceEnd(
) {
// no implementation
}

struct TraceGuard<'a> {
Copy link
Contributor

@comphead comphead May 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps we can have it as generic function?

pub fn trace_time<T, F>(label: &str, mut f: F) -> T
where
    F: FnMut() -> T,
{
    let _ = TraceGuard::new("writeSortedFileNative");
    let result = f();
    result
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I like that idea. I will try and implement this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran into some issues around lifetimes for captured variables with this approach, so perhaps it is simpler to stick with the TraceGuard.

label: &'a str,
}

impl<'a> TraceGuard<'a> {
fn new(label: &'a str) -> Self {
trace_begin(label);
Self { label }
}
}

impl<'a> Drop for TraceGuard<'a> {
fn drop(&mut self) {
trace_end(self.label);
}
}
Loading