From f55f48711890072633fce3711edcc95d42c925dd Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Tue, 6 Jan 2026 19:20:08 +0000 Subject: [PATCH 01/14] fix(batch): optimize processing batch trigger v2 --- apps/webapp/app/env.server.ts | 6 + apps/webapp/app/v3/runEngine.server.ts | 3 + .../run-engine/src/batch-queue/index.ts | 11 +- .../run-engine/src/batch-queue/types.ts | 6 + .../run-engine/src/engine/index.ts | 3 + .../run-engine/src/engine/types.ts | 4 + .../src/fair-queue/concurrency.ts | 20 +++ packages/redis-worker/src/fair-queue/index.ts | 170 +++++++++++------- .../src/fair-queue/schedulers/drr.ts | 4 +- packages/redis-worker/src/fair-queue/types.ts | 2 + .../redis-worker/src/fair-queue/visibility.ts | 149 +++++++++++++++ 11 files changed, 313 insertions(+), 65 deletions(-) diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 1cc0db0bf0..d772a44438 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -951,6 +951,12 @@ const EnvironmentSchema = z BATCH_QUEUE_MAX_DEFICIT: z.coerce.number().int().default(100), BATCH_QUEUE_CONSUMER_COUNT: z.coerce.number().int().default(3), BATCH_QUEUE_CONSUMER_INTERVAL_MS: z.coerce.number().int().default(50), + // Number of master queue shards for horizontal scaling + BATCH_QUEUE_SHARD_COUNT: z.coerce.number().int().default(1), + // Maximum queues to fetch from master queue per iteration + BATCH_QUEUE_MASTER_QUEUE_LIMIT: z.coerce.number().int().default(1000), + // Worker queue blocking timeout in seconds (for two-stage processing) + BATCH_QUEUE_WORKER_QUEUE_TIMEOUT_SECONDS: z.coerce.number().int().default(10), // Global rate limit: max items processed per second across all consumers // If not set, no global rate limiting is applied BATCH_QUEUE_GLOBAL_RATE_LIMIT: z.coerce.number().int().positive().optional(), diff --git a/apps/webapp/app/v3/runEngine.server.ts b/apps/webapp/app/v3/runEngine.server.ts index 5f88d5f6a4..4d18245324 100644 --- a/apps/webapp/app/v3/runEngine.server.ts +++ b/apps/webapp/app/v3/runEngine.server.ts @@ -171,7 +171,10 @@ function createRunEngine() { drr: { quantum: env.BATCH_QUEUE_DRR_QUANTUM, maxDeficit: env.BATCH_QUEUE_MAX_DEFICIT, + masterQueueLimit: env.BATCH_QUEUE_MASTER_QUEUE_LIMIT, }, + shardCount: env.BATCH_QUEUE_SHARD_COUNT, + workerQueueBlockingTimeoutSeconds: env.BATCH_QUEUE_WORKER_QUEUE_TIMEOUT_SECONDS, consumerCount: env.BATCH_QUEUE_CONSUMER_COUNT, consumerIntervalMs: env.BATCH_QUEUE_CONSUMER_INTERVAL_MS, // Default processing concurrency when no specific limit is set diff --git a/internal-packages/run-engine/src/batch-queue/index.ts b/internal-packages/run-engine/src/batch-queue/index.ts index 4b3904548a..ba662e2619 100644 --- a/internal-packages/run-engine/src/batch-queue/index.ts +++ b/internal-packages/run-engine/src/batch-queue/index.ts @@ -108,6 +108,7 @@ export class BatchQueue { keys: keyProducer, quantum: options.drr.quantum, maxDeficit: options.drr.maxDeficit, + masterQueueLimit: options.drr.masterQueueLimit, logger: { debug: (msg, ctx) => this.logger.debug(msg, ctx), error: (msg, ctx) => this.logger.error(msg, ctx), @@ -121,7 +122,7 @@ export class BatchQueue { scheduler, payloadSchema: BatchItemPayloadSchema, validateOnEnqueue: false, // We control the payload - shardCount: 1, // Batches don't need sharding + shardCount: options.shardCount ?? 1, consumerCount: options.consumerCount, consumerIntervalMs: options.consumerIntervalMs, visibilityTimeoutMs: 60_000, // 1 minute for batch item processing @@ -131,6 +132,14 @@ export class BatchQueue { threshold: 5, periodMs: 5_000, }, + // Enable two-stage processing with worker queues for better parallelism (when configured) + // Worker queues provide better concurrency by separating queue selection from message processing + workerQueue: options.workerQueueBlockingTimeoutSeconds + ? { + enabled: true, + blockingTimeoutSeconds: options.workerQueueBlockingTimeoutSeconds, + } + : undefined, // Concurrency group based on tenant (environment) // This limits how many batch items can be processed concurrently per environment // Items wait in queue until capacity frees up diff --git a/internal-packages/run-engine/src/batch-queue/types.ts b/internal-packages/run-engine/src/batch-queue/types.ts index 6e8e8ad1c1..db91c87acc 100644 --- a/internal-packages/run-engine/src/batch-queue/types.ts +++ b/internal-packages/run-engine/src/batch-queue/types.ts @@ -123,6 +123,8 @@ export type DRRConfig = { quantum: number; /** Maximum accumulated deficit (prevents starvation) */ maxDeficit: number; + /** Maximum queues to fetch from master queue (default: 1000) */ + masterQueueLimit?: number; }; // ============================================================================ @@ -196,6 +198,10 @@ export type BatchQueueOptions = { consumerCount: number; /** Interval between consumer iterations (ms) */ consumerIntervalMs: number; + /** Number of master queue shards (default: 1) */ + shardCount?: number; + /** Worker queue blocking timeout in seconds (enables two-stage processing) */ + workerQueueBlockingTimeoutSeconds?: number; /** Whether to start consumers on initialization */ startConsumers?: boolean; /** diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 1b53d6378d..83b354fbd9 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -340,7 +340,10 @@ export class RunEngine { drr: { quantum: options.batchQueue?.drr?.quantum ?? 5, maxDeficit: options.batchQueue?.drr?.maxDeficit ?? 50, + masterQueueLimit: options.batchQueue?.drr?.masterQueueLimit, }, + shardCount: options.batchQueue?.shardCount, + workerQueueBlockingTimeoutSeconds: options.batchQueue?.workerQueueBlockingTimeoutSeconds, consumerCount: options.batchQueue?.consumerCount ?? 2, consumerIntervalMs: options.batchQueue?.consumerIntervalMs ?? 100, defaultConcurrency: options.batchQueue?.defaultConcurrency ?? 10, diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index 3b2ae8c9a1..01990f12b1 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -76,6 +76,10 @@ export type RunEngineOptions = { batchQueue?: { redis: RedisOptions; drr?: Partial; + /** Number of master queue shards (default: 1) */ + shardCount?: number; + /** Worker queue blocking timeout in seconds (enables two-stage processing) */ + workerQueueBlockingTimeoutSeconds?: number; consumerCount?: number; consumerIntervalMs?: number; /** Default processing concurrency per environment when no specific limit is set */ diff --git a/packages/redis-worker/src/fair-queue/concurrency.ts b/packages/redis-worker/src/fair-queue/concurrency.ts index 23f5293a19..a976794c81 100644 --- a/packages/redis-worker/src/fair-queue/concurrency.ts +++ b/packages/redis-worker/src/fair-queue/concurrency.ts @@ -114,6 +114,26 @@ export class ConcurrencyManager { return await this.redis.scard(key); } + /** + * Get available capacity for a queue across all concurrency groups. + * Returns the minimum available capacity across all groups. + */ + async getAvailableCapacity(queue: QueueDescriptor): Promise { + let minCapacity = Infinity; + + for (const group of this.groups) { + const groupId = group.extractGroupId(queue); + const key = this.keys.concurrencyKey(group.name, groupId); + const current = await this.redis.scard(key); + const limit = (await group.getLimit(groupId)) || group.defaultLimit; + const available = Math.max(0, limit - current); + + minCapacity = Math.min(minCapacity, available); + } + + return minCapacity === Infinity ? 0 : minCapacity; + } + /** * Get concurrency limit for a specific group. */ diff --git a/packages/redis-worker/src/fair-queue/index.ts b/packages/redis-worker/src/fair-queue/index.ts index 2e0e6c3a0c..cb43cb837e 100644 --- a/packages/redis-worker/src/fair-queue/index.ts +++ b/packages/redis-worker/src/fair-queue/index.ts @@ -305,7 +305,7 @@ export class FairQueue { attempt: 1, metadata: options.metadata, }) - : options.queueId, + : undefined, metadata: options.metadata, }; @@ -401,7 +401,7 @@ export class FairQueue { attempt: 1, metadata: options.metadata, }) - : options.queueId, + : undefined, metadata: options.metadata, }; @@ -756,15 +756,19 @@ export class FairQueue { this.batchedSpanManager.initializeLoop(loopId); try { - for await (const _ of setInterval(this.consumerIntervalMs, null, { - signal: this.abortController.signal, - })) { + while (this.isRunning) { + // Check abort signal + if (this.abortController.signal.aborted) { + break; + } + + let hadWork = false; try { - await this.batchedSpanManager.withBatchedSpan( + hadWork = await this.batchedSpanManager.withBatchedSpan( loopId, async (span) => { span.setAttribute("shard_id", shardId); - await this.#processMasterQueueShard(loopId, shardId, span); + return await this.#processMasterQueueShard(loopId, shardId, span); }, { iterationSpanName: "processMasterQueueShard", @@ -779,9 +783,25 @@ export class FairQueue { }); this.batchedSpanManager.markForRotation(loopId); } + + // Only wait if there was no work (avoid spinning when idle) + // When there's work, immediately process the next batch + if (!hadWork) { + await new Promise((resolve, reject) => { + const timeout = setTimeout(resolve, this.consumerIntervalMs); + this.abortController.signal.addEventListener( + "abort", + () => { + clearTimeout(timeout); + reject(new Error("AbortError")); + }, + { once: true } + ); + }); + } } } catch (error) { - if (error instanceof Error && error.name === "AbortError") { + if (error instanceof Error && error.message === "AbortError") { this.logger.debug("Master queue consumer aborted", { loopId }); this.batchedSpanManager.cleanup(loopId); return; @@ -796,7 +816,7 @@ export class FairQueue { loopId: string, shardId: number, parentSpan?: Span - ): Promise { + ): Promise { const masterQueueKey = this.keys.masterQueueKey(shardId); // Create scheduler context @@ -821,7 +841,7 @@ export class FairQueue { if (tenantQueues.length === 0) { this.batchedSpanManager.incrementStat(loopId, "empty_iterations"); - return; + return false; } // Track stats @@ -832,6 +852,8 @@ export class FairQueue { tenantQueues.reduce((acc, t) => acc + t.queues.length, 0) ); + let messagesProcessed = 0; + // Process queues and push to worker queues for (const { tenantId, queues } of tenantQueues) { for (const queueId of queues) { @@ -841,32 +863,47 @@ export class FairQueue { continue; } - const processed = await this.telemetry.trace( + // Check tenant capacity before attempting to process + // If tenant is at capacity, skip ALL remaining queues for this tenant + if (this.concurrencyManager) { + const isAtCapacity = await this.concurrencyManager.isAtCapacity("tenant", tenantId); + if (isAtCapacity) { + this.batchedSpanManager.incrementStat(loopId, "tenant_capacity_skipped"); + break; // Skip remaining queues for this tenant + } + } + + const processedFromQueue = await this.telemetry.trace( "claimAndPushToWorkerQueue", async (span) => { span.setAttribute(FairQueueAttributes.QUEUE_ID, queueId); span.setAttribute(FairQueueAttributes.TENANT_ID, tenantId); span.setAttribute(FairQueueAttributes.SHARD_ID, shardId.toString()); - return this.#claimAndPushToWorkerQueue(loopId, queueId, tenantId, shardId); + const count = await this.#claimAndPushToWorkerQueue(loopId, queueId, tenantId, shardId); + span.setAttribute("messages_claimed", count); + return count; }, { kind: SpanKind.INTERNAL } ); - if (processed) { - this.batchedSpanManager.incrementStat(loopId, "messages_claimed"); + if (processedFromQueue > 0) { + messagesProcessed += processedFromQueue; + this.batchedSpanManager.incrementStat(loopId, "messages_claimed", processedFromQueue); if (this.scheduler.recordProcessed) { - await this.telemetry.trace( - "recordProcessed", - async (span) => { - span.setAttribute(FairQueueAttributes.QUEUE_ID, queueId); - span.setAttribute(FairQueueAttributes.TENANT_ID, tenantId); - await this.scheduler.recordProcessed!(tenantId, queueId); - }, - { kind: SpanKind.INTERNAL } - ); + // Record each processed message for DRR deficit tracking + for (let i = 0; i < processedFromQueue; i++) { + await this.telemetry.trace( + "recordProcessed", + async (span) => { + span.setAttribute(FairQueueAttributes.QUEUE_ID, queueId); + span.setAttribute(FairQueueAttributes.TENANT_ID, tenantId); + await this.scheduler.recordProcessed!(tenantId, queueId); + }, + { kind: SpanKind.INTERNAL } + ); + } } - this.#resetCooloff(queueId); } else { // Don't increment cooloff here - the queue was either: // 1. Empty (removed from master, cache cleaned up) @@ -876,6 +913,9 @@ export class FairQueue { } } } + + // Return true if we processed any messages (had work) + return messagesProcessed > 0; } async #claimAndPushToWorkerQueue( @@ -883,7 +923,7 @@ export class FairQueue { queueId: string, tenantId: string, shardId: number - ): Promise { + ): Promise { const queueKey = this.keys.queueKey(queueId); const queueItemsKey = this.keys.queueItemsKey(queueId); const masterQueueKey = this.keys.masterQueueKey(shardId); @@ -893,14 +933,16 @@ export class FairQueue { metadata: {}, }; - // Check concurrency before claiming + // Determine how many messages we can claim based on concurrency + let maxClaimCount = 10; // Default batch size cap if (this.concurrencyManager) { - const check = await this.concurrencyManager.canProcess(descriptor); - if (!check.allowed) { + const availableCapacity = await this.concurrencyManager.getAvailableCapacity(descriptor); + if (availableCapacity === 0) { // Queue at max concurrency, back off to avoid repeated attempts this.#incrementCooloff(queueId); - return false; + return 0; } + maxClaimCount = Math.min(maxClaimCount, availableCapacity); } // Check global rate limit - wait if rate limited @@ -915,53 +957,55 @@ export class FairQueue { } } - // Claim message with visibility timeout - const claimResult = await this.visibilityManager.claim>>( - queueId, - queueKey, - queueItemsKey, - loopId, - this.visibilityTimeoutMs - ); + // Claim batch of messages with visibility timeout + const claimedMessages = await this.visibilityManager.claimBatch< + StoredMessage> + >(queueId, queueKey, queueItemsKey, loopId, maxClaimCount, this.visibilityTimeoutMs); - if (!claimResult.claimed || !claimResult.message) { + if (claimedMessages.length === 0) { // Queue is empty, update master queue and clean up caches const removed = await this.redis.updateMasterQueueIfEmpty(masterQueueKey, queueKey, queueId); if (removed === 1) { this.queueDescriptorCache.delete(queueId); this.queueCooloffStates.delete(queueId); } - return false; + return 0; } - const { message } = claimResult; + // Use single shared worker queue - all messages go to one queue, all consumers pop atomically + const workerQueueId = "worker-queue"; + let processedCount = 0; - // Reserve concurrency slot - if (this.concurrencyManager) { - const reserved = await this.concurrencyManager.reserve(descriptor, message.messageId); - if (!reserved) { - // Release message back to queue (and ensure it's in master queue) - await this.visibilityManager.release( - message.messageId, - queueId, - queueKey, - queueItemsKey, - masterQueueKey - ); - // Concurrency reservation failed, back off to avoid repeated attempts - this.#incrementCooloff(queueId); - return false; + // Reserve concurrency and push each message to worker queue + for (const message of claimedMessages) { + // Reserve concurrency slot + if (this.concurrencyManager) { + const reserved = await this.concurrencyManager.reserve(descriptor, message.messageId); + if (!reserved) { + // Release message back to queue (and ensure it's in master queue) + await this.visibilityManager.release( + message.messageId, + queueId, + queueKey, + queueItemsKey, + masterQueueKey + ); + // Stop processing more messages from this queue since we're at capacity + break; + } } - } - // Determine worker queue - const workerQueueId = message.payload.workerQueue ?? queueId; + // Push to worker queue + const messageKey = `${message.messageId}:${queueId}`; + await this.workerQueueManager!.push(workerQueueId, messageKey); + processedCount++; + } - // Push to worker queue - const messageKey = `${message.messageId}:${queueId}`; - await this.workerQueueManager!.push(workerQueueId, messageKey); + if (processedCount > 0) { + this.#resetCooloff(queueId); + } - return true; + return processedCount; } // ============================================================================ @@ -970,7 +1014,7 @@ export class FairQueue { async #runWorkerQueueConsumerLoop(consumerId: number): Promise { const loopId = `worker-${consumerId}`; - const workerQueueId = loopId; // Each consumer has its own worker queue by default + const workerQueueId = "worker-queue"; // All consumers share a single worker queue // Initialize batched span tracking for this loop this.batchedSpanManager.initializeLoop(loopId); diff --git a/packages/redis-worker/src/fair-queue/schedulers/drr.ts b/packages/redis-worker/src/fair-queue/schedulers/drr.ts index fbb7f704a4..26d8963485 100644 --- a/packages/redis-worker/src/fair-queue/schedulers/drr.ts +++ b/packages/redis-worker/src/fair-queue/schedulers/drr.ts @@ -27,6 +27,7 @@ export class DRRScheduler extends BaseScheduler { private keys: FairQueueKeyProducer; private quantum: number; private maxDeficit: number; + private masterQueueLimit: number; private logger: NonNullable; constructor(private config: DRRSchedulerConfig) { @@ -35,6 +36,7 @@ export class DRRScheduler extends BaseScheduler { this.keys = config.keys; this.quantum = config.quantum; this.maxDeficit = config.maxDeficit; + this.masterQueueLimit = config.masterQueueLimit ?? 1000; this.logger = config.logger ?? { debug: () => {}, error: () => {}, @@ -195,7 +197,7 @@ export class DRRScheduler extends BaseScheduler { "WITHSCORES", "LIMIT", 0, - 1000 // Limit for performance + this.masterQueueLimit ); const queues: QueueWithScore[] = []; diff --git a/packages/redis-worker/src/fair-queue/types.ts b/packages/redis-worker/src/fair-queue/types.ts index da37868e24..6e44689db0 100644 --- a/packages/redis-worker/src/fair-queue/types.ts +++ b/packages/redis-worker/src/fair-queue/types.ts @@ -511,6 +511,8 @@ export interface DRRSchedulerConfig { quantum: number; /** Maximum accumulated deficit (prevents starvation) */ maxDeficit: number; + /** Maximum queues to fetch from master queue (default: 1000) */ + masterQueueLimit?: number; /** Redis options for state storage */ redis: RedisOptions; /** Key producer */ diff --git a/packages/redis-worker/src/fair-queue/visibility.ts b/packages/redis-worker/src/fair-queue/visibility.ts index d7e2850b62..52be9a3325 100644 --- a/packages/redis-worker/src/fair-queue/visibility.ts +++ b/packages/redis-worker/src/fair-queue/visibility.ts @@ -127,6 +127,93 @@ export class VisibilityManager { } } + /** + * Claim multiple messages for processing (batch claim). + * Moves up to maxCount messages from the queue to the in-flight set. + * + * @param queueId - The queue to claim from + * @param queueKey - The Redis key for the queue sorted set + * @param queueItemsKey - The Redis key for the queue items hash + * @param consumerId - ID of the consumer claiming the messages + * @param maxCount - Maximum number of messages to claim + * @param timeoutMs - Visibility timeout in milliseconds + * @returns Array of claimed messages + */ + async claimBatch( + queueId: string, + queueKey: string, + queueItemsKey: string, + consumerId: string, + maxCount: number, + timeoutMs?: number + ): Promise>> { + const timeout = timeoutMs ?? this.defaultTimeoutMs; + const deadline = Date.now() + timeout; + const shardId = this.#getShardForQueue(queueId); + const inflightKey = this.keys.inflightKey(shardId); + const inflightDataKey = this.keys.inflightDataKey(shardId); + + // Use Lua script to atomically claim up to maxCount messages + const result = await this.redis.claimMessageBatch( + queueKey, + queueItemsKey, + inflightKey, + inflightDataKey, + queueId, + consumerId, + deadline.toString(), + maxCount.toString() + ); + + if (!result || result.length === 0) { + return []; + } + + const messages: Array> = []; + + // Results come in pairs: [messageId1, payload1, messageId2, payload2, ...] + for (let i = 0; i < result.length; i += 2) { + const messageId = result[i]; + const payloadJson = result[i + 1]; + + // Skip if either value is missing + if (!messageId || !payloadJson) { + continue; + } + + try { + const payload = JSON.parse(payloadJson) as TPayload; + messages.push({ + messageId, + queueId, + payload, + deadline, + consumerId, + }); + } catch (error) { + // JSON parse error - skip this message + this.logger.error("Failed to parse claimed message in batch", { + messageId, + queueId, + error: error instanceof Error ? error.message : String(error), + }); + // Remove the corrupted message from in-flight + await this.#removeFromInflight(shardId, messageId, queueId); + } + } + + if (messages.length > 0) { + this.logger.debug("Batch claimed messages", { + queueId, + consumerId, + count: messages.length, + deadline, + }); + } + + return messages; + } + /** * Extend the visibility timeout for a message (heartbeat). * @@ -438,6 +525,57 @@ return {messageId, payload} `, }); + // Atomic batch claim: pop up to N messages from queue, add to in-flight + this.redis.defineCommand("claimMessageBatch", { + numberOfKeys: 4, + lua: ` +local queueKey = KEYS[1] +local queueItemsKey = KEYS[2] +local inflightKey = KEYS[3] +local inflightDataKey = KEYS[4] + +local queueId = ARGV[1] +local consumerId = ARGV[2] +local deadline = tonumber(ARGV[3]) +local maxCount = tonumber(ARGV[4]) + +-- Get up to maxCount oldest messages from queue +local items = redis.call('ZRANGE', queueKey, 0, maxCount - 1) +if #items == 0 then + return {} +end + +local results = {} + +for i, messageId in ipairs(items) do + -- Get message data + local payload = redis.call('HGET', queueItemsKey, messageId) + + if payload then + -- Remove from queue + redis.call('ZREM', queueKey, messageId) + redis.call('HDEL', queueItemsKey, messageId) + + -- Add to in-flight set with deadline + local member = messageId .. ':' .. queueId + redis.call('ZADD', inflightKey, deadline, member) + + -- Store message data for potential release + redis.call('HSET', inflightDataKey, messageId, payload) + + -- Add to results + table.insert(results, messageId) + table.insert(results, payload) + else + -- Message data missing, remove from queue + redis.call('ZREM', queueKey, messageId) + end +end + +return results + `, + }); + // Atomic release: remove from in-flight, add back to queue, update master queue this.redis.defineCommand("releaseMessage", { numberOfKeys: 5, @@ -517,6 +655,17 @@ declare module "@internal/redis" { deadline: string ): Promise<[string, string] | null>; + claimMessageBatch( + queueKey: string, + queueItemsKey: string, + inflightKey: string, + inflightDataKey: string, + queueId: string, + consumerId: string, + deadline: string, + maxCount: string + ): Promise; + releaseMessage( inflightKey: string, inflightDataKey: string, From e7a5010a047ddfec01353575497e6f746635e95d Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Tue, 6 Jan 2026 21:22:21 +0000 Subject: [PATCH 02/14] fix two-stage processing with concurrency limit releasing in batches --- packages/redis-worker/src/fair-queue/index.ts | 11 +- .../src/fair-queue/tests/fairQueue.test.ts | 102 ++++++++ .../src/fair-queue/tests/visibility.test.ts | 231 ++++++++++++++++++ .../redis-worker/src/fair-queue/visibility.ts | 115 +++++++++ 4 files changed, 455 insertions(+), 4 deletions(-) diff --git a/packages/redis-worker/src/fair-queue/index.ts b/packages/redis-worker/src/fair-queue/index.ts index cb43cb837e..0129eea438 100644 --- a/packages/redis-worker/src/fair-queue/index.ts +++ b/packages/redis-worker/src/fair-queue/index.ts @@ -977,14 +977,17 @@ export class FairQueue { let processedCount = 0; // Reserve concurrency and push each message to worker queue - for (const message of claimedMessages) { + for (let i = 0; i < claimedMessages.length; i++) { + const message = claimedMessages[i]!; + // Reserve concurrency slot if (this.concurrencyManager) { const reserved = await this.concurrencyManager.reserve(descriptor, message.messageId); if (!reserved) { - // Release message back to queue (and ensure it's in master queue) - await this.visibilityManager.release( - message.messageId, + // Release ALL remaining messages (from index i onward) back to queue + // This prevents messages from being stranded in the in-flight set + await this.visibilityManager.releaseBatch( + claimedMessages.slice(i), queueId, queueKey, queueItemsKey, diff --git a/packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts b/packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts index b01b90d804..f2bd2793ff 100644 --- a/packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts +++ b/packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts @@ -876,4 +876,106 @@ describe("FairQueue", () => { await queue.close(); }); }); + + describe("two-stage processing with concurrency limits", () => { + redisTest( + "should release remaining claimed messages when concurrency reservation fails", + { timeout: 30000 }, + async ({ redisOptions }) => { + const processed: string[] = []; + const processingMessages = new Set(); + keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + + const scheduler = new DRRScheduler({ + redis: redisOptions, + keys, + quantum: 10, + maxDeficit: 100, + }); + + // Create queue with: + // - Worker queue enabled (two-stage processing) + // - Concurrency limit of 2 per tenant + const queue = new FairQueue({ + redis: redisOptions, + keys, + scheduler, + payloadSchema: TestPayloadSchema, + shardCount: 1, + consumerCount: 1, + consumerIntervalMs: 50, + visibilityTimeoutMs: 10000, + workerQueue: { + enabled: true, + blockingTimeoutSeconds: 1, + }, + concurrency: { + groups: [ + { + name: "tenant", + extractGroupId: (q) => q.tenantId, + getLimit: async () => 2, // Limit to 2 concurrent per tenant + defaultLimit: 2, + }, + ], + }, + startConsumers: false, + }); + + // Message handler that tracks what's being processed + queue.onMessage(async (ctx) => { + const value = ctx.message.payload.value; + processingMessages.add(value); + + // Simulate some work + await new Promise((resolve) => setTimeout(resolve, 100)); + + processed.push(value); + processingMessages.delete(value); + await ctx.complete(); + }); + + // Enqueue 5 messages to the same tenant queue + await queue.enqueueBatch({ + queueId: "tenant:t1:queue:q1", + tenantId: "t1", + messages: [ + { payload: { value: "msg-1" } }, + { payload: { value: "msg-2" } }, + { payload: { value: "msg-3" } }, + { payload: { value: "msg-4" } }, + { payload: { value: "msg-5" } }, + ], + }); + + // Start processing + queue.start(); + + // Wait for all messages to be processed + // With concurrency limit of 2, it should process in batches + await vi.waitFor( + () => { + expect(processed.length).toBe(5); + }, + { timeout: 20000 } + ); + + // All messages should have been processed + expect(processed).toContain("msg-1"); + expect(processed).toContain("msg-2"); + expect(processed).toContain("msg-3"); + expect(processed).toContain("msg-4"); + expect(processed).toContain("msg-5"); + + // Wait a bit for any cleanup to complete + await new Promise((resolve) => setTimeout(resolve, 100)); + + // No messages should be stuck in-flight + const inflightCount = await queue.getTotalInflightCount(); + expect(inflightCount).toBe(0); + + await queue.close(); + } + ); + }); }); diff --git a/packages/redis-worker/src/fair-queue/tests/visibility.test.ts b/packages/redis-worker/src/fair-queue/tests/visibility.test.ts index 42cb8f2a15..817a23dbf6 100644 --- a/packages/redis-worker/src/fair-queue/tests/visibility.test.ts +++ b/packages/redis-worker/src/fair-queue/tests/visibility.test.ts @@ -244,5 +244,236 @@ describe("VisibilityManager", () => { } ); }); + + describe("claimBatch", () => { + redisTest( + "should claim multiple messages atomically", + { timeout: 10000 }, + async ({ redisOptions }) => { + keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + + const manager = new VisibilityManager({ + redis: redisOptions, + keys, + shardCount: 1, + defaultTimeoutMs: 5000, + }); + + const redis = createRedisClient(redisOptions); + const queueId = "tenant:t1:queue:claim-batch"; + const queueKey = keys.queueKey(queueId); + const queueItemsKey = keys.queueItemsKey(queueId); + + // Add multiple messages to the queue + for (let i = 1; i <= 5; i++) { + const messageId = `msg-${i}`; + const storedMessage = { + id: messageId, + queueId, + tenantId: "t1", + payload: { value: `test-${i}` }, + timestamp: Date.now() - (6 - i) * 1000, + attempt: 1, + }; + await redis.zadd(queueKey, storedMessage.timestamp, messageId); + await redis.hset(queueItemsKey, messageId, JSON.stringify(storedMessage)); + } + + // Claim batch of 3 messages + const claimed = await manager.claimBatch(queueId, queueKey, queueItemsKey, "consumer-1", 3); + + expect(claimed).toHaveLength(3); + expect(claimed[0]!.messageId).toBe("msg-1"); + expect(claimed[1]!.messageId).toBe("msg-2"); + expect(claimed[2]!.messageId).toBe("msg-3"); + + // Verify messages are in in-flight set + const inflightCount = await manager.getTotalInflightCount(); + expect(inflightCount).toBe(3); + + // Verify messages are removed from queue + const remainingCount = await redis.zcard(queueKey); + expect(remainingCount).toBe(2); + + await manager.close(); + await redis.quit(); + } + ); + + redisTest( + "should return empty array when queue is empty", + { timeout: 10000 }, + async ({ redisOptions }) => { + keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + + const manager = new VisibilityManager({ + redis: redisOptions, + keys, + shardCount: 1, + defaultTimeoutMs: 5000, + }); + + const queueId = "tenant:t1:queue:empty"; + const queueKey = keys.queueKey(queueId); + const queueItemsKey = keys.queueItemsKey(queueId); + + const claimed = await manager.claimBatch(queueId, queueKey, queueItemsKey, "consumer-1", 5); + expect(claimed).toHaveLength(0); + + await manager.close(); + } + ); + }); + + describe("releaseBatch", () => { + redisTest( + "should release multiple messages back to queue atomically", + { timeout: 10000 }, + async ({ redisOptions }) => { + keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + + const manager = new VisibilityManager({ + redis: redisOptions, + keys, + shardCount: 1, + defaultTimeoutMs: 5000, + }); + + const redis = createRedisClient(redisOptions); + const queueId = "tenant:t1:queue:release-batch"; + const queueKey = keys.queueKey(queueId); + const queueItemsKey = keys.queueItemsKey(queueId); + const masterQueueKey = keys.masterQueueKey(0); + + // Add messages to queue and claim them + for (let i = 1; i <= 5; i++) { + const messageId = `msg-${i}`; + const storedMessage = { + id: messageId, + queueId, + tenantId: "t1", + payload: { value: `test-${i}` }, + timestamp: Date.now() - (6 - i) * 1000, + attempt: 1, + }; + await redis.zadd(queueKey, storedMessage.timestamp, messageId); + await redis.hset(queueItemsKey, messageId, JSON.stringify(storedMessage)); + } + + // Claim all 5 messages + const claimed = await manager.claimBatch(queueId, queueKey, queueItemsKey, "consumer-1", 5); + expect(claimed).toHaveLength(5); + + // Verify all messages are in-flight + let inflightCount = await manager.getTotalInflightCount(); + expect(inflightCount).toBe(5); + + // Queue should be empty + let queueCount = await redis.zcard(queueKey); + expect(queueCount).toBe(0); + + // Release messages 3, 4, 5 back to queue (batch release) + const messagesToRelease = claimed.slice(2); + await manager.releaseBatch( + messagesToRelease, + queueId, + queueKey, + queueItemsKey, + masterQueueKey + ); + + // Verify 2 messages still in-flight + inflightCount = await manager.getTotalInflightCount(); + expect(inflightCount).toBe(2); + + // Verify 3 messages back in queue + queueCount = await redis.zcard(queueKey); + expect(queueCount).toBe(3); + + // Verify the correct messages are back in queue + const queueMembers = await redis.zrange(queueKey, 0, -1); + expect(queueMembers).toContain("msg-3"); + expect(queueMembers).toContain("msg-4"); + expect(queueMembers).toContain("msg-5"); + + await manager.close(); + await redis.quit(); + } + ); + + redisTest( + "should handle empty messages array", + { timeout: 10000 }, + async ({ redisOptions }) => { + keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + + const manager = new VisibilityManager({ + redis: redisOptions, + keys, + shardCount: 1, + defaultTimeoutMs: 5000, + }); + + const queueId = "tenant:t1:queue:empty-release"; + const queueKey = keys.queueKey(queueId); + const queueItemsKey = keys.queueItemsKey(queueId); + const masterQueueKey = keys.masterQueueKey(0); + + // Should not throw when releasing empty array + await manager.releaseBatch([], queueId, queueKey, queueItemsKey, masterQueueKey); + + await manager.close(); + } + ); + + redisTest( + "should update master queue with oldest message timestamp", + { timeout: 10000 }, + async ({ redisOptions }) => { + keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + + const manager = new VisibilityManager({ + redis: redisOptions, + keys, + shardCount: 1, + defaultTimeoutMs: 5000, + }); + + const redis = createRedisClient(redisOptions); + const queueId = "tenant:t1:queue:master-update"; + const queueKey = keys.queueKey(queueId); + const queueItemsKey = keys.queueItemsKey(queueId); + const masterQueueKey = keys.masterQueueKey(0); + + // Add and claim messages + const baseTime = Date.now(); + for (let i = 1; i <= 3; i++) { + const messageId = `msg-${i}`; + const storedMessage = { + id: messageId, + queueId, + tenantId: "t1", + payload: { value: `test-${i}` }, + timestamp: baseTime + i * 1000, // Different timestamps + attempt: 1, + }; + await redis.zadd(queueKey, storedMessage.timestamp, messageId); + await redis.hset(queueItemsKey, messageId, JSON.stringify(storedMessage)); + } + + const claimed = await manager.claimBatch(queueId, queueKey, queueItemsKey, "consumer-1", 3); + + // Release all messages back + await manager.releaseBatch(claimed, queueId, queueKey, queueItemsKey, masterQueueKey); + + // Master queue should have been updated + const masterScore = await redis.zscore(masterQueueKey, queueId); + expect(masterScore).not.toBeNull(); + + await manager.close(); + await redis.quit(); + } + ); + }); }); diff --git a/packages/redis-worker/src/fair-queue/visibility.ts b/packages/redis-worker/src/fair-queue/visibility.ts index 52be9a3325..f0fbf13815 100644 --- a/packages/redis-worker/src/fair-queue/visibility.ts +++ b/packages/redis-worker/src/fair-queue/visibility.ts @@ -311,6 +311,58 @@ export class VisibilityManager { }); } + /** + * Release multiple messages back to their queue in a single operation. + * Used when processing fails or consumer wants to retry later. + * All messages must belong to the same queue. + * + * @param messages - Array of messages to release (must all have same queueId) + * @param queueId - The queue ID + * @param queueKey - The Redis key for the queue + * @param queueItemsKey - The Redis key for the queue items hash + * @param masterQueueKey - The Redis key for the master queue + * @param score - Optional score for the messages (defaults to now) + */ + async releaseBatch( + messages: Array<{ messageId: string }>, + queueId: string, + queueKey: string, + queueItemsKey: string, + masterQueueKey: string, + score?: number + ): Promise { + if (messages.length === 0) { + return; + } + + const shardId = this.#getShardForQueue(queueId); + const inflightKey = this.keys.inflightKey(shardId); + const inflightDataKey = this.keys.inflightDataKey(shardId); + const messageScore = score ?? Date.now(); + + // Build arrays of members and messageIds for the Lua script + const messageIds = messages.map((m) => m.messageId); + const members = messages.map((m) => this.#makeMember(m.messageId, queueId)); + + await this.redis.releaseMessageBatch( + inflightKey, + inflightDataKey, + queueKey, + queueItemsKey, + masterQueueKey, + messageScore.toString(), + queueId, + ...members, + ...messageIds + ); + + this.logger.debug("Batch messages released", { + queueId, + count: messages.length, + score: messageScore, + }); + } + /** * Reclaim timed-out messages from a shard. * Returns messages to their original queues. @@ -618,6 +670,58 @@ return 1 `, }); + // Atomic batch release: release multiple messages back to queue + this.redis.defineCommand("releaseMessageBatch", { + numberOfKeys: 5, + lua: ` +local inflightKey = KEYS[1] +local inflightDataKey = KEYS[2] +local queueKey = KEYS[3] +local queueItemsKey = KEYS[4] +local masterQueueKey = KEYS[5] + +local score = tonumber(ARGV[1]) +local queueId = ARGV[2] + +-- Remaining args are: members..., messageIds... +-- Calculate how many messages we have +local numMessages = (table.getn(ARGV) - 2) / 2 +local membersStart = 3 +local messageIdsStart = membersStart + numMessages + +local releasedCount = 0 + +for i = 0, numMessages - 1 do + local member = ARGV[membersStart + i] + local messageId = ARGV[messageIdsStart + i] + + -- Get message data from in-flight + local payload = redis.call('HGET', inflightDataKey, messageId) + if payload then + -- Remove from in-flight + redis.call('ZREM', inflightKey, member) + redis.call('HDEL', inflightDataKey, messageId) + + -- Add back to queue + redis.call('ZADD', queueKey, score, messageId) + redis.call('HSET', queueItemsKey, messageId, payload) + + releasedCount = releasedCount + 1 + end +end + +-- Update master queue with oldest message timestamp (only once at the end) +if releasedCount > 0 then + local oldest = redis.call('ZRANGE', queueKey, 0, 0, 'WITHSCORES') + if #oldest >= 2 then + redis.call('ZADD', masterQueueKey, oldest[2], queueId) + end +end + +return releasedCount + `, + }); + // Atomic heartbeat: check if member exists and update score // ZADD XX returns 0 even on successful updates (it counts new additions only) // So we need to check existence first with ZSCORE @@ -678,6 +782,17 @@ declare module "@internal/redis" { queueId: string ): Promise; + releaseMessageBatch( + inflightKey: string, + inflightDataKey: string, + queueKey: string, + queueItemsKey: string, + masterQueueKey: string, + score: string, + queueId: string, + ...membersAndMessageIds: string[] + ): Promise; + heartbeatMessage(inflightKey: string, member: string, newDeadline: string): Promise; } } From a4c6e836b8301049bbfebf5af8b70e28e1fd9932 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Tue, 6 Jan 2026 21:25:16 +0000 Subject: [PATCH 03/14] fix leaking event listeners --- packages/redis-worker/src/fair-queue/index.ts | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/packages/redis-worker/src/fair-queue/index.ts b/packages/redis-worker/src/fair-queue/index.ts index 0129eea438..ba2526eb99 100644 --- a/packages/redis-worker/src/fair-queue/index.ts +++ b/packages/redis-worker/src/fair-queue/index.ts @@ -788,15 +788,17 @@ export class FairQueue { // When there's work, immediately process the next batch if (!hadWork) { await new Promise((resolve, reject) => { - const timeout = setTimeout(resolve, this.consumerIntervalMs); - this.abortController.signal.addEventListener( - "abort", - () => { - clearTimeout(timeout); - reject(new Error("AbortError")); - }, - { once: true } - ); + const abortHandler = () => { + clearTimeout(timeout); + reject(new Error("AbortError")); + }; + const timeout = setTimeout(() => { + // Must remove listener when timeout fires, otherwise listeners accumulate + // (the { once: true } option only removes on abort, not on timeout) + this.abortController.signal.removeEventListener("abort", abortHandler); + resolve(); + }, this.consumerIntervalMs); + this.abortController.signal.addEventListener("abort", abortHandler, { once: true }); }); } } From 797ed3679495cda3cce6e311f793b08ac20e2f7a Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Tue, 6 Jan 2026 21:54:37 +0000 Subject: [PATCH 04/14] address claude code PR feedback --- .../src/fair-queue/concurrency.ts | 33 +++- packages/redis-worker/src/fair-queue/index.ts | 29 +++- .../redis-worker/src/fair-queue/scheduler.ts | 9 + .../src/fair-queue/schedulers/drr.ts | 50 ++++++ .../src/fair-queue/tests/concurrency.test.ts | 161 ++++++++++++++++++ .../src/fair-queue/tests/drr.test.ts | 58 +++++++ .../src/fair-queue/tests/visibility.test.ts | 122 +++++++++++++ packages/redis-worker/src/fair-queue/types.ts | 11 ++ .../redis-worker/src/fair-queue/visibility.ts | 7 +- 9 files changed, 464 insertions(+), 16 deletions(-) diff --git a/packages/redis-worker/src/fair-queue/concurrency.ts b/packages/redis-worker/src/fair-queue/concurrency.ts index a976794c81..a48b4b99d3 100644 --- a/packages/redis-worker/src/fair-queue/concurrency.ts +++ b/packages/redis-worker/src/fair-queue/concurrency.ts @@ -119,15 +119,34 @@ export class ConcurrencyManager { * Returns the minimum available capacity across all groups. */ async getAvailableCapacity(queue: QueueDescriptor): Promise { - let minCapacity = Infinity; + if (this.groups.length === 0) { + return 0; + } - for (const group of this.groups) { - const groupId = group.extractGroupId(queue); - const key = this.keys.concurrencyKey(group.name, groupId); - const current = await this.redis.scard(key); - const limit = (await group.getLimit(groupId)) || group.defaultLimit; - const available = Math.max(0, limit - current); + // Build group data for parallel fetching + const groupData = this.groups.map((group) => ({ + group, + groupId: group.extractGroupId(queue), + })); + + // Fetch all current counts and limits in parallel + const [currents, limits] = await Promise.all([ + Promise.all( + groupData.map(({ group, groupId }) => + this.redis.scard(this.keys.concurrencyKey(group.name, groupId)) + ) + ), + Promise.all( + groupData.map(({ group, groupId }) => + group.getLimit(groupId).then((limit) => limit || group.defaultLimit) + ) + ), + ]); + // Calculate minimum available capacity across all groups + let minCapacity = Infinity; + for (let i = 0; i < groupData.length; i++) { + const available = Math.max(0, limits[i]! - currents[i]!); minCapacity = Math.min(minCapacity, available); } diff --git a/packages/redis-worker/src/fair-queue/index.ts b/packages/redis-worker/src/fair-queue/index.ts index ba2526eb99..d55315cf42 100644 --- a/packages/redis-worker/src/fair-queue/index.ts +++ b/packages/redis-worker/src/fair-queue/index.ts @@ -84,6 +84,7 @@ export class FairQueue { private workerQueueEnabled: boolean; private workerQueueBlockingTimeoutSeconds: number; private workerQueueResolver?: (message: StoredMessage>) => string; + private batchClaimSize: number; // Cooloff state private cooloffEnabled: boolean; @@ -139,6 +140,9 @@ export class FairQueue { this.workerQueueBlockingTimeoutSeconds = options.workerQueue?.blockingTimeoutSeconds ?? 10; this.workerQueueResolver = options.workerQueue?.resolveWorkerQueue; + // Batch claiming + this.batchClaimSize = options.batchClaimSize ?? 10; + // Cooloff this.cooloffEnabled = options.cooloff?.enabled ?? true; this.cooloffThreshold = options.cooloff?.threshold ?? 10; @@ -892,8 +896,20 @@ export class FairQueue { messagesProcessed += processedFromQueue; this.batchedSpanManager.incrementStat(loopId, "messages_claimed", processedFromQueue); - if (this.scheduler.recordProcessed) { - // Record each processed message for DRR deficit tracking + // Record processed messages for DRR deficit tracking + // Use batch variant if available for efficiency, otherwise fall back to single calls + if (this.scheduler.recordProcessedBatch) { + await this.telemetry.trace( + "recordProcessedBatch", + async (span) => { + span.setAttribute(FairQueueAttributes.QUEUE_ID, queueId); + span.setAttribute(FairQueueAttributes.TENANT_ID, tenantId); + span.setAttribute("count", processedFromQueue); + await this.scheduler.recordProcessedBatch!(tenantId, queueId, processedFromQueue); + }, + { kind: SpanKind.INTERNAL } + ); + } else if (this.scheduler.recordProcessed) { for (let i = 0; i < processedFromQueue; i++) { await this.telemetry.trace( "recordProcessed", @@ -936,7 +952,7 @@ export class FairQueue { }; // Determine how many messages we can claim based on concurrency - let maxClaimCount = 10; // Default batch size cap + let maxClaimCount = this.batchClaimSize; if (this.concurrencyManager) { const availableCapacity = await this.concurrencyManager.getAvailableCapacity(descriptor); if (availableCapacity === 0) { @@ -974,7 +990,12 @@ export class FairQueue { return 0; } - // Use single shared worker queue - all messages go to one queue, all consumers pop atomically + // Single shared worker queue pattern: + // All consumers pop from one queue ("worker-queue") for atomic distribution. + // Trade-off: Simpler code and fair distribution vs. potential contention + // under very high load (>10k messages/sec). For most workloads, Redis + // can handle 100k+ ops/sec on a single key, so this is rarely a bottleneck. + // Future: Consider adding optional worker queue sharding if needed. const workerQueueId = "worker-queue"; let processedCount = 0; diff --git a/packages/redis-worker/src/fair-queue/scheduler.ts b/packages/redis-worker/src/fair-queue/scheduler.ts index 8acc641e3e..00a22bb97a 100644 --- a/packages/redis-worker/src/fair-queue/scheduler.ts +++ b/packages/redis-worker/src/fair-queue/scheduler.ts @@ -28,6 +28,15 @@ export abstract class BaseScheduler implements FairScheduler { // Default: no state tracking } + /** + * Called after processing multiple messages to update scheduler state. + * Batch variant for efficiency - reduces Redis calls when processing multiple messages. + * Default implementation does nothing. + */ + async recordProcessedBatch(_tenantId: string, _queueId: string, _count: number): Promise { + // Default: no state tracking + } + /** * Initialize the scheduler. * Default implementation does nothing. diff --git a/packages/redis-worker/src/fair-queue/schedulers/drr.ts b/packages/redis-worker/src/fair-queue/schedulers/drr.ts index 26d8963485..d06da05891 100644 --- a/packages/redis-worker/src/fair-queue/schedulers/drr.ts +++ b/packages/redis-worker/src/fair-queue/schedulers/drr.ts @@ -140,6 +140,18 @@ export class DRRScheduler extends BaseScheduler { await this.#decrementDeficit(tenantId); } + /** + * Record that multiple messages were processed from a tenant. + * Decrements the tenant's deficit by count atomically. + */ + override async recordProcessedBatch( + tenantId: string, + _queueId: string, + count: number + ): Promise { + await this.#decrementDeficitBatch(tenantId, count); + } + override async close(): Promise { await this.redis.quit(); } @@ -249,6 +261,17 @@ export class DRRScheduler extends BaseScheduler { return parseFloat(result); } + /** + * Decrement deficit for a tenant by a count atomically. + */ + async #decrementDeficitBatch(tenantId: string, count: number): Promise { + const key = this.#deficitKey(); + + // Use Lua script to decrement by count and ensure non-negative + const result = await this.redis.drrDecrementDeficitBatch(key, tenantId, count.toString()); + return parseFloat(result); + } + #registerCommands(): void { // Atomic quantum addition with capping for multiple tenants this.redis.defineCommand("drrAddQuantum", { @@ -295,6 +318,27 @@ if newDeficit < 0 then newDeficit = 0 end +return tostring(newDeficit) + `, + }); + + // Atomic deficit decrement by count with floor at 0 + this.redis.defineCommand("drrDecrementDeficitBatch", { + numberOfKeys: 1, + lua: ` +local deficitKey = KEYS[1] +local tenantId = ARGV[1] +local count = tonumber(ARGV[2]) + +local newDeficit = redis.call('HINCRBYFLOAT', deficitKey, tenantId, -count) +newDeficit = tonumber(newDeficit) + +-- Floor at 0 +if newDeficit < 0 then + redis.call('HSET', deficitKey, tenantId, 0) + newDeficit = 0 +end + return tostring(newDeficit) `, }); @@ -312,6 +356,12 @@ declare module "@internal/redis" { ): Promise; drrDecrementDeficit(deficitKey: string, tenantId: string): Promise; + + drrDecrementDeficitBatch( + deficitKey: string, + tenantId: string, + count: string + ): Promise; } } diff --git a/packages/redis-worker/src/fair-queue/tests/concurrency.test.ts b/packages/redis-worker/src/fair-queue/tests/concurrency.test.ts index 4f6035f21e..7137ea8ac8 100644 --- a/packages/redis-worker/src/fair-queue/tests/concurrency.test.ts +++ b/packages/redis-worker/src/fair-queue/tests/concurrency.test.ts @@ -222,6 +222,167 @@ describe("ConcurrencyManager", () => { ); }); + describe("getAvailableCapacity", () => { + redisTest( + "should return available capacity for single group", + { timeout: 10000 }, + async ({ redisOptions }) => { + keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + + const manager = new ConcurrencyManager({ + redis: redisOptions, + keys, + groups: [ + { + name: "tenant", + extractGroupId: (q) => q.tenantId, + getLimit: async () => 10, + defaultLimit: 10, + }, + ], + }); + + const queue: QueueDescriptor = { + id: "queue-1", + tenantId: "t1", + metadata: {}, + }; + + // Initial capacity should be full + let capacity = await manager.getAvailableCapacity(queue); + expect(capacity).toBe(10); + + // Reserve 3 slots + await manager.reserve(queue, "msg-1"); + await manager.reserve(queue, "msg-2"); + await manager.reserve(queue, "msg-3"); + + // Capacity should be reduced + capacity = await manager.getAvailableCapacity(queue); + expect(capacity).toBe(7); + + await manager.close(); + } + ); + + redisTest( + "should return minimum capacity across multiple groups", + { timeout: 10000 }, + async ({ redisOptions }) => { + keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + + const manager = new ConcurrencyManager({ + redis: redisOptions, + keys, + groups: [ + { + name: "tenant", + extractGroupId: (q) => q.tenantId, + getLimit: async () => 5, + defaultLimit: 5, + }, + { + name: "organization", + extractGroupId: (q) => (q.metadata.orgId as string) ?? "default", + getLimit: async () => 20, + defaultLimit: 20, + }, + ], + }); + + const queue: QueueDescriptor = { + id: "queue-1", + tenantId: "t1", + metadata: { orgId: "org1" }, + }; + + // Initial capacity should be minimum (5 for tenant, 20 for org) + let capacity = await manager.getAvailableCapacity(queue); + expect(capacity).toBe(5); + + // Reserve 3 slots + await manager.reserve(queue, "msg-1"); + await manager.reserve(queue, "msg-2"); + await manager.reserve(queue, "msg-3"); + + // Now tenant has 2 left, org has 17 left - minimum is 2 + capacity = await manager.getAvailableCapacity(queue); + expect(capacity).toBe(2); + + await manager.close(); + } + ); + + redisTest( + "should return 0 when any group is at capacity", + { timeout: 10000 }, + async ({ redisOptions }) => { + keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + + const manager = new ConcurrencyManager({ + redis: redisOptions, + keys, + groups: [ + { + name: "tenant", + extractGroupId: (q) => q.tenantId, + getLimit: async () => 3, + defaultLimit: 3, + }, + { + name: "organization", + extractGroupId: (q) => (q.metadata.orgId as string) ?? "default", + getLimit: async () => 10, + defaultLimit: 10, + }, + ], + }); + + const queue: QueueDescriptor = { + id: "queue-1", + tenantId: "t1", + metadata: { orgId: "org1" }, + }; + + // Fill up tenant capacity + await manager.reserve(queue, "msg-1"); + await manager.reserve(queue, "msg-2"); + await manager.reserve(queue, "msg-3"); + + // Tenant is at 3/3, org is at 3/10 + const capacity = await manager.getAvailableCapacity(queue); + expect(capacity).toBe(0); + + await manager.close(); + } + ); + + redisTest( + "should return 0 when no groups are configured", + { timeout: 10000 }, + async ({ redisOptions }) => { + keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + + const manager = new ConcurrencyManager({ + redis: redisOptions, + keys, + groups: [], + }); + + const queue: QueueDescriptor = { + id: "queue-1", + tenantId: "t1", + metadata: {}, + }; + + const capacity = await manager.getAvailableCapacity(queue); + expect(capacity).toBe(0); + + await manager.close(); + } + ); + }); + describe("atomic reservation", () => { redisTest( "should atomically reserve across groups", diff --git a/packages/redis-worker/src/fair-queue/tests/drr.test.ts b/packages/redis-worker/src/fair-queue/tests/drr.test.ts index eb7e3e8337..66575d6486 100644 --- a/packages/redis-worker/src/fair-queue/tests/drr.test.ts +++ b/packages/redis-worker/src/fair-queue/tests/drr.test.ts @@ -115,6 +115,64 @@ describe("DRRScheduler", () => { await redis.quit(); }); + redisTest( + "should decrement deficit by count when using recordProcessedBatch", + { timeout: 10000 }, + async ({ redisOptions }) => { + keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + const redis = createRedisClient(redisOptions); + + const scheduler = new DRRScheduler({ + redis: redisOptions, + keys, + quantum: 5, + maxDeficit: 50, + }); + + // Manually set deficit + const deficitKey = `test:drr:deficit`; + await redis.hset(deficitKey, "t1", "15"); + + // Record batch processing of 7 messages + await scheduler.recordProcessedBatch("t1", "queue:q1", 7); + + const deficit = await scheduler.getDeficit("t1"); + expect(deficit).toBe(8); + + await scheduler.close(); + await redis.quit(); + } + ); + + redisTest( + "should not go below 0 when recordProcessedBatch decrements more than available", + { timeout: 10000 }, + async ({ redisOptions }) => { + keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + const redis = createRedisClient(redisOptions); + + const scheduler = new DRRScheduler({ + redis: redisOptions, + keys, + quantum: 5, + maxDeficit: 50, + }); + + // Manually set deficit to 3 + const deficitKey = `test:drr:deficit`; + await redis.hset(deficitKey, "t1", "3"); + + // Record batch processing of 10 messages (more than deficit) + await scheduler.recordProcessedBatch("t1", "queue:q1", 10); + + const deficit = await scheduler.getDeficit("t1"); + expect(deficit).toBe(0); + + await scheduler.close(); + await redis.quit(); + } + ); + redisTest("should reset deficit for tenant", { timeout: 10000 }, async ({ redisOptions }) => { keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); const redis = createRedisClient(redisOptions); diff --git a/packages/redis-worker/src/fair-queue/tests/visibility.test.ts b/packages/redis-worker/src/fair-queue/tests/visibility.test.ts index 817a23dbf6..b35eb874fc 100644 --- a/packages/redis-worker/src/fair-queue/tests/visibility.test.ts +++ b/packages/redis-worker/src/fair-queue/tests/visibility.test.ts @@ -323,6 +323,128 @@ describe("VisibilityManager", () => { await manager.close(); } ); + + redisTest( + "should claim all available messages when queue has fewer than maxCount", + { timeout: 10000 }, + async ({ redisOptions }) => { + keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + + const manager = new VisibilityManager({ + redis: redisOptions, + keys, + shardCount: 1, + defaultTimeoutMs: 5000, + }); + + const redis = createRedisClient(redisOptions); + const queueId = "tenant:t1:queue:partial-batch"; + const queueKey = keys.queueKey(queueId); + const queueItemsKey = keys.queueItemsKey(queueId); + + // Add only 3 messages to the queue + for (let i = 1; i <= 3; i++) { + const messageId = `msg-${i}`; + const storedMessage = { + id: messageId, + queueId, + tenantId: "t1", + payload: { value: `test-${i}` }, + timestamp: Date.now() - (4 - i) * 1000, + attempt: 1, + }; + await redis.zadd(queueKey, storedMessage.timestamp, messageId); + await redis.hset(queueItemsKey, messageId, JSON.stringify(storedMessage)); + } + + // Request 10 messages but only 3 exist + const claimed = await manager.claimBatch(queueId, queueKey, queueItemsKey, "consumer-1", 10); + + expect(claimed).toHaveLength(3); + expect(claimed[0]!.messageId).toBe("msg-1"); + expect(claimed[1]!.messageId).toBe("msg-2"); + expect(claimed[2]!.messageId).toBe("msg-3"); + + // Verify all messages are in in-flight set + const inflightCount = await manager.getTotalInflightCount(); + expect(inflightCount).toBe(3); + + // Verify queue is empty + const remainingCount = await redis.zcard(queueKey); + expect(remainingCount).toBe(0); + + await manager.close(); + await redis.quit(); + } + ); + + redisTest( + "should skip corrupted messages and continue claiming valid ones", + { timeout: 10000 }, + async ({ redisOptions }) => { + keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + + const manager = new VisibilityManager({ + redis: redisOptions, + keys, + shardCount: 1, + defaultTimeoutMs: 5000, + }); + + const redis = createRedisClient(redisOptions); + const queueId = "tenant:t1:queue:corrupted-batch"; + const queueKey = keys.queueKey(queueId); + const queueItemsKey = keys.queueItemsKey(queueId); + + // Add valid message 1 + const storedMessage1 = { + id: "msg-1", + queueId, + tenantId: "t1", + payload: { value: "test-1" }, + timestamp: Date.now() - 3000, + attempt: 1, + }; + await redis.zadd(queueKey, storedMessage1.timestamp, "msg-1"); + await redis.hset(queueItemsKey, "msg-1", JSON.stringify(storedMessage1)); + + // Add corrupted message 2 (invalid JSON) + await redis.zadd(queueKey, Date.now() - 2000, "msg-2"); + await redis.hset(queueItemsKey, "msg-2", "not-valid-json{{{"); + + // Add valid message 3 + const storedMessage3 = { + id: "msg-3", + queueId, + tenantId: "t1", + payload: { value: "test-3" }, + timestamp: Date.now() - 1000, + attempt: 1, + }; + await redis.zadd(queueKey, storedMessage3.timestamp, "msg-3"); + await redis.hset(queueItemsKey, "msg-3", JSON.stringify(storedMessage3)); + + // Claim all 3 messages + const claimed = await manager.claimBatch(queueId, queueKey, queueItemsKey, "consumer-1", 5); + + // Should only return the 2 valid messages + expect(claimed).toHaveLength(2); + expect(claimed[0]!.messageId).toBe("msg-1"); + expect(claimed[1]!.messageId).toBe("msg-3"); + + // Corrupted message should have been removed from in-flight + // Valid messages should be in in-flight + const inflightCount = await manager.getTotalInflightCount(); + expect(inflightCount).toBe(2); + + // Queue should be empty (all messages processed or removed) + const remainingCount = await redis.zcard(queueKey); + expect(remainingCount).toBe(0); + + await manager.close(); + await redis.quit(); + } + ); }); describe("releaseBatch", () => { diff --git a/packages/redis-worker/src/fair-queue/types.ts b/packages/redis-worker/src/fair-queue/types.ts index 6e44689db0..42b7bbf207 100644 --- a/packages/redis-worker/src/fair-queue/types.ts +++ b/packages/redis-worker/src/fair-queue/types.ts @@ -186,6 +186,13 @@ export interface FairScheduler { */ recordProcessed?(tenantId: string, queueId: string): Promise; + /** + * Called after processing multiple messages to update scheduler state. + * Batch variant for efficiency - reduces Redis calls when processing multiple messages. + * Optional - falls back to calling recordProcessed multiple times if not implemented. + */ + recordProcessedBatch?(tenantId: string, queueId: string, count: number): Promise; + /** * Initialize the scheduler (called once on startup). */ @@ -366,6 +373,10 @@ export interface FairQueueOptions; From 2247b1f4910215cb44913cda8e167d40572d3eb8 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 7 Jan 2026 10:30:26 +0000 Subject: [PATCH 05/14] fix typecheck error --- .../src/fair-queue/tests/fairQueue.test.ts | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts b/packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts index f2bd2793ff..28448a9fe4 100644 --- a/packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts +++ b/packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts @@ -909,16 +909,14 @@ describe("FairQueue", () => { enabled: true, blockingTimeoutSeconds: 1, }, - concurrency: { - groups: [ - { - name: "tenant", - extractGroupId: (q) => q.tenantId, - getLimit: async () => 2, // Limit to 2 concurrent per tenant - defaultLimit: 2, - }, - ], - }, + concurrencyGroups: [ + { + name: "tenant", + extractGroupId: (q) => q.tenantId, + getLimit: async () => 2, // Limit to 2 concurrent per tenant + defaultLimit: 2, + }, + ], startConsumers: false, }); From 08753e0c5359368d103843e03e03f17d71065aba Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 7 Jan 2026 10:32:46 +0000 Subject: [PATCH 06/14] allow setting workerQueueBlockingTimeoutSeconds to 0 --- .../run-engine/src/batch-queue/index.ts | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/internal-packages/run-engine/src/batch-queue/index.ts b/internal-packages/run-engine/src/batch-queue/index.ts index ba662e2619..0e10b11c6f 100644 --- a/internal-packages/run-engine/src/batch-queue/index.ts +++ b/internal-packages/run-engine/src/batch-queue/index.ts @@ -134,12 +134,13 @@ export class BatchQueue { }, // Enable two-stage processing with worker queues for better parallelism (when configured) // Worker queues provide better concurrency by separating queue selection from message processing - workerQueue: options.workerQueueBlockingTimeoutSeconds - ? { - enabled: true, - blockingTimeoutSeconds: options.workerQueueBlockingTimeoutSeconds, - } - : undefined, + workerQueue: + options.workerQueueBlockingTimeoutSeconds !== undefined + ? { + enabled: true, + blockingTimeoutSeconds: options.workerQueueBlockingTimeoutSeconds, + } + : undefined, // Concurrency group based on tenant (environment) // This limits how many batch items can be processed concurrently per environment // Items wait in queue until capacity frees up From 50c462fc7eb4379bb80de70aea4f71f62e8eed9e Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 7 Jan 2026 10:36:21 +0000 Subject: [PATCH 07/14] Add a short delay for the master queue consumer loop --- packages/redis-worker/src/fair-queue/index.ts | 33 +++++++++---------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/packages/redis-worker/src/fair-queue/index.ts b/packages/redis-worker/src/fair-queue/index.ts index d55315cf42..23cf305bcc 100644 --- a/packages/redis-worker/src/fair-queue/index.ts +++ b/packages/redis-worker/src/fair-queue/index.ts @@ -788,23 +788,22 @@ export class FairQueue { this.batchedSpanManager.markForRotation(loopId); } - // Only wait if there was no work (avoid spinning when idle) - // When there's work, immediately process the next batch - if (!hadWork) { - await new Promise((resolve, reject) => { - const abortHandler = () => { - clearTimeout(timeout); - reject(new Error("AbortError")); - }; - const timeout = setTimeout(() => { - // Must remove listener when timeout fires, otherwise listeners accumulate - // (the { once: true } option only removes on abort, not on timeout) - this.abortController.signal.removeEventListener("abort", abortHandler); - resolve(); - }, this.consumerIntervalMs); - this.abortController.signal.addEventListener("abort", abortHandler, { once: true }); - }); - } + // Wait between iterations to prevent CPU spin + // Short delay when there's work (yield to event loop), longer delay when idle + const waitMs = hadWork ? 1 : this.consumerIntervalMs; + await new Promise((resolve, reject) => { + const abortHandler = () => { + clearTimeout(timeout); + reject(new Error("AbortError")); + }; + const timeout = setTimeout(() => { + // Must remove listener when timeout fires, otherwise listeners accumulate + // (the { once: true } option only removes on abort, not on timeout) + this.abortController.signal.removeEventListener("abort", abortHandler); + resolve(); + }, waitMs); + this.abortController.signal.addEventListener("abort", abortHandler, { once: true }); + }); } } catch (error) { if (error instanceof Error && error.message === "AbortError") { From ed25caa92b1bd6a76511ef894d198b7646c106e6 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 7 Jan 2026 12:32:27 +0000 Subject: [PATCH 08/14] move the worker queue consumer to the batch queue instead of being inside the fair queue --- .../run-engine/src/batch-queue/index.ts | 216 ++++- .../run-engine/src/batch-queue/types.ts | 4 + packages/redis-worker/src/fair-queue/index.ts | 735 +++++------------- .../src/fair-queue/tests/fairQueue.test.ts | 327 ++++++-- .../fair-queue/tests/raceConditions.test.ts | 221 +++++- packages/redis-worker/src/fair-queue/types.ts | 22 +- 6 files changed, 831 insertions(+), 694 deletions(-) diff --git a/internal-packages/run-engine/src/batch-queue/index.ts b/internal-packages/run-engine/src/batch-queue/index.ts index 0e10b11c6f..7c67c6824d 100644 --- a/internal-packages/run-engine/src/batch-queue/index.ts +++ b/internal-packages/run-engine/src/batch-queue/index.ts @@ -11,7 +11,10 @@ import { FairQueue, DRRScheduler, CallbackFairQueueKeyProducer, + WorkerQueueManager, + BatchedSpanManager, type FairQueueOptions, + type StoredMessage, } from "@trigger.dev/redis-worker"; import { Logger } from "@trigger.dev/core/logger"; import type { @@ -48,8 +51,14 @@ export { BatchCompletionTracker } from "./completionTracker.js"; // Redis key for environment concurrency limits const ENV_CONCURRENCY_KEY_PREFIX = "batch:env_concurrency"; +// Single worker queue ID for all batch items +// BatchQueue uses a single shared worker queue - FairQueue handles fair scheduling, +// then all messages are routed to this queue for BatchQueue's own consumer loop. +const BATCH_WORKER_QUEUE_ID = "batch-worker-queue"; + export class BatchQueue { private fairQueue: FairQueue; + private workerQueueManager: WorkerQueueManager; private completionTracker: BatchCompletionTracker; private logger: Logger; private tracer?: Tracer; @@ -59,6 +68,13 @@ export class BatchQueue { private processItemCallback?: ProcessBatchItemCallback; private completionCallback?: BatchCompletionCallback; + // Consumer loop state + private isRunning = false; + private abortController: AbortController; + private workerQueueConsumerLoops: Promise[] = []; + private workerQueueBlockingTimeoutSeconds: number; + private batchedSpanManager: BatchedSpanManager; + // Metrics private batchesEnqueuedCounter?: Counter; private itemsEnqueuedCounter?: Counter; @@ -72,6 +88,8 @@ export class BatchQueue { this.logger = options.logger ?? new Logger("BatchQueue", options.logLevel ?? "info"); this.tracer = options.tracer; this.defaultConcurrency = options.defaultConcurrency ?? 10; + this.abortController = new AbortController(); + this.workerQueueBlockingTimeoutSeconds = options.workerQueueBlockingTimeoutSeconds ?? 10; // Initialize metrics if meter is provided if (options.meter) { @@ -116,6 +134,8 @@ export class BatchQueue { }); // Create FairQueue with telemetry and environment-based concurrency limiting + // FairQueue handles fair scheduling and routes messages to the batch worker queue + // BatchQueue runs its own consumer loop to process messages from the worker queue const fairQueueOptions: FairQueueOptions = { redis: options.redis, keys: keyProducer, @@ -132,15 +152,11 @@ export class BatchQueue { threshold: 5, periodMs: 5_000, }, - // Enable two-stage processing with worker queues for better parallelism (when configured) - // Worker queues provide better concurrency by separating queue selection from message processing - workerQueue: - options.workerQueueBlockingTimeoutSeconds !== undefined - ? { - enabled: true, - blockingTimeoutSeconds: options.workerQueueBlockingTimeoutSeconds, - } - : undefined, + // Worker queue configuration - FairQueue routes all messages to our single worker queue + workerQueue: { + // All batch items go to the same worker queue - BatchQueue handles consumption + resolveWorkerQueue: () => BATCH_WORKER_QUEUE_ID, + }, // Concurrency group based on tenant (environment) // This limits how many batch items can be processed concurrently per environment // Items wait in queue until capacity frees up @@ -167,6 +183,24 @@ export class BatchQueue { this.fairQueue = new FairQueue(fairQueueOptions); + // Create worker queue manager for consuming from the batch worker queue + this.workerQueueManager = new WorkerQueueManager({ + redis: options.redis, + keys: keyProducer, + logger: { + debug: (msg, ctx) => this.logger.debug(msg, ctx), + error: (msg, ctx) => this.logger.error(msg, ctx), + }, + }); + + // Initialize batched span manager for worker queue consumer tracing + this.batchedSpanManager = new BatchedSpanManager({ + tracer: options.tracer, + name: "batch-queue-worker", + maxIterations: options.consumerTraceMaxIterations ?? 1000, + timeoutSeconds: options.consumerTraceTimeoutSeconds ?? 60, + }); + // Create completion tracker this.completionTracker = new BatchCompletionTracker({ redis: options.redis, @@ -177,11 +211,6 @@ export class BatchQueue { }, }); - // Set up message handler - this.fairQueue.onMessage(async (ctx) => { - await this.#handleMessage(ctx); - }); - // Register telemetry gauge callbacks for observable metrics // Note: observedTenants is not provided since tenant list is dynamic this.fairQueue.registerTelemetryGauges(); @@ -420,13 +449,31 @@ export class BatchQueue { /** * Start the consumer loops. + * FairQueue runs the master queue consumer loop (claim and push to worker queue). + * BatchQueue runs its own worker queue consumer loops to process messages. */ start(): void { + if (this.isRunning) { + return; + } + + this.isRunning = true; + this.abortController = new AbortController(); + + // Start FairQueue's master queue consumers (routes messages to worker queue) this.fairQueue.start(); + + // Start worker queue consumer loops + for (let consumerId = 0; consumerId < this.options.consumerCount; consumerId++) { + const loop = this.#runWorkerQueueConsumerLoop(consumerId); + this.workerQueueConsumerLoops.push(loop); + } + this.logger.info("BatchQueue consumers started", { consumerCount: this.options.consumerCount, intervalMs: this.options.consumerIntervalMs, drrQuantum: this.options.drr.quantum, + workerQueueId: BATCH_WORKER_QUEUE_ID, }); } @@ -434,7 +481,20 @@ export class BatchQueue { * Stop the consumer loops gracefully. */ async stop(): Promise { + if (!this.isRunning) { + return; + } + + this.isRunning = false; + this.abortController.abort(); + + // Stop FairQueue's master queue consumers await this.fairQueue.stop(); + + // Wait for worker queue consumer loops to finish + await Promise.allSettled(this.workerQueueConsumerLoops); + this.workerQueueConsumerLoops = []; + this.logger.info("BatchQueue consumers stopped"); } @@ -442,7 +502,9 @@ export class BatchQueue { * Close the BatchQueue and all Redis connections. */ async close(): Promise { + await this.stop(); await this.fairQueue.close(); + await this.workerQueueManager.close(); await this.completionTracker.close(); await this.concurrencyRedis.quit(); } @@ -526,26 +588,102 @@ export class BatchQueue { }); } + // ============================================================================ + // Private - Worker Queue Consumer Loop + // ============================================================================ + + /** + * Run a worker queue consumer loop. + * This pops messages from the batch worker queue and processes them. + */ + async #runWorkerQueueConsumerLoop(consumerId: number): Promise { + const loopId = `batch-worker-${consumerId}`; + + // Initialize batched span tracking for this loop + this.batchedSpanManager.initializeLoop(loopId); + + try { + while (this.isRunning) { + if (!this.processItemCallback) { + await new Promise((resolve) => setTimeout(resolve, 100)); + continue; + } + + try { + await this.batchedSpanManager.withBatchedSpan( + loopId, + async (span) => { + span.setAttribute("consumer_id", consumerId); + + // Blocking pop from worker queue + const messageKey = await this.workerQueueManager.blockingPop( + BATCH_WORKER_QUEUE_ID, + this.workerQueueBlockingTimeoutSeconds, + this.abortController.signal + ); + + if (!messageKey) { + this.batchedSpanManager.incrementStat(loopId, "empty_iterations"); + return false; // Timeout, no work + } + + // Parse message key (format: "messageId:queueId") + const colonIndex = messageKey.indexOf(":"); + if (colonIndex === -1) { + this.logger.error("Invalid message key format", { messageKey }); + this.batchedSpanManager.incrementStat(loopId, "invalid_message_keys"); + return false; + } + + const messageId = messageKey.substring(0, colonIndex); + const queueId = messageKey.substring(colonIndex + 1); + + await this.#handleMessage(loopId, messageId, queueId); + this.batchedSpanManager.incrementStat(loopId, "messages_processed"); + return true; // Had work + }, + { + iterationSpanName: "processWorkerQueueMessage", + attributes: { consumer_id: consumerId }, + } + ); + } catch (error) { + if (this.abortController.signal.aborted) { + break; + } + this.logger.error("Worker queue consumer error", { + loopId, + error: error instanceof Error ? error.message : String(error), + }); + this.batchedSpanManager.markForRotation(loopId); + } + } + } catch (error) { + if (error instanceof Error && error.name === "AbortError") { + this.logger.debug("Worker queue consumer aborted", { loopId }); + this.batchedSpanManager.cleanup(loopId); + return; + } + throw error; + } finally { + this.batchedSpanManager.cleanup(loopId); + } + } + // ============================================================================ // Private - Message Handling // ============================================================================ - async #handleMessage(ctx: { - message: { - id: string; - queueId: string; - payload: BatchItemPayload; - timestamp: number; - attempt: number; - }; - queue: { id: string; tenantId: string }; - consumerId: string; - heartbeat: () => Promise; - complete: () => Promise; - release: () => Promise; - fail: (error?: Error) => Promise; - }): Promise { - const { batchId, friendlyId, itemIndex, item } = ctx.message.payload; + async #handleMessage(consumerId: string, messageId: string, queueId: string): Promise { + // Get message data from FairQueue's in-flight storage + const storedMessage = await this.fairQueue.getMessageData(messageId, queueId); + + if (!storedMessage) { + this.logger.error("Message not found in in-flight data", { messageId, queueId }); + return; + } + + const { batchId, friendlyId, itemIndex, item } = storedMessage.payload; return this.#startSpan("BatchQueue.handleMessage", async (span) => { span?.setAttributes({ @@ -553,13 +691,13 @@ export class BatchQueue { "batch.friendlyId": friendlyId, "batch.itemIndex": itemIndex, "batch.task": item.task, - "batch.consumerId": ctx.consumerId, - "batch.attempt": ctx.message.attempt, + "batch.consumerId": consumerId, + "batch.attempt": storedMessage.attempt, }); // Record queue time metric (time from enqueue to processing) - const queueTimeMs = Date.now() - ctx.message.timestamp; - this.itemQueueTimeHistogram?.record(queueTimeMs, { envId: ctx.queue.tenantId }); + const queueTimeMs = Date.now() - storedMessage.timestamp; + this.itemQueueTimeHistogram?.record(queueTimeMs, { envId: storedMessage.tenantId }); span?.setAttribute("batch.queueTimeMs", queueTimeMs); this.logger.debug("Processing batch item", { @@ -567,15 +705,15 @@ export class BatchQueue { friendlyId, itemIndex, task: item.task, - consumerId: ctx.consumerId, - attempt: ctx.message.attempt, + consumerId, + attempt: storedMessage.attempt, queueTimeMs, }); if (!this.processItemCallback) { this.logger.error("No process item callback set", { batchId, itemIndex }); // Still complete the message to avoid blocking - await ctx.complete(); + await this.fairQueue.completeMessage(messageId, queueId); return; } @@ -586,7 +724,7 @@ export class BatchQueue { if (!meta) { this.logger.error("Batch metadata not found", { batchId, itemIndex }); - await ctx.complete(); + await this.fairQueue.completeMessage(messageId, queueId); return; } @@ -722,7 +860,7 @@ export class BatchQueue { // This must happen after recording success/failure to ensure the counter // is updated before the message is considered done await this.#startSpan("BatchQueue.completeMessage", async () => { - return ctx.complete(); + return this.fairQueue.completeMessage(messageId, queueId); }); // Check if all items have been processed using atomic counter diff --git a/internal-packages/run-engine/src/batch-queue/types.ts b/internal-packages/run-engine/src/batch-queue/types.ts index db91c87acc..3ff34fd4a6 100644 --- a/internal-packages/run-engine/src/batch-queue/types.ts +++ b/internal-packages/run-engine/src/batch-queue/types.ts @@ -222,6 +222,10 @@ export type BatchQueueOptions = { tracer?: Tracer; /** OpenTelemetry meter for metrics */ meter?: Meter; + /** Maximum iterations before rotating consumer loop trace span (default: 1000) */ + consumerTraceMaxIterations?: number; + /** Maximum seconds before rotating consumer loop trace span (default: 60) */ + consumerTraceTimeoutSeconds?: number; }; /** diff --git a/packages/redis-worker/src/fair-queue/index.ts b/packages/redis-worker/src/fair-queue/index.ts index 23cf305bcc..8ed2772066 100644 --- a/packages/redis-worker/src/fair-queue/index.ts +++ b/packages/redis-worker/src/fair-queue/index.ts @@ -22,11 +22,8 @@ import type { FairQueueOptions, FairScheduler, GlobalRateLimiter, - MessageHandler, - MessageHandlerContext, QueueCooloffState, QueueDescriptor, - QueueMessage, SchedulerContext, StoredMessage, } from "./types.js"; @@ -46,17 +43,21 @@ export * from "./retry.js"; export * from "./telemetry.js"; /** - * FairQueue is the main orchestrator for fair queue processing. + * FairQueue is the main orchestrator for fair queue message routing. * - * It coordinates: + * FairQueue handles: * - Master queue with sharding (using jump consistent hash) * - Fair scheduling via pluggable schedulers * - Multi-level concurrency limiting * - Visibility timeouts with heartbeats - * - Worker queues with blocking pop + * - Routing messages to worker queues * - Retry strategies with dead letter queue * - OpenTelemetry tracing and metrics * + * External consumers are responsible for: + * - Running their own worker queue consumer loops + * - Calling complete/release/fail APIs after processing + * * @typeParam TPayloadSchema - Zod schema for message payload validation */ export class FairQueue { @@ -66,7 +67,7 @@ export class FairQueue { private masterQueue: MasterQueue; private concurrencyManager?: ConcurrencyManager; private visibilityManager: VisibilityManager; - private workerQueueManager?: WorkerQueueManager; + private workerQueueManager: WorkerQueueManager; private telemetry: FairQueueTelemetry; private logger: Logger; @@ -81,9 +82,7 @@ export class FairQueue { private visibilityTimeoutMs: number; private heartbeatIntervalMs: number; private reclaimIntervalMs: number; - private workerQueueEnabled: boolean; - private workerQueueBlockingTimeoutSeconds: number; - private workerQueueResolver?: (message: StoredMessage>) => string; + private workerQueueResolver: (message: StoredMessage>) => string; private batchClaimSize: number; // Cooloff state @@ -102,11 +101,9 @@ export class FairQueue { private batchedSpanManager: BatchedSpanManager; // Runtime state - private messageHandler?: MessageHandler>; private isRunning = false; private abortController: AbortController; private masterQueueConsumerLoops: Promise[] = []; - private workerQueueConsumerLoops: Promise[] = []; private reclaimLoop?: Promise; // Queue descriptor cache for message processing @@ -135,10 +132,8 @@ export class FairQueue { this.heartbeatIntervalMs = options.heartbeatIntervalMs ?? this.visibilityTimeoutMs / 3; this.reclaimIntervalMs = options.reclaimIntervalMs ?? 5_000; - // Worker queue - this.workerQueueEnabled = options.workerQueue?.enabled ?? false; - this.workerQueueBlockingTimeoutSeconds = options.workerQueue?.blockingTimeoutSeconds ?? 10; - this.workerQueueResolver = options.workerQueue?.resolveWorkerQueue; + // Worker queue resolver (required) + this.workerQueueResolver = options.workerQueue.resolveWorkerQueue; // Batch claiming this.batchClaimSize = options.batchClaimSize ?? 10; @@ -201,16 +196,15 @@ export class FairQueue { }, }); - if (this.workerQueueEnabled) { - this.workerQueueManager = new WorkerQueueManager({ - redis: options.redis, - keys: options.keys, - logger: { - debug: (msg, ctx) => this.logger.debug(msg, ctx), - error: (msg, ctx) => this.logger.error(msg, ctx), - }, - }); - } + // Worker queue manager for pushing messages to worker queues + this.workerQueueManager = new WorkerQueueManager({ + redis: options.redis, + keys: options.keys, + logger: { + debug: (msg, ctx) => this.logger.debug(msg, ctx), + error: (msg, ctx) => this.logger.error(msg, ctx), + }, + }); this.#registerCommands(); @@ -246,17 +240,6 @@ export class FairQueue { }); } - // ============================================================================ - // Public API - Message Handler - // ============================================================================ - - /** - * Set the message handler for processing dequeued messages. - */ - onMessage(handler: MessageHandler>): void { - this.messageHandler = handler; - } - // ============================================================================ // Public API - Enqueueing // ============================================================================ @@ -629,7 +612,9 @@ export class FairQueue { // ============================================================================ /** - * Start the consumer loops and reclaim loop. + * Start the master queue consumer loops and reclaim loop. + * FairQueue claims messages and pushes them to worker queues. + * External consumers are responsible for consuming from worker queues. */ start(): void { if (this.isRunning) { @@ -639,36 +624,19 @@ export class FairQueue { this.isRunning = true; this.abortController = new AbortController(); - if (this.workerQueueEnabled && this.workerQueueManager) { - // Two-stage processing: master queue consumers push to worker queues - // Start master queue consumers (one per shard) - for (let shardId = 0; shardId < this.shardCount; shardId++) { - const loop = this.#runMasterQueueConsumerLoop(shardId); - this.masterQueueConsumerLoops.push(loop); - } - - // Start worker queue consumers (multiple per consumer count) - for (let consumerId = 0; consumerId < this.consumerCount; consumerId++) { - const loop = this.#runWorkerQueueConsumerLoop(consumerId); - this.workerQueueConsumerLoops.push(loop); - } - } else { - // Direct processing: consumers process from message queues directly - for (let consumerId = 0; consumerId < this.consumerCount; consumerId++) { - for (let shardId = 0; shardId < this.shardCount; shardId++) { - const loop = this.#runDirectConsumerLoop(consumerId, shardId); - this.masterQueueConsumerLoops.push(loop); - } - } + // Start master queue consumers (one per shard) + // These claim messages from queues and push to worker queues + for (let shardId = 0; shardId < this.shardCount; shardId++) { + const loop = this.#runMasterQueueConsumerLoop(shardId); + this.masterQueueConsumerLoops.push(loop); } - // Start reclaim loop + // Start reclaim loop for handling timed-out messages this.reclaimLoop = this.#runReclaimLoop(); this.logger.info("FairQueue started", { consumerCount: this.consumerCount, shardCount: this.shardCount, - workerQueueEnabled: this.workerQueueEnabled, consumerIntervalMs: this.consumerIntervalMs, }); } @@ -684,14 +652,9 @@ export class FairQueue { this.isRunning = false; this.abortController.abort(); - await Promise.allSettled([ - ...this.masterQueueConsumerLoops, - ...this.workerQueueConsumerLoops, - this.reclaimLoop, - ]); + await Promise.allSettled([...this.masterQueueConsumerLoops, this.reclaimLoop]); this.masterQueueConsumerLoops = []; - this.workerQueueConsumerLoops = []; this.reclaimLoop = undefined; this.logger.info("FairQueue stopped"); @@ -710,7 +673,7 @@ export class FairQueue { this.masterQueue.close(), this.concurrencyManager?.close(), this.visibilityManager.close(), - this.workerQueueManager?.close(), + this.workerQueueManager.close(), this.scheduler.close?.(), this.redis.quit(), ]); @@ -824,6 +787,11 @@ export class FairQueue { ): Promise { const masterQueueKey = this.keys.masterQueueKey(shardId); + // Get total queues in this master queue shard for observability + const masterQueueSize = await this.masterQueue.getShardQueueCount(shardId); + parentSpan?.setAttribute("master_queue_size", masterQueueSize); + this.batchedSpanManager.incrementStat(loopId, "master_queue_size_sum", masterQueueSize); + // Create scheduler context const schedulerContext = this.#createSchedulerContext(); @@ -833,6 +801,7 @@ export class FairQueue { async (span) => { span.setAttribute(FairQueueAttributes.SHARD_ID, shardId.toString()); span.setAttribute(FairQueueAttributes.CONSUMER_ID, loopId); + span.setAttribute("master_queue_size", masterQueueSize); const result = await this.scheduler.selectQueues(masterQueueKey, loopId, schedulerContext); span.setAttribute("tenant_count", result.length); span.setAttribute( @@ -989,13 +958,6 @@ export class FairQueue { return 0; } - // Single shared worker queue pattern: - // All consumers pop from one queue ("worker-queue") for atomic distribution. - // Trade-off: Simpler code and fair distribution vs. potential contention - // under very high load (>10k messages/sec). For most workloads, Redis - // can handle 100k+ ops/sec on a single key, so this is rarely a bottleneck. - // Future: Consider adding optional worker queue sharding if needed. - const workerQueueId = "worker-queue"; let processedCount = 0; // Reserve concurrency and push each message to worker queue @@ -1020,9 +982,12 @@ export class FairQueue { } } - // Push to worker queue + // Resolve which worker queue this message should go to + const workerQueueId = this.workerQueueResolver(message.payload); + + // Push to worker queue with format "messageId:queueId" const messageKey = `${message.messageId}:${queueId}`; - await this.workerQueueManager!.push(workerQueueId, messageKey); + await this.workerQueueManager.push(workerQueueId, messageKey); processedCount++; } @@ -1034,550 +999,212 @@ export class FairQueue { } // ============================================================================ - // Private - Worker Queue Consumer Loop (Two-Stage) + // Public API - Message Lifecycle (for external consumers) // ============================================================================ - async #runWorkerQueueConsumerLoop(consumerId: number): Promise { - const loopId = `worker-${consumerId}`; - const workerQueueId = "worker-queue"; // All consumers share a single worker queue - - // Initialize batched span tracking for this loop - this.batchedSpanManager.initializeLoop(loopId); - - try { - while (this.isRunning) { - if (!this.messageHandler) { - await new Promise((resolve) => setTimeout(resolve, this.consumerIntervalMs)); - continue; - } - - try { - // Blocking pop from worker queue - const messageKey = await this.workerQueueManager!.blockingPop( - workerQueueId, - this.workerQueueBlockingTimeoutSeconds, - this.abortController.signal - ); - - if (!messageKey) { - this.batchedSpanManager.incrementStat(loopId, "blocking_pop_timeouts"); - continue; // Timeout, loop again - } - - // Parse message key - const colonIndex = messageKey.indexOf(":"); - if (colonIndex === -1) { - this.logger.error("Invalid message key format", { messageKey }); - this.batchedSpanManager.incrementStat(loopId, "invalid_message_keys"); - continue; - } - - const messageId = messageKey.substring(0, colonIndex); - const queueId = messageKey.substring(colonIndex + 1); - - await this.batchedSpanManager.withBatchedSpan( - loopId, - async (span) => { - span.setAttribute("consumer_id", consumerId); - span.setAttribute(FairQueueAttributes.MESSAGE_ID, messageId); - span.setAttribute(FairQueueAttributes.QUEUE_ID, queueId); - await this.#processMessageFromWorkerQueue(loopId, messageId, queueId); - this.batchedSpanManager.incrementStat(loopId, "messages_processed"); - }, - { - iterationSpanName: "processMessageFromWorkerQueue", - attributes: { - consumer_id: consumerId, - [FairQueueAttributes.MESSAGE_ID]: messageId, - [FairQueueAttributes.QUEUE_ID]: queueId, - }, - } - ); - } catch (error) { - if (this.abortController.signal.aborted) { - break; - } - this.logger.error("Worker queue consumer error", { - loopId, - error: error instanceof Error ? error.message : String(error), - }); - this.batchedSpanManager.markForRotation(loopId); - } - } - } catch (error) { - if (error instanceof Error && error.name === "AbortError") { - this.logger.debug("Worker queue consumer aborted", { loopId }); - this.batchedSpanManager.cleanup(loopId); - return; - } - throw error; - } finally { - this.batchedSpanManager.cleanup(loopId); - } - } - - async #processMessageFromWorkerQueue( - loopId: string, + /** + * Get message data from in-flight storage. + * External consumers use this to retrieve the stored message after popping from worker queue. + * + * @param messageId - The ID of the message + * @param queueId - The queue ID the message belongs to + * @returns The stored message or null if not found + */ + async getMessageData( messageId: string, queueId: string - ): Promise { - // Get message data from in-flight + ): Promise> | null> { const shardId = this.masterQueue.getShardForQueue(queueId); const inflightDataKey = this.keys.inflightDataKey(shardId); const dataJson = await this.redis.hget(inflightDataKey, messageId); if (!dataJson) { - this.logger.error("Message not found in in-flight data", { messageId, queueId }); - return; + return null; } - let storedMessage: StoredMessage>; try { - storedMessage = JSON.parse(dataJson); + return JSON.parse(dataJson) as StoredMessage>; } catch { this.logger.error("Failed to parse message data", { messageId, queueId }); - return; + return null; } - - await this.#processMessage(loopId, storedMessage, queueId); } - // ============================================================================ - // Private - Direct Consumer Loop (No Worker Queue) - // ============================================================================ - - async #runDirectConsumerLoop(consumerId: number, shardId: number): Promise { - const loopId = `consumer-${consumerId}-shard-${shardId}`; - - // Initialize batched span tracking for this loop - this.batchedSpanManager.initializeLoop(loopId); - - try { - for await (const _ of setInterval(this.consumerIntervalMs, null, { - signal: this.abortController.signal, - })) { - if (!this.messageHandler) { - continue; - } - - try { - await this.batchedSpanManager.withBatchedSpan( - loopId, - async (span) => { - span.setAttribute("consumer_id", consumerId); - span.setAttribute("shard_id", shardId); - await this.#processDirectIteration(loopId, shardId, span); - }, - { - iterationSpanName: "processDirectIteration", - attributes: { consumer_id: consumerId, shard_id: shardId }, - } - ); - } catch (error) { - this.logger.error("Direct consumer iteration error", { - loopId, - error: error instanceof Error ? error.message : String(error), - }); - this.batchedSpanManager.markForRotation(loopId); - } - } - } catch (error) { - if (error instanceof Error && error.name === "AbortError") { - this.logger.debug("Direct consumer loop aborted", { loopId }); - this.batchedSpanManager.cleanup(loopId); - return; - } - throw error; - } finally { - this.batchedSpanManager.cleanup(loopId); - } + /** + * Extend the visibility timeout for a message. + * External consumers should call this periodically during long-running processing. + * + * @param messageId - The ID of the message + * @param queueId - The queue ID the message belongs to + * @returns true if heartbeat was successful + */ + async heartbeatMessage(messageId: string, queueId: string): Promise { + return this.visibilityManager.heartbeat(messageId, queueId, this.heartbeatIntervalMs); } - async #processDirectIteration(loopId: string, shardId: number, parentSpan?: Span): Promise { + /** + * Mark a message as successfully processed. + * This removes the message from in-flight and releases concurrency. + * + * @param messageId - The ID of the message + * @param queueId - The queue ID the message belongs to + */ + async completeMessage(messageId: string, queueId: string): Promise { + const shardId = this.masterQueue.getShardForQueue(queueId); + const queueKey = this.keys.queueKey(queueId); const masterQueueKey = this.keys.masterQueueKey(shardId); + const inflightDataKey = this.keys.inflightDataKey(shardId); - // Create scheduler context - const schedulerContext = this.#createSchedulerContext(); - - // Get queues to process from scheduler - const tenantQueues = await this.telemetry.trace( - "selectQueues", - async (span) => { - span.setAttribute(FairQueueAttributes.SHARD_ID, shardId.toString()); - span.setAttribute(FairQueueAttributes.CONSUMER_ID, loopId); - const result = await this.scheduler.selectQueues(masterQueueKey, loopId, schedulerContext); - span.setAttribute("tenant_count", result.length); - span.setAttribute( - "queue_count", - result.reduce((acc, t) => acc + t.queues.length, 0) - ); - return result; - }, - { kind: SpanKind.INTERNAL } - ); - - if (tenantQueues.length === 0) { - this.batchedSpanManager.incrementStat(loopId, "empty_iterations"); - return; + // Get stored message for concurrency release + const dataJson = await this.redis.hget(inflightDataKey, messageId); + let storedMessage: StoredMessage> | null = null; + if (dataJson) { + try { + storedMessage = JSON.parse(dataJson); + } catch { + // Ignore parse error, proceed with completion + } } - // Track stats - this.batchedSpanManager.incrementStat(loopId, "tenants_selected", tenantQueues.length); - this.batchedSpanManager.incrementStat( - loopId, - "queues_selected", - tenantQueues.reduce((acc, t) => acc + t.queues.length, 0) - ); + const descriptor: QueueDescriptor = storedMessage + ? this.queueDescriptorCache.get(queueId) ?? { + id: queueId, + tenantId: storedMessage.tenantId, + metadata: storedMessage.metadata ?? {}, + } + : { id: queueId, tenantId: "", metadata: {} }; - // Process messages from each selected tenant - // For fairness, process up to available concurrency slots per tenant - for (const { tenantId, queues } of tenantQueues) { - // Get available concurrency for this tenant - let availableSlots = 1; // Default to 1 for backwards compatibility - if (this.concurrencyManager) { - const [current, limit] = await Promise.all([ - this.concurrencyManager.getCurrentConcurrency("tenant", tenantId), - this.concurrencyManager.getConcurrencyLimit("tenant", tenantId), - ]); - availableSlots = Math.max(1, limit - current); - } + // Complete in visibility manager + await this.visibilityManager.complete(messageId, queueId); - // Process up to availableSlots messages from this tenant's queues - let slotsUsed = 0; - queueLoop: for (const queueId of queues) { - while (slotsUsed < availableSlots) { - // Check cooloff - if (this.cooloffEnabled && this.#isInCooloff(queueId)) { - this.batchedSpanManager.incrementStat(loopId, "cooloff_skipped"); - break; // Try next queue - } + // Release concurrency + if (this.concurrencyManager && storedMessage) { + await this.concurrencyManager.release(descriptor, messageId); + } - const processed = await this.telemetry.trace( - "processOneMessage", - async (span) => { - span.setAttribute(FairQueueAttributes.QUEUE_ID, queueId); - span.setAttribute(FairQueueAttributes.TENANT_ID, tenantId); - span.setAttribute(FairQueueAttributes.SHARD_ID, shardId.toString()); - return this.#processOneMessage(loopId, queueId, tenantId, shardId); - }, - { kind: SpanKind.INTERNAL } - ); + // Update master queue if queue is now empty, and clean up caches + const removed = await this.redis.updateMasterQueueIfEmpty(masterQueueKey, queueKey, queueId); + if (removed === 1) { + this.queueDescriptorCache.delete(queueId); + this.queueCooloffStates.delete(queueId); + } - if (processed) { - this.batchedSpanManager.incrementStat(loopId, "messages_processed"); + this.telemetry.recordComplete(); - if (this.scheduler.recordProcessed) { - await this.telemetry.trace( - "recordProcessed", - async (span) => { - span.setAttribute(FairQueueAttributes.QUEUE_ID, queueId); - span.setAttribute(FairQueueAttributes.TENANT_ID, tenantId); - await this.scheduler.recordProcessed!(tenantId, queueId); - }, - { kind: SpanKind.INTERNAL } - ); - } - this.#resetCooloff(queueId); - slotsUsed++; - } else { - // Don't increment cooloff here - the queue was either: - // 1. Empty (removed from master, cache cleaned up) - // 2. Concurrency blocked (message released back to queue) - // Neither case warrants cooloff as they're not failures - this.batchedSpanManager.incrementStat(loopId, "process_skipped"); - break; // Queue empty or blocked, try next queue - } - } - if (slotsUsed >= availableSlots) { - break queueLoop; - } - } - } + this.logger.debug("Message completed", { + messageId, + queueId, + }); } - async #processOneMessage( - loopId: string, - queueId: string, - tenantId: string, - shardId: number - ): Promise { + /** + * Release a message back to the queue for processing by another consumer. + * The message is placed at the back of the queue. + * + * @param messageId - The ID of the message + * @param queueId - The queue ID the message belongs to + */ + async releaseMessage(messageId: string, queueId: string): Promise { + const shardId = this.masterQueue.getShardForQueue(queueId); const queueKey = this.keys.queueKey(queueId); const queueItemsKey = this.keys.queueItemsKey(queueId); const masterQueueKey = this.keys.masterQueueKey(shardId); - const descriptor = this.queueDescriptorCache.get(queueId) ?? { - id: queueId, - tenantId, - metadata: {}, - }; + const inflightDataKey = this.keys.inflightDataKey(shardId); - // Check concurrency before claiming - if (this.concurrencyManager) { - const check = await this.concurrencyManager.canProcess(descriptor); - if (!check.allowed) { - // Queue at max concurrency, back off to avoid repeated attempts - this.#incrementCooloff(queueId); - return false; + // Get stored message for concurrency release + const dataJson = await this.redis.hget(inflightDataKey, messageId); + let storedMessage: StoredMessage> | null = null; + if (dataJson) { + try { + storedMessage = JSON.parse(dataJson); + } catch { + // Ignore parse error } } - // Check global rate limit - wait if rate limited - if (this.globalRateLimiter) { - const result = await this.globalRateLimiter.limit(); - if (!result.allowed && result.resetAt) { - const waitMs = Math.max(0, result.resetAt - Date.now()); - if (waitMs > 0) { - this.logger.debug("Global rate limit reached, waiting", { waitMs, loopId }); - await new Promise((resolve) => setTimeout(resolve, waitMs)); + const descriptor: QueueDescriptor = storedMessage + ? this.queueDescriptorCache.get(queueId) ?? { + id: queueId, + tenantId: storedMessage.tenantId, + metadata: storedMessage.metadata ?? {}, } - } - } + : { id: queueId, tenantId: "", metadata: {} }; - // Claim message with visibility timeout - const claimResult = await this.visibilityManager.claim>>( + // Release back to queue + await this.visibilityManager.release( + messageId, queueId, queueKey, queueItemsKey, - loopId, - this.visibilityTimeoutMs + masterQueueKey, + Date.now() // Put at back of queue ); - if (!claimResult.claimed || !claimResult.message) { - // Queue is empty, update master queue and clean up caches - const removed = await this.redis.updateMasterQueueIfEmpty(masterQueueKey, queueKey, queueId); - if (removed === 1) { - this.queueDescriptorCache.delete(queueId); - this.queueCooloffStates.delete(queueId); - } - return false; - } - - const { message } = claimResult; - - // Reserve concurrency slot - if (this.concurrencyManager) { - const reserved = await this.concurrencyManager.reserve(descriptor, message.messageId); - if (!reserved) { - // Release message back to queue (and ensure it's in master queue) - await this.visibilityManager.release( - message.messageId, - queueId, - queueKey, - queueItemsKey, - masterQueueKey - ); - // Concurrency reservation failed, back off to avoid repeated attempts - this.#incrementCooloff(queueId); - return false; - } + // Release concurrency + if (this.concurrencyManager && storedMessage) { + await this.concurrencyManager.release(descriptor, messageId); } - await this.#processMessage(loopId, message.payload, queueId); - return true; + this.logger.debug("Message released", { + messageId, + queueId, + }); } - // ============================================================================ - // Private - Message Processing - // ============================================================================ - - async #processMessage( - loopId: string, - storedMessage: StoredMessage>, - queueId: string - ): Promise { - const startTime = Date.now(); + /** + * Mark a message as failed. This will trigger retry logic if configured, + * or move the message to the dead letter queue. + * + * @param messageId - The ID of the message + * @param queueId - The queue ID the message belongs to + * @param error - Optional error that caused the failure + */ + async failMessage(messageId: string, queueId: string, error?: Error): Promise { + const shardId = this.masterQueue.getShardForQueue(queueId); const queueKey = this.keys.queueKey(queueId); const queueItemsKey = this.keys.queueItemsKey(queueId); - const shardId = this.masterQueue.getShardForQueue(queueId); const masterQueueKey = this.keys.masterQueueKey(shardId); + const inflightDataKey = this.keys.inflightDataKey(shardId); - const descriptor = this.queueDescriptorCache.get(queueId) ?? { - id: queueId, - tenantId: storedMessage.tenantId, - metadata: storedMessage.metadata ?? {}, - }; - - // Parse payload with schema if provided - let payload: z.infer; - if (this.payloadSchema) { - const result = this.payloadSchema.safeParse(storedMessage.payload); - if (!result.success) { - this.logger.error("Payload validation failed on dequeue", { - messageId: storedMessage.id, - queueId, - error: result.error.message, - }); - // Move to DLQ - await this.#moveToDeadLetterQueue(storedMessage, "Payload validation failed"); - - // Release reserved concurrency slot - if (this.concurrencyManager) { - try { - await this.concurrencyManager.release(descriptor, storedMessage.id); - } catch (releaseError) { - this.logger.error( - "Failed to release concurrency slot after payload validation failure", - { - messageId: storedMessage.id, - queueId, - error: releaseError instanceof Error ? releaseError.message : String(releaseError), - } - ); - } - } - - return; - } - payload = result.data; - } else { - payload = storedMessage.payload; + // Get stored message + const dataJson = await this.redis.hget(inflightDataKey, messageId); + if (!dataJson) { + this.logger.error("Cannot fail message: not found in in-flight data", { messageId, queueId }); + return; } - // Build queue message - const queueMessage: QueueMessage> = { - id: storedMessage.id, - queueId, - payload, - timestamp: storedMessage.timestamp, - attempt: storedMessage.attempt, - metadata: storedMessage.metadata, - }; - - // Record queue time - const queueTime = startTime - storedMessage.timestamp; - this.telemetry.recordQueueTime(queueTime); - - // Build handler context - const handlerContext: MessageHandlerContext> = { - message: queueMessage, - queue: descriptor, - consumerId: loopId, - heartbeat: async () => { - return this.visibilityManager.heartbeat( - storedMessage.id, - queueId, - this.heartbeatIntervalMs - ); - }, - complete: async () => { - await this.#completeMessage(storedMessage, queueId, queueKey, masterQueueKey, descriptor); - this.telemetry.recordComplete(); - this.telemetry.recordProcessingTime(Date.now() - startTime); - }, - release: async () => { - await this.#releaseMessage( - storedMessage, - queueId, - queueKey, - queueItemsKey, - masterQueueKey, - descriptor - ); - }, - fail: async (error?: Error) => { - await this.#handleMessageFailure( - storedMessage, - queueId, - queueKey, - queueItemsKey, - masterQueueKey, - descriptor, - error - ); - }, - }; - - // Call message handler + let storedMessage: StoredMessage>; try { - await this.telemetry.trace( - "processMessage", - async (span) => { - span.setAttributes({ - [FairQueueAttributes.QUEUE_ID]: queueId, - [FairQueueAttributes.TENANT_ID]: storedMessage.tenantId, - [FairQueueAttributes.MESSAGE_ID]: storedMessage.id, - [FairQueueAttributes.ATTEMPT]: storedMessage.attempt, - [FairQueueAttributes.CONSUMER_ID]: loopId, - }); - - await this.messageHandler!(handlerContext); - }, - { - kind: SpanKind.CONSUMER, - attributes: { - [MessagingAttributes.OPERATION]: "process", - }, - } - ); - } catch (error) { - this.logger.error("Message handler error", { - messageId: storedMessage.id, + storedMessage = JSON.parse(dataJson); + } catch { + this.logger.error("Cannot fail message: failed to parse stored message", { + messageId, queueId, - error: error instanceof Error ? error.message : String(error), }); - // Trigger failure handling - await handlerContext.fail(error instanceof Error ? error : new Error(String(error))); - } - } - - async #completeMessage( - storedMessage: StoredMessage>, - queueId: string, - queueKey: string, - masterQueueKey: string, - descriptor: QueueDescriptor - ): Promise { - const shardId = this.masterQueue.getShardForQueue(queueId); - - // Complete in visibility manager - await this.visibilityManager.complete(storedMessage.id, queueId); - - // Release concurrency - if (this.concurrencyManager) { - await this.concurrencyManager.release(descriptor, storedMessage.id); - } - - // Update master queue if queue is now empty, and clean up caches - const removed = await this.redis.updateMasterQueueIfEmpty(masterQueueKey, queueKey, queueId); - if (removed === 1) { - this.queueDescriptorCache.delete(queueId); - this.queueCooloffStates.delete(queueId); + return; } - this.logger.debug("Message completed", { - messageId: storedMessage.id, - queueId, - }); - } + const descriptor = this.queueDescriptorCache.get(queueId) ?? { + id: queueId, + tenantId: storedMessage.tenantId, + metadata: storedMessage.metadata ?? {}, + }; - async #releaseMessage( - storedMessage: StoredMessage>, - queueId: string, - queueKey: string, - queueItemsKey: string, - masterQueueKey: string, - descriptor: QueueDescriptor - ): Promise { - // Release back to queue (and update master queue to ensure the queue is picked up) - await this.visibilityManager.release( - storedMessage.id, + await this.#handleMessageFailure( + storedMessage, queueId, queueKey, queueItemsKey, masterQueueKey, - Date.now() // Put at back of queue + descriptor, + error ); - - // Release concurrency - if (this.concurrencyManager) { - await this.concurrencyManager.release(descriptor, storedMessage.id); - } - - this.logger.debug("Message released", { - messageId: storedMessage.id, - queueId, - }); } + // ============================================================================ + // Private - Message Processing Helpers + // ============================================================================ + async #handleMessageFailure( storedMessage: StoredMessage>, queueId: string, diff --git a/packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts b/packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts index 28448a9fe4..2d615427a2 100644 --- a/packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts +++ b/packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts @@ -7,13 +7,208 @@ import { DRRScheduler, FixedDelayRetry, NoRetry, + WorkerQueueManager, } from "../index.js"; -import type { FairQueueKeyProducer } from "../types.js"; +import type { FairQueueKeyProducer, FairQueueOptions, StoredMessage } from "../types.js"; +import type { RedisOptions } from "@internal/redis"; // Define a common payload schema for tests const TestPayloadSchema = z.object({ value: z.string() }); type TestPayload = z.infer; +// Constant for test worker queue ID +const TEST_WORKER_QUEUE_ID = "test-worker-queue"; + +/** + * TestFairQueueHelper wraps FairQueue for easier testing. + * It manages the worker queue consumer loop and provides a simple onMessage interface. + */ +class TestFairQueueHelper { + public fairQueue: FairQueue; + private workerQueueManager: WorkerQueueManager; + private isRunning = false; + private abortController: AbortController; + private consumerLoops: Promise[] = []; + private messageHandler?: (ctx: { + message: { + id: string; + queueId: string; + payload: TestPayload; + timestamp: number; + attempt: number; + }; + queue: { id: string; tenantId: string }; + consumerId: string; + heartbeat: () => Promise; + complete: () => Promise; + release: () => Promise; + fail: (error?: Error) => Promise; + }) => Promise; + + constructor( + private redisOptions: RedisOptions, + private keys: FairQueueKeyProducer, + options: Omit, "redis" | "keys" | "workerQueue"> + ) { + this.abortController = new AbortController(); + + // Create FairQueue with worker queue resolver + this.fairQueue = new FairQueue({ + ...options, + redis: redisOptions, + keys, + workerQueue: { + resolveWorkerQueue: () => TEST_WORKER_QUEUE_ID, + }, + }); + + // Create worker queue manager for consuming + this.workerQueueManager = new WorkerQueueManager({ + redis: redisOptions, + keys, + }); + } + + onMessage( + handler: (ctx: { + message: { + id: string; + queueId: string; + payload: TestPayload; + timestamp: number; + attempt: number; + }; + queue: { id: string; tenantId: string }; + consumerId: string; + heartbeat: () => Promise; + complete: () => Promise; + release: () => Promise; + fail: (error?: Error) => Promise; + }) => Promise + ): void { + this.messageHandler = handler; + } + + start(): void { + if (this.isRunning) return; + this.isRunning = true; + this.abortController = new AbortController(); + + // Start FairQueue's master queue consumers + this.fairQueue.start(); + + // Start worker queue consumer loop + const loop = this.#runConsumerLoop(); + this.consumerLoops.push(loop); + } + + async stop(): Promise { + if (!this.isRunning) return; + this.isRunning = false; + this.abortController.abort(); + await this.fairQueue.stop(); + await Promise.allSettled(this.consumerLoops); + this.consumerLoops = []; + } + + async close(): Promise { + await this.stop(); + await this.fairQueue.close(); + await this.workerQueueManager.close(); + } + + // Delegate methods to fairQueue + async enqueue(options: Parameters[0]) { + return this.fairQueue.enqueue(options); + } + + async enqueueBatch(options: Parameters[0]) { + return this.fairQueue.enqueueBatch(options); + } + + async getQueueLength(queueId: string) { + return this.fairQueue.getQueueLength(queueId); + } + + async getTotalInflightCount() { + return this.fairQueue.getTotalInflightCount(); + } + + + registerTelemetryGauges(options?: { observedTenants?: string[] }) { + return this.fairQueue.registerTelemetryGauges(options); + } + + async getDeadLetterQueueLength(tenantId: string) { + return this.fairQueue.getDeadLetterQueueLength(tenantId); + } + + async getDeadLetterMessages(tenantId: string) { + return this.fairQueue.getDeadLetterMessages(tenantId); + } + + async redriveMessage(tenantId: string, messageId: string) { + return this.fairQueue.redriveMessage(tenantId, messageId); + } + + async #runConsumerLoop(): Promise { + const loopId = "test-consumer-0"; + + try { + while (this.isRunning) { + if (!this.messageHandler) { + await new Promise((resolve) => setTimeout(resolve, 50)); + continue; + } + + try { + const messageKey = await this.workerQueueManager.blockingPop( + TEST_WORKER_QUEUE_ID, + 1, + this.abortController.signal + ); + + if (!messageKey) continue; + + const colonIndex = messageKey.indexOf(":"); + if (colonIndex === -1) continue; + + const messageId = messageKey.substring(0, colonIndex); + const queueId = messageKey.substring(colonIndex + 1); + + const storedMessage = await this.fairQueue.getMessageData(messageId, queueId); + if (!storedMessage) continue; + + const ctx = { + message: { + id: storedMessage.id, + queueId: storedMessage.queueId, + payload: storedMessage.payload, + timestamp: storedMessage.timestamp, + attempt: storedMessage.attempt, + }, + queue: { + id: queueId, + tenantId: storedMessage.tenantId, + }, + consumerId: loopId, + heartbeat: () => this.fairQueue.heartbeatMessage(messageId, queueId), + complete: () => this.fairQueue.completeMessage(messageId, queueId), + release: () => this.fairQueue.releaseMessage(messageId, queueId), + fail: (error?: Error) => this.fairQueue.failMessage(messageId, queueId, error), + }; + + await this.messageHandler(ctx); + } catch (error) { + if (this.abortController.signal.aborted) break; + } + } + } catch { + // Ignore abort errors + } + } +} + describe("FairQueue", () => { let keys: FairQueueKeyProducer; @@ -32,9 +227,7 @@ describe("FairQueue", () => { maxDeficit: 100, }); - const queue = new FairQueue({ - redis: redisOptions, - keys, + const queue = new TestFairQueueHelper(redisOptions, keys, { scheduler, payloadSchema: TestPayloadSchema, shardCount: 1, @@ -87,9 +280,7 @@ describe("FairQueue", () => { maxDeficit: 100, }); - const queue = new FairQueue({ - redis: redisOptions, - keys, + const queue = new TestFairQueueHelper(redisOptions, keys, { scheduler, payloadSchema: TestPayloadSchema, shardCount: 1, @@ -152,9 +343,7 @@ describe("FairQueue", () => { maxDeficit: 5, }); - const queue = new FairQueue({ - redis: redisOptions, - keys, + const queue = new TestFairQueueHelper(redisOptions, keys, { scheduler, payloadSchema: TestPayloadSchema, shardCount: 1, @@ -226,9 +415,7 @@ describe("FairQueue", () => { maxDeficit: 100, }); - const queue = new FairQueue({ - redis: redisOptions, - keys, + const queue = new TestFairQueueHelper(redisOptions, keys, { scheduler, payloadSchema: TestPayloadSchema, shardCount: 1, @@ -289,9 +476,7 @@ describe("FairQueue", () => { maxDeficit: 100, }); - const queue = new FairQueue({ - redis: redisOptions, - keys, + const queue = new TestFairQueueHelper(redisOptions, keys, { scheduler, payloadSchema: TestPayloadSchema, shardCount: 1, @@ -364,9 +549,7 @@ describe("FairQueue", () => { maxDeficit: 100, }); - const queue = new FairQueue({ - redis: redisOptions, - keys, + const queue = new TestFairQueueHelper(redisOptions, keys, { scheduler, payloadSchema: TestPayloadSchema, shardCount: 1, @@ -429,9 +612,7 @@ describe("FairQueue", () => { maxDeficit: 100, }); - const queue = new FairQueue({ - redis: redisOptions, - keys, + const queue = new TestFairQueueHelper(redisOptions, keys, { scheduler, payloadSchema: TestPayloadSchema, shardCount: 1, @@ -495,9 +676,7 @@ describe("FairQueue", () => { maxDeficit: 100, }); - const queue = new FairQueue({ - redis: redisOptions, - keys, + const queue = new TestFairQueueHelper(redisOptions, keys, { scheduler, payloadSchema: TestPayloadSchema, shardCount: 1, @@ -589,6 +768,9 @@ describe("FairQueue", () => { payloadSchema: PayloadSchema, validateOnEnqueue: true, startConsumers: false, + workerQueue: { + resolveWorkerQueue: () => TEST_WORKER_QUEUE_ID, + }, }); // Valid payload should work @@ -626,6 +808,12 @@ describe("FairQueue", () => { maxDeficit: 100, }); + // Create worker queue manager for consuming + const workerQueueManager = new WorkerQueueManager({ + redis: redisOptions, + keys, + }); + const queue = new FairQueue({ redis: redisOptions, keys, @@ -636,13 +824,13 @@ describe("FairQueue", () => { consumerIntervalMs: 50, visibilityTimeoutMs: 5000, startConsumers: false, + workerQueue: { + resolveWorkerQueue: () => TEST_WORKER_QUEUE_ID, + }, }); - queue.onMessage(async (ctx) => { - // TypeScript should infer ctx.message.payload as { name: string; count: number } - processed.push(ctx.message.payload); - await ctx.complete(); - }); + // Start the queue (which routes messages to worker queue) + queue.start(); await queue.enqueue({ queueId: "tenant:t1:queue:q1", @@ -650,18 +838,29 @@ describe("FairQueue", () => { payload: { name: "typed", count: 42 }, }); - queue.start(); - - await vi.waitFor( - () => { - expect(processed).toHaveLength(1); - }, - { timeout: 5000 } - ); + // Consume from worker queue + let attempts = 0; + while (processed.length === 0 && attempts < 50) { + const messageKey = await workerQueueManager.blockingPop(TEST_WORKER_QUEUE_ID, 1); + if (messageKey) { + const colonIndex = messageKey.indexOf(":"); + const messageId = messageKey.substring(0, colonIndex); + const queueId = messageKey.substring(colonIndex + 1); + const storedMessage = await queue.getMessageData(messageId, queueId); + if (storedMessage) { + // TypeScript should infer storedMessage.payload as { name: string; count: number } + processed.push(storedMessage.payload); + await queue.completeMessage(messageId, queueId); + } + } + attempts++; + } + expect(processed).toHaveLength(1); expect(processed[0]).toEqual({ name: "typed", count: 42 }); await queue.close(); + await workerQueueManager.close(); } ); }); @@ -680,9 +879,8 @@ describe("FairQueue", () => { maxDeficit: 100, }); - const queue = new FairQueue({ - redis: redisOptions, - keys, + const processed: string[] = []; + const queue = new TestFairQueueHelper(redisOptions, keys, { scheduler, payloadSchema: TestPayloadSchema, shardCount: 1, @@ -697,6 +895,11 @@ describe("FairQueue", () => { startConsumers: false, }); + queue.onMessage(async (ctx) => { + processed.push(ctx.message.payload.value); + await ctx.complete(); + }); + // Start without any messages (will trigger empty dequeues) queue.start(); @@ -711,12 +914,6 @@ describe("FairQueue", () => { payload: { value: "after-cooloff" }, }); - const processed: string[] = []; - queue.onMessage(async (ctx) => { - processed.push(ctx.message.payload.value); - await ctx.complete(); - }); - // Message should still be processed (cooloff is per-queue, not global) await vi.waitFor( () => { @@ -742,9 +939,9 @@ describe("FairQueue", () => { maxDeficit: 100, }); - const queue = new FairQueue({ - redis: redisOptions, - keys, + const processed: string[] = []; + + const queue = new TestFairQueueHelper(redisOptions, keys, { scheduler, payloadSchema: TestPayloadSchema, shardCount: 1, @@ -760,6 +957,12 @@ describe("FairQueue", () => { startConsumers: false, }); + // Handler that always fails to trigger cooloff + queue.onMessage(async (ctx) => { + processed.push(ctx.message.payload.value); + await ctx.fail(new Error("Forced failure")); + }); + // Enqueue messages to multiple queues for (let i = 0; i < 10; i++) { await queue.enqueue({ @@ -769,14 +972,6 @@ describe("FairQueue", () => { }); } - const processed: string[] = []; - - // Handler that always fails to trigger cooloff - queue.onMessage(async (ctx) => { - processed.push(ctx.message.payload.value); - await ctx.fail(new Error("Forced failure")); - }); - queue.start(); // Wait for some messages to be processed and fail @@ -788,7 +983,7 @@ describe("FairQueue", () => { ); // The cooloff states size should be capped (test that it doesn't grow unbounded) - const cacheSizes = queue.getCacheSizes(); + const cacheSizes = queue.fairQueue.getCacheSizes(); expect(cacheSizes.cooloffStatesSize).toBeLessThanOrEqual(10); // Some buffer for race conditions await queue.close(); @@ -812,6 +1007,9 @@ describe("FairQueue", () => { keys, scheduler, startConsumers: false, + workerQueue: { + resolveWorkerQueue: () => TEST_WORKER_QUEUE_ID, + }, }); // Initially empty @@ -852,6 +1050,9 @@ describe("FairQueue", () => { scheduler, shardCount: 2, startConsumers: false, + workerQueue: { + resolveWorkerQueue: () => TEST_WORKER_QUEUE_ID, + }, }); // Initially empty @@ -896,19 +1097,13 @@ describe("FairQueue", () => { // Create queue with: // - Worker queue enabled (two-stage processing) // - Concurrency limit of 2 per tenant - const queue = new FairQueue({ - redis: redisOptions, - keys, + const queue = new TestFairQueueHelper(redisOptions, keys, { scheduler, payloadSchema: TestPayloadSchema, shardCount: 1, consumerCount: 1, consumerIntervalMs: 50, visibilityTimeoutMs: 10000, - workerQueue: { - enabled: true, - blockingTimeoutSeconds: 1, - }, concurrencyGroups: [ { name: "tenant", diff --git a/packages/redis-worker/src/fair-queue/tests/raceConditions.test.ts b/packages/redis-worker/src/fair-queue/tests/raceConditions.test.ts index 5de24350e0..d6ee70a450 100644 --- a/packages/redis-worker/src/fair-queue/tests/raceConditions.test.ts +++ b/packages/redis-worker/src/fair-queue/tests/raceConditions.test.ts @@ -8,12 +8,191 @@ import { ConcurrencyManager, VisibilityManager, MasterQueue, + WorkerQueueManager, FixedDelayRetry, } from "../index.js"; -import type { FairQueueKeyProducer, QueueDescriptor } from "../types.js"; -import { createRedisClient } from "@internal/redis"; +import type { FairQueueKeyProducer, FairQueueOptions, QueueDescriptor, StoredMessage } from "../types.js"; +import { createRedisClient, type RedisOptions } from "@internal/redis"; const TestPayloadSchema = z.object({ id: z.number(), value: z.string() }); +type TestPayload = z.infer; + +// Constant for test worker queue ID +const TEST_WORKER_QUEUE_ID = "test-worker-queue"; + +/** + * TestFairQueueHelper wraps FairQueue for easier testing in race condition tests. + */ +class TestFairQueueHelper { + public fairQueue: FairQueue; + private workerQueueManager: WorkerQueueManager; + private isRunning = false; + private abortController: AbortController; + private consumerLoops: Promise[] = []; + private messageHandler?: (ctx: { + message: { + id: string; + queueId: string; + payload: TestPayload; + timestamp: number; + attempt: number; + }; + queue: { id: string; tenantId: string }; + consumerId: string; + heartbeat: () => Promise; + complete: () => Promise; + release: () => Promise; + fail: (error?: Error) => Promise; + }) => Promise; + + constructor( + private redisOptions: RedisOptions, + private keys: FairQueueKeyProducer, + options: Omit, "redis" | "keys" | "workerQueue"> + ) { + this.abortController = new AbortController(); + + this.fairQueue = new FairQueue({ + ...options, + redis: redisOptions, + keys, + workerQueue: { + resolveWorkerQueue: () => TEST_WORKER_QUEUE_ID, + }, + }); + + this.workerQueueManager = new WorkerQueueManager({ + redis: redisOptions, + keys, + }); + } + + onMessage( + handler: (ctx: { + message: { + id: string; + queueId: string; + payload: TestPayload; + timestamp: number; + attempt: number; + }; + queue: { id: string; tenantId: string }; + consumerId: string; + heartbeat: () => Promise; + complete: () => Promise; + release: () => Promise; + fail: (error?: Error) => Promise; + }) => Promise + ): void { + this.messageHandler = handler; + } + + start(): void { + if (this.isRunning) return; + this.isRunning = true; + this.abortController = new AbortController(); + this.fairQueue.start(); + const loop = this.#runConsumerLoop(); + this.consumerLoops.push(loop); + } + + async stop(): Promise { + if (!this.isRunning) return; + this.isRunning = false; + this.abortController.abort(); + await this.fairQueue.stop(); + await Promise.allSettled(this.consumerLoops); + this.consumerLoops = []; + } + + async close(): Promise { + await this.stop(); + await this.fairQueue.close(); + await this.workerQueueManager.close(); + } + + async enqueue(options: Parameters[0]) { + return this.fairQueue.enqueue(options); + } + + async enqueueBatch(options: Parameters[0]) { + return this.fairQueue.enqueueBatch(options); + } + + async getQueueLength(queueId: string) { + return this.fairQueue.getQueueLength(queueId); + } + + async getTotalInflightCount() { + return this.fairQueue.getTotalInflightCount(); + } + + async getDeadLetterQueueLength(tenantId: string) { + return this.fairQueue.getDeadLetterQueueLength(tenantId); + } + + async getDeadLetterMessages(tenantId: string, limit?: number) { + return this.fairQueue.getDeadLetterMessages(tenantId, limit); + } + + async getTotalQueueCount() { + return this.fairQueue.getTotalQueueCount(); + } + + async #runConsumerLoop(): Promise { + const loopId = "test-consumer-0"; + try { + while (this.isRunning) { + if (!this.messageHandler) { + await new Promise((resolve) => setTimeout(resolve, 50)); + continue; + } + try { + const messageKey = await this.workerQueueManager.blockingPop( + TEST_WORKER_QUEUE_ID, + 1, + this.abortController.signal + ); + if (!messageKey) continue; + + const colonIndex = messageKey.indexOf(":"); + if (colonIndex === -1) continue; + + const messageId = messageKey.substring(0, colonIndex); + const queueId = messageKey.substring(colonIndex + 1); + + const storedMessage = await this.fairQueue.getMessageData(messageId, queueId); + if (!storedMessage) continue; + + const ctx = { + message: { + id: storedMessage.id, + queueId: storedMessage.queueId, + payload: storedMessage.payload, + timestamp: storedMessage.timestamp, + attempt: storedMessage.attempt, + }, + queue: { + id: queueId, + tenantId: storedMessage.tenantId, + }, + consumerId: loopId, + heartbeat: () => this.fairQueue.heartbeatMessage(messageId, queueId), + complete: () => this.fairQueue.completeMessage(messageId, queueId), + release: () => this.fairQueue.releaseMessage(messageId, queueId), + fail: (error?: Error) => this.fairQueue.failMessage(messageId, queueId, error), + }; + + await this.messageHandler(ctx); + } catch (error) { + if (this.abortController.signal.aborted) break; + } + } + } catch { + // Ignore abort errors + } + } +} describe("Race Condition Tests", () => { let keys: FairQueueKeyProducer; @@ -39,6 +218,9 @@ describe("Race Condition Tests", () => { payloadSchema: TestPayloadSchema, shardCount: 1, startConsumers: false, + workerQueue: { + resolveWorkerQueue: () => TEST_WORKER_QUEUE_ID, + }, }); const CONCURRENT_ENQUEUES = 100; @@ -87,6 +269,9 @@ describe("Race Condition Tests", () => { payloadSchema: TestPayloadSchema, shardCount: 4, // Multiple shards startConsumers: false, + workerQueue: { + resolveWorkerQueue: () => TEST_WORKER_QUEUE_ID, + }, }); const QUEUES = 10; @@ -144,9 +329,7 @@ describe("Race Condition Tests", () => { maxDeficit: 100, }); - const queue = new FairQueue({ - redis: redisOptions, - keys, + const queue = new TestFairQueueHelper(redisOptions, keys, { scheduler, payloadSchema: TestPayloadSchema, shardCount: 1, @@ -225,9 +408,7 @@ describe("Race Condition Tests", () => { maxDeficit: 100, }); - const queue = new FairQueue({ - redis: redisOptions, - keys, + const queue = new TestFairQueueHelper(redisOptions, keys, { scheduler, payloadSchema: TestPayloadSchema, shardCount: 1, @@ -624,9 +805,7 @@ describe("Race Condition Tests", () => { maxDeficit: 100, }); - const queue = new FairQueue({ - redis: redisOptions, - keys, + const queue = new TestFairQueueHelper(redisOptions, keys, { scheduler, payloadSchema: TestPayloadSchema, shardCount: 1, @@ -711,9 +890,7 @@ describe("Race Condition Tests", () => { maxDeficit: 100, }); - const queue = new FairQueue({ - redis: redisOptions, - keys, + const queue = new TestFairQueueHelper(redisOptions, keys, { scheduler, payloadSchema: TestPayloadSchema, shardCount: 1, @@ -793,9 +970,7 @@ describe("Race Condition Tests", () => { maxDeficit: 100, }); - const queue = new FairQueue({ - redis: redisOptions, - keys, + const queue = new TestFairQueueHelper(redisOptions, keys, { scheduler, payloadSchema: TestPayloadSchema, shardCount: 1, @@ -863,9 +1038,7 @@ describe("Race Condition Tests", () => { // Track concurrency over time let maxConcurrency = 0; - const queue = new FairQueue({ - redis: redisOptions, - keys, + const queue = new TestFairQueueHelper(redisOptions, keys, { scheduler, payloadSchema: TestPayloadSchema, shardCount: 1, @@ -953,9 +1126,7 @@ describe("Race Condition Tests", () => { maxDeficit: 100, }); - const queue = new FairQueue({ - redis: redisOptions, - keys, + const queue = new TestFairQueueHelper(redisOptions, keys, { scheduler, payloadSchema: TestPayloadSchema, shardCount: 1, @@ -1024,9 +1195,7 @@ describe("Race Condition Tests", () => { maxDeficit: 100, }); - const queue = new FairQueue({ - redis: redisOptions, - keys, + const queue = new TestFairQueueHelper(redisOptions, keys, { scheduler, payloadSchema: TestPayloadSchema, shardCount: 2, // Multiple shards to test diff --git a/packages/redis-worker/src/fair-queue/types.ts b/packages/redis-worker/src/fair-queue/types.ts index 42b7bbf207..002a2089a7 100644 --- a/packages/redis-worker/src/fair-queue/types.ts +++ b/packages/redis-worker/src/fair-queue/types.ts @@ -286,14 +286,15 @@ export interface FairQueueKeyProducer { /** * Worker queue configuration options. + * Worker queues are always enabled - FairQueue routes messages to worker queues, + * and external consumers are responsible for consuming from those queues. */ export interface WorkerQueueOptions { - /** Whether to enable worker queues (default: false for backwards compatibility) */ - enabled: boolean; - /** Blocking pop timeout in seconds (default: 10) */ - blockingTimeoutSeconds?: number; - /** Function to resolve which worker queue a message should go to */ - resolveWorkerQueue?: (message: StoredMessage) => string; + /** + * Function to resolve which worker queue a message should go to. + * This is called during the claim-and-push phase to determine the target queue. + */ + resolveWorkerQueue: (message: StoredMessage) => string; } /** @@ -349,9 +350,12 @@ export interface FairQueueOptions>; + // Worker queue (required) + /** + * Worker queue configuration. + * FairQueue routes messages to worker queues; external consumers handle consumption. + */ + workerQueue: WorkerQueueOptions>; // Retry and DLQ /** Retry and dead letter queue configuration */ From babb92d88b5e20e221d2907dc6f1d0c530c29e6d Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 7 Jan 2026 12:50:09 +0000 Subject: [PATCH 09/14] fix tests --- .../src/fair-queue/tests/fairQueue.test.ts | 27 +++++++++++++------ 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts b/packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts index 2d615427a2..c7357f2fc3 100644 --- a/packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts +++ b/packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts @@ -386,14 +386,25 @@ describe("FairQueue", () => { { timeout: 15000 } ); - // Check that messages were interleaved (not all t1 before t2) - const firstFive = processed.slice(0, 5); - const t1InFirstFive = firstFive.filter((p) => p.tenant === "t1").length; - const t2InFirstFive = firstFive.filter((p) => p.tenant === "t2").length; - - // DRR should ensure some interleaving - expect(t1InFirstFive).toBeGreaterThan(0); - expect(t2InFirstFive).toBeGreaterThan(0); + // With two-stage architecture, fairness is at the claiming level, not processing order. + // Both tenants' queues are serviced in the same scheduler round, but messages are + // pushed to a shared worker queue and processed in FIFO order. + // The fairness guarantee is that both tenants' messages ARE processed, not that + // they're interleaved in the processing order. + const t1Count = processed.filter((p) => p.tenant === "t1").length; + const t2Count = processed.filter((p) => p.tenant === "t2").length; + + // DRR ensures both tenants get their messages claimed and processed + expect(t1Count).toBe(5); + expect(t2Count).toBe(5); + + // Verify all messages were processed + expect(processed.filter((p) => p.tenant === "t1").map((p) => p.value)).toEqual( + expect.arrayContaining(["t1-0", "t1-1", "t1-2", "t1-3", "t1-4"]) + ); + expect(processed.filter((p) => p.tenant === "t2").map((p) => p.value)).toEqual( + expect.arrayContaining(["t2-0", "t2-1", "t2-2", "t2-3", "t2-4"]) + ); await queue.close(); } From c8a1df0b9d74eb4ee903877ab6d9bc4f27554763 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 7 Jan 2026 12:57:22 +0000 Subject: [PATCH 10/14] Add BATCH_QUEUE_WORKER_QUEUE_ENABLED to control which service dequeues from the worker queue --- apps/webapp/app/env.server.ts | 4 +++- apps/webapp/app/v3/runEngine.server.ts | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index d772a44438..f1b3e901c7 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -955,7 +955,9 @@ const EnvironmentSchema = z BATCH_QUEUE_SHARD_COUNT: z.coerce.number().int().default(1), // Maximum queues to fetch from master queue per iteration BATCH_QUEUE_MASTER_QUEUE_LIMIT: z.coerce.number().int().default(1000), - // Worker queue blocking timeout in seconds (for two-stage processing) + // Enable worker queue for two-stage processing (claim messages, push to worker queue, process from worker queue) + BATCH_QUEUE_WORKER_QUEUE_ENABLED: BoolEnv.default(true), + // Worker queue blocking timeout in seconds (for two-stage processing, only used when BATCH_QUEUE_WORKER_QUEUE_ENABLED is true) BATCH_QUEUE_WORKER_QUEUE_TIMEOUT_SECONDS: z.coerce.number().int().default(10), // Global rate limit: max items processed per second across all consumers // If not set, no global rate limiting is applied diff --git a/apps/webapp/app/v3/runEngine.server.ts b/apps/webapp/app/v3/runEngine.server.ts index 4d18245324..10f8e550b3 100644 --- a/apps/webapp/app/v3/runEngine.server.ts +++ b/apps/webapp/app/v3/runEngine.server.ts @@ -174,7 +174,9 @@ function createRunEngine() { masterQueueLimit: env.BATCH_QUEUE_MASTER_QUEUE_LIMIT, }, shardCount: env.BATCH_QUEUE_SHARD_COUNT, - workerQueueBlockingTimeoutSeconds: env.BATCH_QUEUE_WORKER_QUEUE_TIMEOUT_SECONDS, + workerQueueBlockingTimeoutSeconds: env.BATCH_QUEUE_WORKER_QUEUE_ENABLED + ? env.BATCH_QUEUE_WORKER_QUEUE_TIMEOUT_SECONDS + : undefined, consumerCount: env.BATCH_QUEUE_CONSUMER_COUNT, consumerIntervalMs: env.BATCH_QUEUE_CONSUMER_INTERVAL_MS, // Default processing concurrency when no specific limit is set From 58a7bcaa25fecc36bbe918c23a00296bacd2d3a8 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 7 Jan 2026 13:06:08 +0000 Subject: [PATCH 11/14] prevent memory leak in abort listeners --- .../redis-worker/src/fair-queue/workerQueue.ts | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/packages/redis-worker/src/fair-queue/workerQueue.ts b/packages/redis-worker/src/fair-queue/workerQueue.ts index b60201c90a..b3b75e9db3 100644 --- a/packages/redis-worker/src/fair-queue/workerQueue.ts +++ b/packages/redis-worker/src/fair-queue/workerQueue.ts @@ -101,12 +101,17 @@ export class WorkerQueueManager { // This is required because BLPOP blocks the connection const blockingClient = this.redis.duplicate(); + // Define cleanup outside try so it's accessible in finally + // This prevents listener accumulation on the AbortSignal + const cleanup = signal + ? () => { + blockingClient.disconnect(); + } + : null; + try { // Set up abort handler - if (signal) { - const cleanup = () => { - blockingClient.disconnect(); - }; + if (signal && cleanup) { signal.addEventListener("abort", cleanup, { once: true }); if (signal.aborted) { @@ -143,6 +148,11 @@ export class WorkerQueueManager { throw error; } finally { + // Always remove the listener to prevent accumulation on the AbortSignal + // (once: true only removes if abort fires, not on normal completion) + if (cleanup && signal) { + signal.removeEventListener("abort", cleanup); + } await blockingClient.quit().catch(() => { // Ignore quit errors (may already be disconnected) }); From 51fb53a968eb9c914ba1b4e1cf611eefca8b0714 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 7 Jan 2026 14:10:32 +0000 Subject: [PATCH 12/14] cleanup batch span processor in the batch queue on stop --- internal-packages/run-engine/src/batch-queue/index.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal-packages/run-engine/src/batch-queue/index.ts b/internal-packages/run-engine/src/batch-queue/index.ts index 7c67c6824d..769bddcc1f 100644 --- a/internal-packages/run-engine/src/batch-queue/index.ts +++ b/internal-packages/run-engine/src/batch-queue/index.ts @@ -503,6 +503,10 @@ export class BatchQueue { */ async close(): Promise { await this.stop(); + + // Clean up any remaining batched spans (safety net for spans not cleaned up by consumer loops) + this.batchedSpanManager.cleanupAll(); + await this.fairQueue.close(); await this.workerQueueManager.close(); await this.completionTracker.close(); From 4ce106606ba2e201f852cda29772cc701413860a Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 7 Jan 2026 14:22:05 +0000 Subject: [PATCH 13/14] complete a message if the stored message is not found --- internal-packages/run-engine/src/batch-queue/index.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/internal-packages/run-engine/src/batch-queue/index.ts b/internal-packages/run-engine/src/batch-queue/index.ts index 769bddcc1f..9f13fce89f 100644 --- a/internal-packages/run-engine/src/batch-queue/index.ts +++ b/internal-packages/run-engine/src/batch-queue/index.ts @@ -684,6 +684,7 @@ export class BatchQueue { if (!storedMessage) { this.logger.error("Message not found in in-flight data", { messageId, queueId }); + await this.fairQueue.completeMessage(messageId, queueId); return; } From 6221282bd7d92b2b0d913eae62eafc12a04e729e Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 7 Jan 2026 14:50:38 +0000 Subject: [PATCH 14/14] better checking if an error is an abort error --- .../run-engine/src/batch-queue/index.ts | 3 ++- packages/redis-worker/src/fair-queue/index.ts | 5 +++-- packages/redis-worker/src/index.ts | 1 + packages/redis-worker/src/utils.ts | 12 ++++++++++++ 4 files changed, 18 insertions(+), 3 deletions(-) create mode 100644 packages/redis-worker/src/utils.ts diff --git a/internal-packages/run-engine/src/batch-queue/index.ts b/internal-packages/run-engine/src/batch-queue/index.ts index 9f13fce89f..4ee337f7b4 100644 --- a/internal-packages/run-engine/src/batch-queue/index.ts +++ b/internal-packages/run-engine/src/batch-queue/index.ts @@ -13,6 +13,7 @@ import { CallbackFairQueueKeyProducer, WorkerQueueManager, BatchedSpanManager, + isAbortError, type FairQueueOptions, type StoredMessage, } from "@trigger.dev/redis-worker"; @@ -663,7 +664,7 @@ export class BatchQueue { } } } catch (error) { - if (error instanceof Error && error.name === "AbortError") { + if (isAbortError(error)) { this.logger.debug("Worker queue consumer aborted", { loopId }); this.batchedSpanManager.cleanup(loopId); return; diff --git a/packages/redis-worker/src/fair-queue/index.ts b/packages/redis-worker/src/fair-queue/index.ts index 8ed2772066..b2281cfa59 100644 --- a/packages/redis-worker/src/fair-queue/index.ts +++ b/packages/redis-worker/src/fair-queue/index.ts @@ -7,6 +7,7 @@ import { type z } from "zod"; import { ConcurrencyManager } from "./concurrency.js"; import { MasterQueue } from "./masterQueue.js"; import { type RetryStrategy, ExponentialBackoffRetry } from "./retry.js"; +import { isAbortError } from "../utils.js"; import { FairQueueTelemetry, FairQueueAttributes, @@ -769,7 +770,7 @@ export class FairQueue { }); } } catch (error) { - if (error instanceof Error && error.message === "AbortError") { + if (isAbortError(error)) { this.logger.debug("Master queue consumer aborted", { loopId }); this.batchedSpanManager.cleanup(loopId); return; @@ -1330,7 +1331,7 @@ export class FairQueue { } } } catch (error) { - if (error instanceof Error && error.name === "AbortError") { + if (isAbortError(error)) { this.logger.debug("Reclaim loop aborted"); return; } diff --git a/packages/redis-worker/src/index.ts b/packages/redis-worker/src/index.ts index 6163c8faa6..1c5147ea48 100644 --- a/packages/redis-worker/src/index.ts +++ b/packages/redis-worker/src/index.ts @@ -1,5 +1,6 @@ export * from "./queue.js"; export * from "./worker.js"; +export * from "./utils.js"; // Fair Queue System export * from "./fair-queue/index.js"; diff --git a/packages/redis-worker/src/utils.ts b/packages/redis-worker/src/utils.ts new file mode 100644 index 0000000000..a577de3b0e --- /dev/null +++ b/packages/redis-worker/src/utils.ts @@ -0,0 +1,12 @@ +/** + * Check if an error is an AbortError. + * + * This handles both: + * - Custom abort errors created with `new Error("AbortError")` (sets .message) + * - Native Node.js AbortError from timers/promises (sets .name) + */ +export function isAbortError(error: unknown): boolean { + return ( + error instanceof Error && (error.name === "AbortError" || error.message === "AbortError") + ); +}