Back to home

Getting Started

From code to production in three steps. Define tasks, compose workflows, and run your workers.

1

Define Tasks

Create task handlers that do one thing well. Built-in timeout, retry, and progress reporting.

@task(name="send-email")
class SendEmailTask:
    async def run(self, ctx: TaskContext, input: EmailInput) -> EmailResult:
        ctx.report_progress(0.1, "Connecting to SMTP...")
        ctx.report_progress(0.5, "Sending email...")

        message_id = await email_service.send(input)

        ctx.report_progress(1.0, "Email sent")
        return EmailResult(message_id=message_id, status="sent")
2

Compose Workflows

Build workflows using the WorkflowContext API. Orchestrate tasks, handle failures, add compensation logic.

@workflow(name="user-onboarding")
class OnboardingWorkflow:
    async def run(self, ctx: WorkflowContext, input: UserInput) -> OnboardingResult:
        user = await ctx.run("create-user", lambda: user_service.create(input))

        # Run email and setup in parallel (schedule_task returns handles)
        email_handle = ctx.schedule_task("send-email", {
            "to": input.email, "subject": "Welcome!", "body": "..."
        })
        setup_handle = ctx.schedule_task("setup-account", {"user_id": user["id"]})
        email_result = await email_handle.result()
        setup_result = await setup_handle.result()

        return OnboardingResult(user_id=user["id"], status="completed")
3

Register and Start

Register your workflows and tasks with the Flovyn client. Start the worker to poll and execute jobs.

async def main():
    client = FlovynClient(
        server_url="localhost:9090",
        org_id=my_org_id,
        queue="onboarding-workers",
    )

    client.register_workflow(OnboardingWorkflow)
    client.register_task(SendEmailTask)
    client.register_task(SetupAccountTask)

    async with client:
        await client.run()  # Blocks and polls for work