Pipelines
Send events to Cloudflare Pipelines with convention-based bindings
Koze auto-discovers .pipeline.ts files in src/server/. Each file creates a Cloudflare Pipelines Worker binding in wrangler.jsonc, a typed koze:pipeline handle you can use from routes, actions, RPC, and server modules, and optional generated setup artifacts under _cloudflare/pipelines/<name>/.
Cloudflare Pipelines bindings send JSON records to streams from Workers without managing API tokens. Koze wires the binding and generates schema, SQL, and setup commands when you declare them. Cloudflare still owns the remote stream, sink, and pipeline resources, so creation is an explicit provisioning step instead of a build-time side effect.
Quick start
// src/server/analytics.pipeline.ts
// Empty is valid. Koze uses the filename as the framework name,
// binding name, and Cloudflare stream/pipeline name.
This produces:
{
"pipelines": [
{
"pipeline": "analytics",
"binding": "ANALYTICS_PIPELINE"
}
]
}
Use it from server code:
import { pipelines } from 'koze:pipeline';
type AnalyticsEvent = {
userId: string;
event: 'view' | 'click';
path: string;
};
export async function POST({ request }: { request: Request }) {
const event = await request.json<AnalyticsEvent>();
await pipelines.analytics.send(event);
return new Response(null, { status: 204 });
}
send() accepts one record or an array of records. The underlying Cloudflare binding receives an array.
Naming
| File | Binding | pipelines handle |
Cloudflare resource |
|---|---|---|---|
analytics.pipeline.ts |
ANALYTICS_PIPELINE |
pipelines.analytics |
analytics unless overridden |
clickstream.pipeline.ts |
CLICKSTREAM_PIPELINE |
pipelines.clickstream |
clickstream unless overridden |
data-lake.pipeline.ts |
DATA_LAKE_PIPELINE |
pipelines['data-lake'] |
data-lake unless overridden |
Override the binding only when you need to match an existing Worker binding name:
export const pipeline = 'analytics-stream-id-or-name';
export const binding = 'ANALYTICS';
Generated setup
Declare a stream schema and an R2 Data Catalog sink in the same .pipeline.ts file when the pipeline should be provisioned from the app's source of truth:
// src/server/activity-log.pipeline.ts
export const pipeline = 'pims_activity_log_stream';
export const pipelineName = 'pims-activity-log';
export const schema = {
schema_version: 'int32!',
id: 'string!',
created_at: 'string!',
action: 'string!',
status: 'string!',
searchable_text: 'string',
};
export const sink = {
type: 'r2-data-catalog',
name: 'activity_log_sink',
bucket: 'pims-activity-log',
namespace: 'default',
table: 'activity_logs',
rollInterval: 10,
};
Koze emits:
_cloudflare/pipelines/activity-log/schema.json
_cloudflare/pipelines/activity-log/pipeline.sql
_cloudflare/pipelines/activity-log/setup.ps1
_cloudflare/pipelines/activity-log/README.md
The generated stream command disables the HTTP endpoint by default because Koze apps write through the Worker Pipeline binding. Enable HTTP in Cloudflare only when non-Worker systems need to push directly to the stream.
Types
Discovered pipeline names are emitted into src/app.d.ts as a string-literal union and as typed properties on pipelines. Passing an unknown name to pipeline() or sendPipeline() is a type error after the Vite plugin regenerates types.
import { pipelines, sendPipeline } from 'koze:pipeline';
await pipelines.analytics.send({ userId: 'u_123', event: 'view', path: '/dashboard' });
await sendPipeline('analytics', [{ userId: 'u_123', event: 'click', path: '/pricing' }]);
Cloudflare setup
For source-controlled apps, prefer declaring the schema and sink in .pipeline.ts, then run the generated _cloudflare/pipelines/<name>/setup.ps1 commands when creating or recreating the Cloudflare resources.
For one-off or externally managed pipelines, create the stream/pipeline with Wrangler or the Cloudflare dashboard, then put its name or ID in the .pipeline.ts file. Koze still keeps the Worker binding in wrangler.jsonc current.
Related Cloudflare docs: