From ed58cc4cffd53e1aa441eef5b716e9ce2d4df00f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 26 Jan 2026 17:58:01 -0700 Subject: [PATCH 1/5] [WIP] Add Iceberg TPC-H benchmarking scripts Add scripts to benchmark TPC-H queries against Iceberg tables using Comet's native iceberg-rust integration: - create-iceberg-tpch.py: Convert Parquet TPC-H data to Iceberg tables - tpcbench-iceberg.py: Run TPC-H queries against Iceberg catalog tables - comet-tpch-iceberg.sh: Shell script to run the benchmark with Comet Also updates README.md with Iceberg benchmarking documentation. Co-Authored-By: Claude Opus 4.5 --- dev/benchmarks/README.md | 67 ++++++++ dev/benchmarks/comet-tpch-iceberg.sh | 114 +++++++++++++ dev/benchmarks/create-iceberg-tpch.py | 88 ++++++++++ dev/benchmarks/tpcbench-iceberg.py | 224 ++++++++++++++++++++++++++ 4 files changed, 493 insertions(+) create mode 100755 dev/benchmarks/comet-tpch-iceberg.sh create mode 100644 dev/benchmarks/create-iceberg-tpch.py create mode 100644 dev/benchmarks/tpcbench-iceberg.py diff --git a/dev/benchmarks/README.md b/dev/benchmarks/README.md index 2ef7a9a260..8e74fad227 100644 --- a/dev/benchmarks/README.md +++ b/dev/benchmarks/README.md @@ -73,3 +73,70 @@ Generating charts: ```shell python3 generate-comparison.py --benchmark tpch --labels "Spark 3.5.3" "Comet 0.9.0" "Gluten 1.4.0" --title "TPC-H @ 100 GB (single executor, 8 cores, local Parquet files)" spark-tpch-1752338506381.json comet-tpch-1752337818039.json gluten-tpch-1752337474344.json ``` + +## Iceberg Benchmarking + +Comet includes native Iceberg support via iceberg-rust integration. This enables benchmarking TPC-H queries +against Iceberg tables with native scan acceleration. + +### Prerequisites + +Download the Iceberg Spark runtime JAR: + +```shell +wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.8.1/iceberg-spark-runtime-3.5_2.12-1.8.1.jar +export ICEBERG_JAR=/path/to/iceberg-spark-runtime-3.5_2.12-1.8.1.jar +``` + +### Create Iceberg TPC-H tables + +Convert existing Parquet TPC-H data to Iceberg format: + +```shell +export ICEBERG_WAREHOUSE=/mnt/bigdata/iceberg-warehouse + +$SPARK_HOME/bin/spark-submit \ + --jars $ICEBERG_JAR \ + --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \ + --conf spark.sql.catalog.local.type=hadoop \ + --conf spark.sql.catalog.local.warehouse=$ICEBERG_WAREHOUSE \ + create-iceberg-tpch.py \ + --parquet-path $TPCH_DATA \ + --catalog local \ + --database tpch +``` + +### Run Iceberg benchmark + +```shell +export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64 +export COMET_JAR=/opt/comet/comet-spark-spark3.5_2.12-0.10.0.jar +export ICEBERG_JAR=/path/to/iceberg-spark-runtime-3.5_2.12-1.8.1.jar +export ICEBERG_WAREHOUSE=/mnt/bigdata/iceberg-warehouse +export TPCH_QUERIES=/mnt/bigdata/tpch/queries/ +sudo ./drop-caches.sh +./comet-tpch-iceberg.sh +``` + +The benchmark uses `spark.comet.scan.icebergNative.enabled=true` to enable Comet's native iceberg-rust +integration. Verify native scanning is active by checking for `CometIcebergNativeScanExec` in the +physical plan output. + +### Iceberg-specific options + +| Environment Variable | Default | Description | +|---------------------|---------|-------------| +| `ICEBERG_CATALOG` | `local` | Iceberg catalog name | +| `ICEBERG_DATABASE` | `tpch` | Database containing TPC-H tables | +| `ICEBERG_WAREHOUSE` | (required) | Path to Iceberg warehouse directory | + +### Comparing Parquet vs Iceberg performance + +Run both benchmarks and compare: + +```shell +python3 generate-comparison.py --benchmark tpch \ + --labels "Comet (Parquet)" "Comet (Iceberg)" \ + --title "TPC-H @ 100 GB: Parquet vs Iceberg" \ + comet-tpch-*.json comet-iceberg-tpch-*.json +``` diff --git a/dev/benchmarks/comet-tpch-iceberg.sh b/dev/benchmarks/comet-tpch-iceberg.sh new file mode 100755 index 0000000000..6c21798d62 --- /dev/null +++ b/dev/benchmarks/comet-tpch-iceberg.sh @@ -0,0 +1,114 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# TPC-H benchmark using Iceberg tables with Comet's native iceberg-rust integration. +# +# Required environment variables: +# SPARK_HOME - Path to Spark installation +# SPARK_MASTER - Spark master URL (e.g., spark://localhost:7077) +# COMET_JAR - Path to Comet JAR +# ICEBERG_JAR - Path to Iceberg Spark runtime JAR +# ICEBERG_WAREHOUSE - Path to Iceberg warehouse directory +# TPCH_QUERIES - Path to TPC-H query files +# +# Optional: +# ICEBERG_CATALOG - Catalog name (default: local) +# ICEBERG_DATABASE - Database name (default: tpch) +# +# Setup (run once to create Iceberg tables from Parquet): +# $SPARK_HOME/bin/spark-submit \ +# --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \ +# --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \ +# --conf spark.sql.catalog.local.type=hadoop \ +# --conf spark.sql.catalog.local.warehouse=$ICEBERG_WAREHOUSE \ +# create-iceberg-tpch.py \ +# --parquet-path $TPCH_DATA \ +# --catalog local \ +# --database tpch + +set -e + +# Defaults +ICEBERG_CATALOG=${ICEBERG_CATALOG:-local} +ICEBERG_DATABASE=${ICEBERG_DATABASE:-tpch} + +# Validate required variables +if [ -z "$SPARK_HOME" ]; then + echo "Error: SPARK_HOME is not set" + exit 1 +fi +if [ -z "$COMET_JAR" ]; then + echo "Error: COMET_JAR is not set" + exit 1 +fi +if [ -z "$ICEBERG_JAR" ]; then + echo "Error: ICEBERG_JAR is not set" + echo "Download from: https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.8.1/" + exit 1 +fi +if [ -z "$ICEBERG_WAREHOUSE" ]; then + echo "Error: ICEBERG_WAREHOUSE is not set" + exit 1 +fi +if [ -z "$TPCH_QUERIES" ]; then + echo "Error: TPCH_QUERIES is not set" + exit 1 +fi + +$SPARK_HOME/sbin/stop-master.sh 2>/dev/null || true +$SPARK_HOME/sbin/stop-worker.sh 2>/dev/null || true + +$SPARK_HOME/sbin/start-master.sh +$SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER + +$SPARK_HOME/bin/spark-submit \ + --master $SPARK_MASTER \ + --jars $COMET_JAR,$ICEBERG_JAR \ + --driver-class-path $COMET_JAR \ + --conf spark.driver.memory=8G \ + --conf spark.executor.instances=1 \ + --conf spark.executor.cores=8 \ + --conf spark.cores.max=8 \ + --conf spark.executor.memory=16g \ + --conf spark.memory.offHeap.enabled=true \ + --conf spark.memory.offHeap.size=16g \ + --conf spark.eventLog.enabled=true \ + --conf spark.driver.extraClassPath=$COMET_JAR \ + --conf spark.executor.extraClassPath=$COMET_JAR \ + --conf spark.plugins=org.apache.spark.CometPlugin \ + --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \ + --conf spark.comet.exec.replaceSortMergeJoin=true \ + --conf spark.comet.expression.Cast.allowIncompatible=true \ + --conf spark.comet.enabled=true \ + --conf spark.comet.exec.enabled=true \ + --conf spark.comet.scan.icebergNative.enabled=true \ + --conf spark.comet.explainFallback.enabled=true \ + --conf spark.sql.catalog.${ICEBERG_CATALOG}=org.apache.iceberg.spark.SparkCatalog \ + --conf spark.sql.catalog.${ICEBERG_CATALOG}.type=hadoop \ + --conf spark.sql.catalog.${ICEBERG_CATALOG}.warehouse=$ICEBERG_WAREHOUSE \ + --conf spark.sql.defaultCatalog=${ICEBERG_CATALOG} \ + tpcbench-iceberg.py \ + --name comet-iceberg \ + --benchmark tpch \ + --catalog $ICEBERG_CATALOG \ + --database $ICEBERG_DATABASE \ + --queries $TPCH_QUERIES \ + --output . \ + --iterations 1 diff --git a/dev/benchmarks/create-iceberg-tpch.py b/dev/benchmarks/create-iceberg-tpch.py new file mode 100644 index 0000000000..44f0f63a2e --- /dev/null +++ b/dev/benchmarks/create-iceberg-tpch.py @@ -0,0 +1,88 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Convert TPC-H Parquet data to Iceberg tables. + +Usage: + spark-submit \ + --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \ + --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \ + --conf spark.sql.catalog.local.type=hadoop \ + --conf spark.sql.catalog.local.warehouse=/path/to/iceberg-warehouse \ + create-iceberg-tpch.py \ + --parquet-path /path/to/tpch/parquet \ + --catalog local \ + --database tpch +""" + +import argparse +from pyspark.sql import SparkSession +import time + + +def main(parquet_path: str, catalog: str, database: str): + spark = SparkSession.builder \ + .appName("Create Iceberg TPC-H Tables") \ + .getOrCreate() + + table_names = [ + "customer", + "lineitem", + "nation", + "orders", + "part", + "partsupp", + "region", + "supplier" + ] + + # Create database if it doesn't exist + spark.sql(f"CREATE DATABASE IF NOT EXISTS {catalog}.{database}") + + for table in table_names: + parquet_table_path = f"{parquet_path}/{table}.parquet" + iceberg_table = f"{catalog}.{database}.{table}" + + print(f"Converting {parquet_table_path} -> {iceberg_table}") + start_time = time.time() + + # Drop table if exists to allow re-running + spark.sql(f"DROP TABLE IF EXISTS {iceberg_table}") + + # Read parquet and write as Iceberg + df = spark.read.parquet(parquet_table_path) + df.writeTo(iceberg_table).using("iceberg").create() + + row_count = spark.table(iceberg_table).count() + elapsed = time.time() - start_time + print(f" Created {iceberg_table} with {row_count} rows in {elapsed:.2f}s") + + print("\nAll TPC-H tables created successfully!") + print(f"Tables available at: {catalog}.{database}.*") + + spark.stop() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Convert TPC-H Parquet data to Iceberg tables") + parser.add_argument("--parquet-path", required=True, help="Path to TPC-H Parquet data directory") + parser.add_argument("--catalog", required=True, help="Iceberg catalog name (e.g., 'local')") + parser.add_argument("--database", default="tpch", help="Database name to create tables in") + args = parser.parse_args() + + main(args.parquet_path, args.catalog, args.database) diff --git a/dev/benchmarks/tpcbench-iceberg.py b/dev/benchmarks/tpcbench-iceberg.py new file mode 100644 index 0000000000..1a0c15da9d --- /dev/null +++ b/dev/benchmarks/tpcbench-iceberg.py @@ -0,0 +1,224 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +TPC-H / TPC-DS benchmark runner for Iceberg tables. + +This is similar to tpcbench.py but reads from Iceberg catalog tables +instead of Parquet files. +""" + +import argparse +from datetime import datetime +import json +from pyspark.sql import SparkSession +import time + + +def dedup_columns(df): + """Rename duplicate column aliases: a, a, b, b -> a, a_1, b, b_1""" + counts = {} + new_cols = [] + for c in df.columns: + if c not in counts: + counts[c] = 0 + new_cols.append(c) + else: + counts[c] += 1 + new_cols.append(f"{c}_{counts[c]}") + return df.toDF(*new_cols) + + +def main( + benchmark: str, + catalog: str, + database: str, + query_path: str, + iterations: int, + output: str, + name: str, + query_num: int = None, + write_path: str = None +): + spark = SparkSession.builder \ + .appName(f"{name} benchmark derived from {benchmark}") \ + .getOrCreate() + + # Define tables for each benchmark + if benchmark == "tpch": + num_queries = 22 + table_names = [ + "customer", "lineitem", "nation", "orders", + "part", "partsupp", "region", "supplier" + ] + elif benchmark == "tpcds": + num_queries = 99 + table_names = [ + "call_center", "catalog_page", "catalog_returns", "catalog_sales", + "customer", "customer_address", "customer_demographics", "date_dim", + "time_dim", "household_demographics", "income_band", "inventory", + "item", "promotion", "reason", "ship_mode", "store", "store_returns", + "store_sales", "warehouse", "web_page", "web_returns", "web_sales", + "web_site" + ] + else: + raise ValueError(f"Invalid benchmark: {benchmark}") + + # Register Iceberg tables as temp views for SQL queries + for table in table_names: + iceberg_table = f"{catalog}.{database}.{table}" + print(f"Registering table {table} from {iceberg_table}") + df = spark.table(iceberg_table) + df.createOrReplaceTempView(table) + + # Show that we're using native Iceberg scan + print(f" Schema: {df.schema.simpleString()}") + + conf_dict = {k: v for k, v in spark.sparkContext.getConf().getAll()} + + results = { + 'engine': 'datafusion-comet', + 'benchmark': benchmark, + 'catalog': catalog, + 'database': database, + 'query_path': query_path, + 'spark_conf': conf_dict, + } + + for iteration in range(iterations): + print(f"\n{'='*60}") + print(f"Starting iteration {iteration + 1} of {iterations}") + print(f"{'='*60}") + iter_start_time = time.time() + + # Determine which queries to run + if query_num is not None: + if query_num < 1 or query_num > num_queries: + raise ValueError( + f"Query number {query_num} out of range. " + f"Valid: 1-{num_queries} for {benchmark}" + ) + queries_to_run = [query_num] + else: + queries_to_run = range(1, num_queries + 1) + + for query in queries_to_run: + spark.sparkContext.setJobDescription(f"{benchmark} q{query}") + + path = f"{query_path}/q{query}.sql" + print(f"\nRunning query {query} from {path}") + + with open(path, "r") as f: + text = f.read() + queries = text.split(";") + + start_time = time.time() + for sql in queries: + sql = sql.strip().replace("create view", "create temp view") + if len(sql) > 0: + print(f"Executing: {sql[:100]}...") + df = spark.sql(sql) + + # Show physical plan to verify native Iceberg scan + if query == queries_to_run[0] if hasattr(queries_to_run, '__iter__') else query == query_num: + print("\nPhysical Plan (verify CometIcebergNativeScanExec):") + df.explain("formatted") + + if write_path is not None: + if len(df.columns) > 0: + output_path = f"{write_path}/q{query}" + deduped = dedup_columns(df) + deduped.orderBy(*deduped.columns).coalesce(1).write.mode("overwrite").parquet(output_path) + print(f"Results written to {output_path}") + else: + rows = df.collect() + print(f"Query {query} returned {len(rows)} rows") + + end_time = time.time() + elapsed = end_time - start_time + print(f"Query {query} took {elapsed:.2f} seconds") + + query_timings = results.setdefault(query, []) + query_timings.append(elapsed) + + iter_end_time = time.time() + print(f"\nIteration {iteration + 1} took {iter_end_time - iter_start_time:.2f} seconds") + + # Write results + result_str = json.dumps(results, indent=4) + current_time_millis = int(datetime.now().timestamp() * 1000) + results_path = f"{output}/{name}-{benchmark}-{current_time_millis}.json" + print(f"\nWriting results to {results_path}") + with open(results_path, "w") as f: + f.write(result_str) + + spark.stop() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="TPC-H/TPC-DS benchmark for Iceberg tables" + ) + parser.add_argument( + "--benchmark", required=True, + help="Benchmark to run (tpch or tpcds)" + ) + parser.add_argument( + "--catalog", required=True, + help="Iceberg catalog name" + ) + parser.add_argument( + "--database", default="tpch", + help="Database containing TPC tables" + ) + parser.add_argument( + "--queries", required=True, + help="Path to query SQL files" + ) + parser.add_argument( + "--iterations", type=int, default=1, + help="Number of iterations" + ) + parser.add_argument( + "--output", required=True, + help="Path to write results JSON" + ) + parser.add_argument( + "--name", required=True, + help="Prefix for result file" + ) + parser.add_argument( + "--query", type=int, + help="Specific query number (1-based). If omitted, run all." + ) + parser.add_argument( + "--write", + help="Path to save query results as Parquet" + ) + args = parser.parse_args() + + main( + args.benchmark, + args.catalog, + args.database, + args.queries, + args.iterations, + args.output, + args.name, + args.query, + args.write + ) From c7ddf7074538c4dd56d449fd0f38d3e9144f1440 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 27 Jan 2026 13:32:32 -0700 Subject: [PATCH 2/5] fix --- dev/benchmarks/README.md | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/dev/benchmarks/README.md b/dev/benchmarks/README.md index 8e74fad227..3acd5a114a 100644 --- a/dev/benchmarks/README.md +++ b/dev/benchmarks/README.md @@ -96,7 +96,13 @@ Convert existing Parquet TPC-H data to Iceberg format: export ICEBERG_WAREHOUSE=/mnt/bigdata/iceberg-warehouse $SPARK_HOME/bin/spark-submit \ + --master $SPARK_MASTER \ --jars $ICEBERG_JAR \ + --conf spark.driver.memory=8G \ + --conf spark.executor.instances=1 \ + --conf spark.executor.cores=8 \ + --conf spark.cores.max=8 \ + --conf spark.executor.memory=16g \ --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.local.type=hadoop \ --conf spark.sql.catalog.local.warehouse=$ICEBERG_WAREHOUSE \ @@ -124,11 +130,11 @@ physical plan output. ### Iceberg-specific options -| Environment Variable | Default | Description | -|---------------------|---------|-------------| -| `ICEBERG_CATALOG` | `local` | Iceberg catalog name | -| `ICEBERG_DATABASE` | `tpch` | Database containing TPC-H tables | -| `ICEBERG_WAREHOUSE` | (required) | Path to Iceberg warehouse directory | +| Environment Variable | Default | Description | +| -------------------- | ---------- | ----------------------------------- | +| `ICEBERG_CATALOG` | `local` | Iceberg catalog name | +| `ICEBERG_DATABASE` | `tpch` | Database containing TPC-H tables | +| `ICEBERG_WAREHOUSE` | (required) | Path to Iceberg warehouse directory | ### Comparing Parquet vs Iceberg performance From 538006f40642bad37d1dc5136ca794cd82c07e17 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 27 Jan 2026 13:37:57 -0700 Subject: [PATCH 3/5] fix --- dev/benchmarks/comet-tpch-iceberg.sh | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dev/benchmarks/comet-tpch-iceberg.sh b/dev/benchmarks/comet-tpch-iceberg.sh index 6c21798d62..db029c889a 100755 --- a/dev/benchmarks/comet-tpch-iceberg.sh +++ b/dev/benchmarks/comet-tpch-iceberg.sh @@ -81,7 +81,7 @@ $SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER $SPARK_HOME/bin/spark-submit \ --master $SPARK_MASTER \ --jars $COMET_JAR,$ICEBERG_JAR \ - --driver-class-path $COMET_JAR \ + --driver-class-path $COMET_JAR:$ICEBERG_JAR \ --conf spark.driver.memory=8G \ --conf spark.executor.instances=1 \ --conf spark.executor.cores=8 \ @@ -90,8 +90,8 @@ $SPARK_HOME/bin/spark-submit \ --conf spark.memory.offHeap.enabled=true \ --conf spark.memory.offHeap.size=16g \ --conf spark.eventLog.enabled=true \ - --conf spark.driver.extraClassPath=$COMET_JAR \ - --conf spark.executor.extraClassPath=$COMET_JAR \ + --conf spark.driver.extraClassPath=$COMET_JAR:$ICEBERG_JAR \ + --conf spark.executor.extraClassPath=$COMET_JAR:$ICEBERG_JAR \ --conf spark.plugins=org.apache.spark.CometPlugin \ --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \ --conf spark.comet.exec.replaceSortMergeJoin=true \ From 8f3039cd65ebe42a39c609fb2033455d0924ea56 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 27 Jan 2026 13:44:42 -0700 Subject: [PATCH 4/5] Consolidate Parquet and Iceberg benchmark scripts into single tpcbench.py Merge tpcbench-iceberg.py into tpcbench.py using mutually exclusive args: - --data for Parquet files - --catalog/--database for Iceberg tables Co-Authored-By: Claude Opus 4.5 --- dev/benchmarks/comet-tpch-iceberg.sh | 2 +- dev/benchmarks/tpcbench-iceberg.py | 224 --------------------------- dev/benchmarks/tpcbench.py | 185 +++++++++++++++------- 3 files changed, 132 insertions(+), 279 deletions(-) delete mode 100644 dev/benchmarks/tpcbench-iceberg.py diff --git a/dev/benchmarks/comet-tpch-iceberg.sh b/dev/benchmarks/comet-tpch-iceberg.sh index db029c889a..7907125c82 100755 --- a/dev/benchmarks/comet-tpch-iceberg.sh +++ b/dev/benchmarks/comet-tpch-iceberg.sh @@ -104,7 +104,7 @@ $SPARK_HOME/bin/spark-submit \ --conf spark.sql.catalog.${ICEBERG_CATALOG}.type=hadoop \ --conf spark.sql.catalog.${ICEBERG_CATALOG}.warehouse=$ICEBERG_WAREHOUSE \ --conf spark.sql.defaultCatalog=${ICEBERG_CATALOG} \ - tpcbench-iceberg.py \ + tpcbench.py \ --name comet-iceberg \ --benchmark tpch \ --catalog $ICEBERG_CATALOG \ diff --git a/dev/benchmarks/tpcbench-iceberg.py b/dev/benchmarks/tpcbench-iceberg.py deleted file mode 100644 index 1a0c15da9d..0000000000 --- a/dev/benchmarks/tpcbench-iceberg.py +++ /dev/null @@ -1,224 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -""" -TPC-H / TPC-DS benchmark runner for Iceberg tables. - -This is similar to tpcbench.py but reads from Iceberg catalog tables -instead of Parquet files. -""" - -import argparse -from datetime import datetime -import json -from pyspark.sql import SparkSession -import time - - -def dedup_columns(df): - """Rename duplicate column aliases: a, a, b, b -> a, a_1, b, b_1""" - counts = {} - new_cols = [] - for c in df.columns: - if c not in counts: - counts[c] = 0 - new_cols.append(c) - else: - counts[c] += 1 - new_cols.append(f"{c}_{counts[c]}") - return df.toDF(*new_cols) - - -def main( - benchmark: str, - catalog: str, - database: str, - query_path: str, - iterations: int, - output: str, - name: str, - query_num: int = None, - write_path: str = None -): - spark = SparkSession.builder \ - .appName(f"{name} benchmark derived from {benchmark}") \ - .getOrCreate() - - # Define tables for each benchmark - if benchmark == "tpch": - num_queries = 22 - table_names = [ - "customer", "lineitem", "nation", "orders", - "part", "partsupp", "region", "supplier" - ] - elif benchmark == "tpcds": - num_queries = 99 - table_names = [ - "call_center", "catalog_page", "catalog_returns", "catalog_sales", - "customer", "customer_address", "customer_demographics", "date_dim", - "time_dim", "household_demographics", "income_band", "inventory", - "item", "promotion", "reason", "ship_mode", "store", "store_returns", - "store_sales", "warehouse", "web_page", "web_returns", "web_sales", - "web_site" - ] - else: - raise ValueError(f"Invalid benchmark: {benchmark}") - - # Register Iceberg tables as temp views for SQL queries - for table in table_names: - iceberg_table = f"{catalog}.{database}.{table}" - print(f"Registering table {table} from {iceberg_table}") - df = spark.table(iceberg_table) - df.createOrReplaceTempView(table) - - # Show that we're using native Iceberg scan - print(f" Schema: {df.schema.simpleString()}") - - conf_dict = {k: v for k, v in spark.sparkContext.getConf().getAll()} - - results = { - 'engine': 'datafusion-comet', - 'benchmark': benchmark, - 'catalog': catalog, - 'database': database, - 'query_path': query_path, - 'spark_conf': conf_dict, - } - - for iteration in range(iterations): - print(f"\n{'='*60}") - print(f"Starting iteration {iteration + 1} of {iterations}") - print(f"{'='*60}") - iter_start_time = time.time() - - # Determine which queries to run - if query_num is not None: - if query_num < 1 or query_num > num_queries: - raise ValueError( - f"Query number {query_num} out of range. " - f"Valid: 1-{num_queries} for {benchmark}" - ) - queries_to_run = [query_num] - else: - queries_to_run = range(1, num_queries + 1) - - for query in queries_to_run: - spark.sparkContext.setJobDescription(f"{benchmark} q{query}") - - path = f"{query_path}/q{query}.sql" - print(f"\nRunning query {query} from {path}") - - with open(path, "r") as f: - text = f.read() - queries = text.split(";") - - start_time = time.time() - for sql in queries: - sql = sql.strip().replace("create view", "create temp view") - if len(sql) > 0: - print(f"Executing: {sql[:100]}...") - df = spark.sql(sql) - - # Show physical plan to verify native Iceberg scan - if query == queries_to_run[0] if hasattr(queries_to_run, '__iter__') else query == query_num: - print("\nPhysical Plan (verify CometIcebergNativeScanExec):") - df.explain("formatted") - - if write_path is not None: - if len(df.columns) > 0: - output_path = f"{write_path}/q{query}" - deduped = dedup_columns(df) - deduped.orderBy(*deduped.columns).coalesce(1).write.mode("overwrite").parquet(output_path) - print(f"Results written to {output_path}") - else: - rows = df.collect() - print(f"Query {query} returned {len(rows)} rows") - - end_time = time.time() - elapsed = end_time - start_time - print(f"Query {query} took {elapsed:.2f} seconds") - - query_timings = results.setdefault(query, []) - query_timings.append(elapsed) - - iter_end_time = time.time() - print(f"\nIteration {iteration + 1} took {iter_end_time - iter_start_time:.2f} seconds") - - # Write results - result_str = json.dumps(results, indent=4) - current_time_millis = int(datetime.now().timestamp() * 1000) - results_path = f"{output}/{name}-{benchmark}-{current_time_millis}.json" - print(f"\nWriting results to {results_path}") - with open(results_path, "w") as f: - f.write(result_str) - - spark.stop() - - -if __name__ == "__main__": - parser = argparse.ArgumentParser( - description="TPC-H/TPC-DS benchmark for Iceberg tables" - ) - parser.add_argument( - "--benchmark", required=True, - help="Benchmark to run (tpch or tpcds)" - ) - parser.add_argument( - "--catalog", required=True, - help="Iceberg catalog name" - ) - parser.add_argument( - "--database", default="tpch", - help="Database containing TPC tables" - ) - parser.add_argument( - "--queries", required=True, - help="Path to query SQL files" - ) - parser.add_argument( - "--iterations", type=int, default=1, - help="Number of iterations" - ) - parser.add_argument( - "--output", required=True, - help="Path to write results JSON" - ) - parser.add_argument( - "--name", required=True, - help="Prefix for result file" - ) - parser.add_argument( - "--query", type=int, - help="Specific query number (1-based). If omitted, run all." - ) - parser.add_argument( - "--write", - help="Path to save query results as Parquet" - ) - args = parser.parse_args() - - main( - args.benchmark, - args.catalog, - args.database, - args.queries, - args.iterations, - args.output, - args.name, - args.query, - args.write - ) diff --git a/dev/benchmarks/tpcbench.py b/dev/benchmarks/tpcbench.py index 75944883df..a02d250a57 100644 --- a/dev/benchmarks/tpcbench.py +++ b/dev/benchmarks/tpcbench.py @@ -15,17 +15,23 @@ # specific language governing permissions and limitations # under the License. +""" +TPC-H / TPC-DS benchmark runner. + +Supports two data sources: + - Parquet files: use --data to specify the path + - Iceberg tables: use --catalog and --database to specify the catalog location +""" + import argparse from datetime import datetime import json from pyspark.sql import SparkSession import time -# rename same columns aliases -# a, a, b, b -> a, a_1, b, b_1 -# -# Important for writing data where column name uniqueness is required + def dedup_columns(df): + """Rename duplicate column aliases: a, a, b, b -> a, a_1, b, b_1""" counts = {} new_cols = [] for c in df.columns: @@ -37,30 +43,54 @@ def dedup_columns(df): new_cols.append(f"{c}_{counts[c]}") return df.toDF(*new_cols) -def main(benchmark: str, data_path: str, query_path: str, iterations: int, output: str, name: str, query_num: int = None, write_path: str = None): - # Initialize a SparkSession +def main( + benchmark: str, + data_path: str, + catalog: str, + database: str, + query_path: str, + iterations: int, + output: str, + name: str, + query_num: int = None, + write_path: str = None +): spark = SparkSession.builder \ .appName(f"{name} benchmark derived from {benchmark}") \ .getOrCreate() - # Register the tables + # Define tables for each benchmark if benchmark == "tpch": num_queries = 22 - table_names = ["customer", "lineitem", "nation", "orders", "part", "partsupp", "region", "supplier"] + table_names = [ + "customer", "lineitem", "nation", "orders", + "part", "partsupp", "region", "supplier" + ] elif benchmark == "tpcds": num_queries = 99 - table_names = ["call_center", "catalog_page", "catalog_returns", "catalog_sales", "customer", - "customer_address", "customer_demographics", "date_dim", "time_dim", "household_demographics", - "income_band", "inventory", "item", "promotion", "reason", "ship_mode", "store", "store_returns", - "store_sales", "warehouse", "web_page", "web_returns", "web_sales", "web_site"] + table_names = [ + "call_center", "catalog_page", "catalog_returns", "catalog_sales", + "customer", "customer_address", "customer_demographics", "date_dim", + "time_dim", "household_demographics", "income_band", "inventory", + "item", "promotion", "reason", "ship_mode", "store", "store_returns", + "store_sales", "warehouse", "web_page", "web_returns", "web_sales", + "web_site" + ] else: - raise "invalid benchmark" + raise ValueError(f"Invalid benchmark: {benchmark}") + # Register tables from either Parquet files or Iceberg catalog + using_iceberg = catalog is not None for table in table_names: - path = f"{data_path}/{table}.parquet" - print(f"Registering table {table} using path {path}") - df = spark.read.parquet(path) + if using_iceberg: + source = f"{catalog}.{database}.{table}" + print(f"Registering table {table} from {source}") + df = spark.table(source) + else: + source = f"{data_path}/{table}.parquet" + print(f"Registering table {table} from {source}") + df = spark.read.parquet(source) df.createOrReplaceTempView(table) conf_dict = {k: v for k, v in spark.sparkContext.getConf().getAll()} @@ -68,93 +98,140 @@ def main(benchmark: str, data_path: str, query_path: str, iterations: int, outpu results = { 'engine': 'datafusion-comet', 'benchmark': benchmark, - 'data_path': data_path, 'query_path': query_path, 'spark_conf': conf_dict, } + if using_iceberg: + results['catalog'] = catalog + results['database'] = database + else: + results['data_path'] = data_path - for iteration in range(0, iterations): - print(f"Starting iteration {iteration} of {iterations}") + for iteration in range(iterations): + print(f"\n{'='*60}") + print(f"Starting iteration {iteration + 1} of {iterations}") + print(f"{'='*60}") iter_start_time = time.time() # Determine which queries to run if query_num is not None: - # Validate query number if query_num < 1 or query_num > num_queries: - raise ValueError(f"Query number {query_num} is out of range. Valid range is 1-{num_queries} for {benchmark}") + raise ValueError( + f"Query number {query_num} out of range. " + f"Valid: 1-{num_queries} for {benchmark}" + ) queries_to_run = [query_num] else: - queries_to_run = range(1, num_queries+1) + queries_to_run = range(1, num_queries + 1) for query in queries_to_run: spark.sparkContext.setJobDescription(f"{benchmark} q{query}") - # read text file path = f"{query_path}/q{query}.sql" + print(f"\nRunning query {query} from {path}") - print(f"Reading query {query} using path {path}") with open(path, "r") as f: text = f.read() - # each file can contain multiple queries queries = text.split(";") start_time = time.time() for sql in queries: sql = sql.strip().replace("create view", "create temp view") if len(sql) > 0: - print(f"Executing: {sql}") + print(f"Executing: {sql[:100]}...") df = spark.sql(sql) df.explain("formatted") if write_path is not None: - # skip results with empty schema - # coming across for running DDL stmt if len(df.columns) > 0: output_path = f"{write_path}/q{query}" - # rename same column names for output - # a, a, b, b => a, a_1, b, b_1 - # output doesn't allow non unique column names deduped = dedup_columns(df) - # sort by all columns to have predictable output dataset for comparison deduped.orderBy(*deduped.columns).coalesce(1).write.mode("overwrite").parquet(output_path) - print(f"Query {query} results written to {output_path}") - else: - print(f"Skipping write: DataFrame has no schema for {output_path}") + print(f"Results written to {output_path}") else: rows = df.collect() print(f"Query {query} returned {len(rows)} rows") end_time = time.time() - print(f"Query {query} took {end_time - start_time} seconds") + elapsed = end_time - start_time + print(f"Query {query} took {elapsed:.2f} seconds") - # store timings in list and later add option to run > 1 iterations query_timings = results.setdefault(query, []) - query_timings.append(end_time - start_time) + query_timings.append(elapsed) iter_end_time = time.time() - print(f"Iteration {iteration} took {round(iter_end_time - iter_start_time,2)} seconds") + print(f"\nIteration {iteration + 1} took {iter_end_time - iter_start_time:.2f} seconds") - str = json.dumps(results, indent=4) + # Write results + result_str = json.dumps(results, indent=4) current_time_millis = int(datetime.now().timestamp() * 1000) results_path = f"{output}/{name}-{benchmark}-{current_time_millis}.json" - print(f"Writing results to {results_path}") + print(f"\nWriting results to {results_path}") with open(results_path, "w") as f: - f.write(str) + f.write(result_str) - # Stop the SparkSession spark.stop() + if __name__ == "__main__": - parser = argparse.ArgumentParser(description="DataFusion benchmark derived from TPC-H / TPC-DS") - parser.add_argument("--benchmark", required=True, help="Benchmark to run (tpch or tpcds)") - parser.add_argument("--data", required=True, help="Path to data files") - parser.add_argument("--queries", required=True, help="Path to query files") - parser.add_argument("--iterations", required=False, default="1", help="How many iterations to run") - parser.add_argument("--output", required=True, help="Path to write output") - parser.add_argument("--name", required=True, help="Prefix for result file e.g. spark/comet/gluten") - parser.add_argument("--query", required=False, type=int, help="Specific query number to run (1-based). If not specified, all queries will be run.") - parser.add_argument("--write", required=False, help="Path to save query results to, in Parquet format.") + parser = argparse.ArgumentParser( + description="TPC-H/TPC-DS benchmark runner for Parquet or Iceberg tables" + ) + parser.add_argument( + "--benchmark", required=True, + help="Benchmark to run (tpch or tpcds)" + ) + + # Data source - mutually exclusive: either Parquet path or Iceberg catalog + source_group = parser.add_mutually_exclusive_group(required=True) + source_group.add_argument( + "--data", + help="Path to Parquet data files" + ) + source_group.add_argument( + "--catalog", + help="Iceberg catalog name" + ) + parser.add_argument( + "--database", default="tpch", + help="Database containing TPC tables (only used with --catalog)" + ) + + parser.add_argument( + "--queries", required=True, + help="Path to query SQL files" + ) + parser.add_argument( + "--iterations", type=int, default=1, + help="Number of iterations" + ) + parser.add_argument( + "--output", required=True, + help="Path to write results JSON" + ) + parser.add_argument( + "--name", required=True, + help="Prefix for result file" + ) + parser.add_argument( + "--query", type=int, + help="Specific query number (1-based). If omitted, run all." + ) + parser.add_argument( + "--write", + help="Path to save query results as Parquet" + ) args = parser.parse_args() - main(args.benchmark, args.data, args.queries, int(args.iterations), args.output, args.name, args.query, args.write) - + main( + args.benchmark, + args.data, + args.catalog, + args.database, + args.queries, + args.iterations, + args.output, + args.name, + args.query, + args.write + ) From 07e1fa39d7d6750d01953fa52f46816f7113d08b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 28 Jan 2026 09:18:29 -0700 Subject: [PATCH 5/5] fix: address review comments on README consistency - Use --packages instead of --jars for table creation to match create-iceberg-tpch.py usage - Use $ICEBERG_CATALOG variable instead of hardcoding 'local' in spark.sql.catalog config to be consistent with comet-tpch-iceberg.sh - Clarify that JAR download is only needed for benchmark execution Co-Authored-By: Claude Opus 4.5 --- dev/benchmarks/README.md | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/dev/benchmarks/README.md b/dev/benchmarks/README.md index 3acd5a114a..b3ea674199 100644 --- a/dev/benchmarks/README.md +++ b/dev/benchmarks/README.md @@ -81,34 +81,37 @@ against Iceberg tables with native scan acceleration. ### Prerequisites -Download the Iceberg Spark runtime JAR: +Download the Iceberg Spark runtime JAR (required for running the benchmark): ```shell wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.8.1/iceberg-spark-runtime-3.5_2.12-1.8.1.jar export ICEBERG_JAR=/path/to/iceberg-spark-runtime-3.5_2.12-1.8.1.jar ``` +Note: Table creation uses `--packages` which auto-downloads the dependency. + ### Create Iceberg TPC-H tables Convert existing Parquet TPC-H data to Iceberg format: ```shell export ICEBERG_WAREHOUSE=/mnt/bigdata/iceberg-warehouse +export ICEBERG_CATALOG=${ICEBERG_CATALOG:-local} $SPARK_HOME/bin/spark-submit \ --master $SPARK_MASTER \ - --jars $ICEBERG_JAR \ + --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \ --conf spark.driver.memory=8G \ --conf spark.executor.instances=1 \ --conf spark.executor.cores=8 \ --conf spark.cores.max=8 \ --conf spark.executor.memory=16g \ - --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \ - --conf spark.sql.catalog.local.type=hadoop \ - --conf spark.sql.catalog.local.warehouse=$ICEBERG_WAREHOUSE \ + --conf spark.sql.catalog.${ICEBERG_CATALOG}=org.apache.iceberg.spark.SparkCatalog \ + --conf spark.sql.catalog.${ICEBERG_CATALOG}.type=hadoop \ + --conf spark.sql.catalog.${ICEBERG_CATALOG}.warehouse=$ICEBERG_WAREHOUSE \ create-iceberg-tpch.py \ --parquet-path $TPCH_DATA \ - --catalog local \ + --catalog $ICEBERG_CATALOG \ --database tpch ```