This guide will walk you through installing Workflow and creating your first workflow in just a few minutes.
Add the Workflow library to your Go project:
go get github.com/luno/workflow
Letβs build a simple task processing workflow to understand the core concepts.
First, define the states your workflow can be in and the data it processes:
package main
import (
"context"
"fmt"
"time"
"github.com/luno/workflow"
"github.com/luno/workflow/adapters/memstreamer"
"github.com/luno/workflow/adapters/memrecordstore"
"github.com/luno/workflow/adapters/memrolescheduler"
)
// TaskStatus represents the possible states in our workflow
type TaskStatus int
const (
TaskStatusUnknown TaskStatus = 0
TaskStatusCreated TaskStatus = 1
TaskStatusValidated TaskStatus = 2
TaskStatusProcessed TaskStatus = 3
TaskStatusCompleted TaskStatus = 4
)
func (s TaskStatus) String() string {
switch s {
case TaskStatusCreated: return "Created"
case TaskStatusValidated: return "Validated"
case TaskStatusProcessed: return "Processed"
case TaskStatusCompleted: return "Completed"
default: return "Unknown"
}
}
// Task represents the data flowing through our workflow
type Task struct {
ID string
Name string
Data string
ProcessedAt *time.Time
Valid bool
}
Create a workflow with steps that transform your data:
func NewTaskWorkflow() *workflow.Workflow[Task, TaskStatus] {
b := workflow.NewBuilder[Task, TaskStatus]("task-processor")
// Step 1: Validate the task
b.AddStep(TaskStatusCreated, func(ctx context.Context, r *workflow.Run[Task, TaskStatus]) (TaskStatus, error) {
// Validate the task
if r.Object.Name == "" {
r.Object.Valid = false
return TaskStatusValidated, fmt.Errorf("task name cannot be empty")
}
r.Object.Valid = true
fmt.Printf("β Validated task: %s\n", r.Object.Name)
return TaskStatusValidated, nil
}, TaskStatusValidated)
// Step 2: Process the task
b.AddStep(TaskStatusValidated, func(ctx context.Context, r *workflow.Run[Task, TaskStatus]) (TaskStatus, error) {
if !r.Object.Valid {
return TaskStatusCompleted, nil // Skip processing invalid tasks
}
// Simulate some processing work
time.Sleep(100 * time.Millisecond)
now := time.Now()
r.Object.ProcessedAt = &now
r.Object.Data = fmt.Sprintf("processed-%s", r.Object.Data)
fmt.Printf("β Processed task: %s\n", r.Object.Name)
return TaskStatusProcessed, nil
}, TaskStatusProcessed, TaskStatusCompleted)
// Step 3: Complete the task
b.AddStep(TaskStatusProcessed, func(ctx context.Context, r *workflow.Run[Task, TaskStatus]) (TaskStatus, error) {
fmt.Printf("β Completed task: %s at %s\n", r.Object.Name, r.Object.ProcessedAt.Format(time.RFC3339))
return TaskStatusCompleted, nil
}, TaskStatusCompleted)
// Build with in-memory adapters for simplicity
return b.Build(
memstreamer.New(),
memrecordstore.New(),
memrolescheduler.New(),
)
}
Start the workflow engine and process some tasks:
func main() {
// Create and start the workflow
wf := NewTaskWorkflow()
ctx := context.Background()
wf.Run(ctx)
defer wf.Stop()
// Create some tasks
tasks := []Task{
{ID: "1", Name: "Process Invoice", Data: "invoice-data"},
{ID: "2", Name: "Send Email", Data: "email-content"},
{ID: "3", Name: "", Data: "invalid-task"}, // This will fail validation
}
// Process each task
for _, task := range tasks {
fmt.Printf("\nπ Starting workflow for task: %s\n", task.Name)
runID, err := wf.Trigger(ctx, task.ID, workflow.WithInitialValue(task))
if err != nil {
fmt.Printf("β Failed to start workflow: %v\n", err)
continue
}
// Wait for completion (with timeout)
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
run, err := wf.Await(ctx, task.ID, runID, TaskStatusCompleted)
cancel()
if err != nil {
fmt.Printf("β Workflow failed or timed out: %v\n", err)
} else {
fmt.Printf("β
Workflow completed! Final state: %+v\n", run.Object)
}
}
fmt.Println("\nπ All workflows completed!")
}
Save this code to main.go and run:
go mod init workflow-example
go get github.com/luno/workflow
go run main.go
Youβll see output like:
π Starting workflow for task: Process Invoice
β Validated task: Process Invoice
β Processed task: Process Invoice
β Completed task: Process Invoice at 2024-01-15T10:30:45Z
β
Workflow completed! Final state: {ID:1 Name:Process Invoice Data:processed-invoice-data ProcessedAt:2024-01-15 10:30:45.123 +0000 UTC Valid:true}
π Starting workflow for task: Send Email
β Validated task: Send Email
β Processed task: Send Email
β Completed task: Send Email at 2024-01-15T10:30:45Z
β
Workflow completed! Final state: {ID:2 Name:Send Email Data:processed-email-content ProcessedAt:2024-01-15 10:30:45.234 +0000 UTC Valid:true}
π Starting workflow for task:
β Workflow failed or timed out: task name cannot be empty
π All workflows completed!
Task and TaskStatusTask object is modified as it flows through stepsNow that you have a basic workflow running, explore these topics:
b.AddStep(TaskValidated, func(ctx context.Context, r *workflow.Run[Task, TaskStatus]) (TaskStatus, error) {
if err := processTask(r.Object); err != nil {
if isRetryableError(err) {
return 0, err // Will retry with backoff
}
return TaskCompleted, nil // Skip to completion for non-retryable errors
}
return TaskProcessed, nil
}).WithOptions(
workflow.PauseAfterErrCount(3), // Pause after 3 consecutive errors
workflow.ErrBackOff(time.Second * 5), // Wait 5 seconds between retries
)
b.AddStep(TaskValidated, func(ctx context.Context, r *workflow.Run[Task, TaskStatus]) (TaskStatus, error) {
if r.Object.Priority == "high" {
return TaskExpedited, nil
}
return TaskProcessed, nil
}, TaskExpedited, TaskProcessed)
b.AddStep(TaskProcessed, func(ctx context.Context, r *workflow.Run[Task, TaskStatus]) (TaskStatus, error) {
r.Object.ProcessCount++
if r.Object.ProcessCount < 3 {
return TaskProcessed, nil // Repeat this step
}
return TaskCompleted, nil
}, TaskProcessed, TaskCompleted)
Youβre now ready to build powerful, type-safe workflows with Workflow!