Arke
BuildWorkflows

SDK

Build klados workers with the @arke-institute/rhiza TypeScript package.

Overview

The @arke-institute/rhiza package provides everything you need to build klados workers — the processing agents that execute within Arke workflows. It handles job lifecycle, API communication, logging, and handoff orchestration so you can focus on your processing logic.

npm install @arke-institute/rhiza

KladosJob — The Main Abstraction

KladosJob is the primary class for building klados workers. It wraps the incoming request, provides an authenticated API client, handles logging, and manages the full job lifecycle including handoffs to the next step in a workflow.

import { KladosJob } from '@arke-institute/rhiza';
import type { KladosRequest, Output } from '@arke-institute/rhiza';

export default {
  async fetch(request: Request, env: Env, ctx: ExecutionContext) {
    const req = await request.json<KladosRequest>();

    // Accept the job immediately
    const job = KladosJob.accept(req, {
      agentId: env.AGENT_ID,
      agentVersion: env.AGENT_VERSION,
      authToken: env.ARKE_AGENT_KEY,
    });

    // Process in background
    ctx.waitUntil(
      job.run(async () => {
        job.log.info('Starting processing');
        const target = await job.fetchTarget();

        // Your processing logic here
        const result = await processEntity(target);

        job.log.success('Processing complete');
        return [target.id]; // Return output entity IDs
      })
    );

    // Return immediate acceptance
    return Response.json(job.acceptResponse);
  }
};

The pattern is:

  1. Accept — Parse the request and create a job (synchronous, immediate)
  2. Run — Execute your logic in the background via ctx.waitUntil
  3. Return outputs — The job handles logging, handoff to the next step, and finalization

Job Properties

PropertyTypeDescription
job.clientArkeClientAuthenticated API client for the Arke API
job.logKladosLoggerStructured logger (messages written to job collection)
job.requestKladosRequestThe original job request
job.configKladosJobConfigAgent ID, version, auth token
job.logIdstringGenerated log entity ID
job.isWorkflowbooleanWhether this job is part of a rhiza
job.batchContextBatchContext?Scatter/gather batch info (if present)
job.recurseDepthnumberCurrent recursion depth (0 if not recursive)

Job Methods

MethodDescription
job.run(fn)Execute processing function with full lifecycle management
job.fetchTarget()Fetch the target entity from the API
job.fetchTargets()Fetch all target entities (cardinality: many)
job.start()Mark job as running (called automatically by run)
job.complete(outputs)Mark job as done with outputs (called automatically by run)
job.fail(error)Mark job as failed (called automatically if run throws)

The Job Request

When a klados is invoked, it receives a KladosRequest:

interface KladosRequest {
  job_id: string;              // Unique job identifier
  target_entity?: string;      // Single target (cardinality: one)
  target_entities?: string[];  // Multiple targets (cardinality: many)
  target_collection: string;   // Collection with granted permissions
  job_collection: string;      // Where to write logs
  input?: Record<string, unknown>;  // Parameters from invocation or workflow
  api_base: string;            // Arke API URL
  expires_at: string;          // Permission expiration (ISO 8601)
  network: 'test' | 'main';   // Network designation
  rhiza?: RhizaContext;        // Present when running inside a workflow
}

The rhiza field is present when the klados is running as part of a workflow:

interface RhizaContext {
  id: string;           // Rhiza entity ID
  path: string[];       // Step path from entry to current
  parent_logs: string[];  // Parent log IDs for chain building
  batch?: {
    id: string;         // Batch entity ID
    index: number;      // This slot's index (0-based)
    total: number;      // Total slots in batch
  };
  scatter_total?: number;  // For scatter without gather
  recurse_depth?: number;  // Current recursion level
}

Outputs

Your processing function returns an array of outputs — the entity IDs (or enriched items) that this klados produced:

// Simple: just entity IDs
return ['01KFNR849AZNBWE9DYJRZR7PSA'];

// Enriched: entity IDs with routing properties
return [
  { entity_id: '01K_COPY_1', entity_class: 'canonical' },
  { entity_id: '01K_COPY_2', entity_class: 'mention' },
];

When a workflow has route rules, the additional properties on OutputItem are used for routing decisions. For example, items with entity_class: 'mention' might be routed to a different step than items with entity_class: 'canonical'.

type Output = string | OutputItem;

interface OutputItem {
  entity_id: string;
  [key: string]: unknown;  // Properties used for routing
}

Logging

KladosLogger collects structured log messages that get written to the job collection:

job.log.info('Fetching target entity', { entityId: target.id });
job.log.warning('Large file detected', { size: file.size });
job.log.error('OCR failed for page', { page: 3, error: err.message });
job.log.success('Processing complete', { outputCount: 5 });

Each message includes:

  • levelinfo, warning, error, or success
  • message — Human-readable description
  • metadata — Optional structured data
  • timestamp — Automatically added

Messages are collected in memory during execution and written to the Arke API when the job completes or fails.

Full Worker Example

A complete klados worker that stamps metadata onto entities:

import { KladosJob } from '@arke-institute/rhiza';
import type { KladosRequest, Output } from '@arke-institute/rhiza';

interface Env {
  AGENT_ID: string;
  AGENT_VERSION: string;
  ARKE_AGENT_KEY: string;
}

export default {
  async fetch(request: Request, env: Env, ctx: ExecutionContext) {
    // Handle verification endpoint
    if (new URL(request.url).pathname === '/.well-known/arke-verification') {
      return Response.json({
        klados_id: env.AGENT_ID,
        token: env.VERIFICATION_TOKEN,
      });
    }

    const req = await request.json<KladosRequest>();

    const job = KladosJob.accept(req, {
      agentId: env.AGENT_ID,
      agentVersion: env.AGENT_VERSION,
      authToken: env.ARKE_AGENT_KEY,
    });

    ctx.waitUntil(
      job.run(async (): Promise<Output[]> => {
        // Fetch the target entity
        const target = await job.fetchTarget();
        job.log.info('Processing entity', { id: target.id, type: target.type });

        // Get current tip for CAS update
        const { data: tip } = await job.client.api.GET('/entities/{id}/tip', {
          params: { path: { id: target.id } },
        });

        // Add a processing stamp
        const stamps = (target.properties.stamps as string[]) || [];
        stamps.push(`${env.AGENT_ID}@${new Date().toISOString()}`);

        // Update the entity
        await job.client.api.PUT('/entities/{id}', {
          params: { path: { id: target.id } },
          body: {
            expect_tip: tip.cid,
            properties: { ...target.properties, stamps },
          },
        });

        job.log.success('Stamp added', { stampCount: stamps.length });
        return [target.id];
      })
    );

    return Response.json(job.acceptResponse);
  },
};

Scatter Worker Example

A klados that produces multiple outputs for scatter:

job.run(async (): Promise<Output[]> => {
  const target = await job.fetchTarget();
  const pages = await splitIntoPages(target);

  const outputs: Output[] = [];

  for (const page of pages) {
    // Create a new entity for each page
    const { data: created } = await job.client.api.POST('/entities', {
      body: {
        type: 'file',
        collection: job.request.target_collection,
        properties: {
          label: `Page ${page.number}`,
          page_number: page.number,
        },
        relationships: [
          { predicate: 'extracted_from', peer: target.id },
        ],
      },
    });

    outputs.push({
      entity_id: created.id,
      content_type: page.hasImages ? 'image' : 'text',
    });
  }

  job.log.success('Split complete', { pageCount: outputs.length });
  return outputs;
});

The content_type property on each output can be used by workflow route rules to direct items to different processing steps.

Registration Utilities

The package includes utilities for registering kladoi and rhizai programmatically:

import {
  registerKlados,
  registerRhiza,
  configureWorkspace,
} from '@arke-institute/rhiza';

// Configure workspace settings
const workspace = configureWorkspace({
  apiBase: 'https://arke-v1.arke.institute',
  authToken: process.env.ARKE_API_KEY,
  network: 'main',
  collection: '01KFNR0H0Q791Y1SMZWEQ09FGV',
});

// Register a klados
const klados = await registerKlados(workspace, {
  label: 'My Processor',
  endpoint: 'https://my-worker.example.com',
  actions_required: ['entity:view', 'entity:update'],
  accepts: { types: ['file'], cardinality: 'one' },
  produces: { types: ['file'], cardinality: 'one' },
});

Registration handles creation, key management, and state persistence so you can redeploy without recreating entities.

Validation

The package exports pure validation functions for checking workflow definitions before registration:

import { validateRhiza, validateKlados } from '@arke-institute/rhiza';

const result = validateRhiza({
  entry: 'step_one',
  flow: {
    step_one: { klados: { id: '...' }, then: { pass: 'step_two' } },
    step_two: { klados: { id: '...' }, then: { done: true } },
  },
});

if (!result.valid) {
  console.error(result.errors);
  // e.g. "Unreachable step: orphan_step"
  // e.g. "Cycle detected: step_a → step_b → step_a"
  // e.g. "Step 'missing' referenced but not defined in flow"
}

Validation checks:

  • Entry step exists in the flow
  • All referenced steps exist
  • No cycles
  • All paths terminate
  • No unreachable steps

On this page