diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala index 5d1d0c5718..2a81316c95 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala @@ -38,6 +38,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.DecimalType import org.apache.comet.CometConf +import org.apache.comet.CometConf.{SCAN_NATIVE_DATAFUSION, SCAN_NATIVE_ICEBERG_COMPAT} import org.apache.comet.CometSparkSessionExtensions trait CometBenchmarkBase @@ -164,6 +165,32 @@ trait CometBenchmarkBase benchmark.run() } + protected def addParquetScanCases( + benchmark: Benchmark, + query: String, + caseSuffix: String = "", + extraConf: Map[String, String] = Map.empty): Unit = { + val suffix = if (caseSuffix.nonEmpty) s" ($caseSuffix)" else "" + + benchmark.addCase(s"SQL Parquet - Spark$suffix") { _ => + withSQLConf(extraConf.toSeq: _*) { + spark.sql(query).noop() + } + } + + for (scanImpl <- Seq(SCAN_NATIVE_DATAFUSION, SCAN_NATIVE_ICEBERG_COMPAT)) { + benchmark.addCase(s"SQL Parquet - Comet ($scanImpl)$suffix") { _ => + withSQLConf( + (extraConf ++ Map( + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanImpl)).toSeq: _*) { + spark.sql(query).noop() + } + } + } + } + protected def prepareTable(dir: File, df: DataFrame, partition: Option[String] = None): Unit = { val testDf = if (partition.isDefined) { df.write.partitionBy(partition.get) diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometIcebergReadBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometIcebergReadBenchmark.scala new file mode 100644 index 0000000000..b90b893712 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometIcebergReadBenchmark.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.benchmark + +import org.apache.spark.benchmark.Benchmark +import org.apache.spark.sql.types._ + +import org.apache.comet.CometConf + +/** + * Benchmark to measure Comet Iceberg read performance. To run this benchmark: + * `SPARK_GENERATE_BENCHMARK_FILES=1 make + * benchmark-org.apache.spark.sql.benchmark.CometIcebergReadBenchmark` Results will be written to + * "spark/benchmarks/CometIcebergReadBenchmark-**results.txt". + */ +object CometIcebergReadBenchmark extends CometBenchmarkBase { + + def icebergScanBenchmark(values: Int, dataType: DataType): Unit = { + val sqlBenchmark = + new Benchmark(s"SQL Single ${dataType.sql} Iceberg Column Scan", values, output = output) + + withTempPath { dir => + withTempTable("icebergTable") { + prepareIcebergTable( + dir, + spark.sql(s"SELECT CAST(value as ${dataType.sql}) id FROM $tbl"), + "icebergTable") + + val query = dataType match { + case BooleanType => "sum(cast(id as bigint))" + case _ => "sum(id)" + } + + sqlBenchmark.addCase("SQL Iceberg - Spark") { _ => + withSQLConf( + "spark.memory.offHeap.enabled" -> "true", + "spark.memory.offHeap.size" -> "10g") { + spark.sql(s"select $query from icebergTable").noop() + } + } + + sqlBenchmark.addCase("SQL Iceberg - Comet Iceberg-Rust") { _ => + withSQLConf( + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + "spark.memory.offHeap.enabled" -> "true", + "spark.memory.offHeap.size" -> "10g", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + spark.sql(s"select $query from icebergTable").noop() + } + } + + sqlBenchmark.run() + } + } + } + + override def runCometBenchmark(mainArgs: Array[String]): Unit = { + runBenchmarkWithTable("SQL Single Numeric Iceberg Column Scan", 1024 * 1024 * 128) { v => + Seq(BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType) + .foreach(icebergScanBenchmark(v, _)) + } + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPartitionColumnBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPartitionColumnBenchmark.scala new file mode 100644 index 0000000000..a7d170057f --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPartitionColumnBenchmark.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.benchmark + +import org.apache.spark.benchmark.Benchmark + +/** + * Benchmark to measure partition column scan performance. This exercises the CometConstantVector + * path where constant columns are exported as 1-element Arrow arrays and expanded on the native + * side. + * + * To run this benchmark: + * {{{ + * SPARK_GENERATE_BENCHMARK_FILES=1 make \ + * benchmark-org.apache.spark.sql.benchmark.CometPartitionColumnBenchmark + * }}} + * + * Results will be written to "spark/benchmarks/CometPartitionColumnBenchmark-**results.txt". + */ +object CometPartitionColumnBenchmark extends CometBenchmarkBase { + + def partitionColumnScanBenchmark(values: Int, numPartitionCols: Int): Unit = { + val sqlBenchmark = new Benchmark( + s"Partitioned Scan with $numPartitionCols partition column(s)", + values, + output = output) + + withTempPath { dir => + withTempTable("parquetV1Table") { + val partCols = + (1 to numPartitionCols).map(i => s"'part$i' as p$i").mkString(", ") + val partNames = (1 to numPartitionCols).map(i => s"p$i") + val df = spark.sql(s"SELECT value as id, $partCols FROM $tbl") + val parquetDir = dir.getCanonicalPath + "/parquetV1" + df.write + .partitionBy(partNames: _*) + .mode("overwrite") + .option("compression", "snappy") + .parquet(parquetDir) + spark.read.parquet(parquetDir).createOrReplaceTempView("parquetV1Table") + + addParquetScanCases(sqlBenchmark, "select sum(id) from parquetV1Table") + + // Also benchmark reading partition columns themselves + val partSumExpr = + (1 to numPartitionCols).map(i => s"sum(length(p$i))").mkString(", ") + + addParquetScanCases( + sqlBenchmark, + s"select $partSumExpr from parquetV1Table", + caseSuffix = "partition cols") + + sqlBenchmark.run() + } + } + } + + override def runCometBenchmark(mainArgs: Array[String]): Unit = { + runBenchmarkWithTable("Partitioned Column Scan", 1024 * 1024 * 15) { v => + for (numPartCols <- List(1, 5)) { + partitionColumnScanBenchmark(v, numPartCols) + } + } + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala index 3bfbdee91a..a2f196a4fc 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala @@ -38,7 +38,6 @@ import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnVector import org.apache.comet.{CometConf, WithHdfsCluster} -import org.apache.comet.CometConf.{SCAN_NATIVE_DATAFUSION, SCAN_NATIVE_ICEBERG_COMPAT} import org.apache.comet.parquet.BatchReader /** @@ -50,7 +49,6 @@ import org.apache.comet.parquet.BatchReader class CometReadBaseBenchmark extends CometBenchmarkBase { def numericScanBenchmark(values: Int, dataType: DataType): Unit = { - // Benchmarks running through spark sql. val sqlBenchmark = new Benchmark(s"SQL Single ${dataType.sql} Column Scan", values, output = output) @@ -63,76 +61,13 @@ class CometReadBaseBenchmark extends CometBenchmarkBase { case _ => "sum(id)" } - sqlBenchmark.addCase("SQL Parquet - Spark") { _ => - spark.sql(s"select $query from parquetV1Table").noop() - } - - sqlBenchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) { - spark.sql(s"select $query from parquetV1Table").noop() - } - } - - sqlBenchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT) { - spark.sql(s"select $query from parquetV1Table").noop() - } - } - - sqlBenchmark.run() - } - } - } - - def icebergScanBenchmark(values: Int, dataType: DataType): Unit = { - // Benchmarks running through spark sql. - val sqlBenchmark = - new Benchmark(s"SQL Single ${dataType.sql} Iceberg Column Scan", values, output = output) - - withTempPath { dir => - withTempTable("icebergTable") { - prepareIcebergTable( - dir, - spark.sql(s"SELECT CAST(value as ${dataType.sql}) id FROM $tbl"), - "icebergTable") - - val query = dataType match { - case BooleanType => "sum(cast(id as bigint))" - case _ => "sum(id)" - } - - sqlBenchmark.addCase("SQL Iceberg - Spark") { _ => - withSQLConf( - "spark.memory.offHeap.enabled" -> "true", - "spark.memory.offHeap.size" -> "10g") { - spark.sql(s"select $query from icebergTable").noop() - } - } - - sqlBenchmark.addCase("SQL Iceberg - Comet Iceberg-Rust") { _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - "spark.memory.offHeap.enabled" -> "true", - "spark.memory.offHeap.size" -> "10g", - CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { - spark.sql(s"select $query from icebergTable").noop() - } - } - + addParquetScanCases(sqlBenchmark, s"select $query from parquetV1Table") sqlBenchmark.run() } } } def encryptedScanBenchmark(values: Int, dataType: DataType): Unit = { - // Benchmarks running through spark sql. val sqlBenchmark = new Benchmark(s"SQL Single ${dataType.sql} Encrypted Column Scan", values, output = output) @@ -143,6 +78,15 @@ class CometReadBaseBenchmark extends CometBenchmarkBase { val cryptoFactoryClass = "org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory" + val cryptoConf = Map( + "spark.memory.offHeap.enabled" -> "true", + "spark.memory.offHeap.size" -> "10g", + DecryptionPropertiesFactory.CRYPTO_FACTORY_CLASS_PROPERTY_NAME -> cryptoFactoryClass, + KeyToolkit.KMS_CLIENT_CLASS_PROPERTY_NAME -> + "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS", + InMemoryKMS.KEY_LIST_PROPERTY_NAME -> + s"footerKey: ${footerKey}, key1: ${key1}") + withTempPath { dir => withTempTable("parquetV1Table") { prepareEncryptedTable( @@ -154,51 +98,10 @@ class CometReadBaseBenchmark extends CometBenchmarkBase { case _ => "sum(id)" } - sqlBenchmark.addCase("SQL Parquet - Spark") { _ => - withSQLConf( - "spark.memory.offHeap.enabled" -> "true", - "spark.memory.offHeap.size" -> "10g", - DecryptionPropertiesFactory.CRYPTO_FACTORY_CLASS_PROPERTY_NAME -> cryptoFactoryClass, - KeyToolkit.KMS_CLIENT_CLASS_PROPERTY_NAME -> - "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS", - InMemoryKMS.KEY_LIST_PROPERTY_NAME -> - s"footerKey: ${footerKey}, key1: ${key1}") { - spark.sql(s"select $query from parquetV1Table").noop() - } - } - - sqlBenchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ => - withSQLConf( - "spark.memory.offHeap.enabled" -> "true", - "spark.memory.offHeap.size" -> "10g", - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION, - DecryptionPropertiesFactory.CRYPTO_FACTORY_CLASS_PROPERTY_NAME -> cryptoFactoryClass, - KeyToolkit.KMS_CLIENT_CLASS_PROPERTY_NAME -> - "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS", - InMemoryKMS.KEY_LIST_PROPERTY_NAME -> - s"footerKey: ${footerKey}, key1: ${key1}") { - spark.sql(s"select $query from parquetV1Table").noop() - } - } - - sqlBenchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ => - withSQLConf( - "spark.memory.offHeap.enabled" -> "true", - "spark.memory.offHeap.size" -> "10g", - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT, - DecryptionPropertiesFactory.CRYPTO_FACTORY_CLASS_PROPERTY_NAME -> cryptoFactoryClass, - KeyToolkit.KMS_CLIENT_CLASS_PROPERTY_NAME -> - "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS", - InMemoryKMS.KEY_LIST_PROPERTY_NAME -> - s"footerKey: ${footerKey}, key1: ${key1}") { - spark.sql(s"select $query from parquetV1Table").noop() - } - } - + addParquetScanCases( + sqlBenchmark, + s"select $query from parquetV1Table", + extraConf = cryptoConf) sqlBenchmark.run() } } @@ -218,28 +121,7 @@ class CometReadBaseBenchmark extends CometBenchmarkBase { s"SELECT CAST(value / 10000000.0 as DECIMAL($precision, $scale)) " + s"id FROM $tbl")) - sqlBenchmark.addCase("SQL Parquet - Spark") { _ => - spark.sql("select sum(id) from parquetV1Table").noop() - } - - sqlBenchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) { - spark.sql("select sum(id) from parquetV1Table").noop() - } - } - - sqlBenchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT) { - spark.sql("select sum(id) from parquetV1Table").noop() - } - } - + addParquetScanCases(sqlBenchmark, "select sum(id) from parquetV1Table") sqlBenchmark.run() } } @@ -338,28 +220,7 @@ class CometReadBaseBenchmark extends CometBenchmarkBase { s"SELECT IF(RAND(1) < $fractionOfZeros, -1, value) AS c1, value AS c2 FROM " + s"$tbl")) - benchmark.addCase("SQL Parquet - Spark") { _ => - spark.sql("select sum(c2) from parquetV1Table where c1 + 1 > 0").noop() - } - - benchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) { - spark.sql("select sum(c2) from parquetV1Table where c1 + 1 > 0").noop() - } - } - - benchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT) { - spark.sql("select sum(c2) from parquetV1Table where c1 + 1 > 0").noop() - } - } - + addParquetScanCases(benchmark, "select sum(c2) from parquetV1Table where c1 + 1 > 0") benchmark.run() } } @@ -388,28 +249,7 @@ class CometReadBaseBenchmark extends CometBenchmarkBase { |FROM tmp |""".stripMargin)) - sqlBenchmark.addCase("SQL Parquet - Spark") { _ => - spark.sql("select sum(length(id)) from parquetV1Table").noop() - } - - sqlBenchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) { - spark.sql("select sum(length(id)) from parquetV1Table").noop() - } - } - - sqlBenchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT) { - spark.sql("select sum(length(id)) from parquetV1Table").noop() - } - } - + addParquetScanCases(sqlBenchmark, "select sum(length(id)) from parquetV1Table") sqlBenchmark.run() } } @@ -428,37 +268,10 @@ class CometReadBaseBenchmark extends CometBenchmarkBase { s"SELECT IF(RAND(1) < $fractionOfNulls, NULL, CAST(value as STRING)) AS c1, " + s"IF(RAND(2) < $fractionOfNulls, NULL, CAST(value as STRING)) AS c2 FROM $tbl")) - benchmark.addCase("SQL Parquet - Spark") { _ => - spark - .sql("select sum(length(c2)) from parquetV1Table where c1 is " + - "not NULL and c2 is not NULL") - .noop() - } - - benchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) { - spark - .sql("select sum(length(c2)) from parquetV1Table where c1 is " + - "not NULL and c2 is not NULL") - .noop() - } - } - - benchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT) { - spark - .sql("select sum(length(c2)) from parquetV1Table where c1 is " + - "not NULL and c2 is not NULL") - .noop() - } - } - + addParquetScanCases( + benchmark, + "select sum(length(c2)) from parquetV1Table where c1 is " + + "not NULL and c2 is not NULL") benchmark.run() } } @@ -476,28 +289,7 @@ class CometReadBaseBenchmark extends CometBenchmarkBase { prepareTable(dir, spark.sql("SELECT * FROM t1")) - benchmark.addCase("SQL Parquet - Spark") { _ => - spark.sql(s"SELECT sum(c$middle) FROM parquetV1Table").noop() - } - - benchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) { - spark.sql(s"SELECT sum(c$middle) FROM parquetV1Table").noop() - } - } - - benchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT) { - spark.sql(s"SELECT sum(c$middle) FROM parquetV1Table").noop() - } - } - + addParquetScanCases(benchmark, s"SELECT sum(c$middle) FROM parquetV1Table") benchmark.run() } } @@ -519,28 +311,7 @@ class CometReadBaseBenchmark extends CometBenchmarkBase { s"SELECT IF(RAND(1) < $fractionOfZeros, -1, value) AS c1, " + s"REPEAT(CAST(value AS STRING), 100) AS c2 FROM $tbl")) - benchmark.addCase("SQL Parquet - Spark") { _ => - spark.sql("SELECT * FROM parquetV1Table WHERE c1 + 1 > 0").noop() - } - - benchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) { - spark.sql("SELECT * FROM parquetV1Table WHERE c1 + 1 > 0").noop() - } - } - - benchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT) { - spark.sql("SELECT * FROM parquetV1Table WHERE c1 + 1 > 0").noop() - } - } - + addParquetScanCases(benchmark, "SELECT * FROM parquetV1Table WHERE c1 + 1 > 0") benchmark.run() } } @@ -562,28 +333,7 @@ class CometReadBaseBenchmark extends CometBenchmarkBase { s"SELECT IF(RAND(1) < $fractionOfZeros, -1, value) AS c1, " + s"REPEAT(CAST(value AS STRING), 100) AS c2 FROM $tbl ORDER BY c1, c2")) - benchmark.addCase("SQL Parquet - Spark") { _ => - spark.sql("SELECT * FROM parquetV1Table WHERE c1 + 1 > 0").noop() - } - - benchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) { - spark.sql("SELECT * FROM parquetV1Table WHERE c1 + 1 > 0").noop() - } - } - - benchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT) { - spark.sql("SELECT * FROM parquetV1Table WHERE c1 + 1 > 0").noop() - } - } - + addParquetScanCases(benchmark, "SELECT * FROM parquetV1Table WHERE c1 + 1 > 0") benchmark.run() } } @@ -611,13 +361,6 @@ class CometReadBaseBenchmark extends CometBenchmarkBase { } } - runBenchmarkWithTable("SQL Single Numeric Iceberg Column Scan", 1024 * 1024 * 128) { v => - Seq(BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType) - .foreach { dataType => - icebergScanBenchmark(v, dataType) - } - } - runBenchmarkWithTable("SQL Single Numeric Encrypted Column Scan", 1024 * 1024 * 128) { v => Seq(BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType) .foreach { dataType =>