Back to home
Getting Started
From code to production in three steps. Define tasks, compose workflows, and run your workers.
1
Define Tasks
Create task handlers that do one thing well. Built-in timeout, retry, and progress reporting.
@task(name="send-email")
class SendEmailTask:
async def run(self, ctx: TaskContext, input: EmailInput) -> EmailResult:
ctx.report_progress(0.1, "Connecting to SMTP...")
ctx.report_progress(0.5, "Sending email...")
message_id = await email_service.send(input)
ctx.report_progress(1.0, "Email sent")
return EmailResult(message_id=message_id, status="sent")2
Compose Workflows
Build workflows using the WorkflowContext API. Orchestrate tasks, handle failures, add compensation logic.
@workflow(name="user-onboarding")
class OnboardingWorkflow:
async def run(self, ctx: WorkflowContext, input: UserInput) -> OnboardingResult:
user = await ctx.run("create-user", lambda: user_service.create(input))
# Run email and setup in parallel (schedule_task returns handles)
email_handle = ctx.schedule_task("send-email", {
"to": input.email, "subject": "Welcome!", "body": "..."
})
setup_handle = ctx.schedule_task("setup-account", {"user_id": user["id"]})
email_result = await email_handle.result()
setup_result = await setup_handle.result()
return OnboardingResult(user_id=user["id"], status="completed")3
Register and Start
Register your workflows and tasks with the Flovyn client. Start the worker to poll and execute jobs.
async def main():
client = FlovynClient(
server_url="localhost:9090",
org_id=my_org_id,
queue="onboarding-workers",
)
client.register_workflow(OnboardingWorkflow)
client.register_task(SendEmailTask)
client.register_task(SetupAccountTask)
async with client:
await client.run() # Blocks and polls for work