Workflows
Build durable workflows with typed live status via koze:workflow
Workflows
Koze auto-discovers .workflow.ts files in src/server/. Each file becomes a Cloudflare Workflow, and the filename basename becomes its live-status name (e.g. migration.workflow.ts → 'migration').
// src/server/migration.workflow.ts
import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers';
export class MigrationWorkflow extends WorkflowEntrypoint {
async run(event: WorkflowEvent, step: WorkflowStep) {
const data = await step.do('fetch-data', async () => {
return await fetchLegacyData();
});
await step.do('transform-data', async () => {
return await transformData(data);
});
await step.do('import-data', async () => {
return await importToNewSystem(data);
});
return { migrated: data.length };
}
}
Naming Convention
Workflow binding names are derived from the filename, and the workflowStatus(name, ...) name is simply the filename basename:
| File | Binding (env) | workflowStatus name |
|---|---|---|
migration.workflow.ts |
MIGRATION_WORKFLOW |
'migration' |
data-sync.workflow.ts |
DATA_SYNC_WORKFLOW |
'data-sync' |
email-campaign.workflow.ts |
EMAIL_CAMPAIGN_WORKFLOW |
'email-campaign' |
The discovered names are baked into a compile-time string-literal union (WorkflowName) so passing an unknown name is a type error. Run kuratchi types to regenerate src/app.d.ts whenever you add or rename a workflow file.
Starting Workflows
Start a workflow from server code:
// In a server action
import { env } from 'cloudflare:workers';
export async function startMigration({ formData }: { formData: FormData }) {
const instance = await env.MIGRATION_WORKFLOW.create({
params: { sourceId: formData.get('sourceId') },
});
redirect(`/migrations/${instance.id}`);
}
Live Status with workflowStatus
Import workflowStatus from the koze:workflow virtual module. The first argument is the workflow name (typed), the second is the instance ID, and the optional third enables live refresh.
<script>
import { params } from 'koze:request';
import { workflowStatus } from 'koze:workflow';
const status = await workflowStatus('migration', params.id, { poll: '2s' });
</script>
if (status.error) {
<ErrorBanner error={status.error} />
} else if (status.status === 'running') {
<ProgressBar progress={status.output?.progress} />
} else if (status.status === 'complete') {
<CompletedBanner result={status.output} />
}
status is an AsyncValue with:
status— the current workflow phase ('queued' | 'running' | 'complete' | 'errored' | ...)output— whatever your workflowrun()returnederror— non-null when the status fetch itself failedpending/success— standardAsyncValueflags
{ poll } auto-refresh
When you pass poll, the framework:
- Records the interval on request-scoped state while rendering.
- Injects a tiny directive script into the page before sending it.
- Uses that directive on the client to re-fetch the URL on each tick and swap
<body>with the freshly rendered HTML. - Stops polling when
until(status)returnstrue. The default predicate treats'complete','completed','errored', or'terminated'as terminal.
interface WorkflowStatusOptions<T> {
poll?: string | number; // e.g. '2s', 500, '1m'
until?: (value: T) => boolean; // override the default terminal predicate
}
Because every tick is a full server render, any {status.*} reference in your template reflects the newest data with no client-side reactivity to wire up.
Multiple polls on one page
You can call workflowStatus(..., { poll }) several times on the same page (e.g. one per active instance). The framework uses the shortest interval that was registered and only stops when every registered call reports terminal.
<script>
import { workflowStatus } from 'koze:workflow';
const statuses = Object.fromEntries(await Promise.all(
activeMigrations.map(async (m) => [
m.id,
await workflowStatus('migration', m.id, { poll: '2s' }),
])
));
</script>
Custom until
const status = await workflowStatus('migration', params.id, {
poll: '2s',
until: (s) => s.status === 'ready-for-review',
});
Without polling
Omit poll for a one-shot read. The route won't auto-refresh, but you still get the same resolved AsyncValue.
const status = await workflowStatus('migration', params.id);
Workflow Steps
Use step.do() for durable steps that survive restarts:
async run(event: WorkflowEvent, step: WorkflowStep) {
// Each step is durable - if the workflow restarts,
// completed steps are skipped
const users = await step.do('fetch-users', async () => {
return await db.users.findMany();
});
// Steps can depend on previous results
const processed = await step.do('process-users', async () => {
return await processUsers(users);
});
return { processed: processed.length };
}
Workflow Events
Access the triggering event in your workflow:
async run(event: WorkflowEvent<{ sourceId: string }>, step: WorkflowStep) {
const { sourceId } = event.payload.params;
const data = await step.do('fetch', async () => {
return await fetchFromSource(sourceId);
});
return { fetched: data.length };
}
Sleep and Delays
Use step.sleep() for durable delays:
async run(event: WorkflowEvent, step: WorkflowStep) {
await step.do('send-welcome', async () => {
await sendWelcomeEmail(event.payload.userId);
});
// Wait 24 hours (survives restarts)
await step.sleep('wait-for-followup', '24 hours');
await step.do('send-followup', async () => {
await sendFollowupEmail(event.payload.userId);
});
}
Error Handling
Workflows automatically retry failed steps. Handle errors explicitly when needed:
async run(event: WorkflowEvent, step: WorkflowStep) {
try {
await step.do('risky-operation', async () => {
return await riskyOperation();
});
} catch (err) {
await step.do('handle-failure', async () => {
await notifyAdmin(err);
await cleanupPartialWork();
});
throw err; // Re-throw to mark workflow as errored
}
}
Wrangler Configuration
The framework auto-syncs workflow bindings to wrangler.jsonc:
{
"workflows": [
{
"name": "migration-workflow",
"binding": "MIGRATION_WORKFLOW",
"class_name": "MigrationWorkflow",
"script_name": "src/worker.ts"
}
]
}