This guide covers all configuration options available when building and running workflows, including step-level options, workflow-level build options, and adapter configurations.
Configuration in the workflow system operates at multiple levels:
Step-level options are applied using the WithOptions() method and affect individual workflow components.
builder.AddStep(StatusProcessing,
processOrderStep,
StatusCompleted,
).WithOptions(
workflow.ParallelCount(3), // Run 3 parallel consumers
workflow.PollingFrequency(5 * time.Second), // Poll every 5 seconds
workflow.ErrBackOff(30 * time.Second), // 30s backoff on errors
)
Control the number of parallel consumers for a step:
// Single consumer (default)
builder.AddStep(StatusValidation, validateStep, StatusValidated)
// Multiple parallel consumers
builder.AddStep(StatusProcessing,
processStep,
StatusProcessed,
).WithOptions(
workflow.ParallelCount(5), // Creates 5 consumers: consumer-1-of-5, consumer-2-of-5, etc.
)
Configure how frequently consumers check for new events:
builder.AddStep(StatusWaiting,
waitStep,
StatusReady,
).WithOptions(
workflow.PollingFrequency(100 * time.Millisecond), // High frequency for low latency
)
builder.AddStep(StatusBatchProcessing,
batchStep,
StatusCompleted,
).WithOptions(
workflow.PollingFrequency(30 * time.Second), // Lower frequency for batch operations
)
Configure error backoff and automatic pausing:
builder.AddStep(StatusUploading,
uploadStep,
StatusUploaded,
).WithOptions(
workflow.ErrBackOff(5 * time.Minute), // Wait 5 minutes after errors
workflow.PauseAfterErrCount(3), // Pause after 3 consecutive errors
)
Control event processing timing and alerting:
builder.AddStep(StatusNotification,
notifyStep,
StatusNotified,
).WithOptions(
workflow.ConsumeLag(1 * time.Minute), // Only process events older than 1 minute
workflow.LagAlert(5 * time.Minute), // Alert if lag exceeds 5 minutes
)
Workflow-level options are applied during the Build() call and affect the entire workflow.
workflow := builder.Build(
eventStreamer,
recordStore,
roleScheduler,
workflow.WithClock(customClock),
workflow.WithLogger(customLogger),
workflow.WithDebugMode(),
workflow.WithTimeoutStore(timeoutStore),
)
Use custom clocks for testing or special timing requirements:
import "k8s.io/utils/clock/testing"
// For testing with controllable time
fakeClock := testing.NewFakeClock(time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC))
workflow := builder.Build(
eventStreamer,
recordStore,
roleScheduler,
workflow.WithClock(fakeClock),
)
// For production with real time (default)
workflow := builder.Build(
eventStreamer,
recordStore,
roleScheduler,
workflow.WithClock(clock.RealClock{}),
)
Configure custom logging:
type CustomLogger struct{}
func (c CustomLogger) Debug(ctx context.Context, msg string, meta map[string]string) {
// Custom debug logging
}
func (c CustomLogger) Error(ctx context.Context, err error) {
// Custom error logging
}
workflow := builder.Build(
eventStreamer,
recordStore,
roleScheduler,
workflow.WithLogger(CustomLogger{}),
workflow.WithDebugMode(), // Enable debug logging
)
Required when using timeouts:
// Memory store for testing
timeoutStore := memtimeoutstore.New()
// SQL store for production
timeoutStore := sqltimeout.New(db, "timeouts")
workflow := builder.Build(
eventStreamer,
recordStore,
roleScheduler,
workflow.WithTimeoutStore(timeoutStore),
)
Set default options that apply to all steps unless overridden:
workflow := builder.Build(
eventStreamer,
recordStore,
roleScheduler,
workflow.WithDefaultOptions(
workflow.PollingFrequency(1 * time.Second),
workflow.ErrBackOff(10 * time.Second),
workflow.PauseAfterErrCount(5),
workflow.LagAlert(2 * time.Minute),
),
)
Configure the transactional outbox pattern:
// Enable outbox with custom settings
workflow := builder.Build(
eventStreamer,
recordStore,
roleScheduler,
workflow.WithOutboxOptions(
workflow.OutboxPollingFrequency(500 * time.Millisecond),
workflow.OutboxErrBackOff(5 * time.Second),
workflow.OutboxLagAlert(30 * time.Second),
workflow.OutboxLookupLimit(100), // Process up to 100 events per batch
),
)
// Disable outbox entirely
workflow := builder.Build(
eventStreamer,
recordStore,
roleScheduler,
workflow.WithoutOutbox(),
)
Enable verbose logging for development and troubleshooting:
workflow := builder.Build(
eventStreamer,
recordStore,
roleScheduler,
workflow.WithDebugMode(), // Enables detailed logging
)
Options follow a hierarchy where more specific settings override general ones:
// 1. Global defaults (lowest priority)
workflow := builder.Build(
eventStreamer,
recordStore,
roleScheduler,
workflow.WithDefaultOptions(
workflow.PollingFrequency(30 * time.Second), // Applied to all steps
workflow.ErrBackOff(5 * time.Minute),
),
)
// 2. Step-specific overrides (highest priority)
builder.AddStep(StatusUrgent,
urgentStep,
StatusCompleted,
).WithOptions(
workflow.PollingFrequency(1 * time.Second), // Overrides default for this step only
)
eventStreamer := memstreamer.New(
memstreamer.WithClock(customClock),
)
eventStreamer := kafkastreamer.New(
brokers,
kafkastreamer.WithProducerConfig(producerConfig),
kafkastreamer.WithConsumerConfig(consumerConfig),
)
eventStreamer := reflexstreamer.New(
db,
eventsTable,
reflexstreamer.WithCursorsTable(cursorsTable),
)
recordStore := memrecordstore.New(
memrecordstore.WithClock(customClock),
memrecordstore.WithOutbox(ctx, eventStreamer, logger),
)
recordStore := sqlrecordstore.New(
db,
"workflow_records",
sqlrecordstore.WithOutbox(ctx, eventStreamer, logger),
)
timeoutStore := memtimeoutstore.New(
memtimeoutstore.WithClock(customClock),
)
timeoutStore := sqltimeout.New(
db,
"workflow_timeouts",
)
roleScheduler := memrolescheduler.New()
roleScheduler := dbrolescheduler.New(
db,
"workflow_roles",
)
func buildDevelopmentWorkflow() *workflow.Workflow[Order, Status] {
eventStreamer := memstreamer.New()
recordStore := memrecordstore.New()
roleScheduler := memrolescheduler.New()
return builder.Build(
eventStreamer,
recordStore,
roleScheduler,
workflow.WithDebugMode(), // Verbose logging
workflow.WithDefaultOptions(
workflow.PollingFrequency(100 * time.Millisecond), // Fast polling
workflow.ErrBackOff(1 * time.Second), // Quick error recovery
),
)
}
func buildTestWorkflow(t *testing.T) *workflow.Workflow[Order, Status] {
fakeClock := clock_testing.NewFakeClock(time.Now())
eventStreamer := memstreamer.New(memstreamer.WithClock(fakeClock))
recordStore := memrecordstore.New()
roleScheduler := memrolescheduler.New()
return builder.Build(
eventStreamer,
recordStore,
roleScheduler,
workflow.WithClock(fakeClock), // Controllable time
workflow.WithDefaultOptions(
workflow.PollingFrequency(time.Nanosecond), // Immediate processing
),
workflow.WithoutOutbox(), // Simplified for tests
)
}
func buildProductionWorkflow(db *sql.DB, kafkaBrokers []string) *workflow.Workflow[Order, Status] {
eventStreamer := kafkastreamer.New(kafkaBrokers)
recordStore := sqlrecordstore.New(db, "workflow_records")
roleScheduler := dbrolescheduler.New(db, "workflow_roles")
timeoutStore := sqltimeout.New(db, "workflow_timeouts")
return builder.Build(
eventStreamer,
recordStore,
roleScheduler,
workflow.WithTimeoutStore(timeoutStore),
workflow.WithLogger(productionLogger),
workflow.WithDefaultOptions(
workflow.PollingFrequency(5 * time.Second), // Reasonable polling
workflow.ErrBackOff(30 * time.Second), // Conservative backoff
workflow.PauseAfterErrCount(3), // Prevent infinite retries
workflow.LagAlert(5 * time.Minute), // Monitor lag
),
workflow.WithOutboxOptions(
workflow.OutboxPollingFrequency(1 * time.Second), // Fast outbox processing
workflow.OutboxLookupLimit(500), // Larger batches
),
)
}
For workflows that process many events:
builder.AddStep(StatusProcessing,
processStep,
StatusCompleted,
).WithOptions(
workflow.ParallelCount(10), // High parallelism
workflow.PollingFrequency(100 * time.Millisecond), // Fast polling
workflow.ErrBackOff(5 * time.Second), // Quick error recovery
)
For workflows that process events in batches:
builder.AddStep(StatusBatching,
batchStep,
StatusProcessed,
).WithOptions(
workflow.ParallelCount(1), // Single consumer for batching
workflow.PollingFrequency(30 * time.Second), // Slower polling for batching
workflow.ConsumeLag(5 * time.Minute), // Wait for batch to accumulate
)
For steps that call external services:
builder.AddStep(StatusAPICall,
apiCallStep,
StatusAPIResponse,
).WithOptions(
workflow.ParallelCount(3), // Limited parallelism to avoid overwhelming service
workflow.ErrBackOff(60 * time.Second), // Longer backoff for external service issues
workflow.PauseAfterErrCount(5), // Pause on repeated failures
)
For time-sensitive workflow steps:
builder.AddStep(StatusCritical,
criticalStep,
StatusComplete,
).WithOptions(
workflow.PollingFrequency(100 * time.Millisecond), // High frequency polling
workflow.LagAlert(30 * time.Second), // Quick lag alerting
workflow.ErrBackOff(1 * time.Second), // Minimal backoff
)
The workflow system validates configuration at build time:
// This will panic at build time
builder.AddTimeout(StatusWaiting, timer, handler, StatusNext).WithOptions(
workflow.ParallelCount(2), // Timeouts don't support parallel processing
)
// This will panic at build time
builder.AddTimeout(StatusWaiting, timer, handler, StatusNext).WithOptions(
workflow.ConsumeLag(time.Hour), // Timeouts don't support consume lag
)
ParallelCount() or ConsumeLag()Configure monitoring and observability:
// Custom metrics configuration would typically be done
// through your metrics collection system (Prometheus, etc.)
workflow := builder.Build(
eventStreamer,
recordStore,
roleScheduler,
workflow.WithDefaultOptions(
workflow.LagAlert(2 * time.Minute), // Generates lag alerts
),
)
// Monitor metrics:
// - workflow_process_duration_seconds
// - workflow_consumer_lag_seconds
// - workflow_process_skipped_events_total
// - workflow_process_errors_total
Use descriptive option configurations:
// Good - clear intent
builder.AddStep(StatusEmailSending,
sendEmailStep,
StatusEmailSent,
).WithOptions(
workflow.ParallelCount(5), // Max 5 concurrent emails
workflow.ErrBackOff(2 * time.Minute), // Email service rate limiting
workflow.PauseAfterErrCount(3), // Stop on repeated email failures
)
// Avoid - unclear purpose
builder.AddStep(StatusProcessing,
processStep,
StatusDone,
).WithOptions(
workflow.ParallelCount(10),
workflow.ErrBackOff(30 * time.Second),
)
Use different configurations per environment:
type Config struct {
PollingFreq time.Duration
ErrorBackoff time.Duration
ParallelCount int
}
func getConfig() Config {
switch os.Getenv("ENV") {
case "development":
return Config{
PollingFreq: 100 * time.Millisecond,
ErrorBackoff: 1 * time.Second,
ParallelCount: 1,
}
case "production":
return Config{
PollingFreq: 5 * time.Second,
ErrorBackoff: 30 * time.Second,
ParallelCount: 5,
}
default:
return Config{
PollingFreq: 1 * time.Second,
ErrorBackoff: 10 * time.Second,
ParallelCount: 2,
}
}
}
Consider resource implications of configuration:
// Memory usage scales with parallel count
builder.AddStep(StatusProcessing,
memoryIntensiveStep,
StatusCompleted,
).WithOptions(
workflow.ParallelCount(2), // Limited due to memory usage
)
// CPU intensive operations
builder.AddStep(StatusComputing,
cpuIntensiveStep,
StatusCompleted,
).WithOptions(
workflow.ParallelCount(runtime.NumCPU()), // Match CPU cores
)
// I/O bound operations
builder.AddStep(StatusNetworking,
networkStep,
StatusCompleted,
).WithOptions(
workflow.ParallelCount(20), // Higher parallelism for I/O
)