Back to home

The WorkflowContext API

Durable execution means your code can crash and resume exactly where it left off. But this only works if every operation is recorded and can be replayed. The WorkflowContext is your interface to make this happen.

Wrapping Side Effects

When a workflow replays after a crash, it re-executes your code from the beginning. Without protection, this means HTTP calls fire twice, database writes duplicate, and external APIs see the same request again.

The ctx.run() function solves this. It executes your side effect once, records the result, and on replay returns the cached result instead of re-executing. Your code looks normal, but behaves correctly across restarts.

# Without ctx.run() - this HTTP call would fire on every replay
result = await http_client.get(url)  # BAD: duplicates on crash

# With ctx.run() - executes once, replays from cache
result = await ctx.run("fetch-data", lambda: http_client.get(url))

Scheduling Tasks

For operations that take longer or need their own retry policies, use tasks. Tasks are separate units of work that run independently and can be distributed across workers. They have built-in timeouts, retries, and progress reporting.

Use execute_task() to run a task and wait for its result. Use schedule_task() to start multiple tasks in parallel, then await their results when needed.

# Execute a task and wait for result
payment = await ctx.execute_task("payment-task", {"amount": 100})

# Run multiple tasks in parallel
handle_a = ctx.schedule_task("send-email", email_data)
handle_b = ctx.schedule_task("send-sms", sms_data)

# Wait for both to complete
email_result = await handle_a.result()
sms_result = await handle_b.result()

Deterministic Operations

Replay only works if your code produces the same results every time. But some operations are inherently non-deterministic: the current time changes, UUIDs are random, and Math.random() returns different values.

The context provides deterministic alternatives. These record their value on first execution and return the same value on replay. Use them instead of the standard library equivalents.

# These return the same value on replay
order_id = ctx.random_uuid()
created_at = ctx.current_time()
delay_ms = int(ctx.random() * 4000) + 1000

Persisting State

Sometimes you need to store data that persists across workflow restarts but isn't part of the execution log. Maybe you're tracking progress through a multi-step process, or caching expensive computations.

The state API gives you a key-value store scoped to each workflow execution. State survives crashes and restarts, and is automatically cleaned up when the workflow completes.

# Store progress through a multi-step process
await ctx.set_state("current_step", "processing_payment")
await ctx.set_state("items_processed", 42)

# Read state (survives restarts)
step = await ctx.get_state("current_step", type_hint=str)
count = await ctx.get_state("items_processed", type_hint=int)

# Clean up when done
await ctx.clear_state("current_step")

Timers and External Signals

Long-running workflows often need to wait. Maybe you're implementing a reminder that fires in 24 hours, or waiting for human approval before proceeding. Normal sleep calls don't survive restarts—if your worker crashes, the timer is lost.

Durable timers are persisted to the database. The workflow can sleep for days, and even if every worker restarts, the timer will fire at the right time. Promises let you pause execution until an external system sends a signal.

# Sleep for 24 hours - survives worker restarts
await ctx.sleep(timedelta(hours=24))

# Wait for external approval with 7-day timeout
approval = await ctx.promise(
    "manager-approval",
    timeout=timedelta(days=7)
)

if approval["approved"]:
    # proceed with the workflow
    ...

Real-time Streaming

Modern AI applications need to stream responses as they're generated. Users expect to see LLM tokens appear in real-time, not wait for the entire response. But streaming and durable execution seem incompatible—how do you replay a stream?

Flovyn separates streaming from execution. The streaming APIs send data directly to clients via Server-Sent Events without affecting the execution log. On replay, the final result is returned immediately while any active subscribers receive the cached stream.

@task(name="llm-generate")
class LLMGenerateTask:
    async def run(self, ctx: TaskContext, prompt: str) -> str:
        response = ""

        async for token in llm.stream(prompt):
            response += token
            ctx.stream_token(token)  # Sent to client immediately

        ctx.stream_progress(1.0, "Done")
        return response  # This is what gets recorded