Queues
Build queue consumers with convention-based discovery
Queue Consumers
Koze auto-discovers .queue.ts files in src/server/. Each file becomes a queue consumer handler.
// src/server/notifications.queue.ts
import type { MessageBatch } from 'cloudflare:workers';
export default async function (batch: MessageBatch, env: Env, ctx: ExecutionContext) {
for (const message of batch.messages) {
console.log('Processing:', message.body);
message.ack();
}
}
Naming Convention
Queue binding names are derived from the filename:
| File | Binding |
|---|---|
notifications.queue.ts |
NOTIFICATIONS |
email-jobs.queue.ts |
EMAIL_JOBS |
data-sync.queue.ts |
DATA_SYNC |
Export Patterns
The framework supports two export patterns:
Default export (recommended):
export default async function (batch: MessageBatch, env: Env, ctx: ExecutionContext) {
// handle messages
}
Named export:
export async function queue(batch: MessageBatch, env: Env, ctx: ExecutionContext) {
// handle messages
}
Wrangler Configuration
Add the queue binding to wrangler.jsonc:
{
"queues": {
"consumers": [
{
"queue": "notifications-queue",
"max_batch_size": 10,
"max_batch_timeout": 30
}
]
}
}
The binding name in Wrangler must match the auto-generated binding from the filename.
Producing Messages
To send messages to a queue, use the binding directly in your server code:
// In a server action or RPC
await env.NOTIFICATIONS.send({
type: 'welcome',
userId: user.id,
});
Message Types
Define your message types for type safety:
// src/server/notifications.queue.ts
import type { MessageBatch, Message } from 'cloudflare:workers';
interface NotificationMessage {
type: 'welcome' | 'reminder' | 'alert';
userId: string;
data?: Record<string, unknown>;
}
export default async function (
batch: MessageBatch<NotificationMessage>,
env: Env,
ctx: ExecutionContext
) {
for (const message of batch.messages) {
const { type, userId, data } = message.body;
switch (type) {
case 'welcome':
await sendWelcomeEmail(userId);
break;
case 'reminder':
await sendReminder(userId, data);
break;
case 'alert':
await sendAlert(userId, data);
break;
}
message.ack();
}
}
Error Handling
Messages that throw errors are automatically retried by Cloudflare. Use message.ack() to mark successful processing:
export default async function (batch: MessageBatch, env: Env, ctx: ExecutionContext) {
for (const message of batch.messages) {
try {
await processMessage(message.body);
message.ack();
} catch (err) {
// Message will be retried
console.error('Failed to process:', err);
}
}
}
Batch Processing
Process entire batches for efficiency:
export default async function (batch: MessageBatch, env: Env, ctx: ExecutionContext) {
const results = await Promise.allSettled(
batch.messages.map(async (message) => {
await processMessage(message.body);
message.ack();
})
);
const failed = results.filter(r => r.status === 'rejected');
if (failed.length > 0) {
console.error(`${failed.length} messages failed`);
}
}