Skip to content

Conversation

@ericallam
Copy link
Member

@ericallam ericallam commented Jan 6, 2026

This PR fixes some issues with the new BatchQueue by implementing the full two-phase dequeue process in the FairQueue, and moving the responsibility of consuming the worker queue to the BatchQueue and independently enabling it via the BATCH_QUEUE_WORKER_QUEUE_ENABLED env var. We've also introduced the BATCH_QUEUE_SHARD_COUNT env var to control the count of master queue shards in the FairQueue. We can also control how many queues are considered in each iteration of the master queue consumer via the BATCH_QUEUE_MASTER_QUEUE_LIMIT env var.

This PR will also now skip trying to dequeue from tenants that are at concurrency capacity, which should lead to fewer issues with low concurrency tenants blocking higher concurrency tenants from processing.

@changeset-bot
Copy link

changeset-bot bot commented Jan 6, 2026

⚠️ No Changeset found

Latest commit: 6221282

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jan 6, 2026

Walkthrough

Adds four environment variables for batch-queue tuning and exposes them in RunEngine. Reworks fair-queue to a master-driven, batch-oriented flow with mandatory worker-queue routing, new batch claim/release Redis Lua commands, external message lifecycle APIs (heartbeatMessage, getMessageData, completeMessage, releaseMessage, failMessage), and WorkerQueueManager always present. DRR scheduler gains masterQueueLimit and a batched deficit-decrement path. BatchQueue implements worker-consumer loops, batched span tracing, shardCount and workerQueueBlockingTimeoutSeconds options, ConcurrencyManager exposes getAvailableCapacity, multiple type updates, and many tests and test helpers added/updated.

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

🚥 Pre-merge checks | ✅ 1 | ❌ 2
❌ Failed checks (2 warnings)
Check name Status Explanation Resolution
Description check ⚠️ Warning The PR description clearly explains the changes and objectives, but the provided description lacks the required sections (Checklist, Testing, Changelog, Screenshots) from the template. Add all required template sections including a completed checklist, testing steps, and a changelog entry to meet repository standards.
Docstring Coverage ⚠️ Warning Docstring coverage is 50.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The title 'fix(batch): optimize processing batch trigger v2' directly relates to the main changes: optimizing the BatchQueue's two-phase processing with FairQueue integration and configuration tuning.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@claude
Copy link

claude bot commented Jan 6, 2026

PR Review: fix(batch): optimize processing batch trigger v2

Overview

This PR introduces significant performance optimizations to the batch queue processing system by adding:

  1. Batch message claiming (claim multiple messages atomically)
  2. Shared worker queue architecture (all consumers pop from a single queue)
  3. Configurable master queue sharding and limits
  4. Early tenant capacity checks to skip overloaded tenants

Code Quality and Best Practices

Strengths:

  • Good use of atomic Lua scripts for batch operations (claimMessageBatch) which maintains data consistency
  • Configuration options are well-documented with JSDoc comments
  • Backward compatibility is maintained with sensible defaults
  • The shared worker queue pattern simplifies load distribution

Suggestions:

  1. Magic number in batch size cap (packages/redis-worker/src/fair-queue/index.ts:937):
let maxClaimCount = 10; // Default batch size cap

Consider making this configurable via options rather than a hardcoded value.

  1. Potential inefficiency in DRR deficit tracking (packages/redis-worker/src/fair-queue/index.ts:893-906):
for (let i = 0; i < processedFromQueue; i++) {
  await this.telemetry.trace(
    "recordProcessed",
    async (span) => {
      // ...
      await this.scheduler.recordProcessed!(tenantId, queueId);
    },
    { kind: SpanKind.INTERNAL }
  );
}

This calls recordProcessed in a loop, creating N spans and N Redis calls for N messages. Consider adding a batch variant recordProcessedBatch(tenantId, queueId, count) to the scheduler interface.

Potential Bugs or Issues

  1. Worker queue not null-checked (packages/redis-worker/src/fair-queue/index.ts:1000):
await this.workerQueueManager!.push(workerQueueId, messageKey);

The non-null assertion is used, but this code path is inside #claimAndPushToWorkerQueue which is only called from #processMasterQueueShard. While currently safe due to the flow, it would be more robust to add an early guard or use optional chaining with error handling.

  1. Unused consumerId parameter in Lua script (packages/redis-worker/src/fair-queue/visibility.ts:537):
local consumerId = ARGV[2]

The consumerId is passed to claimMessageBatch but never used in the Lua script. Either remove it from the arguments or use it for logging/tracking purposes.

  1. Change in workerQueue assignment behavior (packages/redis-worker/src/fair-queue/index.ts:305-307):
    The change from:
: options.queueId,

to:

: undefined,

Changes the fallback behavior when workerQueueResolver is not provided. This could be an intentional change, but ensure this doesn't break existing behavior for consumers relying on the previous default.

Performance Considerations

Positive:

  • Batch claiming reduces Redis round trips significantly
  • Early tenant capacity check (isAtCapacity) with break prevents wasted work iterating over queues for overloaded tenants
  • The "only sleep when no work" pattern avoids unnecessary latency when there's work to do
  • Configurable masterQueueLimit allows tuning for different workload sizes

Considerations:

  1. Sequential capacity checks in getAvailableCapacity (packages/redis-worker/src/fair-queue/concurrency.ts:121-134):
