fix: add explicit sort for window aggregates to fix correctness issues#3397
Open
andygrove wants to merge 2 commits intoapache:mainfrom
Open
fix: add explicit sort for window aggregates to fix correctness issues#3397andygrove wants to merge 2 commits intoapache:mainfrom
andygrove wants to merge 2 commits intoapache:mainfrom
Conversation
The core issue was that BoundedWindowAggExec requires InputOrderMode::Sorted but the input wasn't always properly sorted when ORDER BY was present. Changes: - Add explicit SortExec before BoundedWindowAggExec when ORDER BY is present - Change getSupportLevel from blanket Incompatible to Compatible for valid cases - Properly detect unsupported case: partition exprs must be subset of order exprs - Disable window by default (spark.comet.exec.window.enabled=false) to avoid breaking changes; users can opt-in to test the fix What now works natively (when enabled): - COUNT, SUM, MIN, MAX window aggregates - OVER() - no partition, no order - OVER(ORDER BY x) - order only - OVER(PARTITION BY x) - partition only - OVER(PARTITION BY x ORDER BY x, y) - partition is subset of order Tracking issue: apache#2721 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
3c6710a to
0787235
Compare
Member
Author
|
I need to regenerate the golden files |
comphead
reviewed
Feb 6, 2026
| createExecEnabledConfig("explode", defaultValue = true) | ||
| val COMET_EXEC_WINDOW_ENABLED: ConfigEntry[Boolean] = | ||
| createExecEnabledConfig("window", defaultValue = true) | ||
| createExecEnabledConfig("window", defaultValue = false) |
Contributor
There was a problem hiding this comment.
ouch I thought window was disabled
comphead
reviewed
Feb 6, 2026
|
|
||
| // Ensure input is properly sorted when ORDER BY is present | ||
| // BoundedWindowAggExec requires InputOrderMode::Sorted | ||
| let needs_explicit_sort = !sort_exprs.is_empty(); |
Contributor
There was a problem hiding this comment.
needs_explicit_sort confusing IMO.
can we just make a simple
if sort_exprs.is_empty() {plan } else { sort plan }
comphead
reviewed
Feb 6, 2026
|
|
||
| override def getSupportLevel(op: WindowExec): SupportLevel = { | ||
| Incompatible(Some("Native WindowExec has known correctness issues")) | ||
| // DataFusion requires that partition expressions must be part of the sort ordering. |
Contributor
There was a problem hiding this comment.
I'm not sure if its true, need to check
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
This PR fixes core correctness issues with windowed aggregate queries by adding an explicit
SortExecbeforeBoundedWindowAggExecwhen ORDER BY is present.Tracking Issue: #2721
Changes
Add explicit SortExec (
planner.rs) - Insert sort beforeBoundedWindowAggExecwhen ORDER BY is present, ensuringInputOrderMode::Sortedrequirement is satisfiedImprove support level detection (
CometWindowExec.scala) - Change from blanketIncompatibletoCompatiblefor valid cases, with proper validation that partition expressions must be a subset of order expressionsDisable by default (
CometConf.scala) - Setspark.comet.exec.window.enabled=falseto avoid breaking changes; users can opt-in to testWhat's Now Supported (when enabled)
COUNT,SUM,MIN,MAXOVER()- no partition, no orderOVER(ORDER BY x)- order onlyOVER(PARTITION BY x)- partition onlyOVER(PARTITION BY x ORDER BY x, y)- partition is subset of orderWhat's NOT Supported (falls back to Spark)
PARTITION BY a ORDER BY bwhere partition columns differ from order columnsAVGwindow aggregate (native implementation has known issues)ROW_NUMBER,RANK,DENSE_RANK, etc.LAG,LEADFIRST_VALUE,LAST_VALUE,NTH_VALUERANGE BETWEENwith numeric/temporal expressions (Invalid argument error: Invalid arithmetic operation: Int32 - Int64 #1246)Test Plan
🤖 Generated with Claude Code