Back to Home
Flovyn vs Celery
Celery is a distributed task queue. Flovyn is a workflow orchestration platform with event sourcing. They solve different problems.
Core Difference
Celery
Fire tasks, hope they complete, manually handle failures.
Flovyn
Define workflows as code, the platform guarantees completion with full history.
Simple Task Execution
Celery
@celery.task(bind=True, max_retries=3)
def process_payment(self, order_id, amount):
try:
result = payment_gateway.charge(amount)
return result
except PaymentError as e:
self.retry(exc=e, countdown=60)Flovyn
@dataclass
class OrderInput:
order_id: str
amount: float
class OrderWorkflow(WorkflowDefinition[OrderInput, PaymentResult]):
kind = "order-workflow"
async def execute(self, ctx: WorkflowContext, input: OrderInput) -> PaymentResult:
# Automatic retries, result cached on replay
return await ctx.schedule("process-payment", {"order_id": input.order_id, "amount": input.amount})Multi-Step Process
Celery
- Chain tasks, hope nothing fails in between@celery.task
def charge_payment(order_id, amount):
return payment_gateway.charge(amount)
@celery.task
def reserve_inventory(order_id, items):
return inventory_service.reserve(items)
@celery.task
def create_shipment(order_id, address):
return shipping_service.create(address)
# Orchestration via chains - no compensation on failure
chain = (
charge_payment.s(order_id, amount) |
reserve_inventory.s(order_id, items) |
create_shipment.s(order_id, address)
)
chain.apply_async()
# What if create_shipment fails after payment succeeded?
# You need separate error handling, dead letter queues, manual interventionFlovyn
- Single workflow with automatic compensationclass OrderWorkflow(WorkflowDefinition[OrderInput, OrderResult]):
kind = "order-workflow"
async def execute(self, ctx: WorkflowContext, input: OrderInput) -> OrderResult:
# Step 1: Payment
payment = await ctx.schedule("charge-payment", input.payment_details)
# Step 2: Inventory
inventory = await ctx.schedule("reserve-inventory", input.items)
if inventory["status"] == "failed":
# Compensate: refund payment
await ctx.run("refund", lambda: refund_payment(payment["transaction_id"]))
raise NonRetryableError("Inventory unavailable")
# Step 3: Shipment
try:
shipment = await ctx.schedule("create-shipment", input.shipping_address)
except Exception:
# Compensate: release inventory, refund payment
await ctx.run("release", lambda: release_inventory(inventory["reservation_id"]))
await ctx.run("refund", lambda: refund_payment(payment["transaction_id"]))
raise
return OrderResult(status="completed", tracking_number=shipment["tracking_number"])
Waiting for External Events
Celery
- Poll database, manage state manually@celery.task
def wait_for_approval(request_id):
# Poll database every minute for 7 days
for _ in range(10080): # 7 days * 24 hours * 60 minutes
approval = db.get_approval(request_id)
if approval:
return approval
time.sleep(60)
return {"status": "timeout"}
# Problems:
# - Worker tied up polling for days
# - Server restart loses progress
# - No way to know current stateFlovyn
- Durable promises, sleep for daysclass ApprovalWorkflow(WorkflowDefinition[ApprovalRequest, ApprovalResult]):
kind = "approval-workflow"
async def execute(self, ctx: WorkflowContext, input: ApprovalRequest) -> ApprovalResult:
await ctx.run("send-request", lambda: send_approval_request(input))
# Wait up to 7 days - worker is NOT blocked
manager_approval = await ctx.promise("manager-approval", timeout=timedelta(days=7))
try:
approval = await manager_approval.wait()
except TimeoutError:
await ctx.run("escalate", lambda: escalate_to_director(input))
approval = await ctx.promise("director-approval", timeout=timedelta(days=3)).wait()
return ApprovalResult(approved=True, approver=approval["approver_id"])
# External system calls: POST /workflows/{id}/signals/manager-approval
# Workflow resumes exactly where it left off
Scheduled Retries with Backoff
Celery
- Manual retry state management@celery.task(bind=True)
def sync_with_api(self, resource_id):
try:
return api_client.sync(resource_id)
except RateLimitError:
# Retry with exponential backoff
retry_count = self.request.retries
countdown = min(2 ** retry_count * 60, 3600) # Max 1 hour
self.retry(countdown=countdown, max_retries=10)
# Problems:
# - Retry count lost on worker restart
# - No visibility into retry history
# - Hard to debug failed syncsFlovyn
- State preserved across restartsclass SyncWorkflow(WorkflowDefinition[SyncInput, SyncResult]):
kind = "sync-workflow"
async def execute(self, ctx: WorkflowContext, input: SyncInput) -> SyncResult:
max_retries = 10
for attempt in range(max_retries):
await ctx.set("attempt", attempt + 1)
try:
return await ctx.schedule("api-sync", input)
except RateLimitError:
delay = min(2 ** attempt * 60, 3600)
await ctx.sleep(timedelta(seconds=delay))
raise NonRetryableError(f"Failed after {max_retries} attempts")
# Server can restart mid-sleep - workflow resumes at correct point
# Full history: see every attempt, every delay, every error
Parallel Execution with Aggregation
Celery
- Chord pattern, complex error handling@celery.task
def process_document(doc_id):
return ocr_service.process(doc_id)
@celery.task
def aggregate_results(results):
return {"processed": len(results), "results": results}
# Fan-out/fan-in with chord
chord(
[process_document.s(doc_id) for doc_id in document_ids],
aggregate_results.s()
).apply_async()
# Problems:
# - One failure can break entire chord
# - Partial results lost
# - No way to retry specific documentsFlovyn
- Async execution with partial failure handling@dataclass
class BatchInput:
document_ids: list[str]
@dataclass
class BatchResult:
processed: int
failed: int
results: list[dict]
failures: list[dict]
class BatchProcessWorkflow(WorkflowDefinition[BatchInput, BatchResult]):
kind = "batch-process"
async def execute(self, ctx: WorkflowContext, input: BatchInput) -> BatchResult:
# Fan-out: schedule all in parallel
tasks = [
await ctx.schedule_async("process-document", {"doc_id": doc_id})
for doc_id in input.document_ids
]
# Fan-in: collect results, handle partial failures
results = []
failures = []
for i, task in enumerate(tasks):
try:
result = await task
results.append(result)
except Exception as e:
failures.append({"doc_id": input.document_ids[i], "error": str(e)})
return BatchResult(
processed=len(results),
failed=len(failures),
results=results,
failures=failures
)
# Partial failures don't break the workflow
# Can retry individual documents by re-running workflow
# Full audit trail of what succeeded and what failed
Feature Comparison
| Feature | Celery | Flovyn |
|---|---|---|
| Task execution | Yes | Yes |
| Automatic retries | Yes (per-task) | Yes (per-task and workflow-level) |
| State persistence | No (use Redis/DB manually) | Yes (event sourcing) |
| Long-running waits | Blocks worker | Durable sleep (no worker blocked) |
| External signals | Manual polling | Durable promises |
| Workflow history | Logs only | Complete event history |
| Replay/debug | No | Yes (time-travel debugging) |
| Compensation (saga) | Manual | Built-in pattern |
| Server restart | Tasks may be lost | Workflows resume exactly |
| Multi-step orchestration | Chains/chords (fragile) | Native workflow code |
When to Use What
Use Celery when:
- •Simple fire-and-forget tasks
- •No need for multi-step coordination
- •State doesn't matter (idempotent operations)
- •You already have Celery infrastructure
Use Flovyn when:
- •Multi-step business processes
- •Need compensation/rollback on failures
- •Long-running workflows (hours/days/weeks)
- •Audit trail required
- •External system coordination (webhooks, approvals)
- •Debugging production issues matters
Migration Path
Flovyn tasks can wrap existing Celery tasks:
migration.py
# Existing Celery task
@celery.task
def legacy_process(data):
return process_data(data)
# Flovyn task that calls Celery
class LegacyProcessTask(Task):
type = "legacy-process"
async def execute(self, input: dict, context: TaskContext) -> dict:
# Call existing Celery task synchronously
result = legacy_process.delay(input["data"]).get()
return result
# Orchestrate with Flovyn, execute with Celery
class MigrationWorkflow(WorkflowDefinition):
async def execute(self, ctx: WorkflowContext, input: dict):
# Step 1: Use existing Celery task
legacy_result = await ctx.schedule("legacy-process", input)
# Step 2: New Flovyn-native task
new_result = await ctx.schedule("new-process", legacy_result)
return new_resultMigrate incrementally: wrap existing tasks, add new tasks natively, gain workflow orchestration immediately.