for (const group of this.groups) {
  const groupId = group.extractGroupId(queue);
  const key = this.keys.concurrencyKey(group.name, groupId);
  const current = await this.redis.scard(key);
  const limit = (await group.getLimit(groupId)) || group.defaultLimit;

Each iteration makes 2 Redis calls (SCARD + getLimit). For multiple concurrency groups, consider parallelizing these calls or using a Lua script.

  1. Potential hot key with single worker queue (packages/redis-worker/src/fair-queue/index.ts:976):
const workerQueueId = "worker-queue";

All consumers now share a single worker queue. While this simplifies distribution, it could become a contention point under very high load. Consider documenting this trade-off.

Security Concerns

No significant security concerns identified. The changes are internal queue processing optimizations that don't introduce new attack surfaces.

Test Coverage

Gaps identified:

  • No tests for the new claimBatch method in VisibilityManager
  • No tests for the new getAvailableCapacity method in ConcurrencyManager
  • No tests for the shared worker queue behavior
  • No tests for the new configuration options (shardCount, workerQueueBlockingTimeoutSeconds, masterQueueLimit)

Recommendations:

  1. Add unit tests for claimBatch covering:

    • Claiming multiple messages successfully
    • Claiming when queue has fewer messages than maxCount
    • Handling corrupted message data in batch
    • Behavior when queue is empty
  2. Add integration tests for the new worker queue pattern with multiple consumers

Summary

This is a solid performance optimization PR that introduces batch claiming and shared worker queues. The main areas to address:

  1. Required: Add tests for the new claimBatch and getAvailableCapacity methods
  2. Recommended: Make the batch size cap (10) configurable
  3. Recommended: Consider batch variant for recordProcessed to reduce Redis calls
  4. Minor: Remove unused consumerId from the Lua script or document its purpose

🤖 Generated with Claude Code

@ericallam
Copy link
Member Author

@claude review this PR please

@claude
Copy link

claude bot commented Jan 6, 2026

Claude finished @ericallam's task in 1m 29s —— View job


PR Review: fix(batch): optimize processing batch trigger v2

  • Analyze the changed files
  • Review code quality and patterns
  • Check for potential bugs or issues
  • Assess performance implications
  • Provide comprehensive feedback

Overview

This PR introduces significant performance optimizations to the batch queue processing system:

  1. Batch message claiming - New claimBatch method atomically claims up to N messages
  2. Shared worker queue architecture - All consumers pop from a single queue ("worker-queue")
  3. Configurable master queue limit for DRR - New masterQueueLimit option in DRR scheduler
  4. Early tenant capacity checks - Skip remaining queues for a tenant when at capacity
  5. Improved loop behavior - Only sleep when no work, reducing latency under load

Strengths

  • Atomic Lua scripts for batch operations (claimMessageBatch) maintain data consistency
  • Early break on tenant capacity prevents wasted iterations over queues for overloaded tenants
  • "Only sleep when no work" pattern (lines 787-801) avoids unnecessary latency when there's active work
  • Well-documented configuration options with JSDoc comments
  • Backward compatibility maintained with sensible defaults

Issues & Suggestions

1. Unused consumerId in Lua script (Minor)

File: packages/redis-worker/src/fair-queue/visibility.ts:537

local consumerId = ARGV[2]

The consumerId is passed to claimMessageBatch but never used in the Lua script. Either remove it from the arguments or use it for logging/tracking purposes.

Fix this →

2. Hardcoded batch size cap (Consider making configurable)

File: packages/redis-worker/src/fair-queue/index.ts:937

let maxClaimCount = 10; // Default batch size cap

This is a hardcoded magic number. Consider making it configurable via options to allow tuning for different workloads.

3. Sequential Redis calls in getAvailableCapacity (Performance consideration)

File: packages/redis-worker/src/fair-queue/concurrency.ts:121-134

for (const group of this.groups) {
  const groupId = group.extractGroupId(queue);
  const key = this.keys.concurrencyKey(group.name, groupId);
  const current = await this.redis.scard(key);
  const limit = (await group.getLimit(groupId)) || group.defaultLimit;

Each iteration makes 2 Redis calls (SCARD + getLimit). For multiple concurrency groups, consider:

  • Parallelizing these calls with Promise.all
  • Or using a Lua script for atomic batch checking

4. Sequential recordProcessed calls in loop

File: packages/redis-worker/src/fair-queue/index.ts:893-906

for (let i = 0; i < processedFromQueue; i++) {
  await this.telemetry.trace(
    "recordProcessed",
    async (span) => {
      await this.scheduler.recordProcessed!(tenantId, queueId);
    },
    { kind: SpanKind.INTERNAL }
  );
}

This calls recordProcessed N times for N messages, creating N spans and N Redis calls. Consider adding a batch variant like recordProcessedBatch(tenantId, queueId, count) to the scheduler interface to reduce overhead.

5. Potential hot key with single worker queue

File: packages/redis-worker/src/fair-queue/index.ts:976

const workerQueueId = "worker-queue";

All consumers now share a single worker queue. While this simplifies distribution, it could become a contention point under very high load. Consider documenting this trade-off or adding sharding options for the worker queue.


Test Coverage Gaps

The following new functionality lacks test coverage:

  • claimBatch method in VisibilityManager
  • getAvailableCapacity method in ConcurrencyManager
  • Shared worker queue behavior
  • New configuration options (masterQueueLimit, workerQueueBlockingTimeoutSeconds)

Recommendations:

  1. Add unit tests for claimBatch covering:

    • Claiming multiple messages successfully
    • Claiming when queue has fewer messages than maxCount
    • Handling corrupted message data in batch
    • Behavior when queue is empty
  2. Add integration tests for the new worker queue pattern with multiple consumers


Summary

This is a solid performance optimization PR. The main areas to address:

Priority Item
🟡 Recommended Add tests for new claimBatch and getAvailableCapacity methods
🟡 Recommended Make batch size cap (10) configurable
🟢 Minor Remove unused consumerId from Lua script
🟢 Consider Batch variant for recordProcessed to reduce Redis calls

|

@vibe-kanban-cloud
Copy link

Review Complete

Your review story is ready!

View Story

Comment !reviewfast on this PR to re-generate the story.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
packages/redis-worker/src/fair-queue/index.ts (1)

759-813: Fix abort signal handling inconsistency in master queue consumer loop

The master queue consumer loop has inconsistent abort error handling compared to the rest of the codebase:

  1. Incorrect error detection: Line 804 checks error.message === "AbortError" while the worker queue consumer (line 1083), direct consumer loop (line 1160), and reclaim loop (line 1681) all check error.name === "AbortError". The name property is the standard way to identify error types in JavaScript.

  2. Non-standard error creation: Line 796 creates a plain new Error("AbortError") instead of relying on the standard AbortError thrown by async abort signals. This only works because it's checked against the message property, making it fragile and inconsistent.

  3. Inconsistent patterns: The direct consumer loop (line 1131) uses for await (const _ of setInterval(..., { signal: this.abortController.signal })), which properly integrates with the abort signal and throws a standard AbortError. The master queue loop should follow the same pattern or at minimum align its error checking with the codebase convention.

Use error.name === "AbortError" to match the pattern used throughout the codebase, or refactor the idle wait to use AbortSignal.timeout() or similar standard async patterns:

Proposed fix
       }
     } catch (error) {
-      if (error instanceof Error && error.message === "AbortError") {
+      if (error instanceof Error && error.name === "AbortError") {
         this.logger.debug("Master queue consumer aborted", { loopId });
         this.batchedSpanManager.cleanup(loopId);
         return;
🤖 Fix all issues with AI Agents
In @packages/redis-worker/src/fair-queue/index.ts:
- Around line 980-1002: When concurrencyManager.reserve(descriptor,
message.messageId) fails, you must release not only the current message but also
all remaining entries in claimedMessages back to the master queue instead of
breaking immediately; iterate the remaining claimedMessages (starting after the
current index) and call visibilityManager.release(message.messageId, queueId,
queueKey, queueItemsKey, masterQueueKey) for each to clear their inflight state
before stopping, then break so workerQueueManager.push continues only for
successfully reserved messages.
🧹 Nitpick comments (6)
packages/redis-worker/src/fair-queue/concurrency.ts (1)

117-135: Consider parallelizing Redis calls for better performance.

The method logic is correct, but using await in a loop serializes the Redis calls. For better performance, especially when there are multiple concurrency groups, consider using Promise.all to fetch all group states in parallel.

🔎 Proposed optimization using Promise.all
  async getAvailableCapacity(queue: QueueDescriptor): Promise<number> {
-   let minCapacity = Infinity;
-
-   for (const group of this.groups) {
-     const groupId = group.extractGroupId(queue);
-     const key = this.keys.concurrencyKey(group.name, groupId);
-     const current = await this.redis.scard(key);
-     const limit = (await group.getLimit(groupId)) || group.defaultLimit;
-     const available = Math.max(0, limit - current);
-
-     minCapacity = Math.min(minCapacity, available);
-   }
-
-   return minCapacity === Infinity ? 0 : minCapacity;
+   const capacities = await Promise.all(
+     this.groups.map(async (group) => {
+       const groupId = group.extractGroupId(queue);
+       const key = this.keys.concurrencyKey(group.name, groupId);
+       const current = await this.redis.scard(key);
+       const limit = (await group.getLimit(groupId)) || group.defaultLimit;
+       return Math.max(0, limit - current);
+     })
+   );
+
+   return capacities.length > 0 ? Math.min(...capacities) : 0;
  }
apps/webapp/app/env.server.ts (1)

954-959: Add validation for minimum values.

These configuration values should be constrained to prevent invalid runtime behavior. Consider adding .positive() or .min(1) to ensure:

  • BATCH_QUEUE_SHARD_COUNT is at least 1 (to avoid division by zero in shard mapping)
  • BATCH_QUEUE_MASTER_QUEUE_LIMIT is at least 1 (to ensure queues can be fetched)
  • BATCH_QUEUE_WORKER_QUEUE_TIMEOUT_SECONDS is at least 1 (to avoid potential blocking issues)
🔎 Proposed validation constraints
-    BATCH_QUEUE_SHARD_COUNT: z.coerce.number().int().default(1),
+    BATCH_QUEUE_SHARD_COUNT: z.coerce.number().int().positive().default(1),
     // Maximum queues to fetch from master queue per iteration
-    BATCH_QUEUE_MASTER_QUEUE_LIMIT: z.coerce.number().int().default(1000),
+    BATCH_QUEUE_MASTER_QUEUE_LIMIT: z.coerce.number().int().positive().default(1000),
     // Worker queue blocking timeout in seconds (for two-stage processing)
-    BATCH_QUEUE_WORKER_QUEUE_TIMEOUT_SECONDS: z.coerce.number().int().default(10),
+    BATCH_QUEUE_WORKER_QUEUE_TIMEOUT_SECONDS: z.coerce.number().int().positive().default(10),
packages/redis-worker/src/fair-queue/index.ts (4)

976-977: Consider making worker queue ID configurable

The worker queue ID is hardcoded as "worker-queue". If this system needs to support multiple independent worker queue pools in the future (e.g., for different priority levels or isolation), this would need refactoring.

Consider extracting this as a configuration parameter:

// In constructor or options
this.sharedWorkerQueueId = options.workerQueue?.queueId ?? "worker-queue";

1023-1027: Add abort signal check at loop start

The worker queue consumer loop doesn't check the abort signal at the start of the loop iteration (unlike the master queue loop at line 761-763). While the blockingPop call includes the abort signal, adding an early check would prevent unnecessary handler checks when already aborted.

Suggested addition
     try {
       while (this.isRunning) {
+        // Check abort signal
+        if (this.abortController.signal.aborted) {
+          break;
+        }
+        
         if (!this.messageHandler) {
           await new Promise((resolve) => setTimeout(resolve, this.consumerIntervalMs));
           continue;
         }

759-918: Architectural observation: Two-stage vs direct processing paths

The codebase now has two distinct processing paths:

  1. Two-stage (lines 759-918, 1015-1092): Master queue claims batches, pushes to shared worker queue, workers pop and process
  2. Direct (lines 1124-1271): Consumers claim and process messages directly, one at a time

The two-stage path uses batch claiming and numeric return values, while the direct path uses single-message claiming and boolean returns. This divergence is intentional based on the workerQueueEnabled flag, but creates maintenance complexity.

Consider documenting the trade-offs and decision criteria for when to use each mode, as this significantly impacts processing behavior, concurrency patterns, and fairness guarantees.

Also applies to: 1015-1092, 1124-1271


893-906: Move telemetry tracing outside the loop to reduce span overhead

The loop at lines 893-906 creates a separate trace span for each of the up to 10 recordProcessed calls. Consider moving the telemetry.trace wrapper outside the loop to wrap all records in a single span, which would reduce telemetry overhead during batch processing.

For a more complete solution, the FairScheduler interface could also be extended with a recordProcessedBatch(tenantId: string, queueId: string, count: number) method to allow schedulers to optimize their deficit tracking for batch operations.

📜 Review details

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 47bed15 and 08f635d.

📒 Files selected for processing (11)
  • apps/webapp/app/env.server.ts
  • apps/webapp/app/v3/runEngine.server.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
  • internal-packages/run-engine/src/batch-queue/types.ts
  • internal-packages/run-engine/src/engine/index.ts
  • internal-packages/run-engine/src/engine/types.ts
  • packages/redis-worker/src/fair-queue/concurrency.ts
  • packages/redis-worker/src/fair-queue/index.ts
  • packages/redis-worker/src/fair-queue/schedulers/drr.ts
  • packages/redis-worker/src/fair-queue/types.ts
  • packages/redis-worker/src/fair-queue/visibility.ts
🧰 Additional context used
📓 Path-based instructions (6)
**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

**/*.{ts,tsx}: Use types over interfaces for TypeScript
Avoid using enums; prefer string unions or const objects instead

Files:

  • packages/redis-worker/src/fair-queue/concurrency.ts
  • packages/redis-worker/src/fair-queue/types.ts
  • packages/redis-worker/src/fair-queue/schedulers/drr.ts
  • internal-packages/run-engine/src/engine/types.ts
  • internal-packages/run-engine/src/engine/index.ts
  • apps/webapp/app/v3/runEngine.server.ts
  • packages/redis-worker/src/fair-queue/visibility.ts
  • internal-packages/run-engine/src/batch-queue/types.ts
  • apps/webapp/app/env.server.ts
  • packages/redis-worker/src/fair-queue/index.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
**/*.{ts,tsx,js,jsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Use function declarations instead of default exports

Files:

  • packages/redis-worker/src/fair-queue/concurrency.ts
  • packages/redis-worker/src/fair-queue/types.ts
  • packages/redis-worker/src/fair-queue/schedulers/drr.ts
  • internal-packages/run-engine/src/engine/types.ts
  • internal-packages/run-engine/src/engine/index.ts
  • apps/webapp/app/v3/runEngine.server.ts
  • packages/redis-worker/src/fair-queue/visibility.ts
  • internal-packages/run-engine/src/batch-queue/types.ts
  • apps/webapp/app/env.server.ts
  • packages/redis-worker/src/fair-queue/index.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
**/*.{js,ts,jsx,tsx,json,md,css,scss}

📄 CodeRabbit inference engine (AGENTS.md)

Format code using Prettier

Files:

  • packages/redis-worker/src/fair-queue/concurrency.ts
  • packages/redis-worker/src/fair-queue/types.ts
  • packages/redis-worker/src/fair-queue/schedulers/drr.ts
  • internal-packages/run-engine/src/engine/types.ts
  • internal-packages/run-engine/src/engine/index.ts
  • apps/webapp/app/v3/runEngine.server.ts
  • packages/redis-worker/src/fair-queue/visibility.ts
  • internal-packages/run-engine/src/batch-queue/types.ts
  • apps/webapp/app/env.server.ts
  • packages/redis-worker/src/fair-queue/index.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
{packages/core,apps/webapp}/**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Use zod for validation in packages/core and apps/webapp

Files:

  • apps/webapp/app/v3/runEngine.server.ts
  • apps/webapp/app/env.server.ts
apps/webapp/app/**/*.{ts,tsx}

📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)

Access all environment variables through the env export of env.server.ts instead of directly accessing process.env in the Trigger.dev webapp

Files:

  • apps/webapp/app/v3/runEngine.server.ts
  • apps/webapp/app/env.server.ts
apps/webapp/**/*.{ts,tsx}

📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)

apps/webapp/**/*.{ts,tsx}: When importing from @trigger.dev/core in the webapp, use subpath exports from the package.json instead of importing from the root path
Follow the Remix 2.1.0 and Express server conventions when updating the main trigger.dev webapp

Files:

  • apps/webapp/app/v3/runEngine.server.ts
  • apps/webapp/app/env.server.ts
🧠 Learnings (3)
📓 Common learnings
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use `batch.trigger()` to trigger multiple different tasks at once from backend code
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use `batch.triggerByTask()` to batch trigger tasks by passing task instances for static task sets
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use `batch.triggerAndWait()` to batch trigger multiple different tasks and wait for results
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use `batch.triggerByTaskAndWait()` to batch trigger tasks by passing task instances and wait for results
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use `tasks.batchTrigger()` to trigger multiple runs of a single task with different payloads
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use `yourTask.batchTrigger()` to trigger multiple runs of a task from inside another task
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use `yourTask.batchTriggerAndWait()` to batch trigger tasks and wait for all results from a parent task
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Control concurrency using the `queue` property with `concurrencyLimit` option

Applied to files:

  • packages/redis-worker/src/fair-queue/concurrency.ts
  • packages/redis-worker/src/fair-queue/types.ts
  • packages/redis-worker/src/fair-queue/schedulers/drr.ts
  • internal-packages/run-engine/src/engine/types.ts
  • internal-packages/run-engine/src/engine/index.ts
  • apps/webapp/app/v3/runEngine.server.ts
  • internal-packages/run-engine/src/batch-queue/types.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
📚 Learning: 2025-11-27T16:26:58.661Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/webapp.mdc:0-0
Timestamp: 2025-11-27T16:26:58.661Z
Learning: Applies to apps/webapp/app/**/*.{ts,tsx} : Access all environment variables through the `env` export of `env.server.ts` instead of directly accessing `process.env` in the Trigger.dev webapp

Applied to files:

  • apps/webapp/app/env.server.ts
🧬 Code graph analysis (5)
packages/redis-worker/src/fair-queue/concurrency.ts (1)
packages/redis-worker/src/fair-queue/types.ts (1)
  • QueueDescriptor (31-38)
internal-packages/run-engine/src/engine/index.ts (1)
internal-packages/run-engine/src/run-queue/index.ts (1)
  • options (1126-1142)
apps/webapp/app/v3/runEngine.server.ts (2)
apps/webapp/app/env.server.ts (1)
  • env (1291-1291)
apps/supervisor/src/env.ts (1)
  • env (119-119)
packages/redis-worker/src/fair-queue/visibility.ts (2)
packages/redis-worker/src/fair-queue/keyProducer.ts (4)
  • queueKey (37-39)
  • queueItemsKey (41-43)
  • inflightKey (57-59)
  • inflightDataKey (61-63)
packages/redis-worker/src/fair-queue/types.ts (1)
  • InFlightMessage (207-218)
packages/redis-worker/src/fair-queue/index.ts (6)
packages/redis-worker/src/fair-queue/visibility.ts (2)
  • shardId (472-481)
  • queueId (453-455)
packages/redis-worker/src/fair-queue/concurrency.ts (1)
  • isAtCapacity (151-157)
packages/redis-worker/src/fair-queue/telemetry.ts (1)
  • FairQueueAttributes (18-29)
packages/redis-worker/src/fair-queue/masterQueue.ts (1)
  • queueId (195-197)
packages/redis-worker/src/fair-queue/types.ts (1)
  • StoredMessage (62-79)
packages/redis-worker/src/fair-queue/keyProducer.ts (3)
  • queueKey (37-39)
  • queueItemsKey (41-43)
  • masterQueueKey (29-31)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (25)
  • GitHub Check: Cursor Bugbot
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
  • GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: claude-review
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (17)
packages/redis-worker/src/fair-queue/types.ts (1)

514-515: LGTM!

The optional masterQueueLimit field is well-documented and maintains backwards compatibility. The comment clearly indicates the default value.

internal-packages/run-engine/src/engine/types.ts (1)

79-82: LGTM!

The new optional configuration fields are well-documented and maintain backwards compatibility. The comments clearly describe their purpose.

internal-packages/run-engine/src/batch-queue/types.ts (2)

126-127: LGTM!

The optional masterQueueLimit field is well-documented and maintains backwards compatibility.


201-204: LGTM!

The new optional configuration fields are well-documented and maintain backwards compatibility. They align properly with the type definitions in other layers of the codebase.

apps/webapp/app/v3/runEngine.server.ts (1)

174-177: All three environment variables are properly defined in env.server.ts with zod validation and appropriate defaults. They are correctly imported and accessed via the env export in runEngine.server.ts. No action needed.

packages/redis-worker/src/fair-queue/schedulers/drr.ts (1)

30-30: LGTM: Clean parameterization of master queue limit.

The masterQueueLimit field is correctly introduced and used to replace the hardcoded value of 1000. The fallback default maintains backward compatibility.

Also applies to: 39-39, 200-200

internal-packages/run-engine/src/engine/index.ts (1)

343-346: LGTM: Configuration propagation is clean and correct.

The three new batch queue configuration options are properly threaded through to the BatchQueue initialization using optional chaining to handle undefined values gracefully.

internal-packages/run-engine/src/batch-queue/index.ts (3)

111-111: LGTM: DRR masterQueueLimit correctly forwarded.

The masterQueueLimit is properly passed through to the DRRScheduler configuration.


125-125: LGTM: Safe shard count default.

The shardCount configuration correctly uses the provided value with a safe fallback to 1, maintaining backward compatibility.


137-142: LGTM: Opt-in worker queue configuration.

The conditional worker queue configuration is a clean approach for enabling two-stage processing only when explicitly configured. The blockingTimeoutSeconds parameter correctly gates the feature.

packages/redis-worker/src/fair-queue/visibility.ts (3)

142-215: LGTM: Well-structured batch claim implementation.

The claimBatch method is correctly implemented:

  • Mirrors the single-message claim method structure for consistency
  • Properly handles the paired result array from the Lua script
  • Includes appropriate error handling for corrupted payloads
  • Logs batch operations for observability

The error handling correctly removes corrupted messages from in-flight tracking to prevent leaks.


529-577: LGTM: Atomic batch claim Lua script is correct.

The claimMessageBatch Lua script correctly implements atomic batch message claiming:

  • Uses ZRANGE with correct 0-based indexing to fetch up to maxCount messages
  • Atomically removes messages from the queue and adds them to in-flight tracking
  • Gracefully handles missing payloads by removing orphaned queue entries
  • Returns results in the expected flat array format

The atomicity guarantees prevent race conditions between concurrent consumers.


658-667: LGTM: TypeScript type definition is correct.

The TypeScript type definition for claimMessageBatch accurately reflects the Lua script's signature and return type.

packages/redis-worker/src/fair-queue/index.ts (4)

298-308: LGTM: Explicit undefined assignment

The explicit undefined assignment for workerQueue when the resolver is not provided makes the intent clearer without changing behavior.

Also applies to: 394-404


866-874: LGTM: Tenant-level capacity check

The tenant capacity check before processing queues prevents unnecessary work when a tenant is at capacity, and correctly skips all remaining queues for that tenant. This is an effective optimization.

Based on learnings, batch processing should respect tenant-level concurrency limits.


961-963: Verified: claimBatch method properly implemented with comprehensive edge case handling

The claimBatch method exists in VisibilityManager and handles all relevant edge cases. The implementation uses a Lua script for atomic operations and properly handles scenarios where fewer messages are claimed than requested, missing data, JSON parsing errors, and empty results. The method includes error logging and cleanup for corrupted messages.


937-946: Verified: All method implementations are correct

The code properly calls getAvailableCapacity() to determine batch size limits based on available concurrency capacity. All referenced methods—getAvailableCapacity(), claimBatch(), reserve(), and release()—are correctly implemented in their respective manager classes. The logic for handling partial batch processing when concurrency limits are reached is sound.

@claude
Copy link

claude bot commented Jan 6, 2026

PR Review: fix(batch): optimize processing batch trigger v2

This PR introduces significant performance optimizations to the batch queue processing system. Here's my detailed review:


✅ Strengths

1. Batch Claiming Architecture
The transition from single-message claiming to batch claiming (claimBatch) is well-designed. The atomic Lua scripts in visibility.ts:580-626 correctly handle the batch operations, ensuring messages are atomically moved from the queue to the in-flight set.

2. Deficit Round Robin Batch Support
The new recordProcessedBatch method (drr.ts:143-152) and drrDecrementDeficitBatch Lua script (drr.ts:318-340) efficiently update scheduler state in a single Redis call instead of N calls.

3. Shared Worker Queue Pattern
The change to a single shared "worker-queue" (index.ts:992) is a pragmatic simplification. The code comment correctly notes Redis can handle 100k+ ops/sec, making this appropriate for most workloads.

4. Adaptive Loop Timing
The consumer loop refactoring (index.ts:763-809) to only wait when idle (if (!hadWork)) is a solid optimization that improves throughput under load while avoiding CPU spinning when idle.

5. Comprehensive Test Coverage
Good test additions:

  • concurrency.test.ts: Tests for getAvailableCapacity across multiple groups
  • drr.test.ts: Tests for batch deficit decrement including edge cases
  • visibility.test.ts: Tests for claimBatch and releaseBatch
  • fairQueue.test.ts: Integration test for two-stage processing with concurrency limits

⚠️ Issues and Concerns

1. Race Condition in Batch Reserve Loop (index.ts:973-1003)

for (let i = 0; i < claimedMessages.length; i++) {
  const message = claimedMessages[i]!;
  if (this.concurrencyManager) {
    const reserved = await this.concurrencyManager.reserve(descriptor, message.messageId);
    if (!reserved) {
      await this.visibilityManager.releaseBatch(claimedMessages.slice(i), ...);
      break;
    }
  }
  await this.workerQueueManager!.push(workerQueueId, messageKey);
  processedCount++;
}

There's a potential issue: if workerQueueManager.push() fails after reserve() succeeds, the concurrency slot is consumed but the message is never pushed to the worker queue. Consider wrapping in try/catch and releasing the reservation on push failure.

2. Hardcoded Worker Queue ID (index.ts:992)
The workerQueueId = "worker-queue" is hardcoded in multiple places. This should be a constant or configurable option for maintainability:

const workerQueueId = "worker-queue"; // index.ts:992
const workerQueueId = "worker-queue"; // index.ts:1043

3. workerQueue Field Changed to undefined (index.ts:309, 405)
The change from options.queueId to undefined for the workerQueue field in stored messages removes backwards compatibility. If there are existing messages in Redis with this field, they might behave differently. Consider whether a migration or fallback is needed.

4. Event Listener Memory Leak Risk (index.ts:793-803)
The abort handler pattern is correct, but ensure the listener is always removed:

this.abortController.signal.addEventListener("abort", abortHandler, { once: true });

The { once: true } only removes on abort. The comment at line 799 mentions this, but the fix at line 797 only removes on timeout. If the promise is garbage collected without either event, this could leak. The current implementation looks correct, but this is worth double-checking.

5. Potential Integer Overflow (concurrency.ts:155)

return minCapacity === Infinity ? 0 : minCapacity;

When no groups are configured, this returns 0 (correct!), but consider adding a comment explaining this intentional behavior since "no groups" returning "no capacity" might be surprising.


🔧 Minor Suggestions

1. Consider Pipeline for Batch Operations
In #claimAndPushToWorkerQueue, the loop pushes messages one at a time to the worker queue. Consider batching these push calls using a Redis pipeline for better performance.

2. Configuration Naming Consistency
New configs use inconsistent naming:

  • shardCount vs BATCH_QUEUE_SHARD_COUNT
  • workerQueueBlockingTimeoutSeconds vs BATCH_QUEUE_WORKER_QUEUE_TIMEOUT_SECONDS

Consider using the same casing pattern throughout.

3. Type Annotation for Lua Script Return
In drr.ts:356-361, the type declaration for drrDecrementDeficitBatch returns Promise<string>. Consider documenting that this represents a float as a string.


📊 Performance Implications

Positive:

  • Batch claiming reduces Redis round trips from N to 1 per batch
  • Batch deficit updates reduce Redis calls
  • Adaptive waiting prevents idle spinning
  • Pre-checking tenant capacity (isAtCapacity at line 875) avoids wasted work

Considerations:

  • The single shared worker queue could become a contention point at very high scale (>10k msg/sec as noted in the comment)
  • batchClaimSize default of 10 should be tuned based on typical workload

🔒 Security

No security concerns identified. The changes are internal to the queue processing system and don't expose new attack surfaces.


Summary

This is a well-designed optimization PR that should significantly improve batch queue throughput. The main concern is the potential issue in the batch reserve loop where a push failure after successful reservation could leak a concurrency slot. I recommend addressing that edge case before merging.

🤖 Generated with Claude Code

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI Agents
In @internal-packages/run-engine/src/batch-queue/index.ts:
- Around line 135-142: The current truthiness check on
options.workerQueueBlockingTimeoutSeconds disables workerQueue when the value is
0; change the condition to explicitly check for undefined (e.g.,
options.workerQueueBlockingTimeoutSeconds !== undefined) so 0 is treated as a
valid blockingTimeoutSeconds; update the object creation for workerQueue
(referencing workerQueue, options.workerQueueBlockingTimeoutSeconds, and
blockingTimeoutSeconds) to use this explicit undefined check.

In @packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts:
- Around line 880-883: The test title is misleading: the test named "should
release remaining claimed messages when concurrency reservation fails" does not
simulate a reservation failure or check release behavior but instead verifies
successful completion under concurrency limits; rename the test title in the
describe/redisTest invocation to accurately reflect the behavior being asserted
(e.g., "should process all messages respecting concurrency limits in two-stage
mode" or "should complete batch processing without leaking in-flight messages")
so the name matches the assertions in this test block and update any related
comments to match.
🧹 Nitpick comments (4)
packages/redis-worker/src/fair-queue/index.ts (2)

809-814: Inconsistent abort error detection pattern.

Line 810 checks error.message === "AbortError", but line 1109 in #runWorkerQueueConsumerLoop checks error.name === "AbortError". The timeout rejection on line 797 creates an error with message "AbortError", so this works, but the inconsistency could cause confusion.

🔎 Consider using consistent abort error detection
     } catch (error) {
-      if (error instanceof Error && error.message === "AbortError") {
+      if (error instanceof Error && (error.name === "AbortError" || error.message === "AbortError")) {
         this.logger.debug("Master queue consumer aborted", { loopId });

Or create a helper function for consistent abort detection across all loops.


912-924: Telemetry fallback loop is inefficient for large batches.

When recordProcessedBatch is not available, the fallback iterates and traces each message individually. For large batch sizes, this creates significant tracing overhead.

🔎 Consider batching the fallback telemetry
           } else if (this.scheduler.recordProcessed) {
+            // Fallback: record each message individually
+            // Note: This is less efficient than batch variant
             for (let i = 0; i < processedFromQueue; i++) {
-              await this.telemetry.trace(
-                "recordProcessed",
-                async (span) => {
-                  span.setAttribute(FairQueueAttributes.QUEUE_ID, queueId);
-                  span.setAttribute(FairQueueAttributes.TENANT_ID, tenantId);
-                  await this.scheduler.recordProcessed!(tenantId, queueId);
-                },
-                { kind: SpanKind.INTERNAL }
-              );
+              await this.scheduler.recordProcessed!(tenantId, queueId);
             }
           }

Creating a span per message in the fallback path may not add value since the messages are identical operations.

packages/redis-worker/src/fair-queue/visibility.ts (1)

686-686: Use # operator instead of deprecated table.getn in Lua.

table.getn was deprecated in Lua 5.1 and removed in later versions. Redis uses Lua 5.1, so this works, but #ARGV is the idiomatic approach and is more consistent with the rest of the codebase (e.g., line 594 uses #items).

🔎 Suggested fix
 -- Remaining args are: members..., messageIds...
 -- Calculate how many messages we have
-local numMessages = (table.getn(ARGV) - 2) / 2
+local numMessages = (#ARGV - 2) / 2
 local membersStart = 3
internal-packages/run-engine/src/batch-queue/index.ts (1)

125-125: Good: Dynamic shardCount with sensible default.

The change from a hardcoded value to options.shardCount ?? 1 improves configurability while maintaining backwards compatibility.

Consider adding validation for the shardCount value

While not critical, you might want to validate that shardCount is a positive integer to catch configuration errors early:

-      shardCount: options.shardCount ?? 1,
+      shardCount: Math.max(1, Math.floor(options.shardCount ?? 1)),

Or add validation in the constructor with a more descriptive error:

if (options.shardCount !== undefined && (options.shardCount < 1 || !Number.isInteger(options.shardCount))) {
  throw new Error(`Invalid shardCount: ${options.shardCount}. Must be a positive integer.`);
}
📜 Review details

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 08f635d and 797ed36.

📒 Files selected for processing (16)
  • apps/webapp/app/env.server.ts
  • apps/webapp/app/v3/runEngine.server.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
  • internal-packages/run-engine/src/batch-queue/types.ts
  • internal-packages/run-engine/src/engine/index.ts
  • internal-packages/run-engine/src/engine/types.ts
  • packages/redis-worker/src/fair-queue/concurrency.ts
  • packages/redis-worker/src/fair-queue/index.ts
  • packages/redis-worker/src/fair-queue/scheduler.ts
  • packages/redis-worker/src/fair-queue/schedulers/drr.ts
  • packages/redis-worker/src/fair-queue/tests/concurrency.test.ts
  • packages/redis-worker/src/fair-queue/tests/drr.test.ts
  • packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts
  • packages/redis-worker/src/fair-queue/tests/visibility.test.ts
  • packages/redis-worker/src/fair-queue/types.ts
  • packages/redis-worker/src/fair-queue/visibility.ts
🚧 Files skipped from review as they are similar to previous changes (7)
  • internal-packages/run-engine/src/engine/index.ts
  • apps/webapp/app/v3/runEngine.server.ts
  • apps/webapp/app/env.server.ts
  • internal-packages/run-engine/src/batch-queue/types.ts
  • internal-packages/run-engine/src/engine/types.ts
  • packages/redis-worker/src/fair-queue/concurrency.ts
  • packages/redis-worker/src/fair-queue/types.ts
🧰 Additional context used
📓 Path-based instructions (5)
**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

**/*.{ts,tsx}: Use types over interfaces for TypeScript
Avoid using enums; prefer string unions or const objects instead

Files:

  • packages/redis-worker/src/fair-queue/scheduler.ts
  • packages/redis-worker/src/fair-queue/tests/drr.test.ts
  • packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts
  • packages/redis-worker/src/fair-queue/tests/visibility.test.ts
  • packages/redis-worker/src/fair-queue/schedulers/drr.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
  • packages/redis-worker/src/fair-queue/tests/concurrency.test.ts
  • packages/redis-worker/src/fair-queue/visibility.ts
  • packages/redis-worker/src/fair-queue/index.ts
**/*.{ts,tsx,js,jsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Use function declarations instead of default exports

Files:

  • packages/redis-worker/src/fair-queue/scheduler.ts
  • packages/redis-worker/src/fair-queue/tests/drr.test.ts
  • packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts
  • packages/redis-worker/src/fair-queue/tests/visibility.test.ts
  • packages/redis-worker/src/fair-queue/schedulers/drr.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
  • packages/redis-worker/src/fair-queue/tests/concurrency.test.ts
  • packages/redis-worker/src/fair-queue/visibility.ts
  • packages/redis-worker/src/fair-queue/index.ts
**/*.{js,ts,jsx,tsx,json,md,css,scss}

📄 CodeRabbit inference engine (AGENTS.md)

Format code using Prettier

Files:

  • packages/redis-worker/src/fair-queue/scheduler.ts
  • packages/redis-worker/src/fair-queue/tests/drr.test.ts
  • packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts
  • packages/redis-worker/src/fair-queue/tests/visibility.test.ts
  • packages/redis-worker/src/fair-queue/schedulers/drr.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
  • packages/redis-worker/src/fair-queue/tests/concurrency.test.ts
  • packages/redis-worker/src/fair-queue/visibility.ts
  • packages/redis-worker/src/fair-queue/index.ts
**/*.{test,spec}.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Use vitest for all tests in the Trigger.dev repository

Files:

  • packages/redis-worker/src/fair-queue/tests/drr.test.ts
  • packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts
  • packages/redis-worker/src/fair-queue/tests/visibility.test.ts
  • packages/redis-worker/src/fair-queue/tests/concurrency.test.ts
**/*.test.{ts,tsx,js,jsx}

📄 CodeRabbit inference engine (AGENTS.md)

**/*.test.{ts,tsx,js,jsx}: Test files should live beside the files under test and use descriptive describe and it blocks
Avoid mocks or stubs in tests; use helpers from @internal/testcontainers when Redis or Postgres are needed
Use vitest for unit tests

Files:

  • packages/redis-worker/src/fair-queue/tests/drr.test.ts
  • packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts
  • packages/redis-worker/src/fair-queue/tests/visibility.test.ts
  • packages/redis-worker/src/fair-queue/tests/concurrency.test.ts
🧠 Learnings (1)
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Control concurrency using the `queue` property with `concurrencyLimit` option

Applied to files:

  • packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts
  • packages/redis-worker/src/fair-queue/schedulers/drr.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
🧬 Code graph analysis (3)
packages/redis-worker/src/fair-queue/tests/drr.test.ts (3)
packages/redis-worker/src/fair-queue/schedulers/drr.ts (3)
  • keys (198-201)
  • redis (275-345)
  • DRRScheduler (25-346)
packages/redis-worker/src/fair-queue/keyProducer.ts (1)
  • DefaultFairQueueKeyProducer (16-133)
packages/redis-worker/src/fair-queue/schedulers/index.ts (1)
  • DRRScheduler (5-5)
packages/redis-worker/src/fair-queue/tests/visibility.test.ts (2)
packages/redis-worker/src/fair-queue/visibility.ts (3)
  • VisibilityManager (29-745)
  • redis (534-744)
  • queueId (504-506)
packages/redis-worker/src/fair-queue/index.ts (4)
  • redis (1826-1916)
  • queueId (1737-1750)
  • queueId (1752-1786)
  • queueId (1788-1790)
packages/redis-worker/src/fair-queue/tests/concurrency.test.ts (1)
packages/redis-worker/src/fair-queue/types.ts (1)
  • QueueDescriptor (31-38)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (25)
  • GitHub Check: Cursor Bugbot
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
  • GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: claude-review
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (22)
packages/redis-worker/src/fair-queue/scheduler.ts (1)

31-38: LGTM! Well-documented batch processing hook.

The recordProcessedBatch method is properly designed as a batch-optimized variant of recordProcessed. The documentation clearly explains its purpose (reducing Redis calls when processing multiple messages), and the default no-op implementation follows the established pattern for base scheduler hooks.

packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts (1)

884-980: Test coverage for two-stage processing looks comprehensive.

The test properly validates:

  • Two-stage worker queue with concurrency limits
  • All messages are processed successfully
  • No messages remain in-flight after completion
  • Proper cleanup occurs
packages/redis-worker/src/fair-queue/tests/concurrency.test.ts (1)

225-384: Excellent test coverage for getAvailableCapacity.

The test suite comprehensively validates capacity calculation across multiple scenarios:

  • Single group capacity and dynamic updates after reservations
  • Minimum capacity calculation across multiple groups (correctly returns the bottleneck)
  • Edge case: zero capacity when any group is at limit
  • Edge case: zero capacity when no groups are configured

All tests properly clean up resources and follow vitest best practices.

packages/redis-worker/src/fair-queue/tests/visibility.test.ts (2)

248-448: Excellent batch claim test coverage with robust edge case handling.

The claimBatch test suite is comprehensive and includes:

  • Atomic batch operations with proper count verification
  • Empty queue handling
  • Partial batch behavior when fewer messages available
  • Particularly good: The corrupted message test (lines 381-447) validates that invalid JSON is gracefully skipped while valid messages continue processing

This defensive approach prevents one corrupted message from blocking an entire batch.


450-599: Comprehensive batch release test coverage.

The releaseBatch tests properly validate:

  • Atomic release of multiple messages with correct state updates
  • Graceful handling of empty input (no exceptions thrown)
  • Master queue timestamp updates with oldest message

All tests include proper verification of both in-flight and queue counts, ensuring state consistency.

packages/redis-worker/src/fair-queue/tests/drr.test.ts (1)

118-174: LGTM! Proper test coverage for batch deficit decrement.

The new tests validate recordProcessedBatch behavior:

  • Normal batch decrement (15 - 7 = 8)
  • Floor-at-zero when decrementing more than available (3 - 10 = 0)

Both tests follow the established pattern for deficit management tests and properly verify boundary conditions.

packages/redis-worker/src/fair-queue/schedulers/drr.ts (4)

30-39: LGTM! Configurable master queue limit.

The addition of masterQueueLimit as a configurable field with a sensible default of 1000 improves flexibility over the previously hardcoded value.


143-153: LGTM! Batch deficit decrement implementation.

The recordProcessedBatch method correctly follows the existing pattern of recordProcessed while supporting batch operations. The delegation to #decrementDeficitBatch is clean.


264-273: LGTM! Atomic batch deficit decrement with proper floor.

The Lua script correctly:

  1. Decrements the deficit by the provided count atomically
  2. Floors at 0 to prevent negative deficits
  3. Returns the new deficit value as a string

This mirrors the single-message drrDecrementDeficit logic appropriately.

Also applies to: 325-344


359-364: LGTM! Interface extension for batch command.

The drrDecrementDeficitBatch interface declaration correctly matches the Lua command implementation.

packages/redis-worker/src/fair-queue/index.ts (6)

87-87: LGTM! Batch claim size configuration.

The batchClaimSize field with a default of 10 provides reasonable control over batch processing throughput.

Also applies to: 143-144


794-806: Memory leak fix correctly implemented.

The abort listener is now properly removed when the timeout fires (line 802), preventing listener accumulation during idle periods. This addresses the previously flagged memory leak issue.


872-880: LGTM! Per-tenant capacity check optimization.

Checking tenant capacity before processing queues and breaking early prevents unnecessary work when the tenant is at capacity. The stat tracking with tenant_capacity_skipped aids observability.


1010-1020: Stranded messages fix correctly implemented.

When concurrency reservation fails, all remaining claimed messages (from index i onward) are now released back to the queue via releaseBatch. This prevents messages from being stranded in the in-flight set until visibility timeout expires. Good fix for the previously flagged issue.


993-999: Shared worker queue pattern is acceptable with documented trade-offs.

The comments clearly document the trade-off: simpler code and fair distribution vs. potential contention under very high load. The suggestion to add optional sharding in the future is noted.


311-312: LGTM! Consistent undefined handling for workerQueue.

Setting workerQueue to undefined when no resolver is provided (instead of potentially calling the resolver with incomplete data) is correct.

Also applies to: 407-408

packages/redis-worker/src/fair-queue/visibility.ts (5)

130-214: LGTM! Batch claim implementation with proper error handling.

The claimBatch method correctly:

  1. Handles empty results
  2. Parses alternating messageId/payload pairs
  3. Handles JSON parse errors gracefully by removing corrupted messages from in-flight
  4. Provides appropriate logging

313-363: LGTM! Batch release implementation.

The releaseBatch method correctly:

  1. Returns early on empty input
  2. Builds parallel arrays of members and messageIds for the Lua script
  3. Provides appropriate logging

579-627: LGTM! Atomic batch claim Lua script.

The script correctly handles the batch claim operation:

  1. Fetches up to maxCount messages using ZRANGE
  2. Processes each message atomically (remove from queue, add to in-flight)
  3. Handles missing payload data gracefully
  4. Returns alternating messageId/payload pairs

671-721: LGTM! Atomic batch release Lua script with single master queue update.

The script correctly:

  1. Calculates array positions for members and messageIds
  2. Releases each message atomically
  3. Updates the master queue only once at the end (performance optimization)
  4. Returns the count of released messages

760-768: LGTM! Redis interface extensions for batch operations.

The interface declarations correctly match the Lua command implementations for both claimMessageBatch and releaseMessageBatch.

Also applies to: 782-791

internal-packages/run-engine/src/batch-queue/index.ts (1)

111-111: The masterQueueLimit forwarding is correct.

The property is properly defined as optional in DRRConfig (line 192 of types.ts), and DRRScheduler correctly defaults undefined values to 1000 (line 39 of drr.ts). The forwarding at line 111 is safe and type-correct.

Comment on lines 135 to 142
// Enable two-stage processing with worker queues for better parallelism (when configured)
// Worker queues provide better concurrency by separating queue selection from message processing
workerQueue: options.workerQueueBlockingTimeoutSeconds
? {
enabled: true,
blockingTimeoutSeconds: options.workerQueueBlockingTimeoutSeconds,
}
: undefined,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check BatchQueueOptions type definition
echo "=== BatchQueueOptions type definition ==="
if [ -f "internal-packages/run-engine/src/batch-queue/types.ts" ]; then
  cat -n internal-packages/run-engine/src/batch-queue/types.ts | head -100
else
  echo "File not found, searching for it..."
  fd "types.ts" internal-packages/run-engine/src/batch-queue/
fi

Repository: triggerdotdev/trigger.dev

Length of output: 4666


🏁 Script executed:

#!/bin/bash
# Search for workerQueueBlockingTimeoutSeconds usage and documentation
echo "=== Searching for workerQueueBlockingTimeoutSeconds ==="
rg -n "workerQueueBlockingTimeoutSeconds" --type ts -B 3 -A 3

echo -e "\n=== Searching for blockingTimeoutSeconds in type definitions ==="
rg -n "blockingTimeoutSeconds" --type ts -B 3 -A 3

Repository: triggerdotdev/trigger.dev

Length of output: 8707


🏁 Script executed:

#!/bin/bash
# Check the actual file to see context around lines 135-142
echo "=== Context around lines 135-142 in index.ts ==="
sed -n '125,150p' internal-packages/run-engine/src/batch-queue/index.ts | cat -n

Repository: triggerdotdev/trigger.dev

Length of output: 1497


Use explicit undefined check to allow 0 as a valid timeout value.

The truthiness check on options.workerQueueBlockingTimeoutSeconds prevents 0 from being used as a valid timeout value. In timeout contexts (Redis BLPOP), 0 is valid and typically means block indefinitely. If the timeout is set via environment variable to "0", it will be coerced to the number 0 and incorrectly disable the worker queue.

Change the condition to:

-      workerQueue: options.workerQueueBlockingTimeoutSeconds
+      workerQueue: options.workerQueueBlockingTimeoutSeconds !== undefined
         ? {
             enabled: true,
             blockingTimeoutSeconds: options.workerQueueBlockingTimeoutSeconds,
           }
         : undefined,
🤖 Prompt for AI Agents
In @internal-packages/run-engine/src/batch-queue/index.ts around lines 135 -
142, The current truthiness check on options.workerQueueBlockingTimeoutSeconds
disables workerQueue when the value is 0; change the condition to explicitly
check for undefined (e.g., options.workerQueueBlockingTimeoutSeconds !==
undefined) so 0 is treated as a valid blockingTimeoutSeconds; update the object
creation for workerQueue (referencing workerQueue,
options.workerQueueBlockingTimeoutSeconds, and blockingTimeoutSeconds) to use
this explicit undefined check.

@claude
Copy link

claude bot commented Jan 7, 2026

Code Review: PR #2841 - fix(batch): optimize processing batch trigger v2

Thank you for this PR! This is a substantial optimization to the batch queue processing system, introducing batch claiming, worker queue consolidation, and improved concurrency handling. Here's my detailed review:


✅ Strengths

  1. Batch Message Claiming - The new claimBatch and releaseBatch methods in VisibilityManager are well-implemented with atomic Lua scripts, reducing Redis round trips significantly.

  2. Improved Concurrency Checking - The new getAvailableCapacity() method in ConcurrencyManager is more efficient than repeatedly calling canProcess() when claiming batches.

  3. Single Worker Queue Pattern - Consolidating all workers to use a single "worker-queue" simplifies the architecture and provides fairer distribution across consumers.

  4. DRR Batch Decrement - The recordProcessedBatch() method with drrDecrementDeficitBatch Lua script efficiently handles deficit updates for batch operations.

  5. Comprehensive Test Coverage - Excellent test coverage for the new functionality including claimBatch, releaseBatch, getAvailableCapacity, and recordProcessedBatch.

  6. Configurable Sharding - The new BATCH_QUEUE_SHARD_COUNT and BATCH_QUEUE_MASTER_QUEUE_LIMIT environment variables provide good horizontal scaling options.


⚠️ Potential Issues & Suggestions

1. Consumer Loop Change - Potential Tight Loop

Location: packages/redis-worker/src/fair-queue/index.ts:763-810

The consumer loop changed from setInterval to a while loop with conditional waiting. When there's work (hadWork = true), it immediately processes the next batch without any delay. While this improves throughput, it could cause CPU spin under sustained high load.

Suggestion: Consider adding a small yield (e.g., setImmediate or process.nextTick) even when there's work to prevent event loop starvation:

if (!hadWork) {
  // existing timeout logic
} else {
  // Yield to event loop briefly
  await new Promise(resolve => setImmediate(resolve));
}

2. Hardcoded Worker Queue ID

Location: packages/redis-worker/src/fair-queue/index.ts:936, 1043

The worker queue ID is hardcoded as "worker-queue" in two places. This works but violates DRY and makes it harder to change later.

Suggestion: Extract to a constant or configuration option.

3. Tenant Capacity Skip Tracking

Location: packages/redis-worker/src/fair-queue/index.ts:873

When a tenant is at capacity, the code increments tenant_capacity_skipped stat and breaks the inner loop, skipping remaining queues for that tenant. This is good optimization, but the stat name could be more descriptive.

Minor: The stat tenant_capacity_skipped is incremented but it's not clear if this tracks skipped tenants or skipped queues. Consider renaming to tenant_at_capacity_queue_skips for clarity.

4. Abort Handler Memory Leak Protection

Location: packages/redis-worker/src/fair-queue/index.ts:795-806

Good job handling the abort listener cleanup! The comment explains why manual removal is needed. This prevents memory leaks from accumulated listeners.

5. Error Message Check

Location: packages/redis-worker/src/fair-queue/index.ts:819

Changed from error.name === "AbortError" to error.message === "AbortError". This is intentional since you're throwing new Error("AbortError") on line 797, but it's unconventional. Typically abort errors have .name === "AbortError".

Suggestion: Consider using a custom error class or setting error.name:

const error = new Error("Aborted");
error.name = "AbortError";
reject(error);

6. Worker Queue Default Behavior Change

Location: packages/redis-worker/src/fair-queue/index.ts:307, 405

The workerQueue field is now set to undefined instead of options.queueId when worker queues are disabled. This appears intentional but the semantic change should be verified to ensure no downstream code relies on the previous behavior.

7. Lua Script table.getn Deprecation

Location: packages/redis-worker/src/fair-queue/visibility.ts:686

The Lua script uses table.getn(ARGV) which is deprecated in Lua 5.1+ (Redis uses LuaJIT which is 5.1 compatible). It still works but #ARGV is preferred.

Suggestion:

local numMessages = (#ARGV - 2) / 2

🔒 Security Considerations

No security concerns identified. The changes are internal queue processing optimizations with proper input validation already in place.


📊 Performance Considerations

  1. Positive: Batch claiming reduces Redis round trips from N to 1 per batch
  2. Positive: Single worker queue eliminates routing complexity
  3. Positive: Upfront capacity check prevents wasted claim operations
  4. Watch: The immediate loop continuation when work exists could increase CPU usage under sustained load
  5. Watch: The single worker queue pattern trades simplicity for potential contention at very high scale (documented in code comments)

🧪 Test Coverage

Test coverage is comprehensive:

  • claimBatch - multiple messages, empty queue, partial batch, corrupted messages
  • releaseBatch - multiple messages, empty array, master queue updates
  • getAvailableCapacity - single group, multiple groups, at capacity, no groups
  • recordProcessedBatch - normal decrement, floor at zero
  • ✅ Integration test for two-stage processing with concurrency limits

📝 Summary

This is a solid optimization PR that significantly improves batch processing efficiency. The code is well-structured, properly tested, and includes helpful comments explaining design decisions. The few suggestions above are minor improvements rather than blockers.

Recommendation: Approve with minor suggestions

@claude
Copy link

claude bot commented Jan 7, 2026

Code Review: fix(batch): optimize processing batch trigger v2

Summary

This PR refactors the FairQueue and BatchQueue to implement a two-stage message processing architecture. FairQueue now handles fair scheduling and routes messages to worker queues, while external consumers (like BatchQueue) run their own worker queue consumer loops. This is a significant architectural change that improves separation of concerns and enables more flexible scaling.


👍 Strengths

  1. Clean Architecture Separation: The two-stage design (master queue → worker queue) separates scheduling concerns from message processing, making the system more modular and testable.

  2. New VisibilityManager Component: The extracted VisibilityManager class (visibility.ts) is well-designed with:

    • Atomic Lua scripts for claim, heartbeat, release, and batch operations
    • Clear separation of concerns
    • Proper handling of corrupted messages
    • Good test coverage
  3. Batch Claiming: The addition of claimBatch and releaseBatch operations reduces Redis round-trips and improves throughput when processing multiple messages.

  4. Configurable Sharding: The new shardCount and masterQueueLimit options enable horizontal scaling of the master queue.

  5. Comprehensive Test Coverage: New test files for visibility management and batch operations cover important edge cases:

    • Corrupted message handling
    • Empty queue scenarios
    • Master queue updates after release
    • Multiple consecutive heartbeats
  6. Good Error Handling: The worker queue consumer loops properly handle abort signals and continue operation after transient errors.


⚠️ Potential Issues & Suggestions

1. Message Key Parsing Fragility (Medium Priority)

Location: batch-queue/index.ts:630-639, fair-queue/index.ts:989

The message key format "messageId:queueId" is parsed by finding the first colon. If a messageId ever contains a colon (e.g., if using nanoid or UUID variants), this parsing would fail silently:

const colonIndex = messageKey.indexOf(":");
if (colonIndex === -1) {
  this.logger.error("Invalid message key format", { messageKey });
  return false;
}
const messageId = messageKey.substring(0, colonIndex);
const queueId = messageKey.substring(colonIndex + 1);

Suggestion: Consider using lastIndexOf if queueIds are expected not to contain colons, or document the constraint that messageIds must not contain colons. Alternatively, use a delimiter that's less likely to appear in IDs (e.g., | or a null byte).

2. Missing Cleanup for Batched Span Manager (Low Priority)

Location: batch-queue/index.ts:667-669

When AbortError is thrown, cleanup is called but then returns immediately. However, the outer finally block also calls cleanup. This is harmless but redundant:

} catch (error) {
  if (error instanceof Error && error.name === "AbortError") {
    this.logger.debug("Worker queue consumer aborted", { loopId });
    this.batchedSpanManager.cleanup(loopId);  // Called here
    return;
  }
  throw error;
} finally {
  this.batchedSpanManager.cleanup(loopId);  // And here (unreachable for AbortError)
}

Suggestion: The early return makes the finally block unreachable for that path. Consider removing the duplicate cleanup in the catch block since finally always runs.

3. Worker Queue Resolver Called Without Full Message Data (Low Priority)

Location: fair-queue/index.ts:986

In #claimAndPushToWorkerQueue, the worker queue resolver receives message.payload (the InFlightMessage.payload, which is StoredMessage<TPayload>), not the message itself:

const workerQueueId = this.workerQueueResolver(message.payload);

This works because InFlightMessage.payload is the StoredMessage, but the type signature suggests it expects StoredMessage. This implicit reliance is correct but could be clearer.

4. Potential Memory Leak in Long-Running Consumers (Low Priority)

Location: fair-queue/index.ts:110, batch-queue/index.ts:74

The workerQueueConsumerLoops array accumulates promises but is only cleared on stop. If start() is called multiple times without stop(), the array could grow. The guard if (this.isRunning) return prevents this currently, but worth noting for future changes.

5. Error Type Check for AbortError (Low Priority)

Location: Multiple locations checking error.name === "AbortError"

Some places check error.message === "AbortError" while others check error.name === "AbortError". The abort signal throws DOMException with name === "AbortError", so checking name is correct. However, in fair-queue/index.ts:772:

if (error instanceof Error && error.message === "AbortError") {

This should be error.name === "AbortError" or handling a custom error thrown in the timeout handler.


🔒 Security Considerations

No security concerns identified. The changes are internal to the queue processing logic and don't introduce new attack surfaces.


📊 Performance Considerations

  1. Batch Claiming: The new batchClaimSize option (default 10) significantly reduces Redis round-trips. The batch Lua scripts are atomic and efficient.

  2. Blocking Pop: Using BLPOP with configurable timeout (workerQueueBlockingTimeoutSeconds) is efficient for worker queue consumption.

  3. Master Queue Limit: The masterQueueLimit option (default 1000) prevents loading too many queues into memory when many tenants are active.

  4. Potential Bottleneck: All batch items route to a single worker queue (BATCH_WORKER_QUEUE_ID). For very high throughput, this could become a bottleneck. The design allows for future extension to multiple worker queues based on tenant or other routing criteria.


🧪 Test Coverage

The PR includes good test coverage:

  • visibility.test.ts: Tests for heartbeat, claimBatch, and releaseBatch operations
  • drr.test.ts: Tests for DRR scheduler with masterQueueLimit
  • fairQueue.test.ts: Updated tests using the new TestFairQueueHelper pattern
  • raceConditions.test.ts: Updated concurrency tests
  • concurrency.test.ts: New tests for ConcurrencyManager.getAvailableCapacity

Suggestion: Consider adding tests for:

  • Worker queue consumer loop error recovery
  • Graceful shutdown with in-flight messages
  • Message key parsing edge cases (queueIds containing colons)

📝 Documentation

The code is well-documented with:

  • Clear class-level JSDoc comments explaining the two-stage architecture
  • Method-level documentation for public APIs
  • Comments explaining the reasoning for design decisions (e.g., single worker queue ID for BatchQueue)

✅ Verdict

This is a well-structured refactoring that improves the architecture of the queue system. The code is clean, well-tested, and follows good practices. The issues identified above are relatively minor and don't block merging.

Recommendation: Approve with minor suggestions to address before or after merge.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In @packages/redis-worker/src/fair-queue/index.ts:
- Line 986: The call to workerQueueResolver is passing message.payload instead
of the full StoredMessage, causing a type/runtime mismatch; update the
invocation to pass the complete message object (the StoredMessage instance
containing messageId, queueId, tenantId, payload, etc.) rather than
message.payload so workerQueueResolver receives the expected shape (same pattern
used in the enqueue methods and as defined by the StoredMessage type).

In @packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts:
- Around line 1081-1084: The test name is misleading; inside the describe
"two-stage processing with concurrency limits" update the redisTest invocation's
first argument (the test title string) to reflect the actual behavior being
validated (successful processing under concurrency limits) — e.g., change the
title passed to redisTest from "should release remaining claimed messages when
concurrency reservation fails" to "should process all messages respecting
concurrency limits in two-stage mode" (or "should complete batch processing
without leaking in-flight messages") so the test name matches the scenario.
🧹 Nitpick comments (1)
packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts (1)

22-210: Consider extracting TestFairQueueHelper to a shared test utility.

The TestFairQueueHelper class is duplicated across both raceConditions.test.ts and fairQueue.test.ts. Extracting it to a shared test utility file would reduce duplication and improve maintainability.

♻️ Suggested refactor

Create a new file packages/redis-worker/src/fair-queue/tests/helpers.ts:

import { createRedisClient, type RedisOptions } from "@internal/redis";
import {
  FairQueue,
  WorkerQueueManager,
  type FairQueueOptions,
  type StoredMessage,
  type FairQueueKeyProducer,
} from "../index.js";
import { z } from "zod";

export const TEST_WORKER_QUEUE_ID = "test-worker-queue";

export class TestFairQueueHelper<TSchema extends z.ZodTypeAny> {
  // ... (move the entire class implementation here)
}

Then import in both test files:

import { TestFairQueueHelper, TEST_WORKER_QUEUE_ID } from "./helpers.js";
📜 Review details

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2247b1f and ed25caa.

📒 Files selected for processing (6)
  • internal-packages/run-engine/src/batch-queue/index.ts
  • internal-packages/run-engine/src/batch-queue/types.ts
  • packages/redis-worker/src/fair-queue/index.ts
  • packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts
  • packages/redis-worker/src/fair-queue/tests/raceConditions.test.ts
  • packages/redis-worker/src/fair-queue/types.ts
🧰 Additional context used
📓 Path-based instructions (5)
**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

**/*.{ts,tsx}: Use types over interfaces for TypeScript
Avoid using enums; prefer string unions or const objects instead

Files:

  • internal-packages/run-engine/src/batch-queue/types.ts
  • packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts
  • packages/redis-worker/src/fair-queue/tests/raceConditions.test.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
  • packages/redis-worker/src/fair-queue/index.ts
  • packages/redis-worker/src/fair-queue/types.ts
**/*.{ts,tsx,js,jsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Use function declarations instead of default exports

Files:

  • internal-packages/run-engine/src/batch-queue/types.ts
  • packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts
  • packages/redis-worker/src/fair-queue/tests/raceConditions.test.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
  • packages/redis-worker/src/fair-queue/index.ts
  • packages/redis-worker/src/fair-queue/types.ts
**/*.{js,ts,jsx,tsx,json,md,css,scss}

📄 CodeRabbit inference engine (AGENTS.md)

Format code using Prettier

Files:

  • internal-packages/run-engine/src/batch-queue/types.ts
  • packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts
  • packages/redis-worker/src/fair-queue/tests/raceConditions.test.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
  • packages/redis-worker/src/fair-queue/index.ts
  • packages/redis-worker/src/fair-queue/types.ts
**/*.{test,spec}.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Use vitest for all tests in the Trigger.dev repository

Files:

  • packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts
  • packages/redis-worker/src/fair-queue/tests/raceConditions.test.ts
**/*.test.{ts,tsx,js,jsx}

📄 CodeRabbit inference engine (AGENTS.md)

**/*.test.{ts,tsx,js,jsx}: Test files should live beside the files under test and use descriptive describe and it blocks
Avoid mocks or stubs in tests; use helpers from @internal/testcontainers when Redis or Postgres are needed
Use vitest for unit tests

Files:

  • packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts
  • packages/redis-worker/src/fair-queue/tests/raceConditions.test.ts
🧠 Learnings (8)
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Control concurrency using the `queue` property with `concurrencyLimit` option

Applied to files:

  • internal-packages/run-engine/src/batch-queue/types.ts
  • packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts
  • packages/redis-worker/src/fair-queue/tests/raceConditions.test.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
  • packages/redis-worker/src/fair-queue/types.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger.config.ts : Configure OpenTelemetry instrumentations and exporters in trigger.config.ts for enhanced logging

Applied to files:

  • internal-packages/run-engine/src/batch-queue/types.ts
📚 Learning: 2025-11-14T16:03:06.917Z
Learnt from: matt-aitken
Repo: triggerdotdev/trigger.dev PR: 2681
File: apps/webapp/app/services/platform.v3.server.ts:258-302
Timestamp: 2025-11-14T16:03:06.917Z
Learning: In `apps/webapp/app/services/platform.v3.server.ts`, the `getDefaultEnvironmentConcurrencyLimit` function intentionally throws an error (rather than falling back to org.maximumConcurrencyLimit) when the billing client returns undefined plan limits. This fail-fast behavior prevents users from receiving more concurrency than their plan entitles them to. The org.maximumConcurrencyLimit fallback is only for self-hosted deployments where no billing client exists.

Applied to files:

  • packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts
📚 Learning: 2025-11-27T16:27:48.109Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-11-27T16:27:48.109Z
Learning: Applies to **/*.test.{ts,tsx,js,jsx} : Avoid mocks or stubs in tests; use helpers from `internal/testcontainers` when Redis or Postgres are needed

Applied to files:

  • packages/redis-worker/src/fair-queue/tests/raceConditions.test.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use `runs.subscribeToBatch()` to subscribe to changes for all runs in a batch

Applied to files:

  • internal-packages/run-engine/src/batch-queue/index.ts
📚 Learning: 2025-08-14T18:35:44.370Z
Learnt from: nicktrn
Repo: triggerdotdev/trigger.dev PR: 2390
File: apps/webapp/app/env.server.ts:764-765
Timestamp: 2025-08-14T18:35:44.370Z
Learning: The BoolEnv helper in apps/webapp/app/utils/boolEnv.ts uses z.preprocess with inconsistent default value types across the codebase - some usages pass boolean defaults (correct) while others pass string defaults (incorrect), leading to type confusion. The helper should enforce boolean-only defaults or have clearer documentation.

Applied to files:

  • internal-packages/run-engine/src/batch-queue/index.ts
📚 Learning: 2025-08-14T10:09:02.528Z
Learnt from: nicktrn
Repo: triggerdotdev/trigger.dev PR: 2390
File: internal-packages/run-engine/src/engine/index.ts:466-467
Timestamp: 2025-08-14T10:09:02.528Z
Learning: In the triggerdotdev/trigger.dev codebase, it's acceptable to pass `string | undefined` types directly to Prisma operations (both create and update). The codebase consistently uses this pattern and the team is comfortable with how Prisma handles undefined values.

Applied to files:

  • internal-packages/run-engine/src/batch-queue/index.ts
📚 Learning: 2025-11-27T16:26:58.661Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/webapp.mdc:0-0
Timestamp: 2025-11-27T16:26:58.661Z
Learning: Use `trigger.dev/redis-worker` for background job and worker system needs in the webapp and run engine

Applied to files:

  • internal-packages/run-engine/src/batch-queue/index.ts
🧬 Code graph analysis (1)
packages/redis-worker/src/fair-queue/index.ts (7)
packages/redis-worker/src/fair-queue/workerQueue.ts (1)
  • WorkerQueueManager (26-275)
packages/redis-worker/src/fair-queue/types.ts (1)
  • StoredMessage (62-79)
packages/redis-worker/src/fair-queue/visibility.ts (3)
  • shardId (523-532)
  • queueId (504-506)
  • messageId (508-510)
packages/redis-worker/src/fair-queue/keyProducer.ts (4)
  • masterQueueKey (29-31)
  • queueKey (37-39)
  • queueItemsKey (41-43)
  • inflightDataKey (61-63)
packages/redis-worker/src/fair-queue/concurrency.ts (1)
  • isAtCapacity (170-176)
packages/redis-worker/src/fair-queue/telemetry.ts (1)
  • FairQueueAttributes (18-29)
packages/redis-worker/src/fair-queue/masterQueue.ts (1)
  • queueId (195-197)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (25)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
  • GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
  • GitHub Check: Cursor Bugbot
  • GitHub Check: claude-review
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (15)
internal-packages/run-engine/src/batch-queue/types.ts (1)

126-128: LGTM! Configuration options are well-documented.

The new configuration fields are properly typed with clear documentation:

  • masterQueueLimit for controlling batch sizes from master queue
  • shardCount for horizontal scaling
  • workerQueueBlockingTimeoutSeconds enabling two-stage processing
  • Consumer trace rotation controls for better observability

All fields are optional with sensible defaults documented in comments.

Also applies to: 201-204, 225-228

internal-packages/run-engine/src/batch-queue/index.ts (3)

455-510: Excellent lifecycle management with proper idempotency and cleanup.

The refactored lifecycle methods properly handle:

  • Idempotent start/stop with isRunning guard
  • Graceful shutdown via AbortController
  • Sequential cleanup of all resources (FairQueue → WorkerQueueManager → CompletionTracker → Redis)
  • Safe waiting for worker loops with Promise.allSettled

595-671: Worker queue consumer loop is well-structured.

The implementation follows best practices:

  • Uses BatchedSpanManager for efficient tracing with rotation
  • Properly handles abort signals for graceful shutdown
  • Tracks metrics (empty iterations, invalid keys, messages processed)
  • Cleanup in finally block ensures span manager resources are released

677-879: Message handling properly integrates with worker queue architecture.

The refactored message handler correctly:

  • Retrieves in-flight message data from FairQueue storage
  • Calculates accurate queue time metrics from stored timestamp
  • Ensures message completion in all paths (success, failure, unexpected error)
  • Preserves idempotency through itemIndex tracking
  • Maintains proper completion callback error handling
packages/redis-worker/src/fair-queue/types.ts (4)

189-194: Good addition of batch processing support to scheduler interface.

The recordProcessedBatch method enables efficient batch updates to scheduler state, reducing Redis calls when processing multiple messages. Being optional, it allows backwards compatibility with a fallback to individual recordProcessed calls.


380-382: Well-designed batch processing configuration options.

The new optional fields support batch processing optimizations:

  • batchClaimSize controls batch claiming for efficiency
  • masterQueueLimit caps queue fetching across schedulers

All are optional with clear documentation, maintaining backwards compatibility.

Also applies to: 529-530, 582-582


353-358: Breaking change: Worker queue configuration now required.

The workerQueue field in FairQueueOptions is now required (was optional). This enforces the two-stage processing model where FairQueue routes messages to worker queues. All FairQueue instantiations in the codebase have been updated to provide this configuration.


292-297: Use type instead of interface and address type signature mismatch.

WorkerQueueOptions should use type instead of interface per TypeScript guidelines. Additionally, the resolveWorkerQueue type signature expects (message: StoredMessage<TPayload>) => string, but implementations use () => string. Line 986 passes only message.payload while lines 286 and 382 construct and pass the full StoredMessage object. Align the type definition with actual usage patterns across the codebase.

Likely an incorrect or invalid review comment.

packages/redis-worker/src/fair-queue/index.ts (7)

757-769: Memory leak fix appears correct.

The manual listener removal on line 765 addresses the past review concern about event listener accumulation. When the timeout fires, the abort listener is explicitly removed, preventing unbounded accumulation. The { once: true } option handles removal when abort fires first.


970-982: Batch release logic correctly addresses past review concern.

The implementation now properly releases all remaining unclaimed messages when concurrency reservation fails (lines 973-979). Using releaseBatch with claimedMessages.slice(i) ensures messages aren't stranded in the inflight state, addressing the critical issue flagged in previous reviews.


1002-1203: Public API methods are well-structured.

The new lifecycle methods (getMessageData, heartbeatMessage, completeMessage, releaseMessage, failMessage) provide a clean external consumer interface. The consistent pattern of retrieving message data, performing operations, releasing concurrency, and cleaning up caches is correct and maintainable.


840-848: Tenant capacity check is a good optimization.

Checking tenant capacity before attempting to claim messages prevents unnecessary work when the tenant is already at its concurrency limit. The early break correctly skips remaining queues for that tenant, reducing contention and improving efficiency.


867-892: Batch recording implementation is efficient.

The logic prefers the batch variant (recordProcessedBatch) when available, falling back to individual calls for backward compatibility. This approach minimizes scheduler overhead while maintaining compatibility with schedulers that don't implement batch operations.


923-932: Available capacity calculation is correct.

The logic properly determines the claim size based on available concurrency capacity, capping it at batchClaimSize. The cooloff increment when capacity is zero prevents repeated futile claim attempts, which is good for performance.


726-770: hadWork-based adaptive waiting is efficient.

The pattern of tracking whether work was performed and adjusting wait times accordingly (1ms when busy, consumerIntervalMs when idle) optimizes both throughput and CPU usage. This prevents busy-waiting while maintaining responsiveness when messages are available.

Also applies to: 787-904

// Determine worker queue
const workerQueueId = message.payload.workerQueue ?? queueId;
// Resolve which worker queue this message should go to
const workerQueueId = this.workerQueueResolver(message.payload);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Critical: Wrong parameter passed to workerQueueResolver.

The workerQueueResolver expects a StoredMessage (line 85), but message.payload is being passed instead of the full message object. This type mismatch will cause runtime errors since the resolver likely needs access to fields like messageId, queueId, and tenantId.

Compare with correct usage in enqueue methods (lines 285-295, 381-391) where the full message object is constructed and passed.

🐛 Proposed fix
-      // Resolve which worker queue this message should go to
-      const workerQueueId = this.workerQueueResolver(message.payload);
+      // Resolve which worker queue this message should go to
+      const workerQueueId = this.workerQueueResolver(message);
🤖 Prompt for AI Agents
In @packages/redis-worker/src/fair-queue/index.ts at line 986, The call to
workerQueueResolver is passing message.payload instead of the full
StoredMessage, causing a type/runtime mismatch; update the invocation to pass
the complete message object (the StoredMessage instance containing messageId,
queueId, tenantId, payload, etc.) rather than message.payload so
workerQueueResolver receives the expected shape (same pattern used in the
enqueue methods and as defined by the StoredMessage type).

Comment on lines +1081 to +1084
describe("two-stage processing with concurrency limits", () => {
redisTest(
"should release remaining claimed messages when concurrency reservation fails",
{ timeout: 30000 },
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Test name doesn't match the scenario being tested.

The test name "should release remaining claimed messages when concurrency reservation fails" suggests testing a failure/cleanup scenario, but the test actually validates successful processing of all messages under concurrency limits without simulating any reservation failure.

Consider renaming to:

  • "should process all messages respecting concurrency limits in two-stage mode"
  • "should complete batch processing without leaking in-flight messages"
🤖 Prompt for AI Agents
In @packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts around lines
1081 - 1084, The test name is misleading; inside the describe "two-stage
processing with concurrency limits" update the redisTest invocation's first
argument (the test title string) to reflect the actual behavior being validated
(successful processing under concurrency limits) — e.g., change the title passed
to redisTest from "should release remaining claimed messages when concurrency
reservation fails" to "should process all messages respecting concurrency limits
in two-stage mode" (or "should complete batch processing without leaking
in-flight messages") so the test name matches the scenario.

@claude
Copy link

claude bot commented Jan 7, 2026

PR Review: fix(batch): optimize processing batch trigger v2

Summary

This PR refactors the batch processing architecture to use a two-stage processing model where FairQueue handles fair scheduling and routes messages to worker queues, while BatchQueue runs its own consumer loops to process messages from those worker queues. This is a significant architectural change aimed at optimizing batch trigger processing.


🟢 Strengths

1. Clean Architectural Separation

The separation of concerns between FairQueue (scheduling/routing) and external consumers (processing) is well-designed. FairQueue now focuses on fair scheduling via DRR and message routing, while consumers handle actual processing. This makes the system more modular and easier to reason about.

2. Comprehensive Test Coverage

The PR includes extensive test updates and additions:

  • New tests in visibility.test.ts for batch claiming edge cases
  • Updated fairQueue.test.ts with TestFairQueueHelper that properly tests the new two-stage architecture
  • New tests for getAvailableCapacity in concurrency.test.ts
  • Tests for recordProcessedBatch in drr.test.ts
  • Race condition tests updated for the new architecture

3. Performance Optimizations

  • recordProcessedBatch() method reduces Redis calls when processing multiple messages
  • drrDecrementDeficitBatch Lua script for atomic batch deficit decrement
  • getAvailableCapacity() method with parallel fetching of current counts and limits
  • Configurable batchClaimSize for tuning throughput

4. Good Use of Atomic Operations

The Lua scripts for atomic operations are well-implemented:

  • claimMessageBatch for batch claiming
  • drrDecrementDeficitBatch for batch deficit updates
  • Proper handling of corrupted messages in batch claims

5. Configurable Sharding

New environment variables (BATCH_QUEUE_SHARD_COUNT, BATCH_QUEUE_MASTER_QUEUE_LIMIT, BATCH_QUEUE_WORKER_QUEUE_TIMEOUT_SECONDS) allow horizontal scaling configuration.


🟡 Suggestions for Improvement

1. Single Worker Queue Potential Bottleneck

In fair-queue/index.ts:990-996, all messages are routed to a single shared worker queue:

const workerQueueId = "worker-queue";

While the comment acknowledges this trade-off, consider documenting performance thresholds more precisely or adding metrics to monitor this potential bottleneck.

Suggestion: Add a gauge metric to track worker queue depth for proactive monitoring.

2. Error Handling in Consumer Loop

In batch-queue/index.ts, the worker queue consumer loop catches errors but continues silently:

} catch (error) {
  if (this.abortController.signal.aborted) {
    break;
  }
  this.logger.error("Worker queue consumer error", {
    loopId,
    error: error instanceof Error ? error.message : String(error),
  });
  this.batchedSpanManager.markForRotation(loopId);
}

Consider adding:

  • A counter metric for consumer errors
  • Optional circuit breaker pattern for repeated failures

3. Message Key Parsing

The message key format messageId:queueId with simple indexOf(":") parsing could be fragile if messageIds ever contain colons:

const colonIndex = messageKey.indexOf(":");
if (colonIndex === -1) {
  this.logger.error("Invalid message key format", { messageKey });
  return false;
}

Suggestion: Consider using a more robust delimiter or encoding scheme, or document the constraint that messageIds must not contain colons.

4. Memory Management for Cooloff States

Good that maxCooloffStatesSize is configurable, but clearing all entries when hitting the cap could cause temporary performance degradation:

if (this.queueCooloffStates.size >= this.maxCooloffStatesSize) {
  this.logger.warn("Cooloff states cache hit size cap, clearing all entries"...);
  this.queueCooloffStates.clear();
}

Suggestion: Consider LRU eviction instead of full clear.

5. TypeScript Strict Null Checks

The non-null assertions in getAvailableCapacity:

const available = Math.max(0, limits[i]! - currents[i]!);

While safe in context, could be more defensive.


🔴 Potential Issues

1. Removed consumerId Parameter

In visibility.ts, the claimMessageBatch Lua script no longer uses consumerId:

-local consumerId = ARGV[2]
-local deadline = tonumber(ARGV[3])
+local deadline = tonumber(ARGV[2])

Verify this doesn't break any debugging/tracing that relies on knowing which consumer claimed which messages.

2. Breaking Change in FairQueueOptions

The workerQueue option is now required (not optional):

workerQueue: WorkerQueueOptions<z.infer<TPayloadSchema>>;

This is a breaking change for existing consumers of FairQueue. Ensure this is documented in migration notes.


📊 Test Coverage Assessment

Area Coverage
Basic enqueue/process ✅ Good
Fair scheduling (DRR) ✅ Good
Visibility timeout/reclaim ✅ Good
Concurrency limiting ✅ Good
Batch operations ✅ Good
Worker queue integration ✅ Good
Race conditions ✅ Good
Error handling ⚠️ Could add more edge cases

📝 Minor Suggestions

  1. Documentation: The comment block explaining the two-stage architecture is helpful. Consider adding an architecture diagram to the README.

  2. Naming: BATCH_WORKER_QUEUE_ID = "batch-worker-queue" vs workerQueueId = "worker-queue" - consider consistent naming across the codebase.

  3. Magic Numbers: The 1ms delay when there's work (const waitMs = hadWork ? 1 : this.consumerIntervalMs) could be a named constant for clarity.


✅ Overall Assessment

This is a well-structured refactoring that improves the batch processing architecture. The code is clean, well-tested, and follows good practices for distributed systems (atomic operations, visibility timeouts, fair scheduling). The separation of scheduling from processing is a good architectural decision that will make the system more maintainable.

Recommendation: Approve with minor suggestions addressed.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In @packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts:
- Line 1094: The test title "should release remaining claimed messages when
concurrency reservation fails" is misleading relative to assertions that verify
successful processing of 5 messages with concurrency 2; update the test name
string in the test declaration (the describe/it/test call in fairQueue.test.ts)
to reflect the actual behavior such as "should process all messages respecting
concurrency limits in two-stage mode" or "should complete batch processing
without leaking messages under concurrency limits" so the test name matches the
validated scenario.
📜 Review details

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ed25caa and babb92d.

📒 Files selected for processing (1)
  • packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts
🧰 Additional context used
📓 Path-based instructions (5)
**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

**/*.{ts,tsx}: Use types over interfaces for TypeScript
Avoid using enums; prefer string unions or const objects instead

Files:

  • packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts
**/*.{ts,tsx,js,jsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Use function declarations instead of default exports

Files:

  • packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts
**/*.{test,spec}.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Use vitest for all tests in the Trigger.dev repository

Files:

  • packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts
**/*.{js,ts,jsx,tsx,json,md,css,scss}

📄 CodeRabbit inference engine (AGENTS.md)

Format code using Prettier

Files:

  • packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts
**/*.test.{ts,tsx,js,jsx}

📄 CodeRabbit inference engine (AGENTS.md)

**/*.test.{ts,tsx,js,jsx}: Test files should live beside the files under test and use descriptive describe and it blocks
Avoid mocks or stubs in tests; use helpers from @internal/testcontainers when Redis or Postgres are needed
Use vitest for unit tests

Files:

  • packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts
🧠 Learnings (3)
📚 Learning: 2025-11-27T16:27:48.109Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-11-27T16:27:48.109Z
Learning: Applies to **/*.test.{ts,tsx,js,jsx} : Avoid mocks or stubs in tests; use helpers from `internal/testcontainers` when Redis or Postgres are needed

Applied to files:

  • packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Control concurrency using the `queue` property with `concurrencyLimit` option

Applied to files:

  • packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts
📚 Learning: 2025-11-14T16:03:06.917Z
Learnt from: matt-aitken
Repo: triggerdotdev/trigger.dev PR: 2681
File: apps/webapp/app/services/platform.v3.server.ts:258-302
Timestamp: 2025-11-14T16:03:06.917Z
Learning: In `apps/webapp/app/services/platform.v3.server.ts`, the `getDefaultEnvironmentConcurrencyLimit` function intentionally throws an error (rather than falling back to org.maximumConcurrencyLimit) when the billing client returns undefined plan limits. This fail-fast behavior prevents users from receiving more concurrency than their plan entitles them to. The org.maximumConcurrencyLimit fallback is only for self-hosted deployments where no billing client exists.

Applied to files:

  • packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (25)
  • GitHub Check: Cursor Bugbot
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
  • GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: claude-review
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (1)
packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts (1)

22-210: LGTM! Well-structured test helper for two-stage processing.

The TestFairQueueHelper effectively orchestrates FairQueue and WorkerQueueManager for testing. The consumer loop properly handles message key parsing, retrieves stored messages, and delegates lifecycle operations (heartbeat, complete, release, fail) to the underlying FairQueue.

@claude
Copy link

claude bot commented Jan 7, 2026

PR Review: fix(batch): optimize processing batch trigger v2

I've reviewed this pull request which introduces significant architectural changes to the batch queue processing system. Here's my detailed feedback:


📋 Summary

This PR refactors the FairQueue system from a direct-processing model to a two-stage processing architecture where:

  1. FairQueue claims messages from queues and pushes them to worker queues
  2. External consumers (BatchQueue) are responsible for consuming from worker queues

Key changes include:

  • New VisibilityManager class for handling message visibility timeouts
  • Worker queue is now always enabled (no longer optional)
  • Batch claiming support (claimBatch, releaseBatch)
  • New configuration options for sharding and master queue limits
  • Comprehensive test coverage for the new architecture

✅ Strengths

  1. Solid Test Coverage: The PR includes comprehensive tests:

    • visibility.test.ts: Tests for heartbeat, batch claim/release operations
    • raceConditions.test.ts: Extensive race condition and concurrency testing
    • fairQueue.test.ts: Updated to work with the new architecture
    • drr.test.ts: Tests for DRR scheduler with new masterQueueLimit
  2. Clean Separation of Concerns: The VisibilityManager is a well-designed class that handles:

    • Atomic message claiming via Lua scripts
    • Heartbeat extension
    • Message release and reclaim
  3. Atomic Operations: Proper use of Lua scripts for atomic Redis operations (e.g., claimMessageBatch, releaseMessageBatch)

  4. Configuration Flexibility: New env variables allow tuning:

    • BATCH_QUEUE_SHARD_COUNT
    • BATCH_QUEUE_MASTER_QUEUE_LIMIT
    • BATCH_QUEUE_WORKER_QUEUE_ENABLED
    • BATCH_QUEUE_WORKER_QUEUE_TIMEOUT_SECONDS

⚠️ Potential Issues & Suggestions

1. Breaking Change: workerQueue is now required

packages/redis-worker/src/fair-queue/types.ts:358

workerQueue: WorkerQueueOptions<z.infer<TPayloadSchema>>;

The workerQueue option changed from optional to required. While the tests are updated, any external consumers of FairQueue will break. Consider:

  • Adding migration notes to the PR description
  • Providing a default worker queue resolver if possible

2. Potential Resource Leak in WorkerQueueManager.blockingPop

packages/redis-worker/src/fair-queue/workerQueue.ts:102-149

The blockingPop method creates a duplicate Redis connection for each blocking pop but only removes the abort listener on success. If an error occurs before the abort handler is removed, the listener remains attached:

if (signal) {
  const cleanup = () => {
    blockingClient.disconnect();
  };
  signal.addEventListener("abort", cleanup, { once: true });
  // cleanup is never explicitly removed on success
}

Consider wrapping the entire try/finally to ensure proper cleanup:

const cleanup = signal ? () => blockingClient.disconnect() : null;
try {
  if (cleanup) signal.addEventListener("abort", cleanup, { once: true });
  // ... existing logic
} finally {
  if (cleanup) signal.removeEventListener("abort", cleanup);
  await blockingClient.quit().catch(() => {});
}

3. Message Key Parsing Fragility

internal-packages/run-engine/src/batch-queue/index.ts:647-656

The message key format "messageId:queueId" is parsed using indexOf(":"), but if messageId contains a colon, this would break:

const colonIndex = messageKey.indexOf(":");
const messageId = messageKey.substring(0, colonIndex);
const queueId = messageKey.substring(colonIndex + 1);

This appears safe assuming message IDs are UUIDs, but consider documenting this assumption or using a safer delimiter.

4. Concurrent Connection Creation

packages/redis-worker/src/fair-queue/workerQueue.ts:102

Each blockingPop call creates a new Redis connection (this.redis.duplicate()). Under high load, this could create many connections. Consider:

  • Connection pooling for blocking operations
  • Rate limiting connection creation
  • Documenting expected connection usage

5. Missing Error Context in BatchQueue Worker Loop

internal-packages/run-engine/src/batch-queue/index.ts:672-675

When encountering invalid message key format, the error is logged but processing continues:

if (colonIndex === -1) {
  this.logger.error("Invalid message key format", { messageKey });
  this.batchedSpanManager.incrementStat(loopId, "invalid_message_keys");
  return false;
}

Consider adding metrics/alerts for this condition as it indicates data corruption.

6. Potential Memory Accumulation in batchedSpanManager

The BatchedSpanManager tracks stats per loop ID. Ensure there's cleanup when loops terminate to prevent memory leaks over long-running processes.


🔍 Performance Considerations

  1. Batch Claiming: The new claimBatch operation (batchClaimSize config) should improve throughput by reducing Redis round-trips.

  2. Sharding: The shardCount option enables horizontal scaling of master queues, which is good for high-throughput scenarios.

  3. Blocking Pop Timeout: The workerQueueBlockingTimeoutSeconds (default 10s) is reasonable, but consider documenting the trade-off between latency and connection usage.

  4. Master Queue Limit: The new masterQueueLimit (default 1000) in DRR scheduler prevents processing too many queues per iteration, which is good for predictable latency.


🔐 Security Considerations

  • No obvious security concerns. Redis operations are properly parameterized through Lua scripts, preventing injection attacks.
  • The environment variables are properly typed with Zod validation.

📝 Minor Suggestions

  1. Documentation: Consider adding JSDoc comments explaining the two-stage architecture at the top of FairQueue class.

  2. Type Safety: In VisibilityManager.claimBatch, the loop at line 174 could benefit from explicit null checks:

    for (let i = 0; i < result.length; i += 2) {
      const messageId = result[i];
      const payloadJson = result[i + 1];
      if (!messageId || !payloadJson) continue; // ✅ Already done
  3. Consistent Naming: The test helper class TestFairQueueHelper is duplicated in multiple test files. Consider extracting to a shared test utility.


✅ Verdict

This is a well-structured refactoring that improves the batch processing architecture. The test coverage is comprehensive, and the atomic Redis operations are properly implemented.

Recommended to merge after addressing:

  1. The abort listener cleanup in blockingPop
  2. Documenting the breaking change for workerQueue being required

🤖 Generated with Claude Code

@claude
Copy link

claude bot commented Jan 7, 2026

PR Review: fix(batch): optimize processing batch trigger v2

Summary

This PR significantly refactors the batch queue and FairQueue architecture to improve performance through a two-stage processing model. The changes introduce horizontal scaling capabilities via sharding and worker queues.

Key Architectural Changes

  1. Two-Stage Processing Model: FairQueue now routes messages to worker queues, with BatchQueue running its own consumer loops to process messages
  2. VisibilityManager Extraction: Message visibility management is now a separate, well-structured class with atomic Lua scripts
  3. New Batch Claiming: Added claimBatch and releaseBatch for more efficient message processing
  4. Configurable Sharding: New shardCount and masterQueueLimit options for horizontal scaling
  5. Worker Queue Integration: FairQueue always routes to worker queues (previously optional)

✅ Strengths

Code Quality

  • Excellent separation of concerns: The VisibilityManager class is well-designed with clear responsibilities for claim, heartbeat, release, and reclaim operations
  • Atomic Lua scripts: All critical operations (claimMessage, claimMessageBatch, releaseMessage, releaseMessageBatch, heartbeat) use Lua scripts for atomicity, preventing race conditions
  • Consistent error handling: Proper cleanup of corrupted messages (e.g., invalid JSON parsing)
  • Good use of TypeScript: Proper type declarations for Redis custom commands via module augmentation

Performance

  • Batch operations: The new claimBatch and releaseBatch reduce Redis round trips significantly
  • Configurable limits: masterQueueLimit prevents fetching too many queues at once (default 1000)
  • Blocking pop with timeout: Worker queue consumers use efficient BLPOP with configurable timeout
  • Parallel capacity checks: ConcurrencyManager.getAvailableCapacity fetches limits and currents in parallel

Test Coverage

  • Comprehensive visibility tests: New visibility.test.ts covers heartbeat, batch claim, batch release, and corrupted message handling
  • New concurrency tests: Tests for getAvailableCapacity with single/multi group scenarios
  • DRR scheduler tests: Coverage for recordProcessedBatch and deficit management
  • Race condition tests: Extensive testing for concurrent operations, shutdown, and atomic consistency

⚠️ Potential Issues

1. Breaking Change: Worker Queue is Now Required

// FairQueueOptions - workerQueue is no longer optional
workerQueue: {
  resolveWorkerQueue: (message: StoredMessage<z.infer<TPayloadSchema>>) => string;
}

Impact: Any existing code that didn't configure worker queues will break.

Recommendation: Document this breaking change clearly or provide a migration path.

2. Message Key Parsing Assumption (batch-queue/index.ts:640-649)

const colonIndex = messageKey.indexOf(":");
if (colonIndex === -1) {
  this.logger.error("Invalid message key format", { messageKey });
  this.batchedSpanManager.incrementStat(loopId, "invalid_message_keys");
  return false;
}

Issue: If the messageId itself contains a colon, parsing will be incorrect since only the first colon is used as delimiter.

Recommendation: Consider using a different delimiter or a fixed-length prefix for the messageId.

3. Unused workerQueueId Parameter in Enqueue

// In enqueue method, workerQueueId is computed but passed as metadata key, not used:
const storedMessage: StoredMessage<z.infer<TPayloadSchema>> = {
  // ...
  workerQueueId: this.workerQueueResolver
    ? this.workerQueueResolver({
        // ...
      })
    : undefined,  // Always undefined since resolver is required now
};

Impact: This computation may be unnecessary since workerQueueResolver is always defined.

4. table.getn is Deprecated in Lua 5.1+

-- In releaseMessageBatch Lua script (visibility.ts:686)
local numMessages = (table.getn(ARGV) - 2) / 2

Recommendation: Use #ARGV instead of table.getn(ARGV) for better compatibility.

5. Potential Memory Leak in Loop Tracking

// BatchQueue maintains arrays that grow during operation:
private workerQueueConsumerLoops: Promise<void>[] = [];

Observation: These are cleared on stop, but if stop() is never called properly, references persist.


🔒 Security Considerations

  • No SQL/Command Injection: All Redis operations use proper parameterization through ioredis
  • No sensitive data exposure: Logging doesn't expose sensitive payload contents
  • Proper timeout handling: Visibility timeouts and blocking operations have configurable limits

📊 Performance Considerations

Positive

  • Batch claiming reduces Redis round trips from N to 1 for N messages
  • masterQueueLimit prevents memory issues from fetching too many queues
  • Parallel processing of concurrency checks

Areas to Monitor

  • Memory usage: Storing message data in inflightDataKey hash could grow large under high load
  • Blocking pop timeout: Default 10 seconds might be too long for some use cases, but is configurable

📝 Suggestions for Improvement

  1. Document the breaking change for workerQueue being required
  2. Consider using UUID format for messageIds to avoid colon collision issues
  3. Replace table.getn with #ARGV in Lua scripts
  4. Add metrics for batch claim/release operations to track efficiency
  5. Consider graceful degradation if worker queue is unavailable

Verdict

This is a well-architected performance optimization that introduces proper separation of concerns, atomic operations, and comprehensive test coverage. The changes are significant but carefully implemented.

Recommended: Approve with minor fixes for the Lua table.getn deprecation and documentation of breaking changes.

🤖 Generated with Claude Code

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
packages/redis-worker/src/fair-queue/workerQueue.ts (1)

114-120: Consider simplifying the redundant condition check.

Since cleanup is only non-null when signal is truthy (line 106), the condition if (signal && cleanup) could be simplified to if (cleanup). The same applies to line 153.

♻️ Proposed simplification
-      if (signal && cleanup) {
+      if (cleanup) {
         signal.addEventListener("abort", cleanup, { once: true });
 
         if (signal.aborted) {

And in the finally block:

-      if (cleanup && signal) {
+      if (cleanup) {
         signal.removeEventListener("abort", cleanup);
       }
📜 Review details

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c8a1df0 and 58a7bca.

📒 Files selected for processing (1)
  • packages/redis-worker/src/fair-queue/workerQueue.ts
🧰 Additional context used
📓 Path-based instructions (3)
**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

**/*.{ts,tsx}: Use types over interfaces for TypeScript
Avoid using enums; prefer string unions or const objects instead

Files:

  • packages/redis-worker/src/fair-queue/workerQueue.ts
**/*.{ts,tsx,js,jsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Use function declarations instead of default exports

Files:

  • packages/redis-worker/src/fair-queue/workerQueue.ts
**/*.{js,ts,jsx,tsx,json,md,css,scss}

📄 CodeRabbit inference engine (AGENTS.md)

Format code using Prettier

Files:

  • packages/redis-worker/src/fair-queue/workerQueue.ts
🧬 Code graph analysis (1)
packages/redis-worker/src/fair-queue/workerQueue.ts (3)
packages/redis-worker/src/fair-queue/telemetry.ts (1)
  • cleanup (705-708)
internal-packages/run-engine/src/batch-queue/completionTracker.ts (1)
  • cleanup (295-308)
internal-packages/run-engine/src/run-queue/index.ts (1)
  • blockingClient (1962-1966)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (25)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
  • GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: Cursor Bugbot
  • GitHub Check: claude-review
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (2)
packages/redis-worker/src/fair-queue/workerQueue.ts (2)

104-110: LGTM! Proper cleanup function hoisting.

The cleanup function is correctly hoisted outside the try block, making it accessible in the finally block. This is essential for preventing listener accumulation.


151-159: Excellent fix for the memory leak!

The finally block correctly removes the abort listener, preventing accumulation on the AbortSignal. The comment clearly explains why manual cleanup is necessary despite { once: true }. The same pattern appears in fair-queue/index.ts and already has proper cleanup in place, so no similar issues remain in the codebase.

@claude
Copy link

claude bot commented Jan 7, 2026

PR Review: fix(batch): optimize processing batch trigger v2

Summary

This PR significantly refactors the FairQueue and BatchQueue systems to implement a two-stage processing model with batch claiming and worker queue routing. The changes represent a substantial architectural improvement for horizontal scaling and efficiency.


Code Quality and Best Practices ✅

Strengths:

  1. Clean architectural separation - The extraction of VisibilityManager as a dedicated component is excellent. This follows the single responsibility principle and makes the code more testable.

  2. Atomic Lua scripts - The Lua scripts for claimMessageBatch, releaseMessageBatch, and DRR operations ensure atomicity and reduce round-trips to Redis.

  3. Consistent error handling - The code properly handles edge cases like corrupted JSON messages in claimBatch (visibility.ts:192-200).

  4. Good use of TypeScript - Proper typing throughout, with StoredMessage<TPayload> generics.

  5. Well-documented changes - Clear comments explaining the two-stage flow and worker queue responsibilities.

Minor suggestions:

  • Consider adding JSDoc comments to the new public methods in VisibilityManager for better API documentation.

Potential Bugs or Issues ⚠️

  1. Member parsing in #parseMember (visibility.ts:512-520):

    #parseMember(member: string): { messageId: string; queueId: string } {
      const colonIndex = member.indexOf(":");
      if (colonIndex === -1) {
        return { messageId: member, queueId: "" };
      }

    If queueId contains colons (e.g., tenant:t1:queue:name), only the first colon is used to split. This appears intentional since messageId is a UUID without colons, but worth verifying this assumption holds.

  2. Redis connection duplication in blockingPop (workerQueue.ts:102):

    const blockingClient = this.redis.duplicate();

    While necessary for BLPOP, creating a new connection per call could lead to connection pool exhaustion under high load. Consider connection pooling or reusing blocking clients.

  3. Race condition potential in batch claiming (batch-queue/index.ts:630-654):
    The processItemCallback check happens before the blocking pop, but the callback could be unset between the check and the processing. The current continue fallback handles this gracefully, but it's worth noting.

  4. Abort error handling (batch-queue/index.ts:665-669):

    if (error instanceof Error && error.name === "AbortError") {

    Verify that the actual abort error from Redis has name === "AbortError". Depending on the Redis client version, this might be different.


Performance Considerations 🚀

Positive:

  1. Batch claiming (claimBatch, releaseBatch) reduces Redis round-trips significantly. The batchClaimSize default of 10 seems reasonable.

  2. Worker queue blocking pop - Using BLPOP eliminates polling overhead and provides instant response when messages arrive.

  3. Sharding support - The BATCH_QUEUE_SHARD_COUNT config enables horizontal scaling of the master queue.

  4. DRR scheduler optimization - The recordProcessedBatch method reduces individual Redis calls when processing multiple messages.

  5. Configurable masterQueueLimit - Prevents the scheduler from fetching unbounded data from large queues.

Considerations:

  1. Memory usage - The batchedSpanManager maintains state per consumer loop. Ensure cleanupAll() is called on shutdown to prevent memory leaks.

  2. Default timeout - BATCH_QUEUE_WORKER_QUEUE_TIMEOUT_SECONDS defaults to 10s. This is a reasonable trade-off between responsiveness and efficiency.


Security Concerns 🔒

No significant security concerns identified. The changes don't introduce new attack vectors:

  • No user input is directly used in Redis keys
  • Lua scripts don't execute arbitrary code
  • Connection handling follows established patterns

Test Coverage ✅

Excellent test additions:

  1. visibility.test.ts (353 lines) - Comprehensive tests for:

    • Heartbeat operations (success, failure, after complete)
    • Batch claiming (atomicity, empty queues, partial batches, corrupted messages)
    • Batch releasing (atomicity, master queue updates)
  2. concurrency.test.ts (161 lines) - Good coverage for:

    • getAvailableCapacity across single and multiple groups
    • KeyPrefix handling (important for multi-tenant scenarios)
  3. drr.test.ts (58 lines) - Tests for the new recordProcessedBatch method.

  4. raceConditions.test.ts - Updated tests reflect the new two-stage architecture.

Suggestions:

  • Consider adding integration tests that exercise the full flow: enqueue → FairQueue claim → worker queue push → BatchQueue consume → complete
  • Add tests for error scenarios during worker queue consumer loop shutdown

Breaking Changes ⚠️

This PR introduces breaking changes to FairQueueOptions:

  1. workerQueue is now required (no longer optional with enabled flag)
  2. Removed onMessage handler from FairQueue - consumers must now manage their own worker queue consumption

This is documented implicitly through the code changes, but should be called out in release notes.


Overall Assessment

This is a well-executed refactoring that improves scalability and separation of concerns. The batch operations will significantly improve throughput for high-volume batch processing scenarios.

Recommendation: ✅ Approve with minor suggestions

The changes are solid and the test coverage is good. The minor issues noted above are worth addressing but not blockers.


🤖 Generated with Claude Code

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In @internal-packages/run-engine/src/batch-queue/index.ts:
- Around line 703-704: The queue time calculation using Date.now() -
storedMessage.timestamp is measuring time from batch creation
(storedMessage.timestamp is set from meta.createdAt + itemIndex) not the
per-item enqueue moment, which makes itemQueueTimeHistogram.record inaccurate
for streaming batches; to fix either (a) record and store a real per-item
enqueue timestamp when enqueueBatchItem is called (e.g., attach enqueueAt to
each storedMessage item and use that for queue time calculation before calling
itemQueueTimeHistogram.record), or (b) if you intentionally want batch-creation
latency, rename/document the metric to reflect “time since batch creation” and
keep using storedMessage.timestamp; update references to
storedMessage.timestamp, enqueueBatchItem, and itemQueueTimeHistogram.record
accordingly.
- Around line 683-688: When storedMessage is null after calling
this.fairQueue.getMessageData(messageId, queueId) the code currently logs and
returns without cleaning up visibility timeouts; change the branch so that after
logging the error (logger.error(...)) you call await
this.fairQueue.completeMessage(messageId, queueId) before returning. This
mirrors the other error paths and ensures visibilityManager.complete runs even
when storedMessage is missing; update the block around storedMessage, keeping
the log message and adding the await this.fairQueue.completeMessage call and
then return.
🧹 Nitpick comments (1)
internal-packages/run-engine/src/batch-queue/index.ts (1)

634-640: Consider more robust message key parsing.

The indexOf approach works but could be more explicit about expected format. If message keys could contain multiple colons, this would split at the first colon which may or may not be correct.

♻️ Alternative parsing approach
-              const colonIndex = messageKey.indexOf(":");
-              if (colonIndex === -1) {
-                this.logger.error("Invalid message key format", { messageKey });
-                this.batchedSpanManager.incrementStat(loopId, "invalid_message_keys");
-                return false;
-              }
-
-              const messageId = messageKey.substring(0, colonIndex);
-              const queueId = messageKey.substring(colonIndex + 1);
+              const parts = messageKey.split(":", 2);
+              if (parts.length !== 2) {
+                this.logger.error("Invalid message key format, expected 'messageId:queueId'", { 
+                  messageKey,
+                  expected: "messageId:queueId" 
+                });
+                this.batchedSpanManager.incrementStat(loopId, "invalid_message_keys");
+                return false;
+              }
+
+              const [messageId, queueId] = parts;
📜 Review details

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 58a7bca and 51fb53a.

📒 Files selected for processing (1)
  • internal-packages/run-engine/src/batch-queue/index.ts
🧰 Additional context used
📓 Path-based instructions (3)
**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

**/*.{ts,tsx}: Use types over interfaces for TypeScript
Avoid using enums; prefer string unions or const objects instead

Files:

  • internal-packages/run-engine/src/batch-queue/index.ts
**/*.{ts,tsx,js,jsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Use function declarations instead of default exports

Files:

  • internal-packages/run-engine/src/batch-queue/index.ts
**/*.{js,ts,jsx,tsx,json,md,css,scss}

📄 CodeRabbit inference engine (AGENTS.md)

Format code using Prettier

Files:

  • internal-packages/run-engine/src/batch-queue/index.ts
🧠 Learnings (4)
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use `runs.subscribeToBatch()` to subscribe to changes for all runs in a batch

Applied to files:

  • internal-packages/run-engine/src/batch-queue/index.ts
📚 Learning: 2025-08-14T18:35:44.370Z
Learnt from: nicktrn
Repo: triggerdotdev/trigger.dev PR: 2390
File: apps/webapp/app/env.server.ts:764-765
Timestamp: 2025-08-14T18:35:44.370Z
Learning: The BoolEnv helper in apps/webapp/app/utils/boolEnv.ts uses z.preprocess with inconsistent default value types across the codebase - some usages pass boolean defaults (correct) while others pass string defaults (incorrect), leading to type confusion. The helper should enforce boolean-only defaults or have clearer documentation.

Applied to files:

  • internal-packages/run-engine/src/batch-queue/index.ts
📚 Learning: 2025-11-27T16:26:58.661Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/webapp.mdc:0-0
Timestamp: 2025-11-27T16:26:58.661Z
Learning: Use `trigger.dev/redis-worker` for background job and worker system needs in the webapp and run engine

Applied to files:

  • internal-packages/run-engine/src/batch-queue/index.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Control concurrency using the `queue` property with `concurrencyLimit` option

Applied to files:

  • internal-packages/run-engine/src/batch-queue/index.ts
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (25)
  • GitHub Check: Cursor Bugbot
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
  • GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: claude-review
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (5)
internal-packages/run-engine/src/batch-queue/index.ts (5)

717-722: Defensive message completion is correct.

Completing the message even when processItemCallback is not set prevents queue blocking. This is the right approach for defensive error handling.


504-514: Past review comment has been addressed.

The missing batchedSpanManager.cleanupAll() call flagged in a previous review has been added at line 508. This ensures proper cleanup of tracing spans during shutdown.


54-202: Well-structured worker-queue architecture.

The introduction of a single shared BATCH_WORKER_QUEUE_ID with WorkerQueueManager and BatchedSpanManager provides a clean two-stage consumption model. FairQueue handles fair scheduling across environments, while BatchQueue's consumer loops process from the worker queue with proper tracing and lifecycle management.

The initialization sequence is correct:

  1. FairQueue for scheduling and routing
  2. WorkerQueueManager for worker queue consumption
  3. BatchedSpanManager for per-loop tracing

455-478: Lifecycle management is idempotent and well-coordinated.

The start() method correctly:

  • Guards against double-start with isRunning check
  • Creates fresh AbortController
  • Starts FairQueue's master consumers
  • Spawns worker queue consumer loops

This follows proper initialization order and enables clean shutdown via the abort signal.


863-868: Idempotency protection is in place for tracking, but completeMessage coordination remains a concern.

The recordSuccess and recordFailure methods use atomic Lua scripts with itemIndex as the idempotency key to prevent double-counting on redelivery. A SADD operation only increments the processed counter if the item hasn't been recorded before, ensuring idempotent behavior.

However, completeMessage (which finalizes the message in FairQueue after recording success/failure) does not employ explicit atomic coordination. If completeMessage fails after the counter has been incremented, the item will be counted as processed but the message won't be marked complete in FairQueue, risking redelivery. While the counter increment is protected from double-counting, the lack of atomic coordination between recording and completion creates a window where the batch could be redelivered with an inconsistent state.

@claude
Copy link

claude bot commented Jan 7, 2026

PR Review: fix(batch): optimize processing batch trigger v2

This PR implements significant improvements to the BatchQueue system by adding a full two-phase dequeue process in FairQueue and moving worker queue consumption responsibility to BatchQueue. Overall, this is a well-designed architectural improvement with good test coverage.


✅ Strengths

Architecture & Design:

  • The two-phase processing model (claim → worker queue → process) is a solid architectural pattern for fair scheduling
  • Clean separation of concerns: FairQueue handles fair scheduling/routing, BatchQueue handles consumption
  • Good use of atomic Lua scripts for Redis operations (claimBatch, releaseBatch, releaseMessageBatch)
  • The getAvailableCapacity() method in ConcurrencyManager is a smart addition for batch claiming optimization

Configuration Flexibility:

  • Nice addition of BATCH_QUEUE_SHARD_COUNT, BATCH_QUEUE_MASTER_QUEUE_LIMIT, and BATCH_QUEUE_WORKER_QUEUE_ENABLED env vars for horizontal scaling control
  • The ability to disable worker queue processing via BATCH_QUEUE_WORKER_QUEUE_ENABLED provides a good escape hatch

Test Coverage:

  • Comprehensive new test files for visibility (visibility.test.ts) and concurrency (concurrency.test.ts)
  • Good edge case coverage: corrupted messages, empty queues, partial batches
  • The keyPrefix handling tests in concurrency.test.ts are particularly valuable for catching prefix issues

Code Quality:

  • Consistent use of TypeScript types and proper error handling
  • Good logging and telemetry instrumentation with batched span management
  • Clear code organization with section comments (e.g., // ============================================================================)

⚠️ Potential Issues & Suggestions

1. Possible Race Condition in Worker Queue Consumer (Medium)
internal-packages/run-engine/src/batch-queue/index.ts:681-689

async #handleMessage(consumerId: string, messageId: string, queueId: string): Promise<void> {
  const storedMessage = await this.fairQueue.getMessageData(messageId, queueId);

  if (!storedMessage) {
    this.logger.error("Message not found in in-flight data", { messageId, queueId });
    await this.fairQueue.completeMessage(messageId, queueId);
    return;
  }

When a message is not found, calling completeMessage on a non-existent message is fine, but consider whether this could indicate a more serious issue (visibility timeout expiry during processing). You might want to track this metric separately from normal completions.

2. Duplicate getAvailableCapacity Implementation (Low)
The getAvailableCapacity method exists in both concurrency.ts:121-154 and also appears duplicated at concurrency.ts:114-153. The implementation at line 114 seems to be orphaned code. Consider removing the duplicate.

3. AbortController Recreation Pattern (Low)
packages/redis-worker/src/fair-queue/index.ts:624-625

this.isRunning = true;
this.abortController = new AbortController();

This is fine, but ensure any previous abort controller listeners are properly cleaned up. The pattern looks correct as stop() calls abort() first.

4. Missing Type Safety in Lua Script Array Length Calculation (Low)
packages/redis-worker/src/fair-queue/visibility.ts:686

local numMessages = (table.getn(ARGV) - 2) / 2

Note: table.getn is deprecated in Lua 5.1+ in favor of #ARGV. While Redis still supports it, consider using #ARGV for future compatibility.

5. Error Handling in Message Key Parsing (Low)
internal-packages/run-engine/src/batch-queue/index.ts:634-643

const colonIndex = messageKey.indexOf(":");
if (colonIndex === -1) {
  this.logger.error("Invalid message key format", { messageKey });
  this.batchedSpanManager.incrementStat(loopId, "invalid_message_keys");
  return false;
}

This handles the error case well, but the message format "messageId:queueId" could be problematic if either messageId or queueId contains a colon. Consider using a delimiter that's less likely to appear in these IDs (e.g., \x00) or encoding/escaping.

6. Hardcoded Timeout Values (Minor)
internal-packages/run-engine/src/batch-queue/index.ts:147-148

visibilityTimeoutMs: 60_000, // 1 minute for batch item processing

Consider making this configurable via options, similar to other timeout parameters.


🔒 Security Considerations

No significant security concerns identified. The code:

  • Properly validates inputs where needed
  • Uses parameterized Redis commands
  • Avoids command injection in Lua scripts

📊 Performance Considerations

Positive:

  • Batch claiming (claimBatch) reduces Redis round trips
  • The masterQueueLimit parameter prevents fetching too many queues at once
  • Skipping tenants at capacity (isAtCapacity check in DRR) is a good optimization

To Watch:

  • The getAvailableCapacity() method makes multiple parallel Redis calls per queue. In high-throughput scenarios, this could add latency. Consider caching capacity information briefly.
  • The blocking pop timeout (workerQueueBlockingTimeoutSeconds) defaults to 10s which is reasonable, but monitor for connection pooling issues at scale.

🧪 Test Coverage Assessment

Well Covered:

  • VisibilityManager: heartbeat, claimBatch, releaseBatch
  • ConcurrencyManager: single/multi group, capacity, keyPrefix handling
  • DRR Scheduler: queue selection, deficit management

Could Use More Tests:

  • End-to-end test of the full two-phase flow (enqueue → FairQueue claim → worker queue → BatchQueue consume)
  • Test for reclaim loop behavior when messages time out during the two-phase process
  • Concurrent consumer race condition tests

Summary

This is a solid PR that improves the batch processing architecture significantly. The two-phase model will help with fair scheduling and the ability to skip tenants at capacity should resolve the issue with low-concurrency tenants blocking higher-concurrency ones.

Recommendation: Approve with minor suggestions

The issues identified are mostly minor improvements rather than blockers. The core logic is sound and well-tested.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
internal-packages/run-engine/src/batch-queue/index.ts (1)

635-640: Consider adding metrics for invalid message keys.

When a message key has an invalid format, the code logs an error and returns false, effectively dropping the message. While this is appropriate (the message was already atomically removed by blockingPop), consider adding a counter metric to track these occurrences for monitoring and alerting.

📊 Optional metric addition
 const colonIndex = messageKey.indexOf(":");
 if (colonIndex === -1) {
   this.logger.error("Invalid message key format", { messageKey });
   this.batchedSpanManager.incrementStat(loopId, "invalid_message_keys");
+  // Consider adding: this.someErrorCounter?.add(1, { error_type: "invalid_message_key" });
   return false;
 }
📜 Review details

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 51fb53a and 4ce1066.

📒 Files selected for processing (1)
  • internal-packages/run-engine/src/batch-queue/index.ts
🧰 Additional context used
📓 Path-based instructions (3)
**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

**/*.{ts,tsx}: Use types over interfaces for TypeScript
Avoid using enums; prefer string unions or const objects instead

Files:

  • internal-packages/run-engine/src/batch-queue/index.ts
**/*.{ts,tsx,js,jsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Use function declarations instead of default exports

Files:

  • internal-packages/run-engine/src/batch-queue/index.ts
**/*.{js,ts,jsx,tsx,json,md,css,scss}

📄 CodeRabbit inference engine (AGENTS.md)

Format code using Prettier

Files:

  • internal-packages/run-engine/src/batch-queue/index.ts
🧠 Learnings (4)
📚 Learning: 2025-08-14T18:35:44.370Z
Learnt from: nicktrn
Repo: triggerdotdev/trigger.dev PR: 2390
File: apps/webapp/app/env.server.ts:764-765
Timestamp: 2025-08-14T18:35:44.370Z
Learning: The BoolEnv helper in apps/webapp/app/utils/boolEnv.ts uses z.preprocess with inconsistent default value types across the codebase - some usages pass boolean defaults (correct) while others pass string defaults (incorrect), leading to type confusion. The helper should enforce boolean-only defaults or have clearer documentation.

Applied to files:

  • internal-packages/run-engine/src/batch-queue/index.ts
📚 Learning: 2025-08-14T10:09:02.528Z
Learnt from: nicktrn
Repo: triggerdotdev/trigger.dev PR: 2390
File: internal-packages/run-engine/src/engine/index.ts:466-467
Timestamp: 2025-08-14T10:09:02.528Z
Learning: In the triggerdotdev/trigger.dev codebase, it's acceptable to pass `string | undefined` types directly to Prisma operations (both create and update). The codebase consistently uses this pattern and the team is comfortable with how Prisma handles undefined values.

Applied to files:

  • internal-packages/run-engine/src/batch-queue/index.ts
📚 Learning: 2025-11-27T16:26:58.661Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/webapp.mdc:0-0
Timestamp: 2025-11-27T16:26:58.661Z
Learning: Use `trigger.dev/redis-worker` for background job and worker system needs in the webapp and run engine

Applied to files:

  • internal-packages/run-engine/src/batch-queue/index.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Control concurrency using the `queue` property with `concurrencyLimit` option

Applied to files:

  • internal-packages/run-engine/src/batch-queue/index.ts
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (25)
  • GitHub Check: Cursor Bugbot
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
  • GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: claude-review
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (3)
internal-packages/run-engine/src/batch-queue/index.ts (3)

54-57: LGTM: Clear architecture documentation.

The constant and its accompanying comments clearly explain the single shared worker queue design where FairQueue handles fair scheduling before routing all messages to this queue for BatchQueue's consumer loops.


505-514: LGTM: Comprehensive resource cleanup.

The close method properly sequences shutdown: stops consumers first, cleans up batched spans, then closes all Redis connections. The addition of batchedSpanManager.cleanupAll() at line 508 addresses the previous review concern about potential span leaks.


681-689: LGTM: Proper cleanup for missing messages.

The code correctly calls completeMessage even when stored message data is not found (line 687), ensuring visibility timeout cleanup. This addresses the previous review concern about potential resource leaks.

@claude
Copy link

claude bot commented Jan 7, 2026

PR Review: fix(batch): optimize processing batch trigger v2

Thanks for submitting this significant refactor of the BatchQueue system. This is a well-structured PR that introduces important architectural improvements. Here's my detailed review:


🎯 Summary

This PR refactors the FairQueue to implement a full two-phase dequeue architecture where:

  1. FairQueue handles fair scheduling and routes messages to worker queues
  2. BatchQueue (and other consumers) run their own worker queue consumer loops

This is a solid separation of concerns that should improve scalability and flexibility.


✅ Code Quality & Best Practices

Strengths:

  • Well-documented code with clear JSDoc comments explaining the two-phase architecture
  • Good separation of concerns between FairQueue (scheduling) and BatchQueue (consumption)
  • Consistent use of Lua scripts for atomic Redis operations (claimMessageBatch, releaseMessageBatch, drrDecrementDeficitBatch)
  • New environment variables are well-documented with sensible defaults
  • The isAbortError utility function (packages/redis-worker/src/utils.ts:8-11) handles both custom and native abort errors properly

Minor Suggestions:

  1. In internal-packages/run-engine/src/batch-queue/index.ts:597-639, consider extracting the worker queue consumer loop error handling into a helper method to reduce nesting depth

  2. The batchClaimSize defaults to 10 (packages/redis-worker/src/fair-queue/index.ts:140) but isn't configurable via env vars. Consider adding BATCH_QUEUE_CLAIM_SIZE for tuning


🐛 Potential Bugs or Issues

  1. Message Key Format Parsing (internal-packages/run-engine/src/batch-queue/index.ts:632-640):
    The format messageId:queueId could be problematic if queueId contains colons (which it might, based on the format tenant:t1:queue:name). The current parsing uses indexOf(":") which only finds the first colon. However, since messageId appears to be a UUID (no colons), this should be safe, but the comment could clarify this assumption.

  2. Concurrency Reservation Failure Handling (packages/redis-worker/src/fair-queue/index.ts:969-980):
    When concurrency reservation fails mid-batch, all remaining messages are released with releaseBatch. This is correct, but if the release fails for any reason, those messages would be stranded. Consider adding error handling/logging for the releaseBatch call.

  3. Worker Queue Resolution When Not Provided (packages/redis-worker/src/fair-queue/index.ts:294 and 389):
    Changed from options.queueId to undefined when no resolver is provided. Verify this is intentional—it seems the stored workerQueue field will be undefined instead of falling back to the queueId.


⚡ Performance Considerations

Improvements:

  • Batch claiming (claimBatch) reduces Redis round-trips significantly - this is a great optimization
  • recordProcessedBatch in DRRScheduler reduces Redis calls when processing multiple messages
  • Skipping tenants at capacity (tenant_capacity_skipped stat at line 864) prevents wasted work
  • The getAvailableCapacity method in ConcurrencyManager fetches all data in parallel (lines 127-135)

Considerations:

  1. Capacity Check Frequency: The scheduler now calls isAtCapacity for every tenant on every iteration (packages/redis-worker/src/fair-queue/schedulers/drr.ts:93-101). For many tenants, this could be expensive. Consider caching capacity state briefly or batching these checks.

  2. Master Queue Size Query: getShardQueueCount is called on every iteration (packages/redis-worker/src/fair-queue/index.ts:790). While useful for observability, it's an extra Redis call per iteration. Consider making this optional or sampling.

  3. Consumer Loop Wait Logic (packages/redis-worker/src/fair-queue/index.ts:756-768): The wait logic correctly uses 1ms when there's work and consumerIntervalMs when idle, but the abort handler pattern creates/removes event listeners on every iteration. This is fine for low frequency but worth noting.


🔒 Security Concerns

No security issues identified. The changes are internal queue processing logic with no user input handling or authentication changes.


🧪 Test Coverage

Strong test coverage:

  • New visibility.test.ts covers claimBatch, releaseBatch, and heartbeat scenarios comprehensively
  • New concurrency.test.ts includes getAvailableCapacity tests with single and multi-group scenarios
  • drr.test.ts tests the new recordProcessedBatch functionality
  • The test helper class TestFairQueueHelper in fairQueue.test.ts nicely demonstrates the expected usage pattern

Suggestions:

  1. Consider adding an integration test that exercises the full BatchQueue -> FairQueue -> WorkerQueue flow with concurrency limits

  2. Test edge case: What happens if workerQueueBlockingTimeoutSeconds is set very low (e.g., 0 or 1) and there's high contention?

  3. The race condition tests in raceConditions.test.ts look good but could benefit from a test where concurrency limits are hit during batch claiming


📝 Additional Notes

  1. Breaking Change?: The workerQueue option is now required in FairQueueOptions (no longer optional with enabled flag). Consumers must provide resolveWorkerQueue. This is documented in the types but worth noting in release notes.

  2. Cleanup: Good job removing the direct consumer loop and worker queue consumer loop from FairQueue—the code is cleaner with this responsibility moved to consumers like BatchQueue.

  3. Observable Stats: The new stats like tenant_capacity_skipped, messages_claimed, and master_queue_size_sum are valuable for monitoring—well thought out.


🏁 Overall

This is a well-executed refactor that improves the architecture while maintaining backward compatibility in the API surface (via the optional new env vars). The test coverage is thorough, and the code quality is high.

Recommendation: Approve with minor suggestions

Consider addressing the performance considerations around capacity checking frequency if you expect high tenant counts, but the current implementation should work well for typical use cases.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In @packages/redis-worker/src/fair-queue/index.ts:
- Around line 986-987: The bug is passing message.payload to workerQueueResolver
instead of the full StoredMessage; update the call so workerQueueResolver
receives the complete StoredMessage (same shape constructed in the enqueue
paths) so it can access messageId/queueId/tenantId/etc. Locate the resolver
invocation (workerQueueResolver(...)) and replace the argument with the full
message object (not message.payload), matching the StoredMessage used by the
enqueue methods.
📜 Review details

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4ce1066 and 6221282.

📒 Files selected for processing (4)
  • internal-packages/run-engine/src/batch-queue/index.ts
  • packages/redis-worker/src/fair-queue/index.ts
  • packages/redis-worker/src/index.ts
  • packages/redis-worker/src/utils.ts
🧰 Additional context used
📓 Path-based instructions (3)
**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

**/*.{ts,tsx}: Use types over interfaces for TypeScript
Avoid using enums; prefer string unions or const objects instead

Files:

  • packages/redis-worker/src/utils.ts
  • packages/redis-worker/src/index.ts
  • packages/redis-worker/src/fair-queue/index.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
**/*.{ts,tsx,js,jsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Use function declarations instead of default exports

Files:

  • packages/redis-worker/src/utils.ts
  • packages/redis-worker/src/index.ts
  • packages/redis-worker/src/fair-queue/index.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
**/*.{js,ts,jsx,tsx,json,md,css,scss}

📄 CodeRabbit inference engine (AGENTS.md)

Format code using Prettier

Files:

  • packages/redis-worker/src/utils.ts
  • packages/redis-worker/src/index.ts
  • packages/redis-worker/src/fair-queue/index.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
🧠 Learnings (7)
📚 Learning: 2024-10-18T15:41:52.352Z
Learnt from: nicktrn
Repo: triggerdotdev/trigger.dev PR: 1418
File: packages/core/src/v3/errors.ts:364-371
Timestamp: 2024-10-18T15:41:52.352Z
Learning: In `packages/core/src/v3/errors.ts`, within the `taskRunErrorEnhancer` function, `error.message` is always defined, so it's safe to directly call `error.message.includes("SIGTERM")` without additional checks.

Applied to files:

  • packages/redis-worker/src/utils.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Control concurrency using the `queue` property with `concurrencyLimit` option

Applied to files:

  • packages/redis-worker/src/fair-queue/index.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use `runs.subscribeToBatch()` to subscribe to changes for all runs in a batch

Applied to files:

  • internal-packages/run-engine/src/batch-queue/index.ts
📚 Learning: 2025-08-14T18:35:44.370Z
Learnt from: nicktrn
Repo: triggerdotdev/trigger.dev PR: 2390
File: apps/webapp/app/env.server.ts:764-765
Timestamp: 2025-08-14T18:35:44.370Z
Learning: The BoolEnv helper in apps/webapp/app/utils/boolEnv.ts uses z.preprocess with inconsistent default value types across the codebase - some usages pass boolean defaults (correct) while others pass string defaults (incorrect), leading to type confusion. The helper should enforce boolean-only defaults or have clearer documentation.

Applied to files:

  • internal-packages/run-engine/src/batch-queue/index.ts
📚 Learning: 2024-10-22T10:50:41.311Z
Learnt from: nicktrn
Repo: triggerdotdev/trigger.dev PR: 1424
File: packages/core/src/v3/errors.ts:155-189
Timestamp: 2024-10-22T10:50:41.311Z
Learning: When using `assertExhaustive` in a `switch` statement in TypeScript (e.g., in the `shouldRetryError` function in `packages/core/src/v3/errors.ts`), and it throws an error, it's acceptable not to add a `return` statement afterward, as control flow will not proceed beyond the `assertExhaustive` call.

Applied to files:

  • internal-packages/run-engine/src/batch-queue/index.ts
📚 Learning: 2024-10-07T10:32:30.100Z
Learnt from: nicktrn
Repo: triggerdotdev/trigger.dev PR: 1387
File: packages/cli-v3/src/executions/taskRunProcess.ts:408-413
Timestamp: 2024-10-07T10:32:30.100Z
Learning: In the `parseExecuteError` method in `packages/cli-v3/src/executions/taskRunProcess.ts`, using `String(error)` to populate the `message` field works fine and even prepends `error.name`.

Applied to files:

  • internal-packages/run-engine/src/batch-queue/index.ts
📚 Learning: 2025-11-27T16:26:58.661Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/webapp.mdc:0-0
Timestamp: 2025-11-27T16:26:58.661Z
Learning: Use `trigger.dev/redis-worker` for background job and worker system needs in the webapp and run engine

Applied to files:

  • internal-packages/run-engine/src/batch-queue/index.ts
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (25)
  • GitHub Check: Cursor Bugbot
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: claude-review
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (11)
packages/redis-worker/src/utils.ts (1)

1-12: LGTM! Well-designed utility for consistent abort error detection.

This helper correctly handles both native Node.js AbortError (which sets .name) and custom abort errors created with new Error("AbortError") (which sets .message). The documentation clearly explains the dual-check rationale.

packages/redis-worker/src/index.ts (1)

3-3: LGTM!

Clean addition following the existing barrel export pattern.

packages/redis-worker/src/fair-queue/index.ts (4)

758-770: Good fix for the event listener memory leak.

The explicit removeEventListener call on timeout (line 766) correctly addresses the accumulation issue. The { once: true } option only removes the listener when it fires on abort, so this manual cleanup is necessary when the timeout resolves first.


972-983: Good fix for stranded claimed messages.

When concurrency reservation fails, releaseBatch now correctly releases all remaining messages in the batch (from index i onward) back to the queue. This prevents messages from being stranded in the in-flight set until visibility timeout expires.


773-777: LGTM!

Using the new isAbortError utility for consistent abort detection.


1053-1099: LGTM on completeMessage implementation.

The method correctly handles the case where stored message data is missing, builds a fallback descriptor, and properly sequences the cleanup operations (visibility completion, concurrency release, master queue update).

internal-packages/run-engine/src/batch-queue/index.ts (5)

505-515: Good addition of batchedSpanManager.cleanupAll() in close().

This provides a safety net for cleaning up any remaining batched spans that weren't cleaned up by individual consumer loop finally blocks.


686-690: Good fix: completeMessage is now called when stored message is not found.

This ensures proper cleanup of visibility timeout entries even when the message data is missing (e.g., due to expiration or race conditions).


666-671: Good fix: Using isAbortError helper for consistent abort detection.

This correctly handles both native AbortError (which sets .name) and custom abort errors (which set .message), consistent with the FairQueue implementation.


604-676: LGTM on worker queue consumer loop implementation.

Clean loop structure with proper:

  • Batched span management (init, per-iteration spans, cleanup in finally)
  • Abort signal handling at multiple levels
  • Message key parsing with error handling
  • Statistics tracking for observability

55-58: Clear documentation of the single worker queue design.

Good explanation of why BatchQueue uses a single shared worker queue - FairQueue handles fair scheduling at the master queue level, then routes all messages to this queue for BatchQueue's own consumer loop to process.

@ericallam ericallam merged commit 062766e into main Jan 7, 2026
35 checks passed
@ericallam ericallam deleted the ea-branch-113-3 branch January 7, 2026 15:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants