Skip to content

platform-queue

Provider-agnostic queue infrastructure for NestJS. Exposes a unified IQueue contract with four ready-made implementations: in-memory FIFO (MemoryQueue), BullMQ/Redis (BullMqQueue), Azure Service Bus (AzureQueue), and Vercel Queues (VercelQueue). Jobs follow a lifecycle from pending through processing to completed or failed, with full metadata tracking.

Package: @breadstone/archipel-platform-queue

npm install @breadstone/archipel-platform-queue

Architecture

mermaid
graph TD
    Core["@breadstone/archipel-platform-queue<br/><i>IQueue, models, errors, MemoryQueue</i>"]
    BullMQ["@breadstone/archipel-platform-queue/bullmq<br/><i>BullMqQueue</i>"]
    Azure["@breadstone/archipel-platform-queue/azure<br/><i>AzureQueue</i>"]
    Vercel["@breadstone/archipel-platform-queue/vercel<br/><i>VercelQueue</i>"]

    BullMQ -->|implements| Core
    Azure -->|implements| Core
    Vercel -->|implements| Core

    BullMQ -.-|requires| R["bullmq"]
    Azure -.-|requires| S["@azure/service-bus"]
    Vercel -.-|requires| V["@vercel/queue"]

Each provider SDK is an optional peer dependency. Install only the one you need. MemoryQueue ships with the core entry point and requires no additional dependencies.


Quick Start

1. Install the provider SDK (optional)

bash
# Pick one (or use MemoryQueue without any extra SDK):
yarn add bullmq
yarn add @azure/service-bus
yarn add @vercel/queue

2. Register the queue

typescript
import { Module } from '@nestjs/common';
import { IQueue, MemoryQueue } from '@breadstone/archipel-platform-queue';

@Module({
  providers: [
    {
      provide: 'IQueue',
      useFactory: () => new MemoryQueue({ maxJobs: 5_000 }),
    },
  ],
  exports: ['IQueue'],
})
export class AppModule {}

Switching providers

Replace the imports — no other code change needed:

typescript
import { BullMqQueue } from '@breadstone/archipel-platform-queue/bullmq';

{
  provide: 'IQueue',
  useFactory: () => new BullMqQueue({ redisUrl: 'redis://localhost:6379' }),
}

Queue Providers

Memory (built-in)

Entry point: @breadstone/archipel-platform-queue

VariableRequiredDefaultDescription
QUEUE_MAX_JOBSNo10000Maximum jobs retained before eviction

BullMQ

Subpath: @breadstone/archipel-platform-queue/bullmqSDK: bullmq ≥ 5.0.0

VariableRequiredDefaultDescription
BULLMQ_REDIS_URLYesRedis connection URL (e.g. redis://localhost:6379)
BULLMQ_PREFIXNo""Optional key prefix for all BullMQ queues
typescript
import { BullMqQueue, BULLMQ_CONFIG_ENTRIES } from '@breadstone/archipel-platform-queue/bullmq';

Azure Service Bus

Subpath: @breadstone/archipel-platform-queue/azureSDK: @azure/service-bus ≥ 7.0.0

VariableRequiredDefaultDescription
AZURE_CONNECTION_STRINGYesAzure Service Bus connection string
AZURE_RECEIVE_WAIT_MSNo5000Max wait time (ms) when receiving messages
typescript
import { AzureQueue, AZURE_CONFIG_ENTRIES } from '@breadstone/archipel-platform-queue/azure';

Vercel Queues

Subpath: @breadstone/archipel-platform-queue/vercelSDK: @vercel/queue ≥ 0.1.0

VariableRequiredDefaultDescription
VERCEL_QUEUE_REGIONYesVercel region code (e.g. iad1, fra1, sfo1)
VERCEL_QUEUE_CONSUMER_GROUPYesConsumer group name for receiving messages
VERCEL_QUEUE_TOKENNoOIDC bearer token (auto-fetched when omitted)
VERCEL_QUEUE_DEPLOYMENT_IDNoDeployment ID for per-deployment message isolation
VERCEL_QUEUE_VISIBILITY_TIMEOUT_SECONDSNo300Visibility timeout in seconds for received messages
typescript
import { VercelQueue, VERCEL_CONFIG_ENTRIES } from '@breadstone/archipel-platform-queue/vercel';

Job Lifecycle

StatusDescription
pendingJob is waiting to be picked up
processingJob has been dequeued and is being processed
completedJob finished successfully via markCompleted()
failedJob finished with an error via markFailed()

Error Handling

Error ClassBase ClassWhen Thrown
QueueErrorErrorBase class for all queue errors
QueueJobNotFoundErrorQueueErrorJob ID does not exist
QueueJobStateErrorQueueErrorJob is in wrong state for requested transition
QueueValidationErrorQueueErrorEmpty queue name or error message

Health Check

The QueueHealthIndicator reports the queue subsystem as available. Import it from the /health subpath:

typescript
import { QueueHealthIndicator } from '@breadstone/archipel-platform-queue/health';
import { HealthModule } from '@breadstone/archipel-platform-health';

@Module({
  imports: [
    QueueModule.register({ /* ... */ }),
    HealthModule.withIndicators([QueueHealthIndicator]),
  ],
})
export class AppModule {}
KeyCheckDependencies
queueAlways reports upNone

Peer Dependencies

PackageRequiredNotes
@nestjs/commonYesNestJS core
bullmqOptionalRequired for BullMQ provider
@azure/service-busOptionalRequired for Azure Service Bus
@vercel/queueOptionalRequired for Vercel Queues

Released under the MIT License.