# Queues

> Build queue consumers with convention-based discovery

Package: Koze
Canonical: https://kuratchi.dev/docs/koze/queues
Markdown: https://kuratchi.dev/docs/koze/queues.md

## Queue Consumers

Koze auto-discovers `.queue.ts` files in `src/server/`. Each file becomes a queue consumer handler.

```ts
// src/server/notifications.queue.ts
import type { MessageBatch } from 'cloudflare:workers';

export default async function (batch: MessageBatch, env: Env, ctx: ExecutionContext) {
  for (const message of batch.messages) {
    console.log('Processing:', message.body);
    message.ack();
  }
}
```

## Naming Convention

Queue binding names are derived from the filename:

| File | Binding |
|------|---------|
| `notifications.queue.ts` | `NOTIFICATIONS` |
| `email-jobs.queue.ts` | `EMAIL_JOBS` |
| `data-sync.queue.ts` | `DATA_SYNC` |

## Export Patterns

The framework supports two export patterns:

**Default export (recommended):**
```ts
export default async function (batch: MessageBatch, env: Env, ctx: ExecutionContext) {
  // handle messages
}
```

**Named export:**
```ts
export async function queue(batch: MessageBatch, env: Env, ctx: ExecutionContext) {
  // handle messages
}
```

## Wrangler Configuration

Add the queue binding to `wrangler.jsonc`:

```jsonc
{
  "queues": {
    "consumers": [
      {
        "queue": "notifications-queue",
        "max_batch_size": 10,
        "max_batch_timeout": 30
      }
    ]
  }
}
```

The binding name in Wrangler must match the auto-generated binding from the filename.

## Producing Messages

To send messages to a queue, use the binding directly in your server code:

```ts
// In a server action or RPC
await env.NOTIFICATIONS.send({
  type: 'welcome',
  userId: user.id,
});
```

## Message Types

Define your message types for type safety:

```ts
// src/server/notifications.queue.ts
import type { MessageBatch, Message } from 'cloudflare:workers';

interface NotificationMessage {
  type: 'welcome' | 'reminder' | 'alert';
  userId: string;
  data?: Record<string, unknown>;
}

export default async function (
  batch: MessageBatch<NotificationMessage>,
  env: Env,
  ctx: ExecutionContext
) {
  for (const message of batch.messages) {
    const { type, userId, data } = message.body;
    
    switch (type) {
      case 'welcome':
        await sendWelcomeEmail(userId);
        break;
      case 'reminder':
        await sendReminder(userId, data);
        break;
      case 'alert':
        await sendAlert(userId, data);
        break;
    }
    
    message.ack();
  }
}
```

## Error Handling

Messages that throw errors are automatically retried by Cloudflare. Use `message.ack()` to mark successful processing:

```ts
export default async function (batch: MessageBatch, env: Env, ctx: ExecutionContext) {
  for (const message of batch.messages) {
    try {
      await processMessage(message.body);
      message.ack();
    } catch (err) {
      // Message will be retried
      console.error('Failed to process:', err);
    }
  }
}
```

## Batch Processing

Process entire batches for efficiency:

```ts
export default async function (batch: MessageBatch, env: Env, ctx: ExecutionContext) {
  const results = await Promise.allSettled(
    batch.messages.map(async (message) => {
      await processMessage(message.body);
      message.ack();
    })
  );
  
  const failed = results.filter(r => r.status === 'rejected');
  if (failed.length > 0) {
    console.error(`${failed.length} messages failed`);
  }
}
```
