From 1f5fe9b6b242fd075c42581af22f0a596084323c Mon Sep 17 00:00:00 2001 From: peterxcli Date: Tue, 3 Feb 2026 10:09:17 +0800 Subject: [PATCH 1/4] Add `map_contains_key` expression support --- .../apache/comet/serde/QueryPlanSerde.scala | 3 +- .../scala/org/apache/comet/serde/maps.scala | 17 +++++++ .../comet/CometMapExpressionSuite.scala | 46 ++++++++++++++++++- 3 files changed, 64 insertions(+), 2 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 066680456e..df1e8a63f2 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -126,7 +126,8 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[MapKeys] -> CometMapKeys, classOf[MapEntries] -> CometMapEntries, classOf[MapValues] -> CometMapValues, - classOf[MapFromArrays] -> CometMapFromArrays) + classOf[MapFromArrays] -> CometMapFromArrays, + classOf[MapContainsKey] -> CometMapContainsKey) private val structExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( classOf[CreateNamedStruct] -> CometCreateNamedStruct, diff --git a/spark/src/main/scala/org/apache/comet/serde/maps.scala b/spark/src/main/scala/org/apache/comet/serde/maps.scala index 2e217f6af0..a4a59fc3ff 100644 --- a/spark/src/main/scala/org/apache/comet/serde/maps.scala +++ b/spark/src/main/scala/org/apache/comet/serde/maps.scala @@ -89,3 +89,20 @@ object CometMapFromArrays extends CometExpressionSerde[MapFromArrays] { optExprWithInfo(mapFromArraysExpr, expr, expr.children: _*) } } + +object CometMapContainsKey extends CometExpressionSerde[MapContainsKey] { + + override def convert( + expr: MapContainsKey, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + // Replace with array_has(map_keys(map), key) + val mapExpr = exprToProtoInternal(expr.left, inputs, binding) + val keyExpr = exprToProtoInternal(expr.right, inputs, binding) + + val mapKeysExpr = scalarFunctionExprToProto("map_keys", mapExpr) + + val mapContainsKeyExpr = scalarFunctionExprToProto("array_has", mapKeysExpr, keyExpr) + optExprWithInfo(mapContainsKeyExpr, expr, expr.children: _*) + } +} diff --git a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala index ee77bb80f5..09fdf4efb6 100644 --- a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala @@ -22,7 +22,7 @@ package org.apache.comet import scala.util.Random import org.apache.hadoop.fs.Path -import org.apache.spark.sql.CometTestBase +import org.apache.spark.sql.{CometTestBase, Row} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -157,4 +157,48 @@ class CometMapExpressionSuite extends CometTestBase { } } + test("map_contains_key") { + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + val filename = path.toString + val random = new Random(42) + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + val schemaGenOptions = + SchemaGenOptions(generateArray = true, generateStruct = false, generateMap = true) + val dataGenOptions = DataGenOptions(allowNull = false, generateNegativeZero = false) + ParquetGenerator.makeParquetFile( + random, + spark, + filename, + 100, + schemaGenOptions, + dataGenOptions) + } + spark.read.parquet(filename).createOrReplaceTempView("t1") + + checkSparkAnswer( + spark.sql("SELECT map_contains_key(c14, element_at(map_keys(c14), 1)) FROM t1")) + checkSparkAnswer(spark.sql("SELECT map_contains_key(c14, 999999) FROM t1")) + + checkAnswer( + spark.sql("SELECT map_contains_key(c14, element_at(map_keys(c14), 1)) FROM t1 LIMIT 1"), + Row(true)) + checkAnswer(spark.sql("SELECT map_contains_key(c14, 999999) FROM t1 LIMIT 1"), Row(false)) + + // Empty map + checkSparkAnswerAndOperator(spark.sql("""SELECT map_contains_key( + | map_from_arrays(CAST(array() AS array), CAST(array() AS array)), + | 'any_key' + |) FROM t1 LIMIT 1""".stripMargin)) + + // Empty map with int keys + checkSparkAnswerAndOperator(spark.sql( + "SELECT map_contains_key(map_from_arrays(CAST(array() AS array), CAST(array() AS array)), 0) FROM t1")) + + // Empty map with string keys + checkSparkAnswerAndOperator(spark.sql( + "SELECT map_contains_key(map_from_arrays(CAST(array() AS array), CAST(array() AS array)), 'key') FROM t1")) + } + } + } From 105cadd91e23e335b505b8e66e1fe468a2fdaf89 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Tue, 3 Feb 2026 10:09:31 +0800 Subject: [PATCH 2/4] upd spark expr support list --- docs/spark_expressions_support.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/spark_expressions_support.md b/docs/spark_expressions_support.md index 27b6ad3b59..2c18cbd08d 100644 --- a/docs/spark_expressions_support.md +++ b/docs/spark_expressions_support.md @@ -272,11 +272,11 @@ - [ ] element_at - [ ] map - [ ] map_concat -- [ ] map_contains_key +- [x] map_contains_key - [ ] map_entries - [ ] map_from_arrays - [ ] map_from_entries -- [ ] map_keys +- [x] map_keys - [ ] map_values - [ ] str_to_map - [ ] try_element_at From 298d837541a559ef715a5e09ec10005a206ec2cb Mon Sep 17 00:00:00 2001 From: peterxcli Date: Wed, 4 Feb 2026 19:43:09 +0800 Subject: [PATCH 3/4] move map_contains_key tests from old scala test suit to slt --- .../expressions/map/map_contains_key.sql | 75 +++++++++++++++++++ .../comet/CometMapExpressionSuite.scala | 46 +----------- 2 files changed, 76 insertions(+), 45 deletions(-) create mode 100644 spark/src/test/resources/sql-tests/expressions/map/map_contains_key.sql diff --git a/spark/src/test/resources/sql-tests/expressions/map/map_contains_key.sql b/spark/src/test/resources/sql-tests/expressions/map/map_contains_key.sql new file mode 100644 index 0000000000..7dc3ce436d --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/map/map_contains_key.sql @@ -0,0 +1,75 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- ConfigMatrix: parquet.enable.dictionary=false,true + +-- TODO: replace map_from_arrays with map whenever map is supported in Comet + +-- Basic integer key tests with map literals +query +select map_contains_key(map_from_arrays(array(1, 2), array('a', 'b')), 5) + +query +select map_contains_key(map_from_arrays(array(1, 2), array('a', 'b')), 1) + +-- Decimal type coercion tests +-- TODO: requires map cast to be supported in Comet +query spark_answer_only +select map_contains_key(map_from_arrays(array(1, 2), array('a', 'b')), 5.0) + +query spark_answer_only +select map_contains_key(map_from_arrays(array(1, 2), array('a', 'b')), 1.0) + +query spark_answer_only +select map_contains_key(map_from_arrays(array(1.0, 2), array('a', 'b')), 5) + +query spark_answer_only +select map_contains_key(map_from_arrays(array(1.0, 2), array('a', 'b')), 1) + +-- Empty map tests +-- TODO: requires casting from NullType to be supported in Comet +query spark_answer_only +select map_contains_key(map_from_arrays(array(), array()), 0) + +-- Test with table data +statement +CREATE TABLE test_map_contains_key(m map) USING parquet + +statement +INSERT INTO test_map_contains_key VALUES (map_from_arrays(array('a', 'b', 'c'), array(1, 2, 3))), (map_from_arrays(array('x'), array(10))), (map_from_arrays(array(), array())), (NULL) + +query +SELECT map_contains_key(m, 'a') FROM test_map_contains_key + +query +SELECT map_contains_key(m, 'x') FROM test_map_contains_key + +query +SELECT map_contains_key(m, 'missing') FROM test_map_contains_key + +-- Test with integer key map +statement +CREATE TABLE test_map_int_key(m map) USING parquet + +statement +INSERT INTO test_map_int_key VALUES (map_from_arrays(array(1, 2), array('a', 'b'))), (map_from_arrays(array(), array())), (NULL) + +query +SELECT map_contains_key(m, 1) FROM test_map_int_key + +query +SELECT map_contains_key(m, 5) FROM test_map_int_key diff --git a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala index 09fdf4efb6..ee77bb80f5 100644 --- a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala @@ -22,7 +22,7 @@ package org.apache.comet import scala.util.Random import org.apache.hadoop.fs.Path -import org.apache.spark.sql.{CometTestBase, Row} +import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -157,48 +157,4 @@ class CometMapExpressionSuite extends CometTestBase { } } - test("map_contains_key") { - withTempDir { dir => - val path = new Path(dir.toURI.toString, "test.parquet") - val filename = path.toString - val random = new Random(42) - withSQLConf(CometConf.COMET_ENABLED.key -> "false") { - val schemaGenOptions = - SchemaGenOptions(generateArray = true, generateStruct = false, generateMap = true) - val dataGenOptions = DataGenOptions(allowNull = false, generateNegativeZero = false) - ParquetGenerator.makeParquetFile( - random, - spark, - filename, - 100, - schemaGenOptions, - dataGenOptions) - } - spark.read.parquet(filename).createOrReplaceTempView("t1") - - checkSparkAnswer( - spark.sql("SELECT map_contains_key(c14, element_at(map_keys(c14), 1)) FROM t1")) - checkSparkAnswer(spark.sql("SELECT map_contains_key(c14, 999999) FROM t1")) - - checkAnswer( - spark.sql("SELECT map_contains_key(c14, element_at(map_keys(c14), 1)) FROM t1 LIMIT 1"), - Row(true)) - checkAnswer(spark.sql("SELECT map_contains_key(c14, 999999) FROM t1 LIMIT 1"), Row(false)) - - // Empty map - checkSparkAnswerAndOperator(spark.sql("""SELECT map_contains_key( - | map_from_arrays(CAST(array() AS array), CAST(array() AS array)), - | 'any_key' - |) FROM t1 LIMIT 1""".stripMargin)) - - // Empty map with int keys - checkSparkAnswerAndOperator(spark.sql( - "SELECT map_contains_key(map_from_arrays(CAST(array() AS array), CAST(array() AS array)), 0) FROM t1")) - - // Empty map with string keys - checkSparkAnswerAndOperator(spark.sql( - "SELECT map_contains_key(map_from_arrays(CAST(array() AS array), CAST(array() AS array)), 'key') FROM t1")) - } - } - } From ca46346fbdad692df2e48832b2a8c6dc7de38fdd Mon Sep 17 00:00:00 2001 From: peterxcli Date: Sat, 7 Feb 2026 02:49:14 +0800 Subject: [PATCH 4/4] Add null map key and diff type test --- .../sql-tests/expressions/map/map_contains_key.sql | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/spark/src/test/resources/sql-tests/expressions/map/map_contains_key.sql b/spark/src/test/resources/sql-tests/expressions/map/map_contains_key.sql index 7dc3ce436d..d3d45fc7d5 100644 --- a/spark/src/test/resources/sql-tests/expressions/map/map_contains_key.sql +++ b/spark/src/test/resources/sql-tests/expressions/map/map_contains_key.sql @@ -23,9 +23,15 @@ query select map_contains_key(map_from_arrays(array(1, 2), array('a', 'b')), 5) +query +select map_contains_key(map_from_arrays(array(1, NULL), array('a', 'b')), 5) + query select map_contains_key(map_from_arrays(array(1, 2), array('a', 'b')), 1) +query +select map_contains_key(map_from_arrays(array('1', '2'), array('a', 'b')), 1) + -- Decimal type coercion tests -- TODO: requires map cast to be supported in Comet query spark_answer_only