workflow

Core Concepts

This document explains the fundamental concepts that make Workflow powerful and how they work together to create robust, distributed workflow systems.

Workflows

A Workflow is a definition of how data should flow through a series of steps to accomplish a business process. In Workflow, workflows are:

// A workflow processes data of type Order through OrderStatus states
type Workflow[Order, OrderStatus] interface {
    Trigger(ctx context.Context, foreignID string, opts ...TriggerOption) (string, error)
    Await(ctx context.Context, foreignID, runID string, status OrderStatus) (*Run[Order, OrderStatus], error)
    Callback(ctx context.Context, foreignID string, status OrderStatus, payload io.Reader) error
    Run(ctx context.Context)
    Stop()
}

Runs

A Run is an individual instance of a workflow processing specific data. When you call Trigger(), you create a new Run.

type Run[Type any, Status StatusType] struct {
    WorkflowName string
    ForeignID    string    // Your unique identifier for this business entity
    RunID        string    // Unique identifier for this specific run
    RunState     RunState  // Lifecycle state (Initiated, Running, Completed, etc.)
    Status       Status    // Your workflow-specific status
    Object       *Type     // Your business data
    CreatedAt    time.Time
    UpdatedAt    time.Time
}

Run Lifecycle (RunState)

Every Run progresses through a finite state machine:

RunState Value Description
Unknown 0 Default zero value, has no meaning
Initiated 1 Run created but not yet processed
Running 2 Currently being processed by a workflow step
Paused 3 Temporarily stopped (usually due to errors)
Cancelled 4 Terminated before completion
Completed 5 Successfully finished all steps
DataDeleted 6 Data has been scrubbed/deleted
RequestedDataDeleted 7 Data deletion requested (e.g., for GDPR)
stateDiagram-v2
    direction LR

    Initiated-->Running

    Running-->Completed
    Running-->Paused

    Paused-->Running

    Running --> Cancelled
    Paused --> Cancelled

    state Finished {
        Completed --> RequestedDataDeleted
        Cancelled --> RequestedDataDeleted

        DataDeleted-->RequestedDataDeleted
        RequestedDataDeleted-->DataDeleted
    }

Your Custom Status vs RunState

A Run can be RunStateRunning with status OrderCreated, meaning the system is actively processing your order that’s currently in the “created” state.

Events

Events are the mechanism by which workflow steps communicate and trigger each other.

type Event struct {
    ID        int64             // Unique event identifier
    ForeignID string            // Links to workflow instance
    Type      int               // Status the workflow moved to
    Headers   map[Header]string // Metadata
    CreatedAt time.Time         // Event timestamp
}

Event Flow

  1. Step completes and returns a new Status
  2. Workflow persists the Run with new Status
  3. Event is published to the EventStreamer
  4. Next step’s consumer receives the event
  5. Next step processes the Run

This event-driven architecture enables:

State Machines

Workflows are state machines where:

Unlike traditional state machines, Workflow state machines:

b := workflow.NewBuilder[Order, OrderStatus]("order-processing")

// Define valid transitions
b.AddStep(OrderCreated, processPayment, PaymentProcessed, PaymentFailed)
b.AddStep(PaymentProcessed, reserveInventory, InventoryReserved, InventoryFailed)
b.AddStep(InventoryReserved, fulfillOrder, OrderFulfilled)

// Cycles are allowed - orders can be retried
b.AddStep(PaymentFailed, retryPayment, PaymentProcessed, OrderCancelled)

Role-Based Scheduling

Role Scheduling ensures that only one instance of each process runs at any time, even in a distributed environment.

How It Works

Each workflow step creates a unique role:

workflow-name-status-consumer-shard-of-total-shards

Examples:

The RoleScheduler ensures only one process holds each role at any time, enabling:

Sharding

For high-throughput steps, you can shard across multiple processes:

b.AddStep(OrderCreated, processPayment, PaymentProcessed).
    WithOptions(workflow.ParallelCount(5)) // Run 5 parallel instances

This creates roles:

Events are distributed across shards using consistent hashing on the ForeignID.

Type Safety

Workflow uses Go generics to provide compile-time safety:

// This workflow processes Orders through OrderStatus states
type OrderWorkflow = workflow.Workflow[Order, OrderStatus]

// Step functions are fully typed
func processPayment(ctx context.Context, r *workflow.Run[Order, OrderStatus]) (OrderStatus, error) {
    // r.Object is typed as *Order
    // Return value must be OrderStatus
    // Compiler catches type mismatches
}

Benefits:

Adapters

Adapters make Workflow infrastructure-agnostic by providing interfaces for:

// Production setup
wf := b.Build(
    kafkastreamer.New(brokers, config),
    sqlstore.New(db, "workflow_records", "workflow_outbox"),
    rinkrolescheduler.New(rinkConfig),
    workflow.WithTimeoutStore(sqltimeout.New(db)),
)

// Development setup
wf := b.Build(
    memstreamer.New(),
    memrecordstore.New(),
    memrolescheduler.New(),
)

Durability and Consistency

Workflow ensures exactly-once processing and at-least-once delivery through:

Transactional Outbox Pattern

  1. Run state change and outbox event are stored in a single transaction
  2. Background process publishes outbox events to EventStreamer
  3. Events are deleted from outbox only after successful publication

Idempotency

Error Handling

b.AddStep(OrderCreated, processPayment, PaymentProcessed).
    WithOptions(
        workflow.PauseAfterErrCount(5),        // Pause after 5 errors
        workflow.ErrBackOff(time.Second * 30), // Wait 30s between retries
    )

Observability

Workflow provides comprehensive observability:

Metrics

Events

Debugging

Next Steps

Now that you understand the core concepts, explore: