Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ object CometConf extends ShimCometConf {
val COMET_EXEC_EXPLODE_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig("explode", defaultValue = true)
val COMET_EXEC_WINDOW_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig("window", defaultValue = true)
createExecEnabledConfig("window", defaultValue = false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ouch I thought window was disabled

val COMET_EXEC_TAKE_ORDERED_AND_PROJECT_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig("takeOrderedAndProject", defaultValue = true)
val COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED: ConfigEntry[Boolean] =
Expand Down
15 changes: 14 additions & 1 deletion native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1583,9 +1583,22 @@ impl PhysicalPlanner {
})
.collect();

// Ensure input is properly sorted when ORDER BY is present
// BoundedWindowAggExec requires InputOrderMode::Sorted
let needs_explicit_sort = !sort_exprs.is_empty();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs_explicit_sort confusing IMO.

can we just make a simple

if sort_exprs.is_empty() {plan } else { sort plan }

let sorted_child: Arc<dyn ExecutionPlan> = if needs_explicit_sort {
// Insert explicit sort to ensure data ordering
Arc::new(SortExec::new(
LexOrdering::new(sort_exprs.to_vec()).unwrap(),
Arc::clone(&child.native_plan),
))
} else {
Arc::clone(&child.native_plan)
};

let window_agg = Arc::new(BoundedWindowAggExec::try_new(
window_expr?,
Arc::clone(&child.native_plan),
sorted_child,
InputOrderMode::Sorted,
!partition_exprs.is_empty(),
)?);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package org.apache.spark.sql.comet

import scala.jdk.CollectionConverters._

import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSet, CurrentRow, Expression, NamedExpression, RangeFrame, RowFrame, SortOrder, SpecifiedWindowFrame, UnboundedFollowing, UnboundedPreceding, WindowExpression}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeSet, CurrentRow, Expression, NamedExpression, RangeFrame, RowFrame, SortOrder, SpecifiedWindowFrame, UnboundedFollowing, UnboundedPreceding, WindowExpression}
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Complete, Count, Max, Min, Sum}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.SparkPlan
Expand All @@ -34,7 +34,7 @@ import com.google.common.base.Objects

import org.apache.comet.{CometConf, ConfigEntry}
import org.apache.comet.CometSparkSessionExtensions.withInfo
import org.apache.comet.serde.{AggSerde, CometOperatorSerde, Incompatible, OperatorOuterClass, SupportLevel}
import org.apache.comet.serde.{AggSerde, CometOperatorSerde, Compatible, OperatorOuterClass, SupportLevel, Unsupported}
import org.apache.comet.serde.OperatorOuterClass.Operator
import org.apache.comet.serde.QueryPlanSerde.{aggExprToProto, exprToProto}

Expand All @@ -44,7 +44,17 @@ object CometWindowExec extends CometOperatorSerde[WindowExec] {
CometConf.COMET_EXEC_WINDOW_ENABLED)

override def getSupportLevel(op: WindowExec): SupportLevel = {
Incompatible(Some("Native WindowExec has known correctness issues"))
// DataFusion requires that partition expressions must be part of the sort ordering.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if its true, need to check

// If partition spec and order spec use different columns, we need to fall back to Spark.
if (op.partitionSpec.nonEmpty && op.orderSpec.nonEmpty) {
val orderExprs = op.orderSpec.map(_.child).toSet
val partitionExprsInOrder = op.partitionSpec.forall(orderExprs.contains)
if (!partitionExprsInOrder) {
return Unsupported(
Some("Partition expressions must be a subset of order expressions for native window"))
}
}
Compatible()
}

override def convert(
Expand Down Expand Up @@ -72,11 +82,6 @@ object CometWindowExec extends CometOperatorSerde[WindowExec] {
return None
}

if (op.partitionSpec.nonEmpty && op.orderSpec.nonEmpty &&
!validatePartitionAndSortSpecsForWindowFunc(op.partitionSpec, op.orderSpec, op)) {
return None
}

val windowExprProto = winExprs.map(windowExprToProto(_, output, op.conf))
val partitionExprs = op.partitionSpec.map(exprToProto(_, op.child.output))

Expand Down Expand Up @@ -279,40 +284,6 @@ object CometWindowExec extends CometOperatorSerde[WindowExec] {
SerializedPlan(None))
}

private def validatePartitionAndSortSpecsForWindowFunc(
partitionSpec: Seq[Expression],
orderSpec: Seq[SortOrder],
op: SparkPlan): Boolean = {
if (partitionSpec.length != orderSpec.length) {
return false
}

val partitionColumnNames = partitionSpec.collect {
case a: AttributeReference => a.name
case other =>
withInfo(op, s"Unsupported partition expression: ${other.getClass.getSimpleName}")
return false
}

val orderColumnNames = orderSpec.collect { case s: SortOrder =>
s.child match {
case a: AttributeReference => a.name
case other =>
withInfo(op, s"Unsupported sort expression: ${other.getClass.getSimpleName}")
return false
}
}

if (partitionColumnNames.zip(orderColumnNames).exists { case (partCol, orderCol) =>
partCol != orderCol
}) {
withInfo(op, "Partitioning and sorting specifications must be the same.")
return false
}

true
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
TakeOrderedAndProject
+- Project
+- Window [COMET: WindowExec is not fully compatible with Spark (Native WindowExec has known correctness issues). To enable it anyway, set spark.comet.operator.WindowExec.allowIncompatible=true. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html).]
+- Window [COMET: Native support for operator WindowExec is disabled. Set spark.comet.exec.window.enabled=true to enable it.]
+- CometColumnarToRow
+- CometSort
+- CometColumnarExchange
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
TakeOrderedAndProject
+- Project
+- Window [COMET: WindowExec is not fully compatible with Spark (Native WindowExec has known correctness issues). To enable it anyway, set spark.comet.operator.WindowExec.allowIncompatible=true. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html).]
+- Window [COMET: Native support for operator WindowExec is disabled. Set spark.comet.exec.window.enabled=true to enable it.]
+- CometColumnarToRow
+- CometSort
+- CometExchange
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
TakeOrderedAndProject
+- Project
+- Window [COMET: WindowExec is not fully compatible with Spark (Native WindowExec has known correctness issues). To enable it anyway, set spark.comet.operator.WindowExec.allowIncompatible=true. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html).]
+- Window [COMET: Native support for operator WindowExec is disabled. Set spark.comet.exec.window.enabled=true to enable it.]
+- CometColumnarToRow
+- CometSort
+- CometExchange
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
TakeOrderedAndProject
+- Project
+- Window [COMET: WindowExec is not fully compatible with Spark (Native WindowExec has known correctness issues). To enable it anyway, set spark.comet.operator.WindowExec.allowIncompatible=true. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html).]
+- Window [COMET: Native support for operator WindowExec is disabled. Set spark.comet.exec.window.enabled=true to enable it.]
+- CometColumnarToRow
+- CometSort
+- CometColumnarExchange
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
TakeOrderedAndProject
+- Project
+- Window [COMET: WindowExec is not fully compatible with Spark (Native WindowExec has known correctness issues). To enable it anyway, set spark.comet.operator.WindowExec.allowIncompatible=true. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html).]
+- Window [COMET: Native support for operator WindowExec is disabled. Set spark.comet.exec.window.enabled=true to enable it.]
+- CometColumnarToRow
+- CometSort
+- CometExchange
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
TakeOrderedAndProject
+- Project
+- Window [COMET: WindowExec is not fully compatible with Spark (Native WindowExec has known correctness issues). To enable it anyway, set spark.comet.operator.WindowExec.allowIncompatible=true. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html).]
+- Window [COMET: Native support for operator WindowExec is disabled. Set spark.comet.exec.window.enabled=true to enable it.]
+- CometColumnarToRow
+- CometSort
+- CometExchange
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
TakeOrderedAndProject
+- Project
+- Window [COMET: WindowExec is not fully compatible with Spark (Native WindowExec has known correctness issues). To enable it anyway, set spark.comet.operator.WindowExec.allowIncompatible=true. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html).]
+- Window [COMET: Native support for operator WindowExec is disabled. Set spark.comet.exec.window.enabled=true to enable it.]
+- CometColumnarToRow
+- CometSort
+- CometColumnarExchange
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
TakeOrderedAndProject
+- Project
+- Window [COMET: WindowExec is not fully compatible with Spark (Native WindowExec has known correctness issues). To enable it anyway, set spark.comet.operator.WindowExec.allowIncompatible=true. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html).]
+- Window [COMET: Native support for operator WindowExec is disabled. Set spark.comet.exec.window.enabled=true to enable it.]
+- CometColumnarToRow
+- CometSort
+- CometExchange
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
TakeOrderedAndProject
+- Project
+- Window [COMET: WindowExec is not fully compatible with Spark (Native WindowExec has known correctness issues). To enable it anyway, set spark.comet.operator.WindowExec.allowIncompatible=true. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html).]
+- Window [COMET: Native support for operator WindowExec is disabled. Set spark.comet.exec.window.enabled=true to enable it.]
+- CometColumnarToRow
+- CometSort
+- CometExchange
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ TakeOrderedAndProject
: : +- Filter
: : +- Window
: : +- Filter
: : +- Window [COMET: WindowExec is not fully compatible with Spark (Native WindowExec has known correctness issues). To enable it anyway, set spark.comet.operator.WindowExec.allowIncompatible=true. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html).]
: : +- Window [COMET: Native support for operator WindowExec is disabled. Set spark.comet.exec.window.enabled=true to enable it.]
: : +- CometColumnarToRow
: : +- CometSort
: : +- CometColumnarExchange
Expand Down Expand Up @@ -44,7 +44,7 @@ TakeOrderedAndProject
: : +- CometNativeScan parquet spark_catalog.default.store
: +- BroadcastExchange
: +- Project
: +- Window [COMET: WindowExec is not fully compatible with Spark (Native WindowExec has known correctness issues). To enable it anyway, set spark.comet.operator.WindowExec.allowIncompatible=true. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html).]
: +- Window [COMET: Native support for operator WindowExec is disabled. Set spark.comet.exec.window.enabled=true to enable it.]
: +- CometColumnarToRow
: +- CometSort
: +- CometColumnarExchange
Expand Down Expand Up @@ -81,7 +81,7 @@ TakeOrderedAndProject
: +- CometNativeScan parquet spark_catalog.default.store
+- BroadcastExchange
+- Project
+- Window [COMET: WindowExec is not fully compatible with Spark (Native WindowExec has known correctness issues). To enable it anyway, set spark.comet.operator.WindowExec.allowIncompatible=true. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html).]
+- Window [COMET: Native support for operator WindowExec is disabled. Set spark.comet.exec.window.enabled=true to enable it.]
+- CometColumnarToRow
+- CometSort
+- CometColumnarExchange
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ TakeOrderedAndProject
: : +- Filter
: : +- Window
: : +- Filter
: : +- Window [COMET: WindowExec is not fully compatible with Spark (Native WindowExec has known correctness issues). To enable it anyway, set spark.comet.operator.WindowExec.allowIncompatible=true. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html).]
: : +- Window [COMET: Native support for operator WindowExec is disabled. Set spark.comet.exec.window.enabled=true to enable it.]
: : +- CometColumnarToRow
: : +- CometSort
: : +- CometExchange
Expand Down Expand Up @@ -39,7 +39,7 @@ TakeOrderedAndProject
: : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.store
: +- BroadcastExchange
: +- Project
: +- Window [COMET: WindowExec is not fully compatible with Spark (Native WindowExec has known correctness issues). To enable it anyway, set spark.comet.operator.WindowExec.allowIncompatible=true. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html).]
: +- Window [COMET: Native support for operator WindowExec is disabled. Set spark.comet.exec.window.enabled=true to enable it.]
: +- CometColumnarToRow
: +- CometSort
: +- CometExchange
Expand Down Expand Up @@ -71,7 +71,7 @@ TakeOrderedAndProject
: +- CometScan [native_iceberg_compat] parquet spark_catalog.default.store
+- BroadcastExchange
+- Project
+- Window [COMET: WindowExec is not fully compatible with Spark (Native WindowExec has known correctness issues). To enable it anyway, set spark.comet.operator.WindowExec.allowIncompatible=true. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html).]
+- Window [COMET: Native support for operator WindowExec is disabled. Set spark.comet.exec.window.enabled=true to enable it.]
+- CometColumnarToRow
+- CometSort
+- CometExchange
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ TakeOrderedAndProject
: : +- Filter
: : +- Window
: : +- Filter
: : +- Window [COMET: WindowExec is not fully compatible with Spark (Native WindowExec has known correctness issues). To enable it anyway, set spark.comet.operator.WindowExec.allowIncompatible=true. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html).]
: : +- Window [COMET: Native support for operator WindowExec is disabled. Set spark.comet.exec.window.enabled=true to enable it.]
: : +- CometColumnarToRow
: : +- CometSort
: : +- CometExchange
Expand Down Expand Up @@ -39,7 +39,7 @@ TakeOrderedAndProject
: : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.store
: +- BroadcastExchange
: +- Project
: +- Window [COMET: WindowExec is not fully compatible with Spark (Native WindowExec has known correctness issues). To enable it anyway, set spark.comet.operator.WindowExec.allowIncompatible=true. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html).]
: +- Window [COMET: Native support for operator WindowExec is disabled. Set spark.comet.exec.window.enabled=true to enable it.]
: +- CometColumnarToRow
: +- CometSort
: +- CometExchange
Expand Down Expand Up @@ -71,7 +71,7 @@ TakeOrderedAndProject
: +- CometScan [native_iceberg_compat] parquet spark_catalog.default.store
+- BroadcastExchange
+- Project
+- Window [COMET: WindowExec is not fully compatible with Spark (Native WindowExec has known correctness issues). To enable it anyway, set spark.comet.operator.WindowExec.allowIncompatible=true. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html).]
+- Window [COMET: Native support for operator WindowExec is disabled. Set spark.comet.exec.window.enabled=true to enable it.]
+- CometColumnarToRow
+- CometSort
+- CometExchange
Expand Down
Loading
Loading