Workflows | Koze | Primitives Docs

Workflows

Build durable workflows with typed live status via koze:workflow

Workflows

Koze auto-discovers .workflow.ts files in src/server/. Each file becomes a Cloudflare Workflow, and the filename basename becomes its live-status name (e.g. migration.workflow.ts'migration').

// src/server/migration.workflow.ts
import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers';

export class MigrationWorkflow extends WorkflowEntrypoint {
  async run(event: WorkflowEvent, step: WorkflowStep) {
    const data = await step.do('fetch-data', async () => {
      return await fetchLegacyData();
    });

    await step.do('transform-data', async () => {
      return await transformData(data);
    });

    await step.do('import-data', async () => {
      return await importToNewSystem(data);
    });

    return { migrated: data.length };
  }
}

Naming Convention

Workflow binding names are derived from the filename, and the workflowStatus(name, ...) name is simply the filename basename:

File Binding (env) workflowStatus name
migration.workflow.ts MIGRATION_WORKFLOW 'migration'
data-sync.workflow.ts DATA_SYNC_WORKFLOW 'data-sync'
email-campaign.workflow.ts EMAIL_CAMPAIGN_WORKFLOW 'email-campaign'

The discovered names are baked into a compile-time string-literal union (WorkflowName) so passing an unknown name is a type error. Run kuratchi types to regenerate src/app.d.ts whenever you add or rename a workflow file.

Starting Workflows

Start a workflow from server code:

// In a server action
import { env } from 'cloudflare:workers';

export async function startMigration({ formData }: { formData: FormData }) {
  const instance = await env.MIGRATION_WORKFLOW.create({
    params: { sourceId: formData.get('sourceId') },
  });

  redirect(`/migrations/${instance.id}`);
}

Live Status with workflowStatus

Import workflowStatus from the koze:workflow virtual module. The first argument is the workflow name (typed), the second is the instance ID, and the optional third enables live refresh.

<script>
  import { params } from 'koze:request';
  import { workflowStatus } from 'koze:workflow';

  const status = await workflowStatus('migration', params.id, { poll: '2s' });
</script>

if (status.error) {
  <ErrorBanner error={status.error} />
} else if (status.status === 'running') {
  <ProgressBar progress={status.output?.progress} />
} else if (status.status === 'complete') {
  <CompletedBanner result={status.output} />
}

status is an AsyncValue with:

  • status — the current workflow phase ('queued' | 'running' | 'complete' | 'errored' | ...)
  • output — whatever your workflow run() returned
  • error — non-null when the status fetch itself failed
  • pending / success — standard AsyncValue flags

{ poll } auto-refresh

When you pass poll, the framework:

  1. Records the interval on request-scoped state while rendering.
  2. Injects a tiny directive script into the page before sending it.
  3. Uses that directive on the client to re-fetch the URL on each tick and swap <body> with the freshly rendered HTML.
  4. Stops polling when until(status) returns true. The default predicate treats 'complete', 'completed', 'errored', or 'terminated' as terminal.
interface WorkflowStatusOptions<T> {
  poll?: string | number;        // e.g. '2s', 500, '1m'
  until?: (value: T) => boolean; // override the default terminal predicate
}

Because every tick is a full server render, any {status.*} reference in your template reflects the newest data with no client-side reactivity to wire up.

Multiple polls on one page

You can call workflowStatus(..., { poll }) several times on the same page (e.g. one per active instance). The framework uses the shortest interval that was registered and only stops when every registered call reports terminal.

<script>
  import { workflowStatus } from 'koze:workflow';

  const statuses = Object.fromEntries(await Promise.all(
    activeMigrations.map(async (m) => [
      m.id,
      await workflowStatus('migration', m.id, { poll: '2s' }),
    ])
  ));
</script>

Custom until

const status = await workflowStatus('migration', params.id, {
  poll: '2s',
  until: (s) => s.status === 'ready-for-review',
});

Without polling

Omit poll for a one-shot read. The route won't auto-refresh, but you still get the same resolved AsyncValue.

const status = await workflowStatus('migration', params.id);

Workflow Steps

Use step.do() for durable steps that survive restarts:

async run(event: WorkflowEvent, step: WorkflowStep) {
  // Each step is durable - if the workflow restarts,
  // completed steps are skipped
  const users = await step.do('fetch-users', async () => {
    return await db.users.findMany();
  });

  // Steps can depend on previous results
  const processed = await step.do('process-users', async () => {
    return await processUsers(users);
  });

  return { processed: processed.length };
}

Workflow Events

Access the triggering event in your workflow:

async run(event: WorkflowEvent<{ sourceId: string }>, step: WorkflowStep) {
  const { sourceId } = event.payload.params;
  
  const data = await step.do('fetch', async () => {
    return await fetchFromSource(sourceId);
  });
  
  return { fetched: data.length };
}

Sleep and Delays

Use step.sleep() for durable delays:

async run(event: WorkflowEvent, step: WorkflowStep) {
  await step.do('send-welcome', async () => {
    await sendWelcomeEmail(event.payload.userId);
  });

  // Wait 24 hours (survives restarts)
  await step.sleep('wait-for-followup', '24 hours');

  await step.do('send-followup', async () => {
    await sendFollowupEmail(event.payload.userId);
  });
}

Error Handling

Workflows automatically retry failed steps. Handle errors explicitly when needed:

async run(event: WorkflowEvent, step: WorkflowStep) {
  try {
    await step.do('risky-operation', async () => {
      return await riskyOperation();
    });
  } catch (err) {
    await step.do('handle-failure', async () => {
      await notifyAdmin(err);
      await cleanupPartialWork();
    });
    throw err; // Re-throw to mark workflow as errored
  }
}

Wrangler Configuration

The framework auto-syncs workflow bindings to wrangler.jsonc:

{
  "workflows": [
    {
      "name": "migration-workflow",
      "binding": "MIGRATION_WORKFLOW",
      "class_name": "MigrationWorkflow",
      "script_name": "src/worker.ts"
    }
  ]
}