メインコンテンツまでスキップ

Using BullMQ

BullMQ is a powerful job queue library for Node.js backed by Redis. This guide shows how to integrate BullMQ with Zelt using dependency injection and lifecycle management.

Installation

pnpm add bullmq ioredis

Basic Setup

Create a service that manages the Redis connection and exposes the BullMQ client:

@Config
class BullMQConfig {
  static readonly Token = BullMQConfig;

  constructor(private env = inject(Env)) {}

  get connection(): RedisOptions {
    return {
      host: this.env.getString('REDIS_HOST', 'localhost'),
      port: Number(this.env.getString('REDIS_PORT', '6379')),
    };
  }
}

@Injectable()
class BullMQService implements Lifecycle {
  readonly client: Redis;

  constructor(
    private config = inject(BullMQConfig),
    lifecycle = inject(LifecycleManager),
  ) {
    this.client = new Redis(this.config.connection);
    lifecycle.register(this);
  }

  async startup(): Promise<void> {}

  async shutdown(): Promise<void> {
    await this.client.quit();
  }
}

Creating Queues

Inject BullMQService and create queues using the shared connection:

@Injectable()
class EmailService {
  private readonly queue: Queue;

  constructor(bullmq = inject(BullMQService)) {
    this.queue = new Queue('email', { connection: bullmq.client });
  }

  async sendWelcomeEmail(to: string): Promise<void> {
    await this.queue.add('welcome', { to, subject: 'Welcome!', body: '...' });
  }

  async sendPasswordReset(to: string, token: string): Promise<void> {
    await this.queue.add('password-reset', { to, token }, {
      attempts: 3,
      backoff: { type: 'exponential', delay: 1000 },
    });
  }
}

Creating Workers

Workers process jobs from the queue. Register them with the lifecycle manager for graceful shutdown:

@Injectable()
class EmailWorker implements Lifecycle {
  private readonly worker: Worker<EmailJobData>;

  constructor(
    bullmq = inject(BullMQService),
    private emailClient = inject(EmailClient),
    lifecycle = inject(LifecycleManager),
  ) {
    this.worker = new Worker<EmailJobData>('email', this.process.bind(this), {
      connection: bullmq.client,
      concurrency: 5,
    });
    lifecycle.register(this);
  }

  private async process(job: Job<EmailJobData>): Promise<void> {
    switch (job.name) {
      case 'welcome':
        await this.emailClient.send(job.data.to, job.data.subject!, job.data.body!);
        break;
      case 'password-reset':
        await this.emailClient.sendPasswordReset(job.data.to, job.data.token!);
        break;
    }
  }

  async startup(): Promise<void> {}

  async shutdown(): Promise<void> {
    await this.worker.close();
  }
}

Using in Controllers

Enqueue jobs from your HTTP controllers:

@Controller('/users')
class UserController {
  constructor(private emailService = inject(EmailService)) {}

  @Post('/register')
  async register() {
    const body = validated(v.object({ email: v.string() }));
    
    // ... create user
    
    await this.emailService.sendWelcomeEmail(body.email);
    
    return { message: 'User registered' };
  }
}

App Configuration

Register your services in the app:

const app = createApp([http({ controllers: [UserController] })], { configs: [BullMQConfig] });

export default app;

To start workers, ensure they are instantiated at startup:

const app = createApp([http({ controllers: [UserController] })], { configs: [BullMQConfig] });

// Instantiate worker to start processing
app.createRuntime().then(() => {
  inject(EmailWorker);
});

Custom Configuration

Extend BullMQConfig for different environments:

@Config
class ProductionBullMQConfig extends BullMQConfig {
  override get connection(): ConnectionOptions {
    return {
      host: this.env.getRequired('REDIS_HOST'),
      port: Number(this.env.getString('REDIS_PORT', '6379')),
      password: this.env.getString('REDIS_PASSWORD'),
      tls: this.env.getString('REDIS_TLS') === 'true' ? {} : undefined,
    };
  }
}

Job Options

BullMQ supports many job options. Use them directly:

await queue.add('report', { userId: 123 }, {
  delay: 60000,                    // Delay 1 minute
  attempts: 5,                     // Retry 5 times
  backoff: { type: 'exponential', delay: 2000 },
  priority: 1,                     // Higher priority
  removeOnComplete: 100,           // Keep last 100 completed
  removeOnFail: 50,                // Keep last 50 failed
});

Scheduled Jobs

For recurring jobs, use BullMQ's repeat feature:

await queue.add('daily-report', {}, {
  repeat: {
    pattern: '0 9 * * *',          // Every day at 9:00
    tz: 'Asia/Tokyo',
  },
});

Monitoring

Use Bull Board or Arena to monitor your queues. These integrate directly with BullMQ.

Testing

For testing, use a separate Redis instance or mock the queue:

describe('EmailService', () => {
  it('enqueues welcome email', async () => {
    const mockQueue = { add: vi.fn() };
    const service = new EmailService({ client: {} } as any);
    (service as any).queue = mockQueue;

    await service.sendWelcomeEmail('test@example.com');

    expect(mockQueue.add).toHaveBeenCalledWith('welcome', {
      to: 'test@example.com',
      subject: 'Welcome!',
      body: '...',
    });
  });
});

For integration tests, use Testcontainers with a test config override:

const redis = await new GenericContainer('redis:7').withExposedPorts(6379).start();

@Config
class TestBullMQConfig extends BullMQConfig {
  override get connection() {
    return {
      host: redis.getHost(),
      port: redis.getMappedPort(6379),
    };
  }
}

// Use TestBullMQConfig in your test app setup