Back to Home

Flovyn vs Celery

Celery is a distributed task queue. Flovyn is a workflow orchestration platform with event sourcing. They solve different problems.

Core Difference

Celery

Fire tasks, hope they complete, manually handle failures.

Flovyn

Define workflows as code, the platform guarantees completion with full history.

Simple Task Execution

Celery

@celery.task(bind=True, max_retries=3)
def process_payment(self, order_id, amount):
    try:
        result = payment_gateway.charge(amount)
        return result
    except PaymentError as e:
        self.retry(exc=e, countdown=60)

Flovyn

@dataclass
class OrderInput:
    order_id: str
    amount: float

class OrderWorkflow(WorkflowDefinition[OrderInput, PaymentResult]):
    kind = "order-workflow"

    async def execute(self, ctx: WorkflowContext, input: OrderInput) -> PaymentResult:
        # Automatic retries, result cached on replay
        return await ctx.schedule("process-payment", {"order_id": input.order_id, "amount": input.amount})

Multi-Step Process

Celery

- Chain tasks, hope nothing fails in between
@celery.task
def charge_payment(order_id, amount):
    return payment_gateway.charge(amount)

@celery.task
def reserve_inventory(order_id, items):
    return inventory_service.reserve(items)

@celery.task
def create_shipment(order_id, address):
    return shipping_service.create(address)

# Orchestration via chains - no compensation on failure
chain = (
    charge_payment.s(order_id, amount) |
    reserve_inventory.s(order_id, items) |
    create_shipment.s(order_id, address)
)
chain.apply_async()

# What if create_shipment fails after payment succeeded?
# You need separate error handling, dead letter queues, manual intervention

Flovyn

- Single workflow with automatic compensation
class OrderWorkflow(WorkflowDefinition[OrderInput, OrderResult]):
    kind = "order-workflow"

    async def execute(self, ctx: WorkflowContext, input: OrderInput) -> OrderResult:
        # Step 1: Payment
        payment = await ctx.schedule("charge-payment", input.payment_details)

        # Step 2: Inventory
        inventory = await ctx.schedule("reserve-inventory", input.items)
        if inventory["status"] == "failed":
            # Compensate: refund payment
            await ctx.run("refund", lambda: refund_payment(payment["transaction_id"]))
            raise NonRetryableError("Inventory unavailable")

        # Step 3: Shipment
        try:
            shipment = await ctx.schedule("create-shipment", input.shipping_address)
        except Exception:
            # Compensate: release inventory, refund payment
            await ctx.run("release", lambda: release_inventory(inventory["reservation_id"]))
            await ctx.run("refund", lambda: refund_payment(payment["transaction_id"]))
            raise

        return OrderResult(status="completed", tracking_number=shipment["tracking_number"])

Waiting for External Events

Celery

- Poll database, manage state manually
@celery.task
def wait_for_approval(request_id):
    # Poll database every minute for 7 days
    for _ in range(10080):  # 7 days * 24 hours * 60 minutes
        approval = db.get_approval(request_id)
        if approval:
            return approval
        time.sleep(60)
    return {"status": "timeout"}

# Problems:
# - Worker tied up polling for days
# - Server restart loses progress
# - No way to know current state

Flovyn

- Durable promises, sleep for days
class ApprovalWorkflow(WorkflowDefinition[ApprovalRequest, ApprovalResult]):
    kind = "approval-workflow"

    async def execute(self, ctx: WorkflowContext, input: ApprovalRequest) -> ApprovalResult:
        await ctx.run("send-request", lambda: send_approval_request(input))

        # Wait up to 7 days - worker is NOT blocked
        manager_approval = await ctx.promise("manager-approval", timeout=timedelta(days=7))

        try:
            approval = await manager_approval.wait()
        except TimeoutError:
            await ctx.run("escalate", lambda: escalate_to_director(input))
            approval = await ctx.promise("director-approval", timeout=timedelta(days=3)).wait()

        return ApprovalResult(approved=True, approver=approval["approver_id"])

# External system calls: POST /workflows/{id}/signals/manager-approval
# Workflow resumes exactly where it left off

Scheduled Retries with Backoff

Celery

- Manual retry state management
@celery.task(bind=True)
def sync_with_api(self, resource_id):
    try:
        return api_client.sync(resource_id)
    except RateLimitError:
        # Retry with exponential backoff
        retry_count = self.request.retries
        countdown = min(2 ** retry_count * 60, 3600)  # Max 1 hour
        self.retry(countdown=countdown, max_retries=10)

