This document provides a deep dive into Workflowβs architecture, design decisions, and how the components work together to provide a robust, scalable workflow orchestration system.
Workflow is built around several core principles:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Client Application β
βββββββββββββββββββ¬ββββββββββββββββββββββββ¬ββββββββββββββββββββββ
β β
βΌ βΌ
βββββββββββββββββββ βββββββββββββββββββ
β Workflow API β β Web UI β
β - Trigger() β β - Monitor β
β - Await() β β - Debug β
β - Callback() β β - Visualize β
βββββββββββ¬ββββββββ βββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Workflow Engine β
βββββββββββββββββββ¬ββββββββββββββββ¬ββββββββββββββββ¬ββββββββββββββββ€
β Step β Callback β Timeout β Connector β
β Consumers β Handlers β Pollers β Consumers β
β β β β β
β Process β Handle β Schedule β Consume β
β workflow β external β time-based β external β
β steps β events β operations β streams β
βββββββββββ¬ββββββββ΄ββββββββ¬ββββββββ΄ββββββββ¬ββββββββ΄ββββββββ¬ββββββββ
β β β β
βΌ βΌ βΌ βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Role Scheduler β
β Coordinates distributed execution of workflow processes β
β - Ensures only one instance of each role runs at a time β
β - Handles failover and load distribution β
βββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Adapters β
βββββββββββββββ¬ββββββββββββββ¬ββββββββββββββ¬ββββββββββββββ¬ββββββββββ€
βEventStreamerβRecordStore βTimeoutStore β Logger βWebUI β
β β β β β β
β - Kafka β - SQL DB β - SQL DB β - Structuredβ - HTTP β
β - Reflex β - NoSQL β - Redis β - Debug β - React β
β - Memory β - Memory β - Memory β - Custom β - Customβ
βββββββββββββββ΄ββββββββββββββ΄ββββββββββββββ΄ββββββββββββββ΄ββββββββββ
The engine is responsible for:
All components communicate through events, providing:
Loose Coupling: Components donβt need to know about each other directly
Reliability: Events are persisted and can be replayed on failure
Observability: All state changes are observable as events
Scalability: Events can be partitioned and processed in parallel
The role scheduler ensures distributed coordination:
type RoleScheduler interface {
Await(ctx context.Context, role string) (context.Context, context.CancelFunc, error)
}
How it works:
Role Naming Convention:
{workflow-name}-{status}-{process-type}-{shard}-of-{total-shards}
Examples:
order-processing-1-consumer-1-of-1user-onboarding-2-timeout-consumer-2-of-3runID, err := workflow.Trigger(ctx, "order-123",
workflow.WithInitialValue(order),
workflow.WithStartAt(OrderCreated),
)
EventStreamer β Step Consumer β Business Logic β State Update β New Event
To ensure exactly-once processing:
tx := db.Begin()
// 1. Update run state
recordStore.Store(tx, updatedRun)
// 2. Write event to outbox
recordStore.AddOutboxEvent(tx, event)
tx.Commit()
// 3. Async: publish outbox events
outboxProcessor.PublishPending()
This pattern ensures that state changes and events are atomically committed.
Workflow supports several scaling patterns:
Single Instance: All roles run on one machine
// Simple setup - everything on one instance
wf := b.Build(memstreamer.New(), memrecordstore.New(), memrolescheduler.New())
Multi-Instance: Roles distributed across machines
// Production setup - roles distributed via RoleScheduler
wf := b.Build(kafkastreamer.New(), sqlstore.New(), rinkrolescheduler.New())
Sharded Processing: High-throughput steps split across multiple instances
b.AddStep(OrderCreated, processPayment, PaymentProcessed).
WithOptions(workflow.ParallelCount(5)) // 5 parallel processors
Adapters provide infrastructure abstraction:
type EventStreamer interface {
NewSender(ctx context.Context, topic string) (EventSender, error)
NewReceiver(ctx context.Context, topic string, name string, opts ...ReceiverOption) (EventReceiver, error)
}
type EventSender interface {
Send(ctx context.Context, foreignID string, statusType int, headers map[Header]string) error
Close() error
}
type EventReceiver interface {
Recv(ctx context.Context) (*Event, Ack, error)
Close() error
}
Implementations:
type RecordStore interface {
Store(ctx context.Context, record *Record) error
Lookup(ctx context.Context, runID string) (*Record, error)
Latest(ctx context.Context, workflowName, foreignID string) (*Record, error)
List(ctx context.Context, workflowName string, offsetID int64, limit int, order OrderType, filters ...RecordFilter) ([]Record, error)
ListOutboxEvents(ctx context.Context, workflowName string, limit int64) ([]OutboxEvent, error)
DeleteOutboxEvent(ctx context.Context, id string) error
}
Key Requirements:
Step Level: Individual step errors
func processPayment(ctx context.Context, r *workflow.Run[Order, OrderStatus]) (OrderStatus, error) {
if err := paymentService.Charge(r.Object.Amount); err != nil {
if isRetryableError(err) {
return 0, err // Retry with backoff
}
return PaymentFailed, nil // Move to failure state
}
return PaymentProcessed, nil
}
Process Level: Consumer and timeout process errors
System Level: Infrastructure failures
Prometheus metrics for:
Structured logging with:
# docker-compose.yml
services:
workflow-app:
build: .
environment:
- WORKFLOW_ADAPTERS=memory
# kubernetes deployment
apiVersion: apps/v1
kind: Deployment
spec:
replicas: 3
template:
spec:
containers:
- name: workflow-app
env:
- name: KAFKA_BROKERS
value: "kafka:9092"
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: db-secret
key: url
# Each region has its own deployment
# Shared: Database (with replication)
# Regional: Kafka clusters, role schedulers
# Global: Workflow definitions
func TestPaymentStep(t *testing.T) {
run := &workflow.Run[Order, OrderStatus]{
Object: &Order{Amount: 100.00},
}
status, err := processPayment(ctx, run)
assert.NoError(t, err)
assert.Equal(t, PaymentProcessed, status)
}
func TestWorkflowEndToEnd(t *testing.T) {
wf := NewOrderWorkflow()
defer wf.Stop()
wf.Run(ctx)
runID, _ := wf.Trigger(ctx, "order-123", workflow.WithInitialValue(order))
run, err := wf.Await(ctx, "order-123", runID, OrderCompleted)
assert.NoError(t, err)
assert.Equal(t, OrderCompleted, run.Status)
}