This guide covers monitoring and observability features built into the workflow system, including Prometheus metrics, logging, alerting, and troubleshooting workflows in production.
The workflow system provides comprehensive observability through:
The workflow system automatically exports Prometheus metrics that provide insights into performance, errors, and system health.
workflow_process_latency_secondsHistogram tracking how long each process takes to handle an event.
Labels:
workflow_name: Name of the workflowprocess_name: Name of the process (e.g., “processing-consumer”)Buckets: [0.01, 0.1, 1, 5, 10, 60, 300] seconds
# Average processing time per workflow
rate(workflow_process_latency_seconds_sum[5m]) / rate(workflow_process_latency_seconds_count[5m])
# 95th percentile processing time
histogram_quantile(0.95, rate(workflow_process_latency_seconds_bucket[5m]))
workflow_process_error_countCounter tracking the number of errors encountered during event processing.
Labels:
workflow_name: Name of the workflowprocess_name: Name of the process# Error rate per workflow
rate(workflow_process_error_count[5m])
# Workflows with highest error rates
topk(5, rate(workflow_process_error_count[5m]))
workflow_process_skipped_events_countCounter tracking events that were skipped by consumers with reasons.
Labels:
workflow_name: Name of the workflowprocess_name: Name of the processreason: Why the event was skippedCommon skip reasons:
"record not found": Record was deleted before processing"record status not in expected state": Record status changed before processing"event record version lower than latest record version": Stale event"record stopped": Record is in a stopped state"next value specified skip": Step function returned skip"filtered out": Event filtered by custom logic# Events skipped per reason
rate(workflow_process_skipped_events_count[5m]) by (reason)
# Skip rate percentage
(rate(workflow_process_skipped_events_count[5m]) / (rate(workflow_process_skipped_events_count[5m]) + rate(workflow_process_latency_seconds_count[5m]))) * 100
workflow_process_statesGauge showing the current state of workflow processes.
Labels:
workflow_name: Name of the workflowprocess_name: Name of the processValues:
0: Stopped1: Running2: Error# Processes in error state
workflow_process_states == 2
# Healthy running processes
workflow_process_states == 1
workflow_process_lag_secondsGauge showing the lag between current time and the timestamp of the last consumed event.
Labels:
workflow_name: Name of the workflowprocess_name: Name of the process# Consumers with highest lag
topk(10, workflow_process_lag_seconds)
# Lag trending over time
increase(workflow_process_lag_seconds[1h])
workflow_process_lag_alertBoolean gauge indicating whether consumer lag exceeds the configured alert threshold.
Labels:
workflow_name: Name of the workflowprocess_name: Name of the processValues:
0: Lag is below alert threshold1: Lag exceeds alert threshold# Consumers in lag alert state
workflow_process_lag_alert == 1
# Count of lagging consumers
count(workflow_process_lag_alert == 1)
workflow_run_state_changesCounter tracking transitions between run states for workflow instances.
Labels:
workflow_name: Name of the workflowprevious_run_state: Previous RunStatecurrent_run_state: Current RunStateRun States:
"Unknown": Initial state (should be minimal)"Initiated": Workflow started"Running": Workflow processing"Paused": Workflow paused due to errors"Cancelled": Workflow cancelled"Completed": Workflow completed successfully"DataDeleted": Workflow data deleted# Rate of workflow completions
rate(workflow_run_state_changes{current_run_state="Completed"}[5m])
# Rate of workflow failures (paused/cancelled)
rate(workflow_run_state_changes{current_run_state=~"Paused|Cancelled"}[5m])
# Success rate percentage
(rate(workflow_run_state_changes{current_run_state="Completed"}[5m]) / rate(workflow_run_state_changes{current_run_state=~"Completed|Paused|Cancelled"}[5m])) * 100
Key metrics for overall workflow system health:
panels:
- title: "Workflow Throughput"
query: rate(workflow_process_latency_seconds_count[5m])
- title: "Error Rate"
query: rate(workflow_process_error_count[5m])
- title: "Consumer Lag"
query: workflow_process_lag_seconds
- title: "Process States"
query: workflow_process_states
- title: "Completion Rate"
query: rate(workflow_run_state_changes{current_run_state="Completed"}[5m])
Detailed performance metrics:
panels:
- title: "Processing Latency P95"
query: histogram_quantile(0.95, rate(workflow_process_latency_seconds_bucket[5m]))
- title: "Processing Latency P50"
query: histogram_quantile(0.5, rate(workflow_process_latency_seconds_bucket[5m]))
- title: "Event Skip Rate"
query: rate(workflow_process_skipped_events_count[5m])
- title: "Skip Reasons"
query: rate(workflow_process_skipped_events_count[5m]) by (reason)
Monitor individual workflows:
variables:
- name: "workflow"
query: label_values(workflow_process_latency_seconds, workflow_name)
panels:
- title: "Throughput for "
query: rate(workflow_process_latency_seconds_count{workflow_name="$workflow"}[5m])
- title: "Errors for "
query: rate(workflow_process_error_count{workflow_name="$workflow"}[5m])
- title: "Lag for "
query: workflow_process_lag_seconds{workflow_name="$workflow"}
High-priority alerts for immediate attention:
# High Error Rate
- alert: WorkflowHighErrorRate
expr: rate(workflow_process_error_count[5m]) > 0.1
for: 2m
labels:
severity: critical
annotations:
summary: "High error rate in workflow "
description: "Error rate is errors/sec for "
# Consumer Lag Alert
- alert: WorkflowConsumerLag
expr: workflow_process_lag_alert == 1
for: 5m
labels:
severity: critical
annotations:
summary: "Consumer lag alert for "
description: "Consumer lag exceeds threshold"
# Process Down
- alert: WorkflowProcessDown
expr: workflow_process_states == 0
for: 1m
labels:
severity: critical
annotations:
summary: "Workflow process is down"
description: "Process has been stopped for "
Medium-priority alerts for investigation:
# Increased Skip Rate
- alert: WorkflowHighSkipRate
expr: (rate(workflow_process_skipped_events_count[5m]) / rate(workflow_process_latency_seconds_count[5m])) > 0.2
for: 5m
labels:
severity: warning
annotations:
summary: "High skip rate for "
description: "Skip rate is for "
# Slow Processing
- alert: WorkflowSlowProcessing
expr: histogram_quantile(0.95, rate(workflow_process_latency_seconds_bucket[5m])) > 10
for: 3m
labels:
severity: warning
annotations:
summary: "Slow processing in "
description: "95th percentile latency is s for "
# Low Completion Rate
- alert: WorkflowLowCompletionRate
expr: (rate(workflow_run_state_changes{current_run_state="Completed"}[5m]) / rate(workflow_run_state_changes{current_run_state=~"Completed|Paused|Cancelled"}[5m])) < 0.9
for: 10m
labels:
severity: warning
annotations:
summary: "Low completion rate for "
description: "Completion rate is "
The workflow system provides structured logging with different levels:
// Configure custom logger
type CustomLogger struct {
logger *slog.Logger
}
func (c CustomLogger) Debug(ctx context.Context, msg string, meta map[string]string) {
c.logger.DebugContext(ctx, msg, slog.Any("meta", meta))
}
func (c CustomLogger) Error(ctx context.Context, err error) {
c.logger.ErrorContext(ctx, "Workflow error", slog.Any("error", err))
}
// Enable debug logging
workflow := builder.Build(
eventStreamer,
recordStore,
roleScheduler,
workflow.WithLogger(CustomLogger{logger: slog.Default()}),
workflow.WithDebugMode(), // Required for debug logs
)
Enable detailed logging for development and troubleshooting:
workflow := builder.Build(
eventStreamer,
recordStore,
roleScheduler,
workflow.WithDebugMode(),
)
Debug mode logs include:
Use correlation IDs for tracing workflow execution:
func stepWithLogging(ctx context.Context, r *workflow.Run[Order, Status]) (Status, error) {
logger := slog.With(
"workflow", "order-processing",
"foreign_id", r.ForeignID,
"run_id", r.RunID,
"current_status", r.Status.String(),
)
logger.InfoContext(ctx, "Processing order", "order_value", r.Object.Total)
err := processOrder(ctx, r.Object)
if err != nil {
logger.ErrorContext(ctx, "Failed to process order", "error", err)
return 0, err
}
logger.InfoContext(ctx, "Order processed successfully")
return StatusProcessed, nil
}
# Check error patterns
rate(workflow_process_error_count[5m]) by (workflow_name, process_name)
# Check if specific processes are failing
workflow_process_states == 2
Investigation steps:
# Identify lagging consumers
workflow_process_lag_seconds > 300
# Check lag trend
increase(workflow_process_lag_seconds[1h])
Investigation steps:
# Skip rate by reason
rate(workflow_process_skipped_events_count[5m]) by (reason)
# Skip rate percentage
(rate(workflow_process_skipped_events_count[5m]) / rate(workflow_process_latency_seconds_count[5m])) * 100
Common causes:
# Processing rate
rate(workflow_process_latency_seconds_count[5m])
# Processing latency
histogram_quantile(0.95, rate(workflow_process_latency_seconds_bucket[5m]))
Investigation steps:
# 1. Check current lag
curl -s "http://prometheus:9090/api/v1/query?query=workflow_process_lag_seconds" | jq '.data.result'
# 2. Check event processing rate
curl -s "http://prometheus:9090/api/v1/query?query=rate(workflow_process_latency_seconds_count[5m])" | jq '.data.result'
# 3. Scale up consumers if needed
kubectl scale deployment workflow-service --replicas=5
# 4. Monitor lag improvement
watch "curl -s 'http://prometheus:9090/api/v1/query?query=workflow_process_lag_seconds' | jq '.data.result[].value[1]'"
# 1. Identify failing workflow
curl -s "http://prometheus:9090/api/v1/query?query=rate(workflow_process_error_count[5m])" | jq '.data.result'
# 2. Check process states
curl -s "http://prometheus:9090/api/v1/query?query=workflow_process_states" | jq '.data.result'
# 3. Check application logs
kubectl logs -f deployment/workflow-service --tail=100
# 4. Restart if needed
kubectl rollout restart deployment/workflow-service
Add your own business metrics alongside workflow metrics:
import "github.com/prometheus/client_golang/prometheus"
var (
orderValue = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "order_processing_value_dollars",
Help: "Value of orders being processed",
Buckets: []float64{10, 50, 100, 500, 1000, 5000},
},
[]string{"workflow_name"},
)
paymentFailures = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "order_payment_failures_total",
Help: "Number of payment failures",
},
[]string{"workflow_name", "payment_provider"},
)
)
func init() {
prometheus.MustRegister(orderValue, paymentFailures)
}
func processPayment(ctx context.Context, r *workflow.Run[Order, Status]) (Status, error) {
// Record business metrics
orderValue.WithLabelValues("order-processing").Observe(float64(r.Object.Total))
err := chargePayment(ctx, r.Object)
if err != nil {
// Track payment failures by provider
paymentFailures.WithLabelValues("order-processing", r.Object.PaymentProvider).Inc()
return 0, err
}
return StatusPaymentProcessed, nil
}
Expose metrics endpoint for Prometheus scraping:
import (
"github.com/prometheus/client_golang/prometheus/promhttp"
"net/http"
)
func main() {
// Start metrics server
go func() {
http.Handle("/metrics", promhttp.Handler())
http.ListenAndServe(":8080", nil)
}()
// Start workflow
workflow.Run(context.Background())
}
Configure Prometheus to scrape metrics:
scrape_configs:
- job_name: 'workflow-service'
static_configs:
- targets: ['workflow-service:8080']
metrics_path: '/metrics'
scrape_interval: 15s
Monitor these KPIs for workflow system health:
Use metrics to plan scaling:
# Throughput (requests/sec):
rate(workflow_process_latency_seconds_count[5m])
# CPU Usage (seconds/sec):
increase(cpu_usage_seconds[5m])
# Concurrent Workflows:
count(workflow_run_state_changes{current_run_state="Running"})
# Memory Usage (bytes):
memory_usage_bytes
# Processing Lag (seconds):
workflow_process_lag_seconds
# Processing Rate (events/sec):
rate(workflow_process_latency_seconds_count[5m])
Follow Prometheus naming conventions:
// Good
workflow_process_duration_seconds_total
order_payment_attempts_total
customer_notification_sent_total
// Avoid
ProcessingTime
paymentAttempts
NotificationsSent
Set appropriate thresholds based on SLAs:
# For critical workflows (< 1 minute SLA)
- expr: workflow_process_lag_seconds > 30
# For batch workflows (< 1 hour SLA)
- expr: workflow_process_lag_seconds > 1800
# Error rate based on volume
- expr: rate(workflow_process_error_count[5m]) > 0.1 # 10% error rate
Structure dashboards by audience:
Configure appropriate retention for different log levels:
# Structured logging configuration
debug_logs:
retention: 7 days
volume: high
error_logs:
retention: 90 days
volume: medium
audit_logs:
retention: 365 days
volume: low