# Problems:
# - Retry count lost on worker restart
# - No visibility into retry history
# - Hard to debug failed syncs

Flovyn

- State preserved across restarts
class SyncWorkflow(WorkflowDefinition[SyncInput, SyncResult]):
    kind = "sync-workflow"

    async def execute(self, ctx: WorkflowContext, input: SyncInput) -> SyncResult:
        max_retries = 10

        for attempt in range(max_retries):
            await ctx.set("attempt", attempt + 1)

            try:
                return await ctx.schedule("api-sync", input)
            except RateLimitError:
                delay = min(2 ** attempt * 60, 3600)
                await ctx.sleep(timedelta(seconds=delay))

        raise NonRetryableError(f"Failed after {max_retries} attempts")

# Server can restart mid-sleep - workflow resumes at correct point
# Full history: see every attempt, every delay, every error

Parallel Execution with Aggregation

Celery

- Chord pattern, complex error handling
@celery.task
def process_document(doc_id):
    return ocr_service.process(doc_id)

@celery.task
def aggregate_results(results):
    return {"processed": len(results), "results": results}

# Fan-out/fan-in with chord
chord(
    [process_document.s(doc_id) for doc_id in document_ids],
    aggregate_results.s()
).apply_async()

# Problems:
# - One failure can break entire chord
# - Partial results lost
# - No way to retry specific documents

Flovyn

- Async execution with partial failure handling
@dataclass
class BatchInput:
    document_ids: list[str]

@dataclass
class BatchResult:
    processed: int
    failed: int
    results: list[dict]
    failures: list[dict]

class BatchProcessWorkflow(WorkflowDefinition[BatchInput, BatchResult]):
    kind = "batch-process"

    async def execute(self, ctx: WorkflowContext, input: BatchInput) -> BatchResult:
        # Fan-out: schedule all in parallel
        tasks = [
            await ctx.schedule_async("process-document", {"doc_id": doc_id})
            for doc_id in input.document_ids
        ]

        # Fan-in: collect results, handle partial failures
        results = []
        failures = []

        for i, task in enumerate(tasks):
            try:
                result = await task
                results.append(result)
            except Exception as e:
                failures.append({"doc_id": input.document_ids[i], "error": str(e)})

        return BatchResult(
            processed=len(results),
            failed=len(failures),
            results=results,
            failures=failures
        )

# Partial failures don't break the workflow
# Can retry individual documents by re-running workflow
# Full audit trail of what succeeded and what failed

Feature Comparison

FeatureCeleryFlovyn
Task executionYesYes
Automatic retriesYes (per-task)Yes (per-task and workflow-level)
State persistenceNo (use Redis/DB manually)Yes (event sourcing)
Long-running waitsBlocks workerDurable sleep (no worker blocked)
External signalsManual pollingDurable promises
Workflow historyLogs onlyComplete event history
Replay/debugNoYes (time-travel debugging)
Compensation (saga)ManualBuilt-in pattern
Server restartTasks may be lostWorkflows resume exactly
Multi-step orchestrationChains/chords (fragile)Native workflow code

When to Use What

Use Celery when:

  • Simple fire-and-forget tasks
  • No need for multi-step coordination
  • State doesn't matter (idempotent operations)
  • You already have Celery infrastructure

Use Flovyn when:

  • Multi-step business processes
  • Need compensation/rollback on failures
  • Long-running workflows (hours/days/weeks)
  • Audit trail required
  • External system coordination (webhooks, approvals)
  • Debugging production issues matters

Migration Path

Flovyn tasks can wrap existing Celery tasks:

migration.py
# Existing Celery task
@celery.task
def legacy_process(data):
    return process_data(data)

# Flovyn task that calls Celery
class LegacyProcessTask(Task):
    type = "legacy-process"

    async def execute(self, input: dict, context: TaskContext) -> dict:
        # Call existing Celery task synchronously
        result = legacy_process.delay(input["data"]).get()
        return result

# Orchestrate with Flovyn, execute with Celery
class MigrationWorkflow(WorkflowDefinition):
    async def execute(self, ctx: WorkflowContext, input: dict):
        # Step 1: Use existing Celery task
        legacy_result = await ctx.schedule("legacy-process", input)

        # Step 2: New Flovyn-native task
        new_result = await ctx.schedule("new-process", legacy_result)

        return new_result

Migrate incrementally: wrap existing tasks, add new tasks natively, gain workflow orchestration immediately.

Built with v0