# Pipelines

> Send events to Cloudflare Pipelines with convention-based bindings

Package: Koze
Canonical: https://kuratchi.dev/docs/koze/pipelines
Markdown: https://kuratchi.dev/docs/koze/pipelines.md

Koze auto-discovers `.pipeline.ts` files in `src/server/`. Each file creates a Cloudflare Pipelines Worker binding in `wrangler.jsonc`, a typed `koze:pipeline` handle you can use from routes, actions, RPC, and server modules, and optional generated setup artifacts under `_cloudflare/pipelines/<name>/`.

Cloudflare Pipelines bindings send JSON records to streams from Workers without managing API tokens. Koze wires the binding and generates schema, SQL, and setup commands when you declare them. Cloudflare still owns the remote stream, sink, and pipeline resources, so creation is an explicit provisioning step instead of a build-time side effect.

## Quick start

```ts
// src/server/analytics.pipeline.ts
// Empty is valid. Koze uses the filename as the framework name,
// binding name, and Cloudflare stream/pipeline name.
```

This produces:

```jsonc
{
  "pipelines": [
    {
      "pipeline": "analytics",
      "binding": "ANALYTICS_PIPELINE"
    }
  ]
}
```

Use it from server code:

```ts
import { pipelines } from 'koze:pipeline';

type AnalyticsEvent = {
  userId: string;
  event: 'view' | 'click';
  path: string;
};

export async function POST({ request }: { request: Request }) {
  const event = await request.json<AnalyticsEvent>();
  await pipelines.analytics.send(event);
  return new Response(null, { status: 204 });
}
```

`send()` accepts one record or an array of records. The underlying Cloudflare binding receives an array.

## Naming

| File | Binding | `pipelines` handle | Cloudflare resource |
| --- | --- | --- | --- |
| `analytics.pipeline.ts` | `ANALYTICS_PIPELINE` | `pipelines.analytics` | `analytics` unless overridden |
| `clickstream.pipeline.ts` | `CLICKSTREAM_PIPELINE` | `pipelines.clickstream` | `clickstream` unless overridden |
| `data-lake.pipeline.ts` | `DATA_LAKE_PIPELINE` | `pipelines['data-lake']` | `data-lake` unless overridden |

Override the binding only when you need to match an existing Worker binding name:

```ts
export const pipeline = 'analytics-stream-id-or-name';
export const binding = 'ANALYTICS';
```

## Generated setup

Declare a stream schema and an R2 Data Catalog sink in the same `.pipeline.ts` file when the pipeline should be provisioned from the app's source of truth:

```ts
// src/server/activity-log.pipeline.ts
export const pipeline = 'pims_activity_log_stream';
export const pipelineName = 'pims-activity-log';

export const schema = {
  schema_version: 'int32!',
  id: 'string!',
  created_at: 'string!',
  action: 'string!',
  status: 'string!',
  searchable_text: 'string',
};

export const sink = {
  type: 'r2-data-catalog',
  name: 'activity_log_sink',
  bucket: 'pims-activity-log',
  namespace: 'default',
  table: 'activity_logs',
  rollInterval: 10,
};
```

Koze emits:

```txt
_cloudflare/pipelines/activity-log/schema.json
_cloudflare/pipelines/activity-log/pipeline.sql
_cloudflare/pipelines/activity-log/setup.ps1
_cloudflare/pipelines/activity-log/README.md
```

The generated stream command disables the HTTP endpoint by default because Koze apps write through the Worker Pipeline binding. Enable HTTP in Cloudflare only when non-Worker systems need to push directly to the stream.

## Types

Discovered pipeline names are emitted into `src/app.d.ts` as a string-literal union and as typed properties on `pipelines`. Passing an unknown name to `pipeline()` or `sendPipeline()` is a type error after the Vite plugin regenerates types.

```ts
import { pipelines, sendPipeline } from 'koze:pipeline';

await pipelines.analytics.send({ userId: 'u_123', event: 'view', path: '/dashboard' });
await sendPipeline('analytics', [{ userId: 'u_123', event: 'click', path: '/pricing' }]);
```

## Cloudflare setup

For source-controlled apps, prefer declaring the schema and sink in `.pipeline.ts`, then run the generated `_cloudflare/pipelines/<name>/setup.ps1` commands when creating or recreating the Cloudflare resources.

For one-off or externally managed pipelines, create the stream/pipeline with Wrangler or the Cloudflare dashboard, then put its name or ID in the `.pipeline.ts` file. Koze still keeps the Worker binding in `wrangler.jsonc` current.

Related Cloudflare docs:

- [Writing to streams](https://developers.cloudflare.com/pipelines/streams/writing-to-streams/)
- [Pipelines overview](https://developers.cloudflare.com/pipelines/)
