diff --git a/Cargo.lock b/Cargo.lock index 204cf51f46752..5b62e17074924 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2672,6 +2672,7 @@ dependencies = [ "percent-encoding", "rand 0.9.2", "sha1", + "twox-hash", "url", ] @@ -6578,6 +6579,9 @@ name = "twox-hash" version = "2.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ea3136b675547379c4bd395ca6b938e5ad3c3d20fad76e7fe85f9e0d011419c" +dependencies = [ + "rand 0.9.2", +] [[package]] name = "typed-arena" diff --git a/datafusion/spark/Cargo.toml b/datafusion/spark/Cargo.toml index 09959db41fe60..a1da056e1289d 100644 --- a/datafusion/spark/Cargo.toml +++ b/datafusion/spark/Cargo.toml @@ -53,6 +53,7 @@ log = { workspace = true } percent-encoding = "2.3.2" rand = { workspace = true } sha1 = "0.10" +twox-hash = "2.1" url = { workspace = true } [dev-dependencies] diff --git a/datafusion/spark/src/function/hash/mod.rs b/datafusion/spark/src/function/hash/mod.rs index 5860596ac70a3..7a5e9898f5041 100644 --- a/datafusion/spark/src/function/hash/mod.rs +++ b/datafusion/spark/src/function/hash/mod.rs @@ -16,26 +16,32 @@ // under the License. pub mod crc32; +pub mod murmur3_hash; pub mod sha1; pub mod sha2; +pub mod xxhash64; use datafusion_expr::ScalarUDF; use datafusion_functions::make_udf_function; use std::sync::Arc; make_udf_function!(crc32::SparkCrc32, crc32); +make_udf_function!(murmur3_hash::SparkMurmur3Hash, murmur3_hash); make_udf_function!(sha1::SparkSha1, sha1); make_udf_function!(sha2::SparkSha2, sha2); +make_udf_function!(xxhash64::SparkXxhash64, xxhash64); pub mod expr_fn { use datafusion_functions::export_functions; export_functions!( (crc32, "crc32(expr) - Returns a cyclic redundancy check value of the expr as a bigint.", arg1), + (murmur3_hash, "hash(expr1, expr2, ...) - Returns a hash value of the arguments using murmur3. Also available as `hash`.", args), (sha1, "sha1(expr) - Returns a SHA-1 hash value of the expr as a hex string.", arg1), - (sha2, "sha2(expr, bitLength) - Returns a checksum of SHA-2 family as a hex string of expr. SHA-224, SHA-256, SHA-384, and SHA-512 are supported. Bit length of 0 is equivalent to 256.", arg1 arg2) + (sha2, "sha2(expr, bitLength) - Returns a checksum of SHA-2 family as a hex string of expr. SHA-224, SHA-256, SHA-384, and SHA-512 are supported. Bit length of 0 is equivalent to 256.", arg1 arg2), + (xxhash64, "xxhash64(expr1, expr2, ...) - Returns a 64-bit hash value of the arguments using xxHash.", args) ); } pub fn functions() -> Vec> { - vec![crc32(), sha1(), sha2()] + vec![crc32(), murmur3_hash(), sha1(), sha2(), xxhash64()] } diff --git a/datafusion/spark/src/function/hash/murmur3_hash.rs b/datafusion/spark/src/function/hash/murmur3_hash.rs new file mode 100644 index 0000000000000..4a8787182122d --- /dev/null +++ b/datafusion/spark/src/function/hash/murmur3_hash.rs @@ -0,0 +1,692 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::sync::Arc; + +use arrow::array::{ + Array, ArrayRef, ArrowNativeTypeOp, AsArray, BinaryArray, BooleanArray, Date32Array, + Date64Array, Decimal128Array, DictionaryArray, FixedSizeBinaryArray, Float32Array, + Float64Array, Int8Array, Int16Array, Int32Array, Int64Array, LargeBinaryArray, + LargeStringArray, StringArray, TimestampMicrosecondArray, TimestampMillisecondArray, + TimestampNanosecondArray, TimestampSecondArray, types::ArrowDictionaryKeyType, +}; +use arrow::compute::take; +use arrow::datatypes::{ArrowNativeType, DataType, TimeUnit}; +use datafusion_common::{Result, ScalarValue, exec_err, internal_err}; +use datafusion_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, +}; + +const DEFAULT_SEED: i32 = 42; + +/// Spark-compatible murmur3 hash function. +/// +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkMurmur3Hash { + signature: Signature, + aliases: Vec, +} + +impl Default for SparkMurmur3Hash { + fn default() -> Self { + Self::new() + } +} + +impl SparkMurmur3Hash { + pub fn new() -> Self { + Self { + signature: Signature::variadic_any(Volatility::Immutable), + aliases: vec!["hash".to_string()], + } + } +} + +impl ScalarUDFImpl for SparkMurmur3Hash { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "murmur3_hash" + } + + fn aliases(&self) -> &[String] { + &self.aliases + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Int32) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + if args.args.is_empty() { + return exec_err!("murmur3_hash requires at least one argument"); + } + + // Determine number of rows from the first array argument + let num_rows = args + .args + .iter() + .find_map(|arg| match arg { + ColumnarValue::Array(array) => Some(array.len()), + ColumnarValue::Scalar(_) => None, + }) + .unwrap_or(1); + + // Initialize hashes with seed + let mut hashes: Vec = vec![DEFAULT_SEED as u32; num_rows]; + + // Convert all arguments to arrays + let arrays: Vec = args + .args + .iter() + .map(|arg| match arg { + ColumnarValue::Array(array) => Arc::clone(array), + ColumnarValue::Scalar(scalar) => scalar + .to_array_of_size(num_rows) + .expect("Failed to convert scalar to array"), + }) + .collect(); + + // Hash each column + for (i, col) in arrays.iter().enumerate() { + hash_column_murmur3(col, &mut hashes, i == 0)?; + } + + // Convert to Int32 + let result: Vec = hashes.into_iter().map(|h| h as i32).collect(); + let result_array = Int32Array::from(result); + + if num_rows == 1 { + Ok(ColumnarValue::Scalar(ScalarValue::Int32(Some( + result_array.value(0), + )))) + } else { + Ok(ColumnarValue::Array(Arc::new(result_array))) + } + } +} + +/// Spark-compatible murmur3 hash algorithm +#[inline] +pub fn spark_compatible_murmur3_hash>(data: T, seed: u32) -> u32 { + #[inline] + fn mix_k1(mut k1: i32) -> i32 { + k1 = k1.mul_wrapping(0xcc9e2d51u32 as i32); + k1 = k1.rotate_left(15); + k1.mul_wrapping(0x1b873593u32 as i32) + } + + #[inline] + fn mix_h1(mut h1: i32, k1: i32) -> i32 { + h1 ^= k1; + h1 = h1.rotate_left(13); + h1.mul_wrapping(5).add_wrapping(0xe6546b64u32 as i32) + } + + #[inline] + fn fmix(mut h1: i32, len: i32) -> i32 { + h1 ^= len; + h1 ^= (h1 as u32 >> 16) as i32; + h1 = h1.mul_wrapping(0x85ebca6bu32 as i32); + h1 ^= (h1 as u32 >> 13) as i32; + h1 = h1.mul_wrapping(0xc2b2ae35u32 as i32); + h1 ^= (h1 as u32 >> 16) as i32; + h1 + } + + #[inline] + unsafe fn hash_bytes_by_int(data: &[u8], seed: u32) -> i32 { + // SAFETY: caller guarantees data length is aligned to 4 bytes + unsafe { + let mut h1 = seed as i32; + for i in (0..data.len()).step_by(4) { + let ints = data.as_ptr().add(i) as *const i32; + let mut half_word = ints.read_unaligned(); + if cfg!(target_endian = "big") { + half_word = half_word.reverse_bits(); + } + h1 = mix_h1(h1, mix_k1(half_word)); + } + h1 + } + } + + let data = data.as_ref(); + let len = data.len(); + let len_aligned = len - len % 4; + + // SAFETY: + // Avoid boundary checking in performance critical code. + // All operations are guaranteed to be safe. + // data is &[u8] so we do not need to check for proper alignment. + unsafe { + let mut h1 = if len_aligned > 0 { + hash_bytes_by_int(&data[0..len_aligned], seed) + } else { + seed as i32 + }; + + for i in len_aligned..len { + let half_word = *data.get_unchecked(i) as i8 as i32; + h1 = mix_h1(h1, mix_k1(half_word)); + } + fmix(h1, len as i32) as u32 + } +} + +/// Hash the values in a dictionary array +fn hash_column_dictionary( + array: &ArrayRef, + hashes: &mut [u32], + first_col: bool, +) -> Result<()> { + let dict_array = array.as_any().downcast_ref::>().unwrap(); + if !first_col { + // Unpack the dictionary array as each row may have a different hash input + let unpacked = take(dict_array.values().as_ref(), dict_array.keys(), None)?; + hash_column_murmur3(&unpacked, hashes, false)?; + } else { + // For the first column, hash each dictionary value once, and then use + // that computed hash for each key value to avoid a potentially + // expensive redundant hashing for large dictionary elements (e.g. strings) + let dict_values = Arc::clone(dict_array.values()); + // Same initial seed as Spark + let mut dict_hashes = vec![DEFAULT_SEED as u32; dict_values.len()]; + hash_column_murmur3(&dict_values, &mut dict_hashes, true)?; + for (hash, key) in hashes.iter_mut().zip(dict_array.keys().iter()) { + if let Some(key) = key { + let idx = key.to_usize().ok_or_else(|| { + datafusion_common::DataFusionError::Internal(format!( + "Can not convert key value {:?} to usize in dictionary of type {:?}", + key, + dict_array.data_type() + )) + })?; + *hash = dict_hashes[idx] + } + // No update for Null keys, consistent with other types + } + } + Ok(()) +} + +fn hash_column_murmur3( + col: &ArrayRef, + hashes: &mut [u32], + first_col: bool, +) -> Result<()> { + match col.data_type() { + DataType::Boolean => { + let array = col.as_any().downcast_ref::().unwrap(); + for (i, hash) in hashes.iter_mut().enumerate() { + if !array.is_null(i) { + let val = i32::from(array.value(i)); + *hash = spark_compatible_murmur3_hash(val.to_le_bytes(), *hash); + } + } + } + DataType::Int8 => { + let array = col.as_any().downcast_ref::().unwrap(); + for (i, hash) in hashes.iter_mut().enumerate() { + if !array.is_null(i) { + let val = array.value(i) as i32; + *hash = spark_compatible_murmur3_hash(val.to_le_bytes(), *hash); + } + } + } + DataType::Int16 => { + let array = col.as_any().downcast_ref::().unwrap(); + for (i, hash) in hashes.iter_mut().enumerate() { + if !array.is_null(i) { + let val = array.value(i) as i32; + *hash = spark_compatible_murmur3_hash(val.to_le_bytes(), *hash); + } + } + } + DataType::Int32 => { + let array = col.as_any().downcast_ref::().unwrap(); + for (i, hash) in hashes.iter_mut().enumerate() { + if !array.is_null(i) { + *hash = spark_compatible_murmur3_hash( + array.value(i).to_le_bytes(), + *hash, + ); + } + } + } + DataType::Int64 => { + let array = col.as_any().downcast_ref::().unwrap(); + for (i, hash) in hashes.iter_mut().enumerate() { + if !array.is_null(i) { + *hash = spark_compatible_murmur3_hash( + array.value(i).to_le_bytes(), + *hash, + ); + } + } + } + DataType::Float32 => { + let array = col.as_any().downcast_ref::().unwrap(); + for (i, hash) in hashes.iter_mut().enumerate() { + if !array.is_null(i) { + let val = array.value(i); + // Spark uses 0 as hash for -0.0 + let bytes = if val == 0.0 && val.is_sign_negative() { + 0i32.to_le_bytes() + } else { + val.to_le_bytes() + }; + *hash = spark_compatible_murmur3_hash(bytes, *hash); + } + } + } + DataType::Float64 => { + let array = col.as_any().downcast_ref::().unwrap(); + for (i, hash) in hashes.iter_mut().enumerate() { + if !array.is_null(i) { + let val = array.value(i); + // Spark uses 0 as hash for -0.0 + let bytes = if val == 0.0 && val.is_sign_negative() { + 0i64.to_le_bytes() + } else { + val.to_le_bytes() + }; + *hash = spark_compatible_murmur3_hash(bytes, *hash); + } + } + } + DataType::Date32 => { + let array = col.as_any().downcast_ref::().unwrap(); + for (i, hash) in hashes.iter_mut().enumerate() { + if !array.is_null(i) { + *hash = spark_compatible_murmur3_hash( + array.value(i).to_le_bytes(), + *hash, + ); + } + } + } + DataType::Date64 => { + let array = col.as_any().downcast_ref::().unwrap(); + for (i, hash) in hashes.iter_mut().enumerate() { + if !array.is_null(i) { + *hash = spark_compatible_murmur3_hash( + array.value(i).to_le_bytes(), + *hash, + ); + } + } + } + DataType::Timestamp(TimeUnit::Second, _) => { + let array = col.as_any().downcast_ref::().unwrap(); + for (i, hash) in hashes.iter_mut().enumerate() { + if !array.is_null(i) { + *hash = spark_compatible_murmur3_hash( + array.value(i).to_le_bytes(), + *hash, + ); + } + } + } + DataType::Timestamp(TimeUnit::Millisecond, _) => { + let array = col + .as_any() + .downcast_ref::() + .unwrap(); + for (i, hash) in hashes.iter_mut().enumerate() { + if !array.is_null(i) { + *hash = spark_compatible_murmur3_hash( + array.value(i).to_le_bytes(), + *hash, + ); + } + } + } + DataType::Timestamp(TimeUnit::Microsecond, _) => { + let array = col + .as_any() + .downcast_ref::() + .unwrap(); + for (i, hash) in hashes.iter_mut().enumerate() { + if !array.is_null(i) { + *hash = spark_compatible_murmur3_hash( + array.value(i).to_le_bytes(), + *hash, + ); + } + } + } + DataType::Timestamp(TimeUnit::Nanosecond, _) => { + let array = col + .as_any() + .downcast_ref::() + .unwrap(); + for (i, hash) in hashes.iter_mut().enumerate() { + if !array.is_null(i) { + *hash = spark_compatible_murmur3_hash( + array.value(i).to_le_bytes(), + *hash, + ); + } + } + } + DataType::Utf8 => { + let array = col.as_any().downcast_ref::().unwrap(); + for (i, hash) in hashes.iter_mut().enumerate() { + if !array.is_null(i) { + *hash = spark_compatible_murmur3_hash(array.value(i), *hash); + } + } + } + DataType::LargeUtf8 => { + let array = col.as_any().downcast_ref::().unwrap(); + for (i, hash) in hashes.iter_mut().enumerate() { + if !array.is_null(i) { + *hash = spark_compatible_murmur3_hash(array.value(i), *hash); + } + } + } + DataType::Utf8View => { + let array = col.as_string_view(); + for (i, hash) in hashes.iter_mut().enumerate() { + if !array.is_null(i) { + *hash = spark_compatible_murmur3_hash(array.value(i), *hash); + } + } + } + DataType::Binary => { + let array = col.as_any().downcast_ref::().unwrap(); + for (i, hash) in hashes.iter_mut().enumerate() { + if !array.is_null(i) { + *hash = spark_compatible_murmur3_hash(array.value(i), *hash); + } + } + } + DataType::LargeBinary => { + let array = col.as_any().downcast_ref::().unwrap(); + for (i, hash) in hashes.iter_mut().enumerate() { + if !array.is_null(i) { + *hash = spark_compatible_murmur3_hash(array.value(i), *hash); + } + } + } + DataType::BinaryView => { + let array = col.as_binary_view(); + for (i, hash) in hashes.iter_mut().enumerate() { + if !array.is_null(i) { + *hash = spark_compatible_murmur3_hash(array.value(i), *hash); + } + } + } + DataType::FixedSizeBinary(_) => { + let array = col.as_any().downcast_ref::().unwrap(); + for (i, hash) in hashes.iter_mut().enumerate() { + if !array.is_null(i) { + *hash = spark_compatible_murmur3_hash(array.value(i), *hash); + } + } + } + DataType::Decimal128(precision, _) if *precision <= 18 => { + let array = col.as_any().downcast_ref::().unwrap(); + for (i, hash) in hashes.iter_mut().enumerate() { + if !array.is_null(i) { + // For small decimals, hash as i64 + let val = array.value(i) as i64; + *hash = spark_compatible_murmur3_hash(val.to_le_bytes(), *hash); + } + } + } + DataType::Decimal128(_, _) => { + let array = col.as_any().downcast_ref::().unwrap(); + for (i, hash) in hashes.iter_mut().enumerate() { + if !array.is_null(i) { + *hash = spark_compatible_murmur3_hash( + array.value(i).to_le_bytes(), + *hash, + ); + } + } + } + DataType::Null => { + // Nulls don't update the hash + } + DataType::Dictionary(key_type, _) => match key_type.as_ref() { + DataType::Int8 => hash_column_dictionary::( + col, hashes, first_col, + )?, + DataType::Int16 => hash_column_dictionary::( + col, hashes, first_col, + )?, + DataType::Int32 => hash_column_dictionary::( + col, hashes, first_col, + )?, + DataType::Int64 => hash_column_dictionary::( + col, hashes, first_col, + )?, + DataType::UInt8 => hash_column_dictionary::( + col, hashes, first_col, + )?, + DataType::UInt16 => hash_column_dictionary::( + col, hashes, first_col, + )?, + DataType::UInt32 => hash_column_dictionary::( + col, hashes, first_col, + )?, + DataType::UInt64 => hash_column_dictionary::( + col, hashes, first_col, + )?, + dt => { + return internal_err!( + "Unsupported dictionary key type for murmur3_hash: {dt}" + ); + } + }, + dt => { + return internal_err!("Unsupported data type for murmur3_hash: {dt}"); + } + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_murmur3_i32() { + let seed = 42u32; + assert_eq!( + spark_compatible_murmur3_hash(1i32.to_le_bytes(), seed), + 0xdea578e3 + ); + assert_eq!( + spark_compatible_murmur3_hash(0i32.to_le_bytes(), seed), + 0x379fae8f + ); + assert_eq!( + spark_compatible_murmur3_hash((-1i32).to_le_bytes(), seed), + 0xa0590e3d + ); + } + + #[test] + fn test_murmur3_i64() { + let seed = 42u32; + assert_eq!( + spark_compatible_murmur3_hash(1i64.to_le_bytes(), seed), + 0x99f0149d + ); + assert_eq!( + spark_compatible_murmur3_hash(0i64.to_le_bytes(), seed), + 0x9c67b85d + ); + assert_eq!( + spark_compatible_murmur3_hash((-1i64).to_le_bytes(), seed), + 0xc8008529 + ); + } + + #[test] + fn test_murmur3_string() { + let seed = 42u32; + assert_eq!(spark_compatible_murmur3_hash("hello", seed), 3286402344); + assert_eq!(spark_compatible_murmur3_hash("", seed), 142593372); + assert_eq!(spark_compatible_murmur3_hash("abc", seed), 1322437556); + } + + #[test] + fn test_murmur3_dictionary_string() { + use arrow::array::DictionaryArray; + use arrow::datatypes::Int32Type; + + // Create a dictionary array with string values + // Dictionary: ["hello", "world", "abc"] + // Keys: [0, 1, 2, 0, 1] -> ["hello", "world", "abc", "hello", "world"] + let dict_array: DictionaryArray = + vec!["hello", "world", "abc", "hello", "world"] + .into_iter() + .collect(); + let array_ref: ArrayRef = Arc::new(dict_array); + + let mut hashes = vec![DEFAULT_SEED as u32; 5]; + hash_column_murmur3(&array_ref, &mut hashes, true).unwrap(); + + // Verify hashes match the expected values for strings + // "hello" -> 3286402344, "world" -> ?, "abc" -> 1322437556 + assert_eq!(hashes[0], spark_compatible_murmur3_hash("hello", 42)); + assert_eq!(hashes[1], spark_compatible_murmur3_hash("world", 42)); + assert_eq!(hashes[2], spark_compatible_murmur3_hash("abc", 42)); + // Repeated values should have the same hash + assert_eq!(hashes[3], hashes[0]); // "hello" again + assert_eq!(hashes[4], hashes[1]); // "world" again + } + + #[test] + fn test_murmur3_dictionary_int() { + use arrow::array::DictionaryArray; + use arrow::datatypes::Int32Type; + + // Create a dictionary array with int values + let keys = Int32Array::from(vec![0, 1, 2, 0, 1]); + let values = Int32Array::from(vec![100, 200, 300]); + let dict_array = + DictionaryArray::::try_new(keys, Arc::new(values)).unwrap(); + let array_ref: ArrayRef = Arc::new(dict_array); + + let mut hashes = vec![DEFAULT_SEED as u32; 5]; + hash_column_murmur3(&array_ref, &mut hashes, true).unwrap(); + + // Verify hashes match the expected values for i32 + assert_eq!( + hashes[0], + spark_compatible_murmur3_hash(100i32.to_le_bytes(), 42) + ); + assert_eq!( + hashes[1], + spark_compatible_murmur3_hash(200i32.to_le_bytes(), 42) + ); + assert_eq!( + hashes[2], + spark_compatible_murmur3_hash(300i32.to_le_bytes(), 42) + ); + // Repeated values should have the same hash + assert_eq!(hashes[3], hashes[0]); + assert_eq!(hashes[4], hashes[1]); + } + + #[test] + fn test_murmur3_dictionary_with_nulls() { + use arrow::array::DictionaryArray; + use arrow::datatypes::Int32Type; + + // Create a dictionary array with null keys + let keys = Int32Array::from(vec![Some(0), None, Some(1), Some(0), None]); + let values = StringArray::from(vec!["hello", "world"]); + let dict_array = + DictionaryArray::::try_new(keys, Arc::new(values)).unwrap(); + let array_ref: ArrayRef = Arc::new(dict_array); + + let mut hashes = vec![DEFAULT_SEED as u32; 5]; + hash_column_murmur3(&array_ref, &mut hashes, true).unwrap(); + + // Non-null keys should have correct hashes + assert_eq!(hashes[0], spark_compatible_murmur3_hash("hello", 42)); + assert_eq!(hashes[2], spark_compatible_murmur3_hash("world", 42)); + assert_eq!(hashes[3], spark_compatible_murmur3_hash("hello", 42)); + // Null keys should keep the initial seed value (unchanged) + assert_eq!(hashes[1], DEFAULT_SEED as u32); + assert_eq!(hashes[4], DEFAULT_SEED as u32); + } + + #[test] + fn test_murmur3_dictionary_non_first_column() { + use arrow::array::DictionaryArray; + use arrow::datatypes::Int32Type; + + // Test dictionary as non-first column (uses unpacking via take) + let dict_array: DictionaryArray = + vec!["hello", "world", "abc"].into_iter().collect(); + let array_ref: ArrayRef = Arc::new(dict_array); + + // Start with non-seed hash values (simulating previous column hashing) + let mut hashes = vec![123u32, 456u32, 789u32]; + hash_column_murmur3(&array_ref, &mut hashes, false).unwrap(); + + // The hashes should be updated from the previous values + assert_eq!(hashes[0], spark_compatible_murmur3_hash("hello", 123)); + assert_eq!(hashes[1], spark_compatible_murmur3_hash("world", 456)); + assert_eq!(hashes[2], spark_compatible_murmur3_hash("abc", 789)); + } + + #[test] + fn test_murmur3_fixed_size_binary() { + // Create a FixedSizeBinary array with 4-byte values + let array = FixedSizeBinaryArray::from(vec![ + Some(&[0x01, 0x02, 0x03, 0x04][..]), + Some(&[0x05, 0x06, 0x07, 0x08][..]), + None, + Some(&[0x00, 0x00, 0x00, 0x00][..]), + ]); + let array_ref: ArrayRef = Arc::new(array); + + let mut hashes = vec![DEFAULT_SEED as u32; 4]; + hash_column_murmur3(&array_ref, &mut hashes, true).unwrap(); + + // Verify hashes match expected values + assert_eq!( + hashes[0], + spark_compatible_murmur3_hash(&[0x01, 0x02, 0x03, 0x04], 42) + ); + assert_eq!( + hashes[1], + spark_compatible_murmur3_hash(&[0x05, 0x06, 0x07, 0x08], 42) + ); + // Null value should keep the seed + assert_eq!(hashes[2], DEFAULT_SEED as u32); + assert_eq!( + hashes[3], + spark_compatible_murmur3_hash(&[0x00, 0x00, 0x00, 0x00], 42) + ); + } +} diff --git a/datafusion/spark/src/function/hash/xxhash64.rs b/datafusion/spark/src/function/hash/xxhash64.rs new file mode 100644 index 0000000000000..62eb60d4b4457 --- /dev/null +++ b/datafusion/spark/src/function/hash/xxhash64.rs @@ -0,0 +1,604 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::sync::Arc; + +use arrow::array::{ + Array, ArrayRef, AsArray, BinaryArray, BooleanArray, Date32Array, Date64Array, + Decimal128Array, DictionaryArray, FixedSizeBinaryArray, Float32Array, Float64Array, + Int8Array, Int16Array, Int32Array, Int64Array, LargeBinaryArray, LargeStringArray, + StringArray, TimestampMicrosecondArray, TimestampMillisecondArray, + TimestampNanosecondArray, TimestampSecondArray, types::ArrowDictionaryKeyType, +}; +use arrow::compute::take; +use arrow::datatypes::{ArrowNativeType, DataType, TimeUnit}; +use datafusion_common::{Result, ScalarValue, exec_err, internal_err}; +use datafusion_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, +}; +use twox_hash::XxHash64; + +const DEFAULT_SEED: i64 = 42; + +/// Spark-compatible xxhash64 function. +/// +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkXxhash64 { + signature: Signature, +} + +impl Default for SparkXxhash64 { + fn default() -> Self { + Self::new() + } +} + +impl SparkXxhash64 { + pub fn new() -> Self { + Self { + signature: Signature::variadic_any(Volatility::Immutable), + } + } +} + +impl ScalarUDFImpl for SparkXxhash64 { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "xxhash64" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Int64) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + if args.args.is_empty() { + return exec_err!("xxhash64 requires at least one argument"); + } + + // Determine number of rows from the first array argument + let num_rows = args + .args + .iter() + .find_map(|arg| match arg { + ColumnarValue::Array(array) => Some(array.len()), + ColumnarValue::Scalar(_) => None, + }) + .unwrap_or(1); + + // Initialize hashes with seed + let mut hashes: Vec = vec![DEFAULT_SEED as u64; num_rows]; + + // Convert all arguments to arrays + let arrays: Vec = args + .args + .iter() + .map(|arg| match arg { + ColumnarValue::Array(array) => Arc::clone(array), + ColumnarValue::Scalar(scalar) => scalar + .to_array_of_size(num_rows) + .expect("Failed to convert scalar to array"), + }) + .collect(); + + // Hash each column + for (i, col) in arrays.iter().enumerate() { + hash_column_xxhash64(col, &mut hashes, i == 0)?; + } + + // Convert to Int64 + let result: Vec = hashes.into_iter().map(|h| h as i64).collect(); + let result_array = Int64Array::from(result); + + if num_rows == 1 { + Ok(ColumnarValue::Scalar(ScalarValue::Int64(Some( + result_array.value(0), + )))) + } else { + Ok(ColumnarValue::Array(Arc::new(result_array))) + } + } +} + +#[inline] +fn spark_compatible_xxhash64>(data: T, seed: u64) -> u64 { + XxHash64::oneshot(seed, data.as_ref()) +} + +/// Hash the values in a dictionary array +fn hash_column_dictionary( + array: &ArrayRef, + hashes: &mut [u64], + first_col: bool, +) -> Result<()> { + let dict_array = array.as_any().downcast_ref::>().unwrap(); + if !first_col { + // Unpack the dictionary array as each row may have a different hash input + let unpacked = take(dict_array.values().as_ref(), dict_array.keys(), None)?; + hash_column_xxhash64(&unpacked, hashes, false)?; + } else { + // For the first column, hash each dictionary value once, and then use + // that computed hash for each key value to avoid a potentially + // expensive redundant hashing for large dictionary elements (e.g. strings) + let dict_values = Arc::clone(dict_array.values()); + // Same initial seed as Spark + let mut dict_hashes = vec![DEFAULT_SEED as u64; dict_values.len()]; + hash_column_xxhash64(&dict_values, &mut dict_hashes, true)?; + for (hash, key) in hashes.iter_mut().zip(dict_array.keys().iter()) { + if let Some(key) = key { + let idx = key.to_usize().ok_or_else(|| { + datafusion_common::DataFusionError::Internal(format!( + "Can not convert key value {:?} to usize in dictionary of type {:?}", + key, + dict_array.data_type() + )) + })?; + *hash = dict_hashes[idx] + } + // No update for Null keys, consistent with other types + } + } + Ok(()) +} + +fn hash_column_xxhash64( + col: &ArrayRef, + hashes: &mut [u64], + first_col: bool, +) -> Result<()> { + match col.data_type() { + DataType::Boolean => { + let array = col.as_any().downcast_ref::().unwrap(); + for (i, hash) in hashes.iter_mut().enumerate() { + if !array.is_null(i) { + let val = i32::from(array.value(i)); + *hash = spark_compatible_xxhash64(val.to_le_bytes(), *hash); + } + } + } + DataType::Int8 => { + let array = col.as_any().downcast_ref::().unwrap(); + for (i, hash) in hashes.iter_mut().enumerate() { + if !array.is_null(i) { + let val = array.value(i) as i32; + *hash = spark_compatible_xxhash64(val.to_le_bytes(), *hash); + } + } + } + DataType::Int16 => { + let array = col.as_any().downcast_ref::().unwrap(); + for (i, hash) in hashes.iter_mut().enumerate() { + if !array.is_null(i) { + let val = array.value(i) as i32; + *hash = spark_compatible_xxhash64(val.to_le_bytes(), *hash); + } + } + } + DataType::Int32 => { + let array = col.as_any().downcast_ref::().unwrap(); + for (i, hash) in hashes.iter_mut().enumerate() { + if !array.is_null(i) { + *hash = + spark_compatible_xxhash64(array.value(i).to_le_bytes(), *hash); + } + } + } + DataType::Int64 => { + let array = col.as_any().downcast_ref::().unwrap(); + for (i, hash) in hashes.iter_mut().enumerate() { + if !array.is_null(i) { + *hash = + spark_compatible_xxhash64(array.value(i).to_le_bytes(), *hash); + } + } + } + DataType::Float32 => { + let array = col.as_any().downcast_ref::().unwrap(); + for (i, hash) in hashes.iter_mut().enumerate() { + if !array.is_null(i) { + let val = array.value(i); + // Spark uses 0 as hash for -0.0 + let bytes = if val == 0.0 && val.is_sign_negative() { + 0i32.to_le_bytes() + } else { + val.to_le_bytes() + }; + *hash = spark_compatible_xxhash64(bytes, *hash); + } + } + } + DataType::Float64 => { + let array = col.as_any().downcast_ref::().unwrap(); + for (i, hash) in hashes.iter_mut().enumerate() { + if !array.is_null(i) { + let val = array.value(i); + // Spark uses 0 as hash for -0.0 + let bytes = if val == 0.0 && val.is_sign_negative() { + 0i64.to_le_bytes() + } else { + val.to_le_bytes() + }; + *hash = spark_compatible_xxhash64(bytes, *hash); + } + } + } + DataType::Date32 => { + let array = col.as_any().downcast_ref::().unwrap(); + for (i, hash) in hashes.iter_mut().enumerate() { + if !array.is_null(i) { + *hash = + spark_compatible_xxhash64(array.value(i).to_le_bytes(), *hash); + } + } + } + DataType::Date64 => { + let array = col.as_any().downcast_ref::().unwrap(); + for (i, hash) in hashes.iter_mut().enumerate() { + if !array.is_null(i) { + *hash = + spark_compatible_xxhash64(array.value(i).to_le_bytes(), *hash); + } + } + } + DataType::Timestamp(TimeUnit::Second, _) => { + let array = col.as_any().downcast_ref::().unwrap(); + for (i, hash) in hashes.iter_mut().enumerate() { + if !array.is_null(i) { + *hash = + spark_compatible_xxhash64(array.value(i).to_le_bytes(), *hash); + } + } + } + DataType::Timestamp(TimeUnit::Millisecond, _) => { + let array = col + .as_any() + .downcast_ref::() + .unwrap(); + for (i, hash) in hashes.iter_mut().enumerate() { + if !array.is_null(i) { + *hash = + spark_compatible_xxhash64(array.value(i).to_le_bytes(), *hash); + } + } + } + DataType::Timestamp(TimeUnit::Microsecond, _) => { + let array = col + .as_any() + .downcast_ref::() + .unwrap(); + for (i, hash) in hashes.iter_mut().enumerate() { + if !array.is_null(i) { + *hash = + spark_compatible_xxhash64(array.value(i).to_le_bytes(), *hash); + } + } + } + DataType::Timestamp(TimeUnit::Nanosecond, _) => { + let array = col + .as_any() + .downcast_ref::() + .unwrap(); + for (i, hash) in hashes.iter_mut().enumerate() { + if !array.is_null(i) { + *hash = + spark_compatible_xxhash64(array.value(i).to_le_bytes(), *hash); + } + } + } + DataType::Utf8 => { + let array = col.as_any().downcast_ref::().unwrap(); + for (i, hash) in hashes.iter_mut().enumerate() { + if !array.is_null(i) { + *hash = spark_compatible_xxhash64(array.value(i), *hash); + } + } + } + DataType::LargeUtf8 => { + let array = col.as_any().downcast_ref::().unwrap(); + for (i, hash) in hashes.iter_mut().enumerate() { + if !array.is_null(i) { + *hash = spark_compatible_xxhash64(array.value(i), *hash); + } + } + } + DataType::Utf8View => { + let array = col.as_string_view(); + for (i, hash) in hashes.iter_mut().enumerate() { + if !array.is_null(i) { + *hash = spark_compatible_xxhash64(array.value(i), *hash); + } + } + } + DataType::Binary => { + let array = col.as_any().downcast_ref::().unwrap(); + for (i, hash) in hashes.iter_mut().enumerate() { + if !array.is_null(i) { + *hash = spark_compatible_xxhash64(array.value(i), *hash); + } + } + } + DataType::LargeBinary => { + let array = col.as_any().downcast_ref::().unwrap(); + for (i, hash) in hashes.iter_mut().enumerate() { + if !array.is_null(i) { + *hash = spark_compatible_xxhash64(array.value(i), *hash); + } + } + } + DataType::BinaryView => { + let array = col.as_binary_view(); + for (i, hash) in hashes.iter_mut().enumerate() { + if !array.is_null(i) { + *hash = spark_compatible_xxhash64(array.value(i), *hash); + } + } + } + DataType::FixedSizeBinary(_) => { + let array = col.as_any().downcast_ref::().unwrap(); + for (i, hash) in hashes.iter_mut().enumerate() { + if !array.is_null(i) { + *hash = spark_compatible_xxhash64(array.value(i), *hash); + } + } + } + DataType::Decimal128(precision, _) if *precision <= 18 => { + let array = col.as_any().downcast_ref::().unwrap(); + for (i, hash) in hashes.iter_mut().enumerate() { + if !array.is_null(i) { + // For small decimals, hash as i64 + let val = array.value(i) as i64; + *hash = spark_compatible_xxhash64(val.to_le_bytes(), *hash); + } + } + } + DataType::Decimal128(_, _) => { + let array = col.as_any().downcast_ref::().unwrap(); + for (i, hash) in hashes.iter_mut().enumerate() { + if !array.is_null(i) { + *hash = + spark_compatible_xxhash64(array.value(i).to_le_bytes(), *hash); + } + } + } + DataType::Null => { + // Nulls don't update the hash + } + DataType::Dictionary(key_type, _) => match key_type.as_ref() { + DataType::Int8 => hash_column_dictionary::( + col, hashes, first_col, + )?, + DataType::Int16 => hash_column_dictionary::( + col, hashes, first_col, + )?, + DataType::Int32 => hash_column_dictionary::( + col, hashes, first_col, + )?, + DataType::Int64 => hash_column_dictionary::( + col, hashes, first_col, + )?, + DataType::UInt8 => hash_column_dictionary::( + col, hashes, first_col, + )?, + DataType::UInt16 => hash_column_dictionary::( + col, hashes, first_col, + )?, + DataType::UInt32 => hash_column_dictionary::( + col, hashes, first_col, + )?, + DataType::UInt64 => hash_column_dictionary::( + col, hashes, first_col, + )?, + dt => { + return internal_err!( + "Unsupported dictionary key type for xxhash64: {dt}" + ); + } + }, + dt => { + return internal_err!("Unsupported data type for xxhash64: {dt}"); + } + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_xxhash64_i32() { + let seed = 42u64; + assert_eq!( + spark_compatible_xxhash64(1i32.to_le_bytes(), seed), + 0xa309b38455455929 + ); + assert_eq!( + spark_compatible_xxhash64(0i32.to_le_bytes(), seed), + 0x3229fbc4681e48f3 + ); + assert_eq!( + spark_compatible_xxhash64((-1i32).to_le_bytes(), seed), + 0x1bfdda8861c06e45 + ); + } + + #[test] + fn test_xxhash64_i64() { + let seed = 42u64; + assert_eq!( + spark_compatible_xxhash64(1i64.to_le_bytes(), seed), + 0x9ed50fd59358d232 + ); + assert_eq!( + spark_compatible_xxhash64(0i64.to_le_bytes(), seed), + 0xb71b47ebda15746c + ); + assert_eq!( + spark_compatible_xxhash64((-1i64).to_le_bytes(), seed), + 0x358ae035bfb46fd2 + ); + } + + #[test] + fn test_xxhash64_string() { + let seed = 42u64; + assert_eq!(spark_compatible_xxhash64("hello", seed), 0xc3629e6318d53932); + assert_eq!(spark_compatible_xxhash64("", seed), 0x98b1582b0977e704); + assert_eq!(spark_compatible_xxhash64("abc", seed), 0x13c1d910702770e6); + } + + #[test] + fn test_xxhash64_dictionary_string() { + use arrow::array::DictionaryArray; + use arrow::datatypes::Int32Type; + + // Create a dictionary array with string values + // Keys: [0, 1, 2, 0, 1] -> ["hello", "world", "abc", "hello", "world"] + let dict_array: DictionaryArray = + vec!["hello", "world", "abc", "hello", "world"] + .into_iter() + .collect(); + let array_ref: ArrayRef = Arc::new(dict_array); + + let mut hashes = vec![DEFAULT_SEED as u64; 5]; + hash_column_xxhash64(&array_ref, &mut hashes, true).unwrap(); + + // Verify hashes match the expected values for strings + assert_eq!(hashes[0], spark_compatible_xxhash64("hello", 42)); + assert_eq!(hashes[1], spark_compatible_xxhash64("world", 42)); + assert_eq!(hashes[2], spark_compatible_xxhash64("abc", 42)); + // Repeated values should have the same hash + assert_eq!(hashes[3], hashes[0]); // "hello" again + assert_eq!(hashes[4], hashes[1]); // "world" again + } + + #[test] + fn test_xxhash64_dictionary_int() { + use arrow::array::DictionaryArray; + use arrow::datatypes::Int32Type; + + // Create a dictionary array with int values + let keys = Int32Array::from(vec![0, 1, 2, 0, 1]); + let values = Int32Array::from(vec![100, 200, 300]); + let dict_array = + DictionaryArray::::try_new(keys, Arc::new(values)).unwrap(); + let array_ref: ArrayRef = Arc::new(dict_array); + + let mut hashes = vec![DEFAULT_SEED as u64; 5]; + hash_column_xxhash64(&array_ref, &mut hashes, true).unwrap(); + + // Verify hashes match the expected values for i32 + assert_eq!( + hashes[0], + spark_compatible_xxhash64(100i32.to_le_bytes(), 42) + ); + assert_eq!( + hashes[1], + spark_compatible_xxhash64(200i32.to_le_bytes(), 42) + ); + assert_eq!( + hashes[2], + spark_compatible_xxhash64(300i32.to_le_bytes(), 42) + ); + // Repeated values should have the same hash + assert_eq!(hashes[3], hashes[0]); + assert_eq!(hashes[4], hashes[1]); + } + + #[test] + fn test_xxhash64_dictionary_with_nulls() { + use arrow::array::DictionaryArray; + use arrow::datatypes::Int32Type; + + // Create a dictionary array with null keys + let keys = Int32Array::from(vec![Some(0), None, Some(1), Some(0), None]); + let values = StringArray::from(vec!["hello", "world"]); + let dict_array = + DictionaryArray::::try_new(keys, Arc::new(values)).unwrap(); + let array_ref: ArrayRef = Arc::new(dict_array); + + let mut hashes = vec![DEFAULT_SEED as u64; 5]; + hash_column_xxhash64(&array_ref, &mut hashes, true).unwrap(); + + // Non-null keys should have correct hashes + assert_eq!(hashes[0], spark_compatible_xxhash64("hello", 42)); + assert_eq!(hashes[2], spark_compatible_xxhash64("world", 42)); + assert_eq!(hashes[3], spark_compatible_xxhash64("hello", 42)); + // Null keys should keep the initial seed value (unchanged) + assert_eq!(hashes[1], DEFAULT_SEED as u64); + assert_eq!(hashes[4], DEFAULT_SEED as u64); + } + + #[test] + fn test_xxhash64_dictionary_non_first_column() { + use arrow::array::DictionaryArray; + use arrow::datatypes::Int32Type; + + // Test dictionary as non-first column (uses unpacking via take) + let dict_array: DictionaryArray = + vec!["hello", "world", "abc"].into_iter().collect(); + let array_ref: ArrayRef = Arc::new(dict_array); + + // Start with non-seed hash values (simulating previous column hashing) + let mut hashes = vec![123u64, 456u64, 789u64]; + hash_column_xxhash64(&array_ref, &mut hashes, false).unwrap(); + + // The hashes should be updated from the previous values + assert_eq!(hashes[0], spark_compatible_xxhash64("hello", 123)); + assert_eq!(hashes[1], spark_compatible_xxhash64("world", 456)); + assert_eq!(hashes[2], spark_compatible_xxhash64("abc", 789)); + } + + #[test] + fn test_xxhash64_fixed_size_binary() { + // Create a FixedSizeBinary array with 4-byte values + let array = FixedSizeBinaryArray::from(vec![ + Some(&[0x01, 0x02, 0x03, 0x04][..]), + Some(&[0x05, 0x06, 0x07, 0x08][..]), + None, + Some(&[0x00, 0x00, 0x00, 0x00][..]), + ]); + let array_ref: ArrayRef = Arc::new(array); + + let mut hashes = vec![DEFAULT_SEED as u64; 4]; + hash_column_xxhash64(&array_ref, &mut hashes, true).unwrap(); + + // Verify hashes match expected values + assert_eq!( + hashes[0], + spark_compatible_xxhash64(&[0x01, 0x02, 0x03, 0x04], 42) + ); + assert_eq!( + hashes[1], + spark_compatible_xxhash64(&[0x05, 0x06, 0x07, 0x08], 42) + ); + // Null value should keep the seed + assert_eq!(hashes[2], DEFAULT_SEED as u64); + assert_eq!( + hashes[3], + spark_compatible_xxhash64(&[0x00, 0x00, 0x00, 0x00], 42) + ); + } +} diff --git a/datafusion/sqllogictest/test_files/spark/hash/murmur3_hash.slt b/datafusion/sqllogictest/test_files/spark/hash/murmur3_hash.slt new file mode 100644 index 0000000000000..f42c274fb4017 --- /dev/null +++ b/datafusion/sqllogictest/test_files/spark/hash/murmur3_hash.slt @@ -0,0 +1,152 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Tests for Spark-compatible murmur3_hash (also available as `hash`) +# Reference: https://spark.apache.org/docs/latest/api/sql/index.html#hash + +# Basic integer tests +query I +SELECT murmur3_hash(1); +---- +-1712319331 + +query I +SELECT murmur3_hash(0); +---- +-1670924195 + +query I +SELECT murmur3_hash(-1); +---- +-939490007 + +# Test with `hash` alias +query I +SELECT hash(1); +---- +-1712319331 + +query I +SELECT hash('hello'); +---- +-1008564952 + +# String tests +query I +SELECT murmur3_hash('Spark'); +---- +228093765 + +query I +SELECT murmur3_hash(''); +---- +142593372 + +query I +SELECT murmur3_hash('abc'); +---- +1322437556 + +# NULL handling - nulls don't change the hash (keeps seed value 42) +query I +SELECT murmur3_hash(NULL); +---- +42 + +# Multiple column hashing +query I +SELECT murmur3_hash(1, 2); +---- +-1181176833 + +query I +SELECT murmur3_hash('a', 'b', 'c'); +---- +-1512497118 + +query I +SELECT murmur3_hash(1, 'hello'); +---- +-389038858 + +# Int64 tests +query I +SELECT murmur3_hash(arrow_cast(1, 'Int64')); +---- +-1712319331 + +query I +SELECT murmur3_hash(arrow_cast(0, 'Int64')); +---- +-1670924195 + +# Float tests - note: -0.0 and 0.0 hash to the same value in Spark +query I +SELECT murmur3_hash(1.0); +---- +-460888942 + +query I +SELECT murmur3_hash(0.0); +---- +-1670924195 + +query I +SELECT murmur3_hash(-0.0); +---- +-1670924195 + +# Boolean tests +query I +SELECT murmur3_hash(true); +---- +-559580957 + +query I +SELECT murmur3_hash(false); +---- +933211791 + +# Binary tests +query I +SELECT murmur3_hash(arrow_cast('hello', 'Binary')); +---- +-1008564952 + +# Date tests +query I +SELECT murmur3_hash(arrow_cast('2023-01-15', 'Date32')); +---- +1059174970 + +# Test with table data +statement ok +CREATE TABLE hash_test AS SELECT * FROM (VALUES + (1, 'a'), + (2, 'b'), + (3, 'c') +) AS t(id, name); + +query II +SELECT id, murmur3_hash(id, name) FROM hash_test ORDER BY id; +---- +1 573093329 +2 -436607593 +3 -1127961587 + +statement ok +DROP TABLE hash_test; diff --git a/datafusion/sqllogictest/test_files/spark/hash/xxhash64.slt b/datafusion/sqllogictest/test_files/spark/hash/xxhash64.slt new file mode 100644 index 0000000000000..f78ed2e757fb5 --- /dev/null +++ b/datafusion/sqllogictest/test_files/spark/hash/xxhash64.slt @@ -0,0 +1,141 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Tests for Spark-compatible xxhash64 +# Reference: https://spark.apache.org/docs/latest/api/sql/index.html#xxhash64 + +# Basic integer tests (Int32 - hashed as 4 bytes) +query I +SELECT xxhash64(1); +---- +-7001672635703045582 + +query I +SELECT xxhash64(0); +---- +-5252525462095825812 + +query I +SELECT xxhash64(-1); +---- +3858142552250413010 + +# String tests +query I +SELECT xxhash64('hello'); +---- +-4367754540140381902 + +query I +SELECT xxhash64(''); +---- +-7444071767201028348 + +query I +SELECT xxhash64('abc'); +---- +1423657621850124518 + +# NULL handling - nulls don't change the hash (keeps seed value 42) +query I +SELECT xxhash64(NULL); +---- +42 + +# Multiple column hashing +query I +SELECT xxhash64(1, 2); +---- +-8198029865082835910 + +query I +SELECT xxhash64('a', 'b', 'c'); +---- +-8775012835737190202 + +query I +SELECT xxhash64(1, 'hello'); +---- +-7340167327466467369 + +# Int64 tests +query I +SELECT xxhash64(arrow_cast(1, 'Int64')); +---- +-7001672635703045582 + +query I +SELECT xxhash64(arrow_cast(0, 'Int64')); +---- +-5252525462095825812 + +# Float tests - note: -0.0 and 0.0 hash to the same value in Spark +query I +SELECT xxhash64(1.0); +---- +-2162451265447482029 + +query I +SELECT xxhash64(0.0); +---- +-5252525462095825812 + +query I +SELECT xxhash64(-0.0); +---- +-5252525462095825812 + +# Boolean tests +query I +SELECT xxhash64(true); +---- +-6698625589789238999 + +query I +SELECT xxhash64(false); +---- +3614696996920510707 + +# Binary tests +query I +SELECT xxhash64(arrow_cast('hello', 'Binary')); +---- +-4367754540140381902 + +# Date tests +query I +SELECT xxhash64(arrow_cast('2023-01-15', 'Date32')); +---- +290591839883576215 + +# Test with table data +statement ok +CREATE TABLE xxhash_test AS SELECT * FROM (VALUES + (1, 'a'), + (2, 'b'), + (3, 'c') +) AS t(id, name); + +query II +SELECT id, xxhash64(id, name) FROM xxhash_test ORDER BY id; +---- +1 -8527884376001292090 +2 -7687314219880041104 +3 -7773406846575579340 + +statement ok +DROP TABLE xxhash_test;