workflow

Architecture

This document provides a deep dive into Workflow’s architecture, design decisions, and how the components work together to provide a robust, scalable workflow orchestration system.

System Overview

Workflow is built around several core principles:

Architecture Diagram

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                        Client Application                       β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                  β”‚                       β”‚
                  β–Ό                       β–Ό
         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
         β”‚   Workflow API  β”‚    β”‚   Web UI        β”‚
         β”‚   - Trigger()   β”‚    β”‚   - Monitor     β”‚
         β”‚   - Await()     β”‚    β”‚   - Debug       β”‚
         β”‚   - Callback()  β”‚    β”‚   - Visualize   β”‚
         β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                   β”‚
                   β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                        Workflow Engine                          β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚   Step          β”‚   Callback    β”‚   Timeout     β”‚   Connector   β”‚
β”‚   Consumers     β”‚   Handlers    β”‚   Pollers     β”‚   Consumers   β”‚
β”‚                 β”‚               β”‚               β”‚               β”‚
β”‚   Process       β”‚   Handle      β”‚   Schedule    β”‚   Consume     β”‚
β”‚   workflow      β”‚   external    β”‚   time-based  β”‚   external    β”‚
β”‚   steps         β”‚   events      β”‚   operations  β”‚   streams     β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜
          β”‚               β”‚               β”‚               β”‚
          β–Ό               β–Ό               β–Ό               β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                      Role Scheduler                             β”‚
β”‚   Coordinates distributed execution of workflow processes      β”‚
β”‚   - Ensures only one instance of each role runs at a time     β”‚
β”‚   - Handles failover and load distribution                    β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                      β”‚
                      β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                        Adapters                                 β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚EventStreamerβ”‚RecordStore  β”‚TimeoutStore β”‚   Logger    β”‚WebUI    β”‚
β”‚             β”‚             β”‚             β”‚             β”‚         β”‚
β”‚ - Kafka     β”‚ - SQL DB    β”‚ - SQL DB    β”‚ - Structuredβ”‚ - HTTP  β”‚
β”‚ - Reflex    β”‚ - NoSQL     β”‚ - Redis     β”‚ - Debug     β”‚ - React β”‚
β”‚ - Memory    β”‚ - Memory    β”‚ - Memory    β”‚ - Custom    β”‚ - Customβ”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Core Components

Workflow Engine

The engine is responsible for:

Event-Driven Communication

All components communicate through events, providing:

Loose Coupling: Components don’t need to know about each other directly

Reliability: Events are persisted and can be replayed on failure

Observability: All state changes are observable as events

Scalability: Events can be partitioned and processed in parallel

Role-Based Scheduling

The role scheduler ensures distributed coordination:

type RoleScheduler interface {
    Await(ctx context.Context, role string) (context.Context, context.CancelFunc, error)
}

How it works:

  1. Each workflow process defines a unique role
  2. Scheduler ensures only one instance holds each role
  3. If an instance fails, scheduler reassigns roles to healthy instances
  4. Load can be distributed by creating multiple roles for the same logical process

Role Naming Convention:

{workflow-name}-{status}-{process-type}-{shard}-of-{total-shards}

Examples:

Data Flow

1. Workflow Trigger

runID, err := workflow.Trigger(ctx, "order-123",
    workflow.WithInitialValue(order),
    workflow.WithStartAt(OrderCreated),
)
  1. Creates a new Run record in RecordStore
  2. Publishes event to EventStreamer
  3. Returns immediately (async processing)

2. Event Processing

EventStreamer β†’ Step Consumer β†’ Business Logic β†’ State Update β†’ New Event
  1. Step consumer receives event for its status
  2. Loads current Run state from RecordStore
  3. Executes step function with business logic
  4. Updates Run state and publishes new event
  5. Process repeats for next step

3. Transactional Outbox

To ensure exactly-once processing:

tx := db.Begin()
// 1. Update run state
recordStore.Store(tx, updatedRun)
// 2. Write event to outbox
recordStore.AddOutboxEvent(tx, event)
tx.Commit()

// 3. Async: publish outbox events
outboxProcessor.PublishPending()

This pattern ensures that state changes and events are atomically committed.

