Shared Nothing Mode
In shared nothing mode, workers operate without any shared filesystem access. All status updates and logs are transmitted to the coordinator via gRPC.
Overview
┌─────────────────────────────────────────────────────────────┐
│ Dagu Instance │
│ (Scheduler + Web UI + Coordinator) │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Local Storage │ │
│ │ ┌───────────────┬─────────────────┬────────────┐ │ │
│ │ │ dag-runs/ │ logs/ │ dags/ │ │ │
│ │ │ (status) │ (execution logs)│ │ │ │
│ │ └───────────────┴─────────────────┴────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
▲ │
│ ReportStatus + StreamLogs │ Task Dispatch
│ (gRPC) │ (gRPC)
│ ▼
┌────────┴───────────┐ ┌────────────────────┐
│ Worker 1 │ │ Worker N │
│ (No local state) │ │ (No local state) │
└────────────────────┘ └────────────────────┘How It Works
Static Discovery
Workers connect directly to coordinators using explicit addresses:
dagu worker --worker.coordinators=coordinator-1:50055,coordinator-2:50055No service registry or shared storage is required.
Status Pushing
Workers send execution status to the coordinator via the ReportStatus gRPC call:
- Worker executes a DAG step
- Worker calls
ReportStatuswith fullDAGRunStatus - Coordinator persists status to its local
DAGRunStore - Web UI reads status from coordinator's local storage
Log Streaming
Workers stream stdout/stderr to the coordinator via the StreamLogs gRPC call:
- Worker buffers log output in 32KB chunks
- Worker sends
LogChunkmessages with sequence numbers - Coordinator writes to local log files, flushing every 64KB
- Worker sends final marker when execution completes
Log streaming supports:
- Separate stdout and stderr streams
- Sequence numbers for ordering
- Automatic reconnection on network failures
Zombie Detection
The coordinator monitors worker heartbeats and automatically marks tasks as failed when workers become unresponsive:
| Parameter | Value |
|---|---|
| Heartbeat interval | 1 second |
| Stale threshold | 30 seconds |
| Detector interval | 45 seconds |
When a worker stops sending heartbeats:
- Coordinator detects stale heartbeat (> 30 seconds old)
- Coordinator marks all running tasks from that worker as
FAILED - Error message:
"worker {workerID} became unresponsive" - All running nodes within the task are also marked as
FAILED
Configuration
Coordinator
# Bind to all interfaces
dagu coordinator --coordinator.host=0.0.0.0 --coordinator.port=50055
# With advertise address for Kubernetes/Docker
dagu coordinator \
--coordinator.host=0.0.0.0 \
--coordinator.advertise=dagu-coordinator.default.svc.cluster.local \
--coordinator.port=50055Workers
# Connect to specific coordinators (no service registry)
dagu worker \
--worker.coordinators=coordinator-1:50055,coordinator-2:50055 \
--worker.labels=gpu=true,region=us-east-1Configuration File
# Coordinator config.yaml
coordinator:
host: 0.0.0.0
port: 50055
advertise: dagu-coordinator.default.svc.cluster.local
paths:
dataDir: "/var/lib/dagu/data" # Local storage for status
logDir: "/var/lib/dagu/logs" # Local storage for logs
---
# Worker config.yaml
worker:
id: "worker-gpu-01"
coordinators:
- "coordinator-1:50055"
- "coordinator-2:50055"
labels:
gpu: "true"
region: "us-east-1"
postgresPool:
maxOpenConns: 25 # Total connections across ALL PostgreSQL DSNs
maxIdleConns: 5 # Per-DSN idle connections
connMaxLifetime: 300 # Seconds
connMaxIdleTime: 60 # SecondsEnvironment Variables
# Worker
export DAGU_WORKER_COORDINATORS="coordinator-1:50055,coordinator-2:50055"
export DAGU_WORKER_ID=worker-01
export DAGU_WORKER_LABELS="gpu=true,region=us-east-1"
# PostgreSQL connection pool (optional, defaults shown)
export DAGU_WORKER_POSTGRES_POOL_MAX_OPEN_CONNS=25
export DAGU_WORKER_POSTGRES_POOL_MAX_IDLE_CONNS=5
export DAGU_WORKER_POSTGRES_POOL_CONN_MAX_LIFETIME=300
export DAGU_WORKER_POSTGRES_POOL_CONN_MAX_IDLE_TIME=60PostgreSQL Connection Pool Management
In shared-nothing mode, multiple DAGs run concurrently within a single worker process. Without global connection pool management, each DAG's PostgreSQL steps could create unlimited connections, leading to connection exhaustion.
How It Works
The global PostgreSQL connection pool:
- Limits total connections across ALL databases and DAG executions
- Shares connections between concurrent DAG runs
- Reuses connections across sequential DAG executions
- Manages per-DSN pools while enforcing a global limit
Configuration
worker:
postgresPool:
maxOpenConns: 25 # Hard limit across ALL PostgreSQL DSNs
maxIdleConns: 5 # Per-DSN idle connection limit
connMaxLifetime: 300 # Max connection age (seconds)
connMaxIdleTime: 60 # Max idle time before closure (seconds)Example Scenario
Consider a worker with maxOpenConns: 25 running 10 concurrent DAGs:
- Same DSN: If all 10 DAGs connect to the same PostgreSQL server, they share the 25-connection pool
- Different DSNs: If DAGs connect to 3 different PostgreSQL servers, connections are distributed among them, but the total across all servers is still limited to 25
- Connection reuse: When a step completes, its connection returns to the pool for reuse by other steps
Best Practices
1. Size based on PostgreSQL limits
Set maxOpenConns based on your PostgreSQL server's max_connections:
worker.postgresPool.maxOpenConns = PostgreSQL max_connections / number_of_workers / 2Example: PostgreSQL with max_connections: 100, 4 workers:
- Per-worker limit:
100 / 4 / 2 = 12(leaving headroom)
2. Consider concurrent DAGs
Calculate maximum concurrent DAG executions:
max_concurrent_dags = worker.maxActiveRuns (default: 100)If many DAGs use PostgreSQL simultaneously, ensure maxOpenConns is sufficient.
3. Idle connection management
maxIdleConns: 5balances connection reuse vs resource usageconnMaxIdleTime: 60ensures idle connections don't persist indefinitelyconnMaxLifetime: 300prevents stale connections
4. Monitor connection usage
Check PostgreSQL connection counts:
SELECT count(*) FROM pg_stat_activity WHERE application_name LIKE 'dagu%';Expected: ≤ maxOpenConns × number_of_workers
Important Notes
Applies Only to PostgreSQL
Global pool management applies only to PostgreSQL. SQLite steps always use 1 connection per step, regardless of worker mode.
Non-Worker Mode
When running DAGs directly (not via workers), PostgreSQL steps use fixed defaults: 1 max connection, 1 idle connection. The global pool is not used.
Kubernetes Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: dagu-coordinator
spec:
replicas: 1
template:
spec:
containers:
- name: dagu
image: dagu:latest
args:
- "start-all"
- "--host=0.0.0.0"
- "--coordinator.host=0.0.0.0"
- "--coordinator.advertise=dagu-coordinator.default.svc.cluster.local"
ports:
- containerPort: 8080
name: http
- containerPort: 50055
name: grpc
volumeMounts:
- name: data
mountPath: /var/lib/dagu
- name: dags
mountPath: /etc/dagu/dags
volumes:
- name: data
emptyDir: {} # Local ephemeral storage
- name: dags
configMap:
name: dagu-dags
---
apiVersion: v1
kind: Service
metadata:
name: dagu-coordinator
spec:
ports:
- port: 8080
name: http
- port: 50055
name: grpc
selector:
app: dagu-coordinator
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: dagu-worker
spec:
replicas: 5
template:
spec:
containers:
- name: worker
image: dagu:latest
args:
- "worker"
- "--worker.coordinators=dagu-coordinator.default.svc.cluster.local:50055"
- "--worker.labels=region=us-east-1"
# No volume mounts needed - all state via gRPCMulti-Cluster Deployment
Workers can connect to coordinators across different clusters or clouds:
# Cluster A - Coordinator
apiVersion: apps/v1
kind: Deployment
metadata:
name: dagu-coordinator
spec:
template:
spec:
containers:
- name: dagu
args:
- "start-all"
- "--coordinator.host=0.0.0.0"
- "--coordinator.advertise=coordinator.cluster-a.example.com"
---
# Cluster B - Workers
apiVersion: apps/v1
kind: Deployment
metadata:
name: dagu-worker
spec:
template:
spec:
containers:
- name: worker
args:
- "worker"
- "--worker.coordinators=coordinator.cluster-a.example.com:50055"
- "--worker.labels=region=us-west-2,cluster=cluster-b"TLS Configuration
For production deployments, enable TLS for gRPC communication:
# Coordinator with TLS
dagu coordinator \
--coordinator.host=0.0.0.0 \
--peer.insecure=false \
--peer.cert-file=/certs/server.crt \
--peer.key-file=/certs/server.key
# Worker with TLS
dagu worker \
--worker.coordinators=coordinator:50055 \
--peer.insecure=false \
--peer.cert-file=/certs/client.crt \
--peer.key-file=/certs/client.key \
--peer.client-ca-file=/certs/ca.crtTechnical Details
Log Streaming Protocol
| Parameter | Value |
|---|---|
| Worker buffer size | 32KB |
| Coordinator flush threshold | 64KB |
| Stream type | Bidirectional gRPC stream |
Log chunks include:
dag_name: Name of the DAGdag_run_id: Unique run identifierstep_name: Name of the step producing logsstream_type:STDOUTorSTDERRsequence: Ordering sequence numberdata: Log content bytesfinal: Marker for stream completion
Status Pushing Protocol
Workers send ReportStatusRequest containing:
- Full
DAGRunStatusprotobuf message - Status updates for all nodes
- Error messages and timestamps
The coordinator:
- Finds or opens the DAGRunAttempt for the run
- Writes status to the local
DAGRunStore - Returns acceptance confirmation
Queue Dispatch with Previous Status
When the scheduler dispatches queued DAGs to workers:
- Scheduler reads the current status from
DAGRunStore - Status is included in the task as
previous_statusfield - Worker receives status with the task (no local store access needed)
- Worker uses
previous_statusfor retry operations
This enables workers to perform retries without requiring:
- Access to shared filesystem
- Local
DAGRunStoreinstance - Previous execution history on the worker
Temporary File Cleanup
Workers automatically clean up temporary files after each execution:
| File Type | Location | Cleaned After |
|---|---|---|
| DAG files | /tmp/dagu/worker-dags/ | Each execution |
| Log directories | /tmp/dagu/worker-logs/ | Each execution |
Workers are safe to run on ephemeral nodes without risk of disk accumulation.
Advantages
- No shared storage: Works in any environment
- Multi-cloud ready: Workers can run anywhere with network access
- Simple infrastructure: No NFS, EFS, or shared volumes needed
- Fault isolation: Worker failures don't affect storage
Limitations
Network Dependency
- Status updates and logs require network connectivity to the coordinator
- If the coordinator is unreachable, status updates are lost (not queued)
- Log streaming failures are non-fatal: steps succeed even if logs cannot be streamed
Cancellation Latency
- Cancellation signals are delivered via heartbeat responses
- Worst-case latency: 1 second (worker heartbeat interval)
- Workers check for cancelled runs in each heartbeat response from coordinator
Log Streaming Behavior
- Log streaming is best-effort: failures don't fail the step execution
- Some logs may be lost if network issues occur during streaming
- From
output.go: "Log streaming failures are non-fatal - they shouldn't fail an otherwise successful step execution. Lost logs are unfortunate but acceptable."
Coordinator Availability
- Single point for status persistence
- If coordinator is down, workers continue executing but cannot report status
- Zombie detection requires coordinator to be running
When to Use
Use shared nothing mode when:
- Kubernetes without
ReadWriteManystorage - Multi-cloud or multi-cluster deployments
- Containerized workloads in dynamic environments
- Infrastructure without shared filesystem capability
- Workers need to run on ephemeral nodes
