SDK
Build klados workers with the @arke-institute/rhiza TypeScript package.
Overview
The @arke-institute/rhiza package provides everything you need to build klados workers — the processing agents that execute within Arke workflows. It handles job lifecycle, API communication, logging, and handoff orchestration so you can focus on your processing logic.
npm install @arke-institute/rhizaKladosJob — The Main Abstraction
KladosJob is the primary class for building klados workers. It wraps the incoming request, provides an authenticated API client, handles logging, and manages the full job lifecycle including handoffs to the next step in a workflow.
import { KladosJob } from '@arke-institute/rhiza';
import type { KladosRequest, Output } from '@arke-institute/rhiza';
export default {
async fetch(request: Request, env: Env, ctx: ExecutionContext) {
const req = await request.json<KladosRequest>();
// Accept the job immediately
const job = KladosJob.accept(req, {
agentId: env.AGENT_ID,
agentVersion: env.AGENT_VERSION,
authToken: env.ARKE_AGENT_KEY,
});
// Process in background
ctx.waitUntil(
job.run(async () => {
job.log.info('Starting processing');
const target = await job.fetchTarget();
// Your processing logic here
const result = await processEntity(target);
job.log.success('Processing complete');
return [target.id]; // Return output entity IDs
})
);
// Return immediate acceptance
return Response.json(job.acceptResponse);
}
};The pattern is:
- Accept — Parse the request and create a job (synchronous, immediate)
- Run — Execute your logic in the background via
ctx.waitUntil - Return outputs — The job handles logging, handoff to the next step, and finalization
Job Properties
| Property | Type | Description |
|---|---|---|
job.client | ArkeClient | Authenticated API client for the Arke API |
job.log | KladosLogger | Structured logger (messages written to job collection) |
job.request | KladosRequest | The original job request |
job.config | KladosJobConfig | Agent ID, version, auth token |
job.logId | string | Generated log entity ID |
job.isWorkflow | boolean | Whether this job is part of a rhiza |
job.batchContext | BatchContext? | Scatter/gather batch info (if present) |
job.recurseDepth | number | Current recursion depth (0 if not recursive) |
Job Methods
| Method | Description |
|---|---|
job.run(fn) | Execute processing function with full lifecycle management |
job.fetchTarget() | Fetch the target entity from the API |
job.fetchTargets() | Fetch all target entities (cardinality: many) |
job.start() | Mark job as running (called automatically by run) |
job.complete(outputs) | Mark job as done with outputs (called automatically by run) |
job.fail(error) | Mark job as failed (called automatically if run throws) |
The Job Request
When a klados is invoked, it receives a KladosRequest:
interface KladosRequest {
job_id: string; // Unique job identifier
target_entity?: string; // Single target (cardinality: one)
target_entities?: string[]; // Multiple targets (cardinality: many)
target_collection: string; // Collection with granted permissions
job_collection: string; // Where to write logs
input?: Record<string, unknown>; // Parameters from invocation or workflow
api_base: string; // Arke API URL
expires_at: string; // Permission expiration (ISO 8601)
network: 'test' | 'main'; // Network designation
rhiza?: RhizaContext; // Present when running inside a workflow
}The rhiza field is present when the klados is running as part of a workflow:
interface RhizaContext {
id: string; // Rhiza entity ID
path: string[]; // Step path from entry to current
parent_logs: string[]; // Parent log IDs for chain building
batch?: {
id: string; // Batch entity ID
index: number; // This slot's index (0-based)
total: number; // Total slots in batch
};
scatter_total?: number; // For scatter without gather
recurse_depth?: number; // Current recursion level
}Outputs
Your processing function returns an array of outputs — the entity IDs (or enriched items) that this klados produced:
// Simple: just entity IDs
return ['01KFNR849AZNBWE9DYJRZR7PSA'];
// Enriched: entity IDs with routing properties
return [
{ entity_id: '01K_COPY_1', entity_class: 'canonical' },
{ entity_id: '01K_COPY_2', entity_class: 'mention' },
];When a workflow has route rules, the additional properties on OutputItem are used for routing decisions. For example, items with entity_class: 'mention' might be routed to a different step than items with entity_class: 'canonical'.
type Output = string | OutputItem;
interface OutputItem {
entity_id: string;
[key: string]: unknown; // Properties used for routing
}Logging
KladosLogger collects structured log messages that get written to the job collection:
job.log.info('Fetching target entity', { entityId: target.id });
job.log.warning('Large file detected', { size: file.size });
job.log.error('OCR failed for page', { page: 3, error: err.message });
job.log.success('Processing complete', { outputCount: 5 });Each message includes:
- level —
info,warning,error, orsuccess - message — Human-readable description
- metadata — Optional structured data
- timestamp — Automatically added
Messages are collected in memory during execution and written to the Arke API when the job completes or fails.
Full Worker Example
A complete klados worker that stamps metadata onto entities:
import { KladosJob } from '@arke-institute/rhiza';
import type { KladosRequest, Output } from '@arke-institute/rhiza';
interface Env {
AGENT_ID: string;
AGENT_VERSION: string;
ARKE_AGENT_KEY: string;
}
export default {
async fetch(request: Request, env: Env, ctx: ExecutionContext) {
// Handle verification endpoint
if (new URL(request.url).pathname === '/.well-known/arke-verification') {
return Response.json({
klados_id: env.AGENT_ID,
token: env.VERIFICATION_TOKEN,
});
}
const req = await request.json<KladosRequest>();
const job = KladosJob.accept(req, {
agentId: env.AGENT_ID,
agentVersion: env.AGENT_VERSION,
authToken: env.ARKE_AGENT_KEY,
});
ctx.waitUntil(
job.run(async (): Promise<Output[]> => {
// Fetch the target entity
const target = await job.fetchTarget();
job.log.info('Processing entity', { id: target.id, type: target.type });
// Get current tip for CAS update
const { data: tip } = await job.client.api.GET('/entities/{id}/tip', {
params: { path: { id: target.id } },
});
// Add a processing stamp
const stamps = (target.properties.stamps as string[]) || [];
stamps.push(`${env.AGENT_ID}@${new Date().toISOString()}`);
// Update the entity
await job.client.api.PUT('/entities/{id}', {
params: { path: { id: target.id } },
body: {
expect_tip: tip.cid,
properties: { ...target.properties, stamps },
},
});
job.log.success('Stamp added', { stampCount: stamps.length });
return [target.id];
})
);
return Response.json(job.acceptResponse);
},
};Scatter Worker Example
A klados that produces multiple outputs for scatter:
job.run(async (): Promise<Output[]> => {
const target = await job.fetchTarget();
const pages = await splitIntoPages(target);
const outputs: Output[] = [];
for (const page of pages) {
// Create a new entity for each page
const { data: created } = await job.client.api.POST('/entities', {
body: {
type: 'file',
collection: job.request.target_collection,
properties: {
label: `Page ${page.number}`,
page_number: page.number,
},
relationships: [
{ predicate: 'extracted_from', peer: target.id },
],
},
});
outputs.push({
entity_id: created.id,
content_type: page.hasImages ? 'image' : 'text',
});
}
job.log.success('Split complete', { pageCount: outputs.length });
return outputs;
});The content_type property on each output can be used by workflow route rules to direct items to different processing steps.
Registration Utilities
The package includes utilities for registering kladoi and rhizai programmatically:
import {
registerKlados,
registerRhiza,
configureWorkspace,
} from '@arke-institute/rhiza';
// Configure workspace settings
const workspace = configureWorkspace({
apiBase: 'https://arke-v1.arke.institute',
authToken: process.env.ARKE_API_KEY,
network: 'main',
collection: '01KFNR0H0Q791Y1SMZWEQ09FGV',
});
// Register a klados
const klados = await registerKlados(workspace, {
label: 'My Processor',
endpoint: 'https://my-worker.example.com',
actions_required: ['entity:view', 'entity:update'],
accepts: { types: ['file'], cardinality: 'one' },
produces: { types: ['file'], cardinality: 'one' },
});Registration handles creation, key management, and state persistence so you can redeploy without recreating entities.
Validation
The package exports pure validation functions for checking workflow definitions before registration:
import { validateRhiza, validateKlados } from '@arke-institute/rhiza';
const result = validateRhiza({
entry: 'step_one',
flow: {
step_one: { klados: { id: '...' }, then: { pass: 'step_two' } },
step_two: { klados: { id: '...' }, then: { done: true } },
},
});
if (!result.valid) {
console.error(result.errors);
// e.g. "Unreachable step: orphan_step"
// e.g. "Cycle detected: step_a → step_b → step_a"
// e.g. "Step 'missing' referenced but not defined in flow"
}Validation checks:
- Entry step exists in the flow
- All referenced steps exist
- No cycles
- All paths terminate
- No unreachable steps