Skip to content
GitHub Get Started
Orchestration

Workflow Automation

Orchestrate multi-step agent tasks with durable workflows that survive crashes and restarts. Build them with RivetKit’s workflow() run handler, where each ctx.step() is recorded, retried, and resumed independently, and the output of one step can feed into the next.

A workflow is the durable run handler of an actor. Wrap it in workflow() and drive a multi-step agent task as an ordered series of steps: clone the repo, let an agent fix the bug, then run the tests. Trigger work by sending to a queue; the workflow loops and waits durably for the next message.

Session creation and prompting happen within the step that uses them, so a session never has to outlive the work it backs (sessions are ephemeral and would not survive a replay). Steps reach the Agent OS VM, a separate actor, through ctx.client().

server.ts
import { agentOS, setup } from "@rivet-dev/agentos";
import { actor, queue } from "rivetkit";
import {
type WorkflowLoopContextOf,
workflow,
} from "rivetkit/workflow";
import pi from "./software/pi";
// The Agent OS VM that each workflow step drives. It is its own actor, kept
// separate from the workflow orchestrator so steps can reach it over the client.
const vm = agentOS({ software: [pi] });
// A durable workflow actor. Its `run` handler is built with `workflow()`, so
// every `ctx.step(...)` is recorded, retried, and resumed independently: if the
// process crashes mid-run, replay skips completed steps and continues where it
// left off. Trigger work by sending to the `fixBug` queue; the workflow loops,
// waiting durably for the next message.
const bugFixer = actor({
state: {
lastIssue: null as string | null,
lastExitCode: null as number | null,
},
queues: {
fixBug: queue<{ repo: string; issue: string }>(),
},
run: workflow(async (ctx) => {
await ctx.loop("fix-bug-loop", async (loopCtx) => {
// Wait durably for the next bug-fix request.
const message = await loopCtx.queue.next("wait-fix-bug");
const { repo, issue } = message.body;
// Step 1: Clone the repo. Each step is an isolated, retryable unit of
// work; a crash here resumes from this step on replay.
await loopCtx.step("clone-repo", () => cloneRepo(loopCtx, repo));
// Step 2: An agent fixes the bug. The session is created and closed
// inside the step, so it never has to outlive the work it backs (sessions
// are ephemeral and would not survive a replay).
await loopCtx.step("fix-bug", () => fixBugWithAgent(loopCtx, issue));
// Step 3: Run the tests. The exit code feeds into the next step.
const exitCode = await loopCtx.step("run-tests", () => runTests(loopCtx));
// State changes are only valid inside a step callback, so they are
// recorded as part of replay.
await loopCtx.step("record-result", async () => {
loopCtx.state.lastIssue = issue;
loopCtx.state.lastExitCode = exitCode;
});
});
}),
actions: {
getState: (c) => c.state,
},
});
async function cloneRepo(
ctx: WorkflowLoopContextOf<typeof bugFixer>,
repo: string,
): Promise<void> {
const agentHandle = ctx.client<typeof registry>().vm.getOrCreate("bug-fixer");
await agentHandle.exec(`git clone ${repo} /home/user/repo`);
}
async function fixBugWithAgent(
ctx: WorkflowLoopContextOf<typeof bugFixer>,
issue: string,
): Promise<void> {
const agentHandle = ctx.client<typeof registry>().vm.getOrCreate("bug-fixer");
const session = await agentHandle.createSession("claude", {
env: { ANTHROPIC_API_KEY: process.env.ANTHROPIC_API_KEY! },
});
await agentHandle.sendPrompt(
session.sessionId,
`Fix the bug described in issue: ${issue}`,
);
await agentHandle.closeSession(session.sessionId);
}
async function runTests(
ctx: WorkflowLoopContextOf<typeof bugFixer>,
): Promise<number> {
const agentHandle = ctx.client<typeof registry>().vm.getOrCreate("bug-fixer");
const tests = await agentHandle.exec("cd /home/user/repo && npm test");
return tests.exitCode;
}
export const registry = setup({ use: { vm, bugFixer, codeReviewer } });
registry.start();

See Full Example

Output of one agent session feeds into the next. Each session is created and completed within its own step, and data passes between steps through the VM filesystem (a review file) and step return values.

server.ts
// Agent chaining: the output of one agent session feeds into the next. Each
// session is created and completed within its own step, and data passes between
// steps through the VM filesystem (a review file) and step return values.
const codeReviewer = actor({
state: {
reviewedFiles: 0,
},
queues: {
codeReview: queue<{ filePath: string }>(),
},
run: workflow(async (ctx) => {
await ctx.loop("code-review-loop", async (loopCtx) => {
const message = await loopCtx.queue.next("wait-code-review");
const { filePath } = message.body;
// Step 1: An agent reviews the code and writes findings to a file.
await loopCtx.step("review", () => reviewCode(loopCtx, filePath));
// Step 2: Read the review back from the VM filesystem. Its text is the
// step return value, so it flows into the next step.
const review = await loopCtx.step("read-review", () => readReview(loopCtx));
// Step 3: A second session applies fixes based on the review.
await loopCtx.step("fix", () => applyReview(loopCtx, review));
await loopCtx.step("record-review", async () => {
loopCtx.state.reviewedFiles += 1;
});
});
}),
actions: {
getState: (c) => c.state,
},
});
async function reviewCode(
ctx: WorkflowLoopContextOf<typeof codeReviewer>,
filePath: string,
): Promise<void> {
const agentHandle = ctx.client<typeof registry>().vm.getOrCreate("reviewer");
const session = await agentHandle.createSession("claude", {
env: { ANTHROPIC_API_KEY: process.env.ANTHROPIC_API_KEY! },
});
await agentHandle.sendPrompt(
session.sessionId,
`Review the code at ${filePath} and write your findings to /home/user/review.md`,
);
await agentHandle.closeSession(session.sessionId);
}
async function readReview(
ctx: WorkflowLoopContextOf<typeof codeReviewer>,
): Promise<string> {
const agentHandle = ctx.client<typeof registry>().vm.getOrCreate("reviewer");
const content = await agentHandle.readFile("/home/user/review.md");
return new TextDecoder().decode(content);
}
async function applyReview(
ctx: WorkflowLoopContextOf<typeof codeReviewer>,
review: string,
): Promise<void> {
const agentHandle = ctx.client<typeof registry>().vm.getOrCreate("reviewer");
const session = await agentHandle.createSession("claude", {
env: { ANTHROPIC_API_KEY: process.env.ANTHROPIC_API_KEY! },
});
await agentHandle.sendPrompt(
session.sessionId,
`Apply the following review feedback:\n\n${review}`,
);
await agentHandle.closeSession(session.sessionId);
}

See Full Example

  • Build the actor’s run handler with workflow() so each ctx.step() is durable: recorded, retried, and resumed independently across crashes and restarts.
  • Keep step names stable across code changes. Renaming a step breaks replay for in-progress workflows.
  • Create and close sessions within the step that uses them. Sessions are ephemeral, so keep their lifetime scoped to one unit of work.
  • Pass data between steps via the filesystem or step return values, not session state.
  • Keep state changes and other actor-local side effects inside ctx.step() callbacks; use non-step workflow code (queue waits, loops, sleeps) only for orchestration.
  • Reach the Agent OS VM, a separate actor, from inside a step with ctx.client().
  • See Workflows for the full workflow API reference including timers, joins, and races.