Timeouts in the workflow system allow you to execute logic after a specified duration has elapsed. This is essential for implementing delays, SLA monitoring, automatic escalations, and time-based business rules.
The timeout system consists of two main components:
Add a timeout to your workflow using the AddTimeout method:
b.AddTimeout(StatusWaiting,
func(ctx context.Context, r *workflow.Run[Order, Status], now time.Time) (time.Time, error) {
// Set timeout for 1 hour from now
return now.Add(time.Hour), nil
},
func(ctx context.Context, r *workflow.Run[Order, Status], now time.Time) (Status, error) {
// Handle timeout - escalate the order
r.Object.Priority = "HIGH"
return StatusEscalated, nil
},
StatusEscalated,
)
Timer functions determine when a timeout should expire. They return a time.Time indicating the expiration time.
Use the built-in DurationTimerFunc for fixed durations:
// Timeout after 30 minutes
timer := workflow.DurationTimerFunc[Order, Status](30 * time.Minute)
b.AddTimeout(StatusPending, timer, timeoutHandler, StatusExpired)
Use TimeTimerFunc to timeout at a specific time:
// Timeout at end of business day
endOfDay := time.Date(2024, 1, 1, 17, 0, 0, 0, time.UTC)
timer := workflow.TimeTimerFunc[Order, Status](endOfDay)
b.AddTimeout(StatusProcessing, timer, timeoutHandler, StatusExpired)
Create custom timer functions for dynamic behavior:
func dynamicTimer(ctx context.Context, r *workflow.Run[Order, Status], now time.Time) (time.Time, error) {
switch r.Object.Priority {
case "HIGH":
return now.Add(15 * time.Minute), nil
case "MEDIUM":
return now.Add(1 * time.Hour), nil
case "LOW":
return now.Add(24 * time.Hour), nil
default:
// Return zero time to skip timeout
return time.Time{}, nil
}
}
Skip timeout creation by returning a zero time:
func conditionalTimer(ctx context.Context, r *workflow.Run[Order, Status], now time.Time) (time.Time, error) {
if r.Object.RequiresTimeout {
return now.Add(time.Hour), nil
}
// No timeout needed
return time.Time{}, nil
}
Timeout functions execute when a timeout expires. They can modify the workflow object and transition to a new status.
func handleTimeout(ctx context.Context, r *workflow.Run[Order, Status], now time.Time) (Status, error) {
// Log timeout
log.Printf("Order %s timed out at %v", r.Object.ID, now)
// Update object
r.Object.TimeoutCount++
r.Object.LastTimeout = now
// Transition to new status
return StatusTimedOut, nil
}
func escalateOrder(ctx context.Context, r *workflow.Run[Order, Status], now time.Time) (Status, error) {
// Escalate priority
if r.Object.Priority == "LOW" {
r.Object.Priority = "MEDIUM"
} else if r.Object.Priority == "MEDIUM" {
r.Object.Priority = "HIGH"
}
// Notify management
err := notifyManagement(ctx, r.Object)
if err != nil {
return 0, fmt.Errorf("failed to notify management: %w", err)
}
return StatusEscalated, nil
}
func retryWithBackoff(ctx context.Context, r *workflow.Run[Order, Status], now time.Time) (Status, error) {
r.Object.RetryCount++
if r.Object.RetryCount > 3 {
// Max retries exceeded
return StatusFailed, nil
}
// Exponential backoff for next retry
backoff := time.Duration(r.Object.RetryCount) * time.Duration(r.Object.RetryCount) * time.Minute
r.Object.NextRetryAt = now.Add(backoff)
return StatusRetrying, nil
}
Configure timeout behavior using builder options:
b.AddTimeout(StatusWaiting, timer, handler, StatusNext).WithOptions(
workflow.PollFrequency(30 * time.Second),
)
b.AddTimeout(StatusWaiting, timer, handler, StatusNext).WithOptions(
workflow.ErrBackOff(5 * time.Minute),
)
b.AddTimeout(StatusWaiting, timer, handler, StatusNext).WithOptions(
workflow.PauseAfterErrCount(3),
)
The workflow system requires a timeout store implementation to persist timeout records:
// Memory store for testing
timeoutStore := memtimeoutstore.New()
// SQL store for production
timeoutStore := sqltimeout.New(db, "timeouts_table")
workflow := b.Build(
eventStreamer,
recordStore,
roleScheduler,
workflow.WithTimeoutStore(timeoutStore),
)
If a timer function returns an error, the timeout is not created:
func errorProneTimer(ctx context.Context, r *workflow.Run[Order, Status], now time.Time) (time.Time, error) {
if r.Object.InvalidData {
return time.Time{}, fmt.Errorf("cannot create timeout: invalid data")
}
return now.Add(time.Hour), nil
}
If a timeout function returns an error, it will be retried with backoff:
func retriableTimeout(ctx context.Context, r *workflow.Run[Order, Status], now time.Time) (Status, error) {
err := externalServiceCall(ctx, r.Object)
if err != nil {
// Will be retried
return 0, fmt.Errorf("external service failed: %w", err)
}
return StatusProcessed, nil
}
Always use the provided now parameter instead of time.Now() for testability:
// Good - testable
func timer(ctx context.Context, r *workflow.Run[Order, Status], now time.Time) (time.Time, error) {
return now.Add(time.Hour), nil
}
// Bad - hard to test
func timer(ctx context.Context, r *workflow.Run[Order, Status], now time.Time) (time.Time, error) {
return time.Now().Add(time.Hour), nil // Don't do this
}
Very long timeouts can accumulate in the timeout store:
// Consider using shorter timeouts with re-scheduling
func reasonableTimeout(ctx context.Context, r *workflow.Run[Order, Status], now time.Time) (time.Time, error) {
// Instead of 30 days, use 1 day and re-schedule
return now.Add(24 * time.Hour), nil
}
Be explicit about timezones for business hour calculations:
func businessHoursTimeout(ctx context.Context, r *workflow.Run[Order, Status], now time.Time) (time.Time, error) {
// Convert to business timezone
businessTZ, _ := time.LoadLocation("America/New_York")
businessTime := now.In(businessTZ)
// Set timeout for next business day 9 AM
nextDay := businessTime.Add(24 * time.Hour)
businessHourStart := time.Date(
nextDay.Year(), nextDay.Month(), nextDay.Day(),
9, 0, 0, 0, businessTZ,
)
return businessHourStart.UTC(), nil
}
func slaTimeout(ctx context.Context, r *workflow.Run[Ticket, Status], now time.Time) (time.Time, error) {
slaHours := getSLAHours(r.Object.Priority)
return r.Object.CreatedAt.Add(time.Duration(slaHours) * time.Hour), nil
}
func slaViolation(ctx context.Context, r *workflow.Run[Ticket, Status], now time.Time) (Status, error) {
r.Object.SLAViolated = true
r.Object.ViolationTime = now
// Notify stakeholders
err := notifySLAViolation(ctx, r.Object)
if err != nil {
return 0, err
}
return StatusSLAViolated, nil
}
func reminderTimeout(ctx context.Context, r *workflow.Run[Task, Status], now time.Time) (time.Time, error) {
return r.Object.DueDate.Add(-24 * time.Hour), nil // Remind 1 day before
}
func sendReminder(ctx context.Context, r *workflow.Run[Task, Status], now time.Time) (Status, error) {
err := sendEmailReminder(ctx, r.Object.AssigneeEmail, r.Object)
if err != nil {
return 0, err
}
// Don't change status, just send reminder
return StatusInProgress, nil
}
func circuitBreakerTimeout(ctx context.Context, r *workflow.Run[Request, Status], now time.Time) (time.Time, error) {
// Reset circuit after 5 minutes
return now.Add(5 * time.Minute), nil
}
func resetCircuit(ctx context.Context, r *workflow.Run[Request, Status], now time.Time) (Status, error) {
r.Object.FailureCount = 0
r.Object.CircuitOpen = false
return StatusRetryReady, nil
}
Use the clock interface for deterministic testing:
func TestTimeout(t *testing.T) {
fakeClock := clock.NewFakeClock(time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC))
workflow := buildWorkflow(t, workflow.WithClock(fakeClock))
// Create record
recordID := insertRecord(t, workflow, StatusWaiting)
// Advance clock to trigger timeout
fakeClock.Step(2 * time.Hour)
// Wait for timeout processing
time.Sleep(100 * time.Millisecond)
// Verify timeout was processed
record := getRecord(t, workflow, recordID)
assert.Equal(t, StatusTimedOut, record.Status)
}
Monitor timeout processing with metrics:
workflow_timeout_created_total: Number of timeouts createdworkflow_timeout_processed_total: Number of timeouts processedworkflow_timeout_errors_total: Number of timeout processing errorsworkflow_timeout_processing_duration_seconds: Time to process timeouts