Scaling Patterns

Horizontal Scaling

Workflow supports several scaling patterns:

Single Instance: All roles run on one machine

// Simple setup - everything on one instance
wf := b.Build(memstreamer.New(), memrecordstore.New(), memrolescheduler.New())

Multi-Instance: Roles distributed across machines

// Production setup - roles distributed via RoleScheduler
wf := b.Build(kafkastreamer.New(), sqlstore.New(), rinkrolescheduler.New())

Sharded Processing: High-throughput steps split across multiple instances

b.AddStep(OrderCreated, processPayment, PaymentProcessed).
    WithOptions(workflow.ParallelCount(5)) // 5 parallel processors

Auto-Scaling Considerations

Adapter Architecture

Adapters provide infrastructure abstraction:

EventStreamer Interface

type EventStreamer interface {
    NewSender(ctx context.Context, topic string) (EventSender, error)
    NewReceiver(ctx context.Context, topic string, name string, opts ...ReceiverOption) (EventReceiver, error)
}

type EventSender interface {
    Send(ctx context.Context, foreignID string, statusType int, headers map[Header]string) error
    Close() error
}

type EventReceiver interface {
    Recv(ctx context.Context) (*Event, Ack, error)
    Close() error
}

Implementations:

RecordStore Interface

type RecordStore interface {
    Store(ctx context.Context, record *Record) error
    Lookup(ctx context.Context, runID string) (*Record, error)
    Latest(ctx context.Context, workflowName, foreignID string) (*Record, error)
    List(ctx context.Context, workflowName string, offsetID int64, limit int, order OrderType, filters ...RecordFilter) ([]Record, error)
    ListOutboxEvents(ctx context.Context, workflowName string, limit int64) ([]OutboxEvent, error)
    DeleteOutboxEvent(ctx context.Context, id string) error
}

Key Requirements:

Error Handling & Resilience

Multi-Level Error Handling

Step Level: Individual step errors

func processPayment(ctx context.Context, r *workflow.Run[Order, OrderStatus]) (OrderStatus, error) {
    if err := paymentService.Charge(r.Object.Amount); err != nil {
        if isRetryableError(err) {
            return 0, err // Retry with backoff
        }
        return PaymentFailed, nil // Move to failure state
    }
    return PaymentProcessed, nil
}

Process Level: Consumer and timeout process errors

System Level: Infrastructure failures

Error Isolation

Performance Characteristics

Throughput

Latency

Resource Usage

Security Considerations

Data Protection

Access Control

Network Security

Monitoring & Observability

Built-in Metrics

Prometheus metrics for:

Distributed Tracing

Logging

Structured logging with:

Deployment Patterns

Development

# docker-compose.yml
services:
  workflow-app:
    build: .
    environment:
      - WORKFLOW_ADAPTERS=memory

Production - Single Region

# kubernetes deployment
apiVersion: apps/v1
kind: Deployment
spec:
  replicas: 3
  template:
    spec:
      containers:
      - name: workflow-app
        env:
        - name: KAFKA_BROKERS
          value: "kafka:9092"
        - name: DATABASE_URL
          valueFrom:
            secretKeyRef:
              name: db-secret
              key: url

Production - Multi-Region

# Each region has its own deployment
# Shared: Database (with replication)
# Regional: Kafka clusters, role schedulers
# Global: Workflow definitions

Testing Strategies

Unit Testing

func TestPaymentStep(t *testing.T) {
    run := &workflow.Run[Order, OrderStatus]{
        Object: &Order{Amount: 100.00},
    }

    status, err := processPayment(ctx, run)
    assert.NoError(t, err)
    assert.Equal(t, PaymentProcessed, status)
}

Integration Testing

func TestWorkflowEndToEnd(t *testing.T) {
    wf := NewOrderWorkflow()
    defer wf.Stop()

    wf.Run(ctx)

    runID, _ := wf.Trigger(ctx, "order-123", workflow.WithInitialValue(order))
    run, err := wf.Await(ctx, "order-123", runID, OrderCompleted)

    assert.NoError(t, err)
    assert.Equal(t, OrderCompleted, run.Status)
}

Load Testing

Next Steps