diff --git a/native/core/benches/row_columnar.rs b/native/core/benches/row_columnar.rs index a040b25eb0..b59c188da0 100644 --- a/native/core/benches/row_columnar.rs +++ b/native/core/benches/row_columnar.rs @@ -15,90 +15,375 @@ // specific language governing permissions and limitations // under the License. -use arrow::datatypes::DataType as ArrowDataType; +//! Benchmarks for JVM shuffle row-to-columnar conversion. +//! +//! Measures `process_sorted_row_partition()` performance for converting Spark +//! UnsafeRow data to Arrow arrays, covering primitive, struct (flat/nested), +//! list, and map types. + +use arrow::datatypes::{DataType as ArrowDataType, Field, Fields}; use comet::execution::shuffle::spark_unsafe::row::{ process_sorted_row_partition, SparkUnsafeObject, SparkUnsafeRow, }; use comet::execution::shuffle::CompressionCodec; -use criterion::{criterion_group, criterion_main, Criterion}; +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; +use std::sync::Arc; use tempfile::Builder; -const NUM_ROWS: usize = 10000; const BATCH_SIZE: usize = 5000; -const NUM_COLS: usize = 100; -const ROW_SIZE: usize = SparkUnsafeRow::get_row_bitset_width(NUM_COLS) + NUM_COLS * 8; -fn benchmark(c: &mut Criterion) { - let mut group = c.benchmark_group("row_array_conversion"); +/// Size of an Int64 value in bytes. +const INT64_SIZE: usize = 8; - group.bench_function("row_to_array", |b| { - let spark_rows = (0..NUM_ROWS) - .map(|_| { - let mut spark_row = SparkUnsafeRow::new_with_num_fields(NUM_COLS); - let mut row = Row::new(); +/// Size of a pointer in Spark's UnsafeRow format. Encodes a 32-bit offset +/// (upper bits) and 32-bit size (lower bits) — always 8 bytes regardless of +/// hardware architecture. +const UNSAFE_ROW_POINTER_SIZE: usize = 8; - for i in SparkUnsafeRow::get_row_bitset_width(NUM_COLS)..ROW_SIZE { - row.data[i] = i as u8; - } +/// Size of the element-count field in UnsafeRow array/map headers. +const ARRAY_HEADER_SIZE: usize = 8; - row.to_spark_row(&mut spark_row); +// ─── UnsafeRow helpers ────────────────────────────────────────────────────── - for i in 0..NUM_COLS { - spark_row.set_not_null_at(i); - } +/// Write an UnsafeRow offset+size pointer at `pos` in `data`. +fn write_pointer(data: &mut [u8], pos: usize, offset: usize, size: usize) { + let value = ((offset as i64) << 32) | (size as i64); + data[pos..pos + UNSAFE_ROW_POINTER_SIZE].copy_from_slice(&value.to_le_bytes()); +} - spark_row - }) - .collect::>(); - - let mut row_addresses = spark_rows - .iter() - .map(|row| row.get_row_addr()) - .collect::>(); - let mut row_sizes = spark_rows - .iter() - .map(|row| row.get_row_size()) - .collect::>(); - - let row_address_ptr = row_addresses.as_mut_ptr(); - let row_size_ptr = row_sizes.as_mut_ptr(); +/// Byte size of a null-bitset for `n` elements (64-bit words, rounded up). +fn null_bitset_size(n: usize) -> usize { + n.div_ceil(64) * 8 +} + +// ─── Schema builders ──────────────────────────────────────────────────────── + +/// Create a struct schema with `depth` nesting levels and `num_leaf_fields` +/// Int64 leaf fields. +/// +/// - depth=1: `Struct` +/// - depth=2: `Struct>` +/// - depth=3: `Struct>>` +fn make_struct_schema(depth: usize, num_leaf_fields: usize) -> ArrowDataType { + let leaf_fields: Vec = (0..num_leaf_fields) + .map(|i| Field::new(format!("f{i}"), ArrowDataType::Int64, true)) + .collect(); + let mut dt = ArrowDataType::Struct(Fields::from(leaf_fields)); + for _ in 0..depth - 1 { + dt = ArrowDataType::Struct(Fields::from(vec![Field::new("nested", dt, true)])); + } + dt +} + +fn make_list_schema() -> ArrowDataType { + ArrowDataType::List(Arc::new(Field::new("item", ArrowDataType::Int64, true))) +} + +fn make_map_schema() -> ArrowDataType { + let entries = Field::new( + "entries", + ArrowDataType::Struct(Fields::from(vec![ + Field::new("key", ArrowDataType::Int64, false), + Field::new("value", ArrowDataType::Int64, true), + ])), + false, + ); + ArrowDataType::Map(Arc::new(entries), false) +} + +// ─── Row data builders ────────────────────────────────────────────────────── + +/// Build a binary UnsafeRow containing a struct column with `depth` nesting +/// levels and `num_leaf_fields` Int64 fields at the innermost level. +fn build_struct_row(depth: usize, num_leaf_fields: usize) -> Vec { + let top_bitset = SparkUnsafeRow::get_row_bitset_width(1); + let inter_bitset = SparkUnsafeRow::get_row_bitset_width(1); + let leaf_bitset = SparkUnsafeRow::get_row_bitset_width(num_leaf_fields); + + let inter_level_size = inter_bitset + UNSAFE_ROW_POINTER_SIZE; + let leaf_level_size = leaf_bitset + num_leaf_fields * INT64_SIZE; + + let total = top_bitset + + UNSAFE_ROW_POINTER_SIZE + + (depth - 1) * inter_level_size + + leaf_level_size; + let mut data = vec![0u8; total]; + + // Absolute start position of each struct level in the buffer + let mut struct_starts = Vec::with_capacity(depth); + let mut pos = top_bitset + UNSAFE_ROW_POINTER_SIZE; + for level in 0..depth { + struct_starts.push(pos); + if level < depth - 1 { + pos += inter_level_size; + } + } + + // Top-level pointer → first struct (absolute offset from row start) + let first_size = if depth == 1 { + leaf_level_size + } else { + inter_level_size + }; + write_pointer(&mut data, top_bitset, struct_starts[0], first_size); + + // Intermediate struct pointers (offsets relative to their own struct start) + for level in 0..depth - 1 { + let next_size = if level + 1 == depth - 1 { + leaf_level_size + } else { + inter_level_size + }; + write_pointer( + &mut data, + struct_starts[level] + inter_bitset, + struct_starts[level + 1] - struct_starts[level], + next_size, + ); + } + + // Fill leaf struct with sample data + let leaf_start = *struct_starts.last().unwrap(); + for i in 0..num_leaf_fields { + let off = leaf_start + leaf_bitset + i * INT64_SIZE; + data[off..off + INT64_SIZE].copy_from_slice(&((i as i64) * 100).to_le_bytes()); + } + + data +} + +/// Build a binary UnsafeRow containing a `List` column. +fn build_list_row(num_elements: usize) -> Vec { + let top_bitset = SparkUnsafeRow::get_row_bitset_width(1); + let elem_null_bitset = null_bitset_size(num_elements); + let list_size = ARRAY_HEADER_SIZE + elem_null_bitset + num_elements * INT64_SIZE; + let total = top_bitset + UNSAFE_ROW_POINTER_SIZE + list_size; + let mut data = vec![0u8; total]; + + let list_offset = top_bitset + UNSAFE_ROW_POINTER_SIZE; + write_pointer(&mut data, top_bitset, list_offset, list_size); + + // Element count + data[list_offset..list_offset + ARRAY_HEADER_SIZE] + .copy_from_slice(&(num_elements as i64).to_le_bytes()); + + // Element values + let data_start = list_offset + ARRAY_HEADER_SIZE + elem_null_bitset; + for i in 0..num_elements { + let off = data_start + i * INT64_SIZE; + data[off..off + INT64_SIZE].copy_from_slice(&((i as i64) * 100).to_le_bytes()); + } + + data +} + +/// Build a binary UnsafeRow containing a `Map` column. +fn build_map_row(num_entries: usize) -> Vec { + let top_bitset = SparkUnsafeRow::get_row_bitset_width(1); + let entry_null_bitset = null_bitset_size(num_entries); + let array_size = ARRAY_HEADER_SIZE + entry_null_bitset + num_entries * INT64_SIZE; + // Map layout: [key_array_size header] [key_array] [value_array] + let map_size = ARRAY_HEADER_SIZE + 2 * array_size; + let total = top_bitset + UNSAFE_ROW_POINTER_SIZE + map_size; + let mut data = vec![0u8; total]; + + let map_offset = top_bitset + UNSAFE_ROW_POINTER_SIZE; + write_pointer(&mut data, top_bitset, map_offset, map_size); + + // Key array size header + data[map_offset..map_offset + ARRAY_HEADER_SIZE] + .copy_from_slice(&(array_size as i64).to_le_bytes()); + + // Key array: [element count] [null bitset] [data] + let key_offset = map_offset + ARRAY_HEADER_SIZE; + data[key_offset..key_offset + ARRAY_HEADER_SIZE] + .copy_from_slice(&(num_entries as i64).to_le_bytes()); + let key_data = key_offset + ARRAY_HEADER_SIZE + entry_null_bitset; + for i in 0..num_entries { + let off = key_data + i * INT64_SIZE; + data[off..off + INT64_SIZE].copy_from_slice(&(i as i64).to_le_bytes()); + } + + // Value array: [element count] [null bitset] [data] + let val_offset = key_offset + array_size; + data[val_offset..val_offset + ARRAY_HEADER_SIZE] + .copy_from_slice(&(num_entries as i64).to_le_bytes()); + let val_data = val_offset + ARRAY_HEADER_SIZE + entry_null_bitset; + for i in 0..num_entries { + let off = val_data + i * INT64_SIZE; + data[off..off + INT64_SIZE].copy_from_slice(&((i as i64) * 100).to_le_bytes()); + } + + data +} + +// ─── Benchmark runner ─────────────────────────────────────────────────────── + +/// Common benchmark harness: wraps raw row bytes in SparkUnsafeRow and runs +/// `process_sorted_row_partition` under Criterion. +fn run_benchmark( + group: &mut criterion::BenchmarkGroup, + name: &str, + param: &str, + schema: &[ArrowDataType], + rows: &[Vec], + num_top_level_fields: usize, +) { + let num_rows = rows.len(); + + let spark_rows: Vec = rows + .iter() + .map(|data| { + let mut row = SparkUnsafeRow::new_with_num_fields(num_top_level_fields); + row.point_to_slice(data); + for i in 0..num_top_level_fields { + row.set_not_null_at(i); + } + row + }) + .collect(); + + let mut addrs: Vec = spark_rows.iter().map(|r| r.get_row_addr()).collect(); + let mut sizes: Vec = spark_rows.iter().map(|r| r.get_row_size()).collect(); + let addr_ptr = addrs.as_mut_ptr(); + let size_ptr = sizes.as_mut_ptr(); + + group.bench_with_input( + BenchmarkId::new(name, param), + &num_rows, + |b, &n| { + b.iter(|| { + let tmp = Builder::new().tempfile().unwrap(); + process_sorted_row_partition( + n, + BATCH_SIZE, + addr_ptr, + size_ptr, + schema, + tmp.path().to_str().unwrap().to_string(), + 1.0, + false, + 0, + None, + &CompressionCodec::Zstd(1), + ) + .unwrap(); + }); + }, + ); + + drop(spark_rows); +} + +// ─── Benchmarks ───────────────────────────────────────────────────────────── + +/// 100 primitive Int64 columns — baseline without complex-type overhead. +fn benchmark_primitive_columns(c: &mut Criterion) { + let mut group = c.benchmark_group("primitive_columns"); + const NUM_COLS: usize = 100; + let bitset = SparkUnsafeRow::get_row_bitset_width(NUM_COLS); + let row_size = bitset + NUM_COLS * INT64_SIZE; + + for num_rows in [1000, 10000] { let schema = vec![ArrowDataType::Int64; NUM_COLS]; + let rows: Vec> = (0..num_rows) + .map(|_| { + let mut data = vec![0u8; row_size]; + for (i, byte) in data.iter_mut().enumerate().take(row_size).skip(bitset) { + *byte = i as u8; + } + data + }) + .collect(); - b.iter(|| { - let tempfile = Builder::new().tempfile().unwrap(); + run_benchmark( + &mut group, + "cols_100", + &format!("rows_{num_rows}"), + &schema, + &rows, + NUM_COLS, + ); + } - process_sorted_row_partition( - NUM_ROWS, - BATCH_SIZE, - row_address_ptr, - row_size_ptr, - &schema, - tempfile.path().to_str().unwrap().to_string(), - 1.0, - false, - 0, - None, - &CompressionCodec::Zstd(1), - ) - .unwrap(); - }); - }); + group.finish(); } -struct Row { - data: Box<[u8; ROW_SIZE]>, +/// Struct columns at varying nesting depths (1 = flat, 2 = nested, 3 = deeply nested). +fn benchmark_struct_conversion(c: &mut Criterion) { + let mut group = c.benchmark_group("struct_conversion"); + + for (depth, label) in [(1, "flat"), (2, "nested"), (3, "deeply_nested")] { + for num_fields in [5, 10, 20] { + for num_rows in [1000, 10000] { + let schema = vec![make_struct_schema(depth, num_fields)]; + let rows: Vec> = (0..num_rows) + .map(|_| build_struct_row(depth, num_fields)) + .collect(); + + run_benchmark( + &mut group, + &format!("{label}_fields_{num_fields}"), + &format!("rows_{num_rows}"), + &schema, + &rows, + 1, + ); + } + } + } + + group.finish(); } -impl Row { - pub fn new() -> Self { - Row { - data: Box::new([0u8; ROW_SIZE]), +/// List columns with varying element counts. +fn benchmark_list_conversion(c: &mut Criterion) { + let mut group = c.benchmark_group("list_conversion"); + + for num_elements in [10, 100] { + for num_rows in [1000, 10000] { + let schema = vec![make_list_schema()]; + let rows: Vec> = (0..num_rows) + .map(|_| build_list_row(num_elements)) + .collect(); + + run_benchmark( + &mut group, + &format!("elements_{num_elements}"), + &format!("rows_{num_rows}"), + &schema, + &rows, + 1, + ); } } - pub fn to_spark_row(&self, spark_row: &mut SparkUnsafeRow) { - spark_row.point_to_slice(self.data.as_ref()); + group.finish(); +} + +/// Map columns with varying entry counts. +fn benchmark_map_conversion(c: &mut Criterion) { + let mut group = c.benchmark_group("map_conversion"); + + for num_entries in [10, 100] { + for num_rows in [1000, 10000] { + let schema = vec![make_map_schema()]; + let rows: Vec> = (0..num_rows) + .map(|_| build_map_row(num_entries)) + .collect(); + + run_benchmark( + &mut group, + &format!("entries_{num_entries}"), + &format!("rows_{num_rows}"), + &schema, + &rows, + 1, + ); + } } + + group.finish(); } fn config() -> Criterion { @@ -108,6 +393,9 @@ fn config() -> Criterion { criterion_group! { name = benches; config = config(); - targets = benchmark + targets = benchmark_primitive_columns, + benchmark_struct_conversion, + benchmark_list_conversion, + benchmark_map_conversion } criterion_main!(benches);