Skip to content

Commit 3ed008d

Browse files
committed
better dequeuing from fair queue
1 parent dcb03ef commit 3ed008d

File tree

2 files changed

+34
-16
lines changed

2 files changed

+34
-16
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,4 +62,5 @@ apps/**/public/build
6262
/packages/trigger-sdk/src/package.json
6363
/packages/python/src/package.json
6464
.claude
65-
.mcp.log
65+
.mcp.log
66+
.cursor/debug.log

packages/redis-worker/src/fair-queue/index.ts

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -960,25 +960,42 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
960960
return;
961961
}
962962

963-
// Process one message from each selected queue
963+
// Process messages from each selected tenant
964+
// For fairness, process up to available concurrency slots per tenant
964965
for (const { tenantId, queues } of tenantQueues) {
965-
for (const queueId of queues) {
966-
// Check cooloff
967-
if (this.cooloffEnabled && this.#isInCooloff(queueId)) {
968-
continue;
969-
}
966+
// Get available concurrency for this tenant
967+
let availableSlots = 1; // Default to 1 for backwards compatibility
968+
if (this.concurrencyManager) {
969+
const [current, limit] = await Promise.all([
970+
this.concurrencyManager.getCurrentConcurrency("tenant", tenantId),
971+
this.concurrencyManager.getConcurrencyLimit("tenant", tenantId),
972+
]);
973+
availableSlots = Math.max(1, limit - current);
974+
}
970975

971-
const processed = await this.#processOneMessage(loopId, queueId, tenantId, shardId);
976+
// Process up to availableSlots messages from this tenant's queues
977+
let slotsUsed = 0;
978+
queueLoop: for (const queueId of queues) {
979+
while (slotsUsed < availableSlots) {
980+
// Check cooloff
981+
if (this.cooloffEnabled && this.#isInCooloff(queueId)) {
982+
break; // Try next queue
983+
}
972984

973-
if (processed) {
974-
await this.scheduler.recordProcessed?.(tenantId, queueId);
975-
this.#resetCooloff(queueId);
976-
} else {
977-
this.#incrementCooloff(queueId);
978-
}
985+
const processed = await this.#processOneMessage(loopId, queueId, tenantId, shardId);
979986

980-
// Only process one message per queue per iteration for fairness
981-
break;
987+
if (processed) {
988+
await this.scheduler.recordProcessed?.(tenantId, queueId);
989+
this.#resetCooloff(queueId);
990+
slotsUsed++;
991+
} else {
992+
this.#incrementCooloff(queueId);
993+
break; // Queue empty or blocked, try next queue
994+
}
995+
}
996+
if (slotsUsed >= availableSlots) {
997+
break queueLoop;
998+
}
982999
}
9831000
}
9841001
}

0 commit comments

Comments
 (0)