-
-
Notifications
You must be signed in to change notification settings - Fork 937
fix(fair-queue): prevent unbounded cooloff states growth #2818
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughAdds an optional Estimated code review effort🎯 3 (Moderate) | ⏱️ ~28 minutes Additional notesReview should confirm metric reclassification correctness, that cooldown increments on concurrency failures are applied only where intended, and that clearing the cooloff cache under the size cap does not cause race conditions or lost state in concurrent processing. Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Review CompleteYour review story is ready! Comment !reviewfast on this PR to re-generate the story. |
|
|
||
| // The cooloff states size should be capped (test that it doesn't grow unbounded) | ||
| const cacheSizes = queue.getCacheSizes(); | ||
| expect(cacheSizes.cooloffStatesSize).toBeLessThanOrEqual(10); // Some buffer for race conditions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test passes vacuously due to disabled cooloff
The test comment says "Handler that always fails to trigger cooloff" but ctx.fail() never triggered cooloff - cooloff was only triggered by dequeue failures (empty queue or concurrency blocked). Since #incrementCooloff is now dead code, cooloffStatesSize will always be 0, making the assertion toBeLessThanOrEqual(10) pass trivially without actually testing the size cap behavior. The test provides false confidence that the cap works when cooloff is actually non-functional.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
packages/redis-worker/src/fair-queue/index.ts (1)
1727-1761: The#incrementCooloffmethod is dead code and should be removed or its usage restored.The method is defined at line 1727 but never called anywhere in the codebase. After the removal of calls from
#processMasterQueueShardand#processDirectIteration, no remaining call sites exist. As a result:
- The cap logic will never execute
- Cooloff states will only be deleted (via
#resetCooloff), never added- The cooloff increment mechanism is effectively disabled
Either remove the method if cooloff increment is no longer needed, or restore the call sites if this was unintentional.
🧹 Nitpick comments (2)
packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts (1)
791-792: Consider tightening the assertion or adding a more direct verification.The assertion allows up to 10 entries when
maxStatesSizeis 5. While the comment mentions race conditions, if the cap logic works correctly, the size should reset to near 0 after clearing and then grow at most to 5 before the next clear.Consider adding a log check or a more direct assertion that verifies the cache was actually cleared at some point, rather than just checking the final size.
packages/redis-worker/src/fair-queue/index.ts (1)
1728-1735: Cap logic works but clearing the entire cache is aggressive.The safeguard correctly prevents unbounded growth. However, clearing the entire cache means all queues immediately lose their cooloff state, which could cause a "thundering herd" effect where previously-cooled-off queues all get polled simultaneously.
Consider a more gradual eviction strategy for a future improvement:
- Evict only expired entries first
- Use LRU eviction to remove oldest entries
- Clear a percentage of entries rather than all
For now, this is acceptable as a safety valve since hitting the cap indicates an abnormal state.
📜 Review details
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
packages/redis-worker/src/fair-queue/index.tspackages/redis-worker/src/fair-queue/tests/fairQueue.test.tspackages/redis-worker/src/fair-queue/types.ts
🧰 Additional context used
📓 Path-based instructions (5)
**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
**/*.{ts,tsx}: Use types over interfaces for TypeScript
Avoid using enums; prefer string unions or const objects instead
Files:
packages/redis-worker/src/fair-queue/types.tspackages/redis-worker/src/fair-queue/tests/fairQueue.test.tspackages/redis-worker/src/fair-queue/index.ts
**/*.{ts,tsx,js,jsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
Use function declarations instead of default exports
Files:
packages/redis-worker/src/fair-queue/types.tspackages/redis-worker/src/fair-queue/tests/fairQueue.test.tspackages/redis-worker/src/fair-queue/index.ts
**/*.{js,ts,jsx,tsx,json,md,css,scss}
📄 CodeRabbit inference engine (AGENTS.md)
Format code using Prettier
Files:
packages/redis-worker/src/fair-queue/types.tspackages/redis-worker/src/fair-queue/tests/fairQueue.test.tspackages/redis-worker/src/fair-queue/index.ts
**/*.{test,spec}.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
Use vitest for all tests in the Trigger.dev repository
Files:
packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts
**/*.test.{ts,tsx,js,jsx}
📄 CodeRabbit inference engine (AGENTS.md)
**/*.test.{ts,tsx,js,jsx}: Test files should live beside the files under test and use descriptivedescribeanditblocks
Avoid mocks or stubs in tests; use helpers from@internal/testcontainerswhen Redis or Postgres are needed
Use vitest for unit tests
Files:
packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts
🧠 Learnings (1)
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Control concurrency using the `queue` property with `concurrencyLimit` option
Applied to files:
packages/redis-worker/src/fair-queue/index.ts
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (24)
- GitHub Check: Cursor Bugbot
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
- GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: typecheck / typecheck
- GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (6)
packages/redis-worker/src/fair-queue/types.ts (1)
312-313: LGTM!The new
maxStatesSizeoptional field is well-documented and correctly typed. The default value of 1000 is documented in the comment and matches the implementation.packages/redis-worker/src/fair-queue/index.ts (4)
92-92: LGTM!Field correctly declared to store the cap configuration.
146-146: LGTM!Initialization correctly uses nullish coalescing with the documented default of 1000.
1222-1226: Consistent with the master queue shard change.Same reasoning applies here. The change is internally consistent, but together with the previous change, this removes all calls to
#incrementCoolofffrom the processing paths.
883-887: The cooloff increment mechanism is not being invoked anywhere, effectively disabling cooloff escalation.The
#incrementCooloffmethod is defined but never called in the codebase. Both#processMasterQueueShardand#processDirectIterationexplicitly avoid incrementing cooloff, and searches confirm no other code paths invoke it either. This means:
- Queues can only enter cooloff if already in that state (impossible for initial failures)
- Failed processing never escalates to cooloff
- The cooloff feature exists but cannot be triggered
The comments correctly explain why empty/blocked scenarios shouldn't escalate, but there's no mechanism for actual failure scenarios to trigger cooloff either. Confirm whether this is intentional or if failure cases should still increment cooloff.
packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts (1)
774-778: Test does not exercise cooloff cap behavior as intended.The handler calls
ctx.fail()which routes through#handleMessageFailure, but that method doesn't call#incrementCooloff. The#incrementCooloffmethod exists but is never invoked anywhere in the codebase. This means cooloff states never grow in the test, making the assertionexpect(cacheSizes.cooloffStatesSize).toBeLessThanOrEqual(10)trivially pass with a size of zero.The code comments in
#processMasterQueueShardand#processDirectIterationexplicitly state cooloff should not increment for empty queues or concurrency-blocked messages, suggesting this design is intentional. However, the test setup claims to trigger cooloff via handler failures, which won't actually occur given the current implementation. Clarify the test intent: if cooloff should increment on failures, wire it into#handleMessageFailure; if not, update the test to match the actual behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
packages/redis-worker/src/fair-queue/index.ts
🧰 Additional context used
📓 Path-based instructions (3)
**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
**/*.{ts,tsx}: Use types over interfaces for TypeScript
Avoid using enums; prefer string unions or const objects instead
Files:
packages/redis-worker/src/fair-queue/index.ts
**/*.{ts,tsx,js,jsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
Use function declarations instead of default exports
Files:
packages/redis-worker/src/fair-queue/index.ts
**/*.{js,ts,jsx,tsx,json,md,css,scss}
📄 CodeRabbit inference engine (AGENTS.md)
Format code using Prettier
Files:
packages/redis-worker/src/fair-queue/index.ts
🧠 Learnings (1)
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Control concurrency using the `queue` property with `concurrencyLimit` option
Applied to files:
packages/redis-worker/src/fair-queue/index.ts
🧬 Code graph analysis (1)
packages/redis-worker/src/fair-queue/index.ts (2)
packages/redis-worker/src/fair-queue/visibility.ts (1)
queueId(366-368)packages/redis-worker/src/fair-queue/masterQueue.ts (1)
queueId(195-197)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (24)
- GitHub Check: Cursor Bugbot
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
- GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
- GitHub Check: typecheck / typecheck
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (7)
packages/redis-worker/src/fair-queue/index.ts (7)
92-92: LGTM: Size cap configuration added.The
maxCooloffStatesSizefield with a default of 1000 provides a reasonable safeguard against unbounded memory growth.Also applies to: 146-146
912-913: LGTM: Appropriate cooloff on concurrency limits.Incrementing cooloff when a queue is at maximum concurrency is correct—it prevents repeated attempts on a queue that cannot accept work, implementing proper backoff behavior.
963-964: LGTM: Cooloff on reservation failure.When concurrency reservation fails after claiming, incrementing cooloff prevents the queue from being immediately retried. This is appropriate backoff behavior.
1260-1261: LGTM: Consistent cooloff in direct processing.These cooloff increments mirror the two-stage processing path (lines 912-913, 963-964), ensuring consistent backoff behavior across both processing modes.
Also applies to: 1311-1312
913-913: Note: Past review comment is outdated.The previous review claimed
#incrementCooloffis never called (dead code), but this is incorrect for the current version. The method is actively called in 4 locations:
- Line 913: Concurrency check failure in
#claimAndPushToWorkerQueue- Line 964: Reservation failure in
#claimAndPushToWorkerQueue- Line 1261: Concurrency check failure in
#processOneMessage- Line 1312: Reservation failure in
#processOneMessageThe cooloff mechanism is working as intended—it now triggers specifically on concurrency-related issues rather than general failures.
Also applies to: 964-964, 1261-1261, 1312-1312
1226-1230: Consistent handling in direct processing path.This mirrors the logic in the master queue path (lines 883-887). The
process_skippedmetric appropriately distinguishes the direct processing path from the claim path and follows the same dynamic metric pattern used throughout the codebase.
883-887: The comment clearly explains why cooloff is not incremented here.The metric name change to
claim_skippedis properly integrated through theBatchedSpanManager.incrementStat()method, which accumulates stats dynamically—consistent with how all other loop counters (cooloff_skipped,messages_claimed, etc.) are tracked in the same system. No explicit metric pre-registration is required; stats are accumulated and emitted as span attributes.
| #incrementCooloff(queueId: string): void { | ||
| // Safety check: if the cache is too large, just clear it | ||
| if (this.queueCooloffStates.size >= this.maxCooloffStatesSize) { | ||
| this.logger.warn("Cooloff states cache hit size cap, clearing all entries", { | ||
| size: this.queueCooloffStates.size, | ||
| cap: this.maxCooloffStatesSize, | ||
| }); | ||
| this.queueCooloffStates.clear(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Disruptive cache clearing strategy.
Clearing the entire cooloff cache when the size cap is reached has significant drawbacks:
- All queues lose their cooloff state, including those legitimately in cooloff
- Queues that were backing off will be retried immediately, potentially causing a thundering herd
- Failure counts for queues tracking consecutive failures are lost
Consider these alternatives:
- LRU eviction: Remove only the least recently used entries
- Selective removal: Remove only
normalstate entries (tracking failures), keepcooloffentries until they expire - Partial clearing: Remove oldest 10-20% of entries rather than all
🔎 Suggested LRU-based approach
Track access times and evict oldest entries:
+ private queueCooloffStatesAccessOrder: string[] = [];
+
#incrementCooloff(queueId: string): void {
- // Safety check: if the cache is too large, just clear it
- if (this.queueCooloffStates.size >= this.maxCooloffStatesSize) {
- this.logger.warn("Cooloff states cache hit size cap, clearing all entries", {
- size: this.queueCooloffStates.size,
- cap: this.maxCooloffStatesSize,
- });
- this.queueCooloffStates.clear();
- }
+ // Safety check: if at capacity, remove oldest entry (LRU)
+ if (this.queueCooloffStates.size >= this.maxCooloffStatesSize) {
+ const oldestQueue = this.queueCooloffStatesAccessOrder.shift();
+ if (oldestQueue) {
+ this.queueCooloffStates.delete(oldestQueue);
+ this.logger.debug("Evicted oldest cooloff state", {
+ queueId: oldestQueue,
+ remainingSize: this.queueCooloffStates.size,
+ });
+ }
+ }
const state = this.queueCooloffStates.get(queueId) ?? {
tag: "normal" as const,
consecutiveFailures: 0,
};
+
+ // Update access order
+ const existingIndex = this.queueCooloffStatesAccessOrder.indexOf(queueId);
+ if (existingIndex !== -1) {
+ this.queueCooloffStatesAccessOrder.splice(existingIndex, 1);
+ }
+ this.queueCooloffStatesAccessOrder.push(queueId);
// ... rest of function
}Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In packages/redis-worker/src/fair-queue/index.ts around lines 1735 to 1743, the
current behavior clears the entire queueCooloffStates map when the size cap is
hit, which drops all cooloff and failure-tracking state; instead implement a
targeted eviction strategy: maintain access order (e.g., use the existing Map
insertion order by re-setting an entry on access to mark it recent or add an
access timestamp) and when maxCooloffStatesSize is exceeded evict entries until
under the cap by first removing entries in "normal" (failure-tracking) state,
then the oldest entries (LRU), or evict the oldest 10-20% as a fallback; update
any access patterns to refresh order/timestamps so LRU works correctly and
replace the single clear() call with code that removes only selected entries
until size < cap.
No description provided.