Adapters make Workflow infrastructure-agnostic by providing standardized interfaces for different technology stacks. This guide explains how adapters work, which ones are available, and how to choose the right combination for your needs.
Workflow uses the adapter pattern to decouple core workflow logic from infrastructure concerns. Each adapter type serves a specific purpose:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Workflow Core β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β EventStreamer β RecordStore β RoleScheduler β TimeoutStore β
β Interface β Interface β Interface β Interface β
βββββββββββββββββββΌββββββββββββββββΌββββββββββββββββΌβββββββββββββββ€
β β’ Kafka β β’ PostgreSQL β β’ Rink β β’ SQL β
β β’ Reflex β β’ MySQL β β’ etcd β β’ Redis β
β β’ Memory β β’ Memory β β’ Memory β β’ Memory β
βββββββββββββββββββ΄ββββββββββββββββ΄ββββββββββββββββ΄βββββββββββββββ
Purpose: Publish and consume workflow events for step coordination.
Interface:
type EventStreamer interface {
NewSender(ctx context.Context, topic string) (EventSender, error)
NewReceiver(ctx context.Context, topic string, name string, opts ...ReceiverOption) (EventReceiver, error)
}
type EventSender interface {
Send(ctx context.Context, foreignID string, statusType int, headers map[Header]string) error
Close() error
}
type EventReceiver interface {
Recv(ctx context.Context) (*Event, Ack, error)
Close() error
}
Available Adapters:
| Adapter | Use Case | Install |
|---|---|---|
| kafkastreamer | Production event streaming | go get github.com/luno/workflow/adapters/kafkastreamer |
| reflexstreamer | Lunoβs Reflex event sourcing | go get github.com/luno/workflow/adapters/reflexstreamer |
| memstreamer | Development and testing | Built-in |
Example:
// Kafka for production
kafkaConfig := sarama.NewConfig()
kafkaConfig.Producer.RequiredAcks = sarama.WaitForAll
streamer := kafkastreamer.New([]string{"kafka:9092"}, kafkaConfig)
// Memory for development
streamer := memstreamer.New()
Purpose: Persist workflow run state with transactional guarantees.
Interface:
type RecordStore interface {
Store(ctx context.Context, record *Record) error
Lookup(ctx context.Context, runID string) (*Record, error)
Latest(ctx context.Context, workflowName, foreignID string) (*Record, error)
List(ctx context.Context, workflowName string, offsetID int64, limit int, order OrderType, filters ...RecordFilter) ([]Record, error)
// Outbox pattern support
ListOutboxEvents(ctx context.Context, workflowName string, limit int64) ([]OutboxEvent, error)
DeleteOutboxEvent(ctx context.Context, id string) error
}
Available Adapters:
| Adapter | Use Case | Install |
|---|---|---|
| sqlstore | Production with SQL databases | go get github.com/luno/workflow/adapters/sqlstore |
| memrecordstore | Development and testing | Built-in |
Requirements:
Example:
// PostgreSQL for production
db, err := sql.Open("postgres", "postgres://user:pass@host/db")
store := sqlstore.New(db, "workflow_records", "workflow_outbox")
// Memory for development
store := memrecordstore.New()
Purpose: Coordinate distributed execution ensuring only one instance of each role runs at a time.
Interface:
type RoleScheduler interface {
Await(ctx context.Context, role string) (context.Context, context.CancelFunc, error)
}
Available Adapters:
| Adapter | Use Case | Install |
|---|---|---|
| rinkrolescheduler | Production distributed coordination | go get github.com/luno/workflow/adapters/rinkrolescheduler |
| memrolescheduler | Single-instance development | Built-in |
Example:
// Rink for production distributed systems
rinkConfig := rink.Config{
Endpoints: []string{"rink-1:8080", "rink-2:8080"},
}
scheduler := rinkrolescheduler.New(rinkConfig)
// Memory for single instance
scheduler := memrolescheduler.New()
Purpose: Schedule durable timeouts that survive process restarts.
Interface:
type TimeoutStore interface {
Store(ctx context.Context, timeout Timeout) error
List(ctx context.Context, workflowName string, status Status) ([]Timeout, error)
Complete(ctx context.Context, id string) error
}
Available Adapters:
| Adapter | Use Case | Install |
|---|---|---|
| sqltimeout | Production durable timeouts | go get github.com/luno/workflow/adapters/sqltimeout |
| memtimeoutstore | Development and testing | Built-in |
Example:
// SQL for production
timeoutStore := sqltimeout.New(db)
// Built with timeout support
wf := b.Build(
eventStreamer, recordStore, roleScheduler,
workflow.WithTimeoutStore(timeoutStore),
)
Goal: Fast feedback, easy debugging, minimal setup.
func NewDevelopmentWorkflow() *workflow.Workflow[Order, OrderStatus] {
return b.Build(
memstreamer.New(),
memrecordstore.New(),
memrolescheduler.New(),
// No timeout store needed for development
)
}
Characteristics:
Goal: Production-like environment for integration testing.
func NewStagingWorkflow() *workflow.Workflow[Order, OrderStatus] {
db := setupDatabase()
return b.Build(
kafkastreamer.New(kafkaBrokers, kafkaConfig),
sqlstore.New(db, "workflow_records", "workflow_outbox"),
rinkrolescheduler.New(rinkConfig),
workflow.WithTimeoutStore(sqltimeout.New(db)),
)
}
Characteristics:
Goal: Maximum reliability, scalability, and observability.
func NewProductionWorkflow() *workflow.Workflow[Order, OrderStatus] {
// Production database with connection pooling
db := setupProductionDB()
// Kafka with optimal configuration
kafkaConfig := &sarama.Config{
Producer.RequiredAcks: sarama.WaitForAll,
Producer.Retry.Max: 5,
Consumer.Group.Rebalance.Strategy: sarama.BalanceStrategyRoundRobin,
}
return b.Build(
kafkastreamer.New(kafkaBrokers, kafkaConfig),
sqlstore.New(db, "workflow_records", "workflow_outbox"),
rinkrolescheduler.New(rinkConfig),
workflow.WithTimeoutStore(sqltimeout.New(db)),
workflow.WithDefaultOptions(
workflow.ParallelCount(5),
workflow.ErrBackOff(time.Minute),
workflow.PauseAfterErrCount(3),
),
)
}
All adapter implementations should be tested using the provided adapter test suites:
func TestMyEventStreamer(t *testing.T) {
streamer := myeventstreamer.New(config)
adaptertest.TestEventStreamer(t, streamer)
}
func TestMyRecordStore(t *testing.T) {
store := myrecordstore.New(config)
adaptertest.RunRecordStoreTest(t, store)
}
func TestMyRoleScheduler(t *testing.T) {
scheduler := myrolescheduler.New(config)
adaptertest.RunRoleSchedulerTest(t, scheduler)
}
type MyEventStreamer struct {
config Config
}
func (s *MyEventStreamer) NewSender(ctx context.Context, topic string) (workflow.EventSender, error) {
return &MySender{
client: s.client,
topic: topic,
}, nil
}
func (s *MyEventStreamer) NewReceiver(ctx context.Context, topic string, name string, opts ...workflow.ReceiverOption) (workflow.EventReceiver, error) {
return &MyReceiver{
client: s.client,
topic: topic,
groupName: name,
}, nil
}
type MySender struct {
client MyClient
topic string
}
func (s *MySender) Send(ctx context.Context, foreignID string, statusType int, headers map[workflow.Header]string) error {
return s.client.Publish(ctx, s.topic, foreignID, statusType, headers)
}
func (s *MySender) Close() error {
return s.client.Close()
}
type MyReceiver struct {
client MyClient
topic string
groupName string
}
func (r *MyReceiver) Recv(ctx context.Context) (*workflow.Event, workflow.Ack, error) {
msg, err := r.client.PollMessage(ctx, r.topic, r.groupName)
if err != nil {
return nil, nil, err
}
event := &workflow.Event{
ID: msg.ID,
ForeignID: msg.ForeignID,
Type: msg.Type,
// ... map other fields
}
ack := func() error {
return r.client.AckMessage(ctx, msg.ID)
}
return event, ack, nil
}
func (r *MyReceiver) Close() error {
return r.client.Close()
}
type MyRecordStore struct {
db MyDatabase
}
func (s *MyRecordStore) Store(ctx context.Context, record *workflow.Record) error {
tx, err := s.db.BeginTx(ctx)
if err != nil {
return err
}
defer tx.Rollback()
// Store the record
if err := s.storeRecord(tx, record); err != nil {
return err
}
// Store outbox events
if err := s.storeOutboxEvents(tx, record.OutboxEvents); err != nil {
return err
}
return tx.Commit()
}
func (s *MyRecordStore) Lookup(ctx context.Context, runID string) (*workflow.Record, error) {
// Query record by run ID
row := s.db.QueryRowContext(ctx, "SELECT ... FROM records WHERE run_id = ?", runID)
return s.scanRecord(row)
}
// Implement other interface methods...
kafkaConfig := &sarama.Config{
// Producer settings
Producer.RequiredAcks: sarama.WaitForAll, // Durability
Producer.Retry.Max: 5, // Retries
Producer.Flush.Frequency: 100 * time.Millisecond, // Batching
Producer.Flush.Messages: 100, // Batch size
Producer.Compression: sarama.CompressionSnappy, // Compression
// Consumer settings
Consumer.Offsets.Initial: sarama.OffsetOldest, // Start from beginning
Consumer.Fetch.Min: 1024, // Min fetch size
Consumer.Fetch.Max: 1024 * 1024, // Max fetch size
Consumer.Group.Heartbeat.Interval: 3 * time.Second, // Heartbeat
Consumer.Group.Session.Timeout: 10 * time.Second, // Session timeout
}
-- Indexes for workflow_records
CREATE INDEX idx_workflow_records_workflow_foreign ON workflow_records(workflow_name, foreign_id);
CREATE INDEX idx_workflow_records_status ON workflow_records(workflow_name, status);
CREATE INDEX idx_workflow_records_run_state ON workflow_records(run_state);
CREATE INDEX idx_workflow_records_updated_at ON workflow_records(updated_at);
-- Indexes for workflow_outbox
CREATE INDEX idx_workflow_outbox_workflow_created ON workflow_outbox(workflow_name, created_at);
-- Connection pool settings
max_connections = 100
shared_buffers = '256MB'
effective_cache_size = '1GB'
// Configure workflow options for memory efficiency
workflow.WithDefaultOptions(
workflow.ParallelCount(5), // Don't over-parallelize
workflow.PollingFrequency(500*time.Millisecond), // Reduce polling frequency
workflow.ErrBackOff(time.Minute), // Longer backoff reduces load
)
Some adapters provide additional monitoring capabilities:
import "github.com/luno/workflow/adapters/webui"
// Add HTTP handlers for workflow monitoring
http.Handle("/", webui.HomeHandlerFunc(webui.Paths{
List: "/api/list",
ObjectData: "/api/object",
}))
http.HandleFunc("/api/list", webui.ListHandlerFunc(recordStore))
http.HandleFunc("/api/object", webui.ObjectDataHandlerFunc(recordStore))
import "github.com/luno/workflow/adapters/jlog"
// Use structured logging
logger := jlog.New()
wf := b.Build(
eventStreamer, recordStore, roleScheduler,
workflow.WithLogger(logger),
)
Adapters are the foundation of Workflowβs flexibility. Choose the right combination for your needs and scale them as your requirements grow.