-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Add BigQueryTableStreamingBufferEmptySensor for safe DML operations #59736
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add BigQueryTableStreamingBufferEmptySensor for safe DML operations #59736
Conversation
This sensor checks if a BigQuery table's streaming buffer is empty before allowing DML operations (UPDATE, DELETE, MERGE) to proceed. Solves the issue where DML statements fail with 'would affect rows in the streaming buffer' error when tables receive continuous streaming inserts. Added comprehensive tests and example DAG demonstrating usage.
|
I have solved this issue , For any misconduct do tell me so that i can make the following changes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds BigQueryTableStreamingBufferEmptySensor to address a critical issue where DML operations (UPDATE, DELETE, MERGE) fail on BigQuery tables that receive continuous streaming inserts. The sensor checks if the streaming buffer is empty before allowing downstream DML operations to proceed, preventing the "would affect rows in the streaming buffer" error.
Key changes:
- New sensor class that monitors BigQuery table streaming buffer state via the BigQuery Tables API
- Comprehensive unit tests covering success scenarios (no buffer, buffer with data)
- Example DAG demonstrating sensor usage with DML operations
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
providers/google/src/airflow/providers/google/cloud/sensors/bigquery.py |
Implements BigQueryTableStreamingBufferEmptySensor with poke method that checks streaming buffer status using BigQueryHook |
providers/google/tests/unit/google/cloud/sensors/test_bigquery.py |
Adds test class with three unit tests covering sensor behavior when buffer is empty, when buffer has rows, and table reference construction |
example_bigquery_streaming_buffer.py |
Provides example DAG showing sensor usage before DML operations with recommended settings (reschedule mode, appropriate timeouts) |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
shahar1
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please address failed tests and my comment - thanks!
| self.log.info("Table %s has no streaming buffer - ready for DML operations", table_uri) | ||
| return True | ||
|
|
||
| # Streaming buffer exists - check if it's empty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should there be a specific if statement for that check, or does it only log it? Comment is a bit confusing, I'd remove it if it only logs.
|
@Arunodoy18 I am going to close your PRs -- Please review and test your changes with correct PR description. Using LLMs without those increase maintenance burdens and CI run time. Feel free to recreate focussed PRs following those guidelines. |
This sensor checks if a BigQuery table's streaming buffer is empty before allowing DML operations (UPDATE, DELETE, MERGE) to proceed.
Solves the issue where DML statements fail with 'would affect rows in the streaming buffer' error when tables receive continuous streaming inserts.
Added comprehensive tests and example DAG demonstrating usage.
Description
This PR adds
BigQueryTableStreamingBufferEmptySensorto solve a critical issue affecting users who run DML operations on BigQuery tables populated via streaming inserts.User Impact
When Apache Airflow DAGs execute DML statements (UPDATE, DELETE, MERGE) on BigQuery tables that receive continuous streaming inserts (via Dataflow, CDC pipelines, BigQuery Storage Write API, or
tabledata.insertAll), tasks fail with:This causes:
Root Cause
BigQuery's documented limitation: DML operations cannot modify rows still residing in the streaming buffer. The buffer typically flushes within minutes but can take up to 90 minutes under certain conditions.
Airflow's current behavior: Operators like
BigQueryInsertJobOperatorimmediately execute DML statements without checking buffer state, causing:Why This Matters
This issue is increasingly common because:
Solution
What This PR Adds
A new sensor
BigQueryTableStreamingBufferEmptySensorthat:mode='reschedule'(frees worker slots)Implementation Details