Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions dev/benchmarks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,79 @@ Generating charts:
```shell
python3 generate-comparison.py --benchmark tpch --labels "Spark 3.5.3" "Comet 0.9.0" "Gluten 1.4.0" --title "TPC-H @ 100 GB (single executor, 8 cores, local Parquet files)" spark-tpch-1752338506381.json comet-tpch-1752337818039.json gluten-tpch-1752337474344.json
```

## Iceberg Benchmarking

Comet includes native Iceberg support via iceberg-rust integration. This enables benchmarking TPC-H queries
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Comet includes native Iceberg support via iceberg-rust integration. This enables benchmarking TPC-H queries
Comet includes native Iceberg support via [iceberg-rust](https://github.com/apache/iceberg-rust) integration. This enables benchmarking TPC-H queries

against Iceberg tables with native scan acceleration.

### Prerequisites

Download the Iceberg Spark runtime JAR (required for running the benchmark):

```shell
wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.8.1/iceberg-spark-runtime-3.5_2.12-1.8.1.jar
export ICEBERG_JAR=/path/to/iceberg-spark-runtime-3.5_2.12-1.8.1.jar
```

Note: Table creation uses `--packages` which auto-downloads the dependency.

### Create Iceberg TPC-H tables

Convert existing Parquet TPC-H data to Iceberg format:

```shell
export ICEBERG_WAREHOUSE=/mnt/bigdata/iceberg-warehouse
export ICEBERG_CATALOG=${ICEBERG_CATALOG:-local}

$SPARK_HOME/bin/spark-submit \
--master $SPARK_MASTER \
--packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \
--conf spark.driver.memory=8G \
--conf spark.executor.instances=1 \
--conf spark.executor.cores=8 \
--conf spark.cores.max=8 \
--conf spark.executor.memory=16g \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for data conversion, its Spark Iceberg used.
Perhaps

--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions   --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog

also needed? I remember having this params doing the same exercise for TCPDS iceberg

--conf spark.sql.catalog.${ICEBERG_CATALOG}=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.${ICEBERG_CATALOG}.type=hadoop \
--conf spark.sql.catalog.${ICEBERG_CATALOG}.warehouse=$ICEBERG_WAREHOUSE \
create-iceberg-tpch.py \
--parquet-path $TPCH_DATA \
--catalog $ICEBERG_CATALOG \
--database tpch
```

### Run Iceberg benchmark

```shell
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64
export COMET_JAR=/opt/comet/comet-spark-spark3.5_2.12-0.10.0.jar
export ICEBERG_JAR=/path/to/iceberg-spark-runtime-3.5_2.12-1.8.1.jar
export ICEBERG_WAREHOUSE=/mnt/bigdata/iceberg-warehouse
export TPCH_QUERIES=/mnt/bigdata/tpch/queries/
sudo ./drop-caches.sh
./comet-tpch-iceberg.sh
```

The benchmark uses `spark.comet.scan.icebergNative.enabled=true` to enable Comet's native iceberg-rust
integration. Verify native scanning is active by checking for `CometIcebergNativeScanExec` in the
physical plan output.

### Iceberg-specific options

| Environment Variable | Default | Description |
| -------------------- | ---------- | ----------------------------------- |
| `ICEBERG_CATALOG` | `local` | Iceberg catalog name |
| `ICEBERG_DATABASE` | `tpch` | Database containing TPC-H tables |
| `ICEBERG_WAREHOUSE` | (required) | Path to Iceberg warehouse directory |

### Comparing Parquet vs Iceberg performance

Run both benchmarks and compare:

```shell
python3 generate-comparison.py --benchmark tpch \
--labels "Comet (Parquet)" "Comet (Iceberg)" \
--title "TPC-H @ 100 GB: Parquet vs Iceberg" \
comet-tpch-*.json comet-iceberg-tpch-*.json
```
114 changes: 114 additions & 0 deletions dev/benchmarks/comet-tpch-iceberg.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

# TPC-H benchmark using Iceberg tables with Comet's native iceberg-rust integration.
#
# Required environment variables:
# SPARK_HOME - Path to Spark installation
# SPARK_MASTER - Spark master URL (e.g., spark://localhost:7077)
# COMET_JAR - Path to Comet JAR
# ICEBERG_JAR - Path to Iceberg Spark runtime JAR
# ICEBERG_WAREHOUSE - Path to Iceberg warehouse directory
# TPCH_QUERIES - Path to TPC-H query files
#
# Optional:
# ICEBERG_CATALOG - Catalog name (default: local)
# ICEBERG_DATABASE - Database name (default: tpch)
#
# Setup (run once to create Iceberg tables from Parquet):
# $SPARK_HOME/bin/spark-submit \
# --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \
# --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
# --conf spark.sql.catalog.local.type=hadoop \
# --conf spark.sql.catalog.local.warehouse=$ICEBERG_WAREHOUSE \
# create-iceberg-tpch.py \
# --parquet-path $TPCH_DATA \
# --catalog local \
# --database tpch

set -e

# Defaults
ICEBERG_CATALOG=${ICEBERG_CATALOG:-local}
ICEBERG_DATABASE=${ICEBERG_DATABASE:-tpch}

# Validate required variables
if [ -z "$SPARK_HOME" ]; then
echo "Error: SPARK_HOME is not set"
exit 1
fi
if [ -z "$COMET_JAR" ]; then
echo "Error: COMET_JAR is not set"
exit 1
fi
if [ -z "$ICEBERG_JAR" ]; then
echo "Error: ICEBERG_JAR is not set"
echo "Download from: https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.8.1/"
exit 1
fi
if [ -z "$ICEBERG_WAREHOUSE" ]; then
echo "Error: ICEBERG_WAREHOUSE is not set"
exit 1
fi
if [ -z "$TPCH_QUERIES" ]; then
echo "Error: TPCH_QUERIES is not set"
exit 1
fi

$SPARK_HOME/sbin/stop-master.sh 2>/dev/null || true
$SPARK_HOME/sbin/stop-worker.sh 2>/dev/null || true

$SPARK_HOME/sbin/start-master.sh
$SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER

$SPARK_HOME/bin/spark-submit \
--master $SPARK_MASTER \
--jars $COMET_JAR,$ICEBERG_JAR \
--driver-class-path $COMET_JAR:$ICEBERG_JAR \
--conf spark.driver.memory=8G \
--conf spark.executor.instances=1 \
--conf spark.executor.cores=8 \
--conf spark.cores.max=8 \
--conf spark.executor.memory=16g \
--conf spark.memory.offHeap.enabled=true \
--conf spark.memory.offHeap.size=16g \
--conf spark.eventLog.enabled=true \
--conf spark.driver.extraClassPath=$COMET_JAR:$ICEBERG_JAR \
--conf spark.executor.extraClassPath=$COMET_JAR:$ICEBERG_JAR \
--conf spark.plugins=org.apache.spark.CometPlugin \
--conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
--conf spark.comet.exec.replaceSortMergeJoin=true \
--conf spark.comet.expression.Cast.allowIncompatible=true \
--conf spark.comet.enabled=true \
--conf spark.comet.exec.enabled=true \
--conf spark.comet.scan.icebergNative.enabled=true \
--conf spark.comet.explainFallback.enabled=true \
--conf spark.sql.catalog.${ICEBERG_CATALOG}=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.${ICEBERG_CATALOG}.type=hadoop \
--conf spark.sql.catalog.${ICEBERG_CATALOG}.warehouse=$ICEBERG_WAREHOUSE \
--conf spark.sql.defaultCatalog=${ICEBERG_CATALOG} \
tpcbench.py \
--name comet-iceberg \
--benchmark tpch \
--catalog $ICEBERG_CATALOG \
--database $ICEBERG_DATABASE \
--queries $TPCH_QUERIES \
--output . \
--iterations 1
88 changes: 88 additions & 0 deletions dev/benchmarks/create-iceberg-tpch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Convert TPC-H Parquet data to Iceberg tables.

Usage:
spark-submit \
--packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \
--conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.local.type=hadoop \
--conf spark.sql.catalog.local.warehouse=/path/to/iceberg-warehouse \
create-iceberg-tpch.py \
--parquet-path /path/to/tpch/parquet \
--catalog local \
--database tpch
"""

import argparse
from pyspark.sql import SparkSession
import time


def main(parquet_path: str, catalog: str, database: str):
spark = SparkSession.builder \
.appName("Create Iceberg TPC-H Tables") \
.getOrCreate()

table_names = [
"customer",
"lineitem",
"nation",
"orders",
"part",
"partsupp",
"region",
"supplier"
]

# Create database if it doesn't exist
spark.sql(f"CREATE DATABASE IF NOT EXISTS {catalog}.{database}")

for table in table_names:
parquet_table_path = f"{parquet_path}/{table}.parquet"
iceberg_table = f"{catalog}.{database}.{table}"

print(f"Converting {parquet_table_path} -> {iceberg_table}")
start_time = time.time()

# Drop table if exists to allow re-running
spark.sql(f"DROP TABLE IF EXISTS {iceberg_table}")

# Read parquet and write as Iceberg
df = spark.read.parquet(parquet_table_path)
df.writeTo(iceberg_table).using("iceberg").create()

row_count = spark.table(iceberg_table).count()
elapsed = time.time() - start_time
print(f" Created {iceberg_table} with {row_count} rows in {elapsed:.2f}s")

print("\nAll TPC-H tables created successfully!")
print(f"Tables available at: {catalog}.{database}.*")

spark.stop()


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Convert TPC-H Parquet data to Iceberg tables")
parser.add_argument("--parquet-path", required=True, help="Path to TPC-H Parquet data directory")
parser.add_argument("--catalog", required=True, help="Iceberg catalog name (e.g., 'local')")
parser.add_argument("--database", default="tpch", help="Database name to create tables in")
args = parser.parse_args()

main(args.parquet_path, args.catalog, args.database)
Loading
Loading