Queues | Koze | Primitives Docs

Queues

Build queue consumers with convention-based discovery

Queue Consumers

Koze auto-discovers .queue.ts files in src/server/. Each file becomes a queue consumer handler.

// src/server/notifications.queue.ts
import type { MessageBatch } from 'cloudflare:workers';

export default async function (batch: MessageBatch, env: Env, ctx: ExecutionContext) {
  for (const message of batch.messages) {
    console.log('Processing:', message.body);
    message.ack();
  }
}

Naming Convention

Queue binding names are derived from the filename:

File Binding
notifications.queue.ts NOTIFICATIONS
email-jobs.queue.ts EMAIL_JOBS
data-sync.queue.ts DATA_SYNC

Export Patterns

The framework supports two export patterns:

Default export (recommended):

export default async function (batch: MessageBatch, env: Env, ctx: ExecutionContext) {
  // handle messages
}

Named export:

export async function queue(batch: MessageBatch, env: Env, ctx: ExecutionContext) {
  // handle messages
}

Wrangler Configuration

Add the queue binding to wrangler.jsonc:

{
  "queues": {
    "consumers": [
      {
        "queue": "notifications-queue",
        "max_batch_size": 10,
        "max_batch_timeout": 30
      }
    ]
  }
}

The binding name in Wrangler must match the auto-generated binding from the filename.

Producing Messages

To send messages to a queue, use the binding directly in your server code:

// In a server action or RPC
await env.NOTIFICATIONS.send({
  type: 'welcome',
  userId: user.id,
});

Message Types

Define your message types for type safety:

// src/server/notifications.queue.ts
import type { MessageBatch, Message } from 'cloudflare:workers';

interface NotificationMessage {
  type: 'welcome' | 'reminder' | 'alert';
  userId: string;
  data?: Record<string, unknown>;
}

export default async function (
  batch: MessageBatch<NotificationMessage>,
  env: Env,
  ctx: ExecutionContext
) {
  for (const message of batch.messages) {
    const { type, userId, data } = message.body;
    
    switch (type) {
      case 'welcome':
        await sendWelcomeEmail(userId);
        break;
      case 'reminder':
        await sendReminder(userId, data);
        break;
      case 'alert':
        await sendAlert(userId, data);
        break;
    }
    
    message.ack();
  }
}

Error Handling

Messages that throw errors are automatically retried by Cloudflare. Use message.ack() to mark successful processing:

export default async function (batch: MessageBatch, env: Env, ctx: ExecutionContext) {
  for (const message of batch.messages) {
    try {
      await processMessage(message.body);
      message.ack();
    } catch (err) {
      // Message will be retried
      console.error('Failed to process:', err);
    }
  }
}

Batch Processing

Process entire batches for efficiency:

export default async function (batch: MessageBatch, env: Env, ctx: ExecutionContext) {
  const results = await Promise.allSettled(
    batch.messages.map(async (message) => {
      await processMessage(message.body);
      message.ack();
    })
  );
  
  const failed = results.filter(r => r.status === 'rejected');
  if (failed.length > 0) {
    console.error(`${failed.length} messages failed`);
  }
}