Skip to content

Shared Nothing Mode

In shared nothing mode, workers operate without any shared filesystem access. All status updates and logs are transmitted to the coordinator via gRPC.

Overview

┌─────────────────────────────────────────────────────────────┐
│                     Dagu Instance                           │
│  (Scheduler + Web UI + Coordinator)                         │
│                                                             │
│  ┌─────────────────────────────────────────────────────┐    │
│  │              Local Storage                          │    │
│  │  ┌───────────────┬─────────────────┬────────────┐   │    │
│  │  │  dag-runs/    │     logs/       │   dags/    │   │    │
│  │  │  (status)     │ (execution logs)│            │   │    │
│  │  └───────────────┴─────────────────┴────────────┘   │    │
│  └─────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────┘
         ▲                              │
         │ ReportStatus + StreamLogs    │ Task Dispatch
         │ (gRPC)                       │ (gRPC)
         │                              ▼
┌────────┴───────────┐        ┌────────────────────┐
│     Worker 1       │        │     Worker N       │
│  (No local state)  │        │  (No local state)  │
└────────────────────┘        └────────────────────┘

How It Works

Static Discovery

Workers connect directly to coordinators using explicit addresses:

bash
dagu worker --worker.coordinators=coordinator-1:50055,coordinator-2:50055

No service registry or shared storage is required.

Status Pushing

Workers send execution status to the coordinator via the ReportStatus gRPC call:

  1. Worker executes a DAG step
  2. Worker calls ReportStatus with full DAGRunStatus
  3. Coordinator persists status to its local DAGRunStore
  4. Web UI reads status from coordinator's local storage

Log Streaming

Workers stream stdout/stderr to the coordinator via the StreamLogs gRPC call:

  1. Worker buffers log output in 32KB chunks
  2. Worker sends LogChunk messages with sequence numbers
  3. Coordinator writes to local log files, flushing every 64KB
  4. Worker sends final marker when execution completes

Log streaming supports:

  • Separate stdout and stderr streams
  • Sequence numbers for ordering
  • Automatic reconnection on network failures

Zombie Detection

The coordinator monitors worker heartbeats and automatically marks tasks as failed when workers become unresponsive:

ParameterValue
Heartbeat interval1 second
Stale threshold30 seconds
Detector interval45 seconds

When a worker stops sending heartbeats:

  1. Coordinator detects stale heartbeat (> 30 seconds old)
  2. Coordinator marks all running tasks from that worker as FAILED
  3. Error message: "worker {workerID} became unresponsive"
  4. All running nodes within the task are also marked as FAILED

Configuration

Coordinator

bash
# Bind to all interfaces
dagu coordinator --coordinator.host=0.0.0.0 --coordinator.port=50055

# With advertise address for Kubernetes/Docker
dagu coordinator \
  --coordinator.host=0.0.0.0 \
  --coordinator.advertise=dagu-coordinator.default.svc.cluster.local \
  --coordinator.port=50055

Workers

bash
# Connect to specific coordinators (no service registry)
dagu worker \
  --worker.coordinators=coordinator-1:50055,coordinator-2:50055 \
  --worker.labels=gpu=true,region=us-east-1

Configuration File

yaml
# Coordinator config.yaml
coordinator:
  host: 0.0.0.0
  port: 50055
  advertise: dagu-coordinator.default.svc.cluster.local

paths:
  dataDir: "/var/lib/dagu/data"   # Local storage for status
  logDir: "/var/lib/dagu/logs"    # Local storage for logs

---
# Worker config.yaml
worker:
  id: "worker-gpu-01"
  coordinators:
    - "coordinator-1:50055"
    - "coordinator-2:50055"
  labels:
    gpu: "true"
    region: "us-east-1"
  postgresPool:
    maxOpenConns: 25       # Total connections across ALL PostgreSQL DSNs
    maxIdleConns: 5        # Per-DSN idle connections
    connMaxLifetime: 300   # Seconds
    connMaxIdleTime: 60    # Seconds

Environment Variables

bash
# Worker
export DAGU_WORKER_COORDINATORS="coordinator-1:50055,coordinator-2:50055"
export DAGU_WORKER_ID=worker-01
export DAGU_WORKER_LABELS="gpu=true,region=us-east-1"

# PostgreSQL connection pool (optional, defaults shown)
export DAGU_WORKER_POSTGRES_POOL_MAX_OPEN_CONNS=25
export DAGU_WORKER_POSTGRES_POOL_MAX_IDLE_CONNS=5
export DAGU_WORKER_POSTGRES_POOL_CONN_MAX_LIFETIME=300
export DAGU_WORKER_POSTGRES_POOL_CONN_MAX_IDLE_TIME=60

PostgreSQL Connection Pool Management

In shared-nothing mode, multiple DAGs run concurrently within a single worker process. Without global connection pool management, each DAG's PostgreSQL steps could create unlimited connections, leading to connection exhaustion.

How It Works

The global PostgreSQL connection pool:

  1. Limits total connections across ALL databases and DAG executions
  2. Shares connections between concurrent DAG runs
  3. Reuses connections across sequential DAG executions
  4. Manages per-DSN pools while enforcing a global limit

Configuration

yaml
worker:
  postgresPool:
    maxOpenConns: 25       # Hard limit across ALL PostgreSQL DSNs
    maxIdleConns: 5        # Per-DSN idle connection limit
    connMaxLifetime: 300   # Max connection age (seconds)
    connMaxIdleTime: 60    # Max idle time before closure (seconds)

Example Scenario

Consider a worker with maxOpenConns: 25 running 10 concurrent DAGs:

  • Same DSN: If all 10 DAGs connect to the same PostgreSQL server, they share the 25-connection pool
  • Different DSNs: If DAGs connect to 3 different PostgreSQL servers, connections are distributed among them, but the total across all servers is still limited to 25
  • Connection reuse: When a step completes, its connection returns to the pool for reuse by other steps

Best Practices

1. Size based on PostgreSQL limits

Set maxOpenConns based on your PostgreSQL server's max_connections:

worker.postgresPool.maxOpenConns = PostgreSQL max_connections / number_of_workers / 2

Example: PostgreSQL with max_connections: 100, 4 workers:

  • Per-worker limit: 100 / 4 / 2 = 12 (leaving headroom)

2. Consider concurrent DAGs

Calculate maximum concurrent DAG executions:

max_concurrent_dags = worker.maxActiveRuns (default: 100)

If many DAGs use PostgreSQL simultaneously, ensure maxOpenConns is sufficient.

3. Idle connection management

  • maxIdleConns: 5 balances connection reuse vs resource usage
  • connMaxIdleTime: 60 ensures idle connections don't persist indefinitely
  • connMaxLifetime: 300 prevents stale connections

4. Monitor connection usage

Check PostgreSQL connection counts:

sql
SELECT count(*) FROM pg_stat_activity WHERE application_name LIKE 'dagu%';

Expected: ≤ maxOpenConns × number_of_workers

Important Notes

Applies Only to PostgreSQL

Global pool management applies only to PostgreSQL. SQLite steps always use 1 connection per step, regardless of worker mode.

Non-Worker Mode

When running DAGs directly (not via workers), PostgreSQL steps use fixed defaults: 1 max connection, 1 idle connection. The global pool is not used.

Kubernetes Deployment

yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: dagu-coordinator
spec:
  replicas: 1
  template:
    spec:
      containers:
        - name: dagu
          image: dagu:latest
          args:
            - "start-all"
            - "--host=0.0.0.0"
            - "--coordinator.host=0.0.0.0"
            - "--coordinator.advertise=dagu-coordinator.default.svc.cluster.local"
          ports:
            - containerPort: 8080
              name: http
            - containerPort: 50055
              name: grpc
          volumeMounts:
            - name: data
              mountPath: /var/lib/dagu
            - name: dags
              mountPath: /etc/dagu/dags
      volumes:
        - name: data
          emptyDir: {}  # Local ephemeral storage
        - name: dags
          configMap:
            name: dagu-dags
---
apiVersion: v1
kind: Service
metadata:
  name: dagu-coordinator
spec:
  ports:
    - port: 8080
      name: http
    - port: 50055
      name: grpc
  selector:
    app: dagu-coordinator
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: dagu-worker
spec:
  replicas: 5
  template:
    spec:
      containers:
        - name: worker
          image: dagu:latest
          args:
            - "worker"
            - "--worker.coordinators=dagu-coordinator.default.svc.cluster.local:50055"
            - "--worker.labels=region=us-east-1"
          # No volume mounts needed - all state via gRPC

Multi-Cluster Deployment

Workers can connect to coordinators across different clusters or clouds:

yaml
# Cluster A - Coordinator
apiVersion: apps/v1
kind: Deployment
metadata:
  name: dagu-coordinator
spec:
  template:
    spec:
      containers:
        - name: dagu
          args:
            - "start-all"
            - "--coordinator.host=0.0.0.0"
            - "--coordinator.advertise=coordinator.cluster-a.example.com"

---
# Cluster B - Workers
apiVersion: apps/v1
kind: Deployment
metadata:
  name: dagu-worker
spec:
  template:
    spec:
      containers:
        - name: worker
          args:
            - "worker"
            - "--worker.coordinators=coordinator.cluster-a.example.com:50055"
            - "--worker.labels=region=us-west-2,cluster=cluster-b"

TLS Configuration

For production deployments, enable TLS for gRPC communication:

bash
# Coordinator with TLS
dagu coordinator \
  --coordinator.host=0.0.0.0 \
  --peer.insecure=false \
  --peer.cert-file=/certs/server.crt \
  --peer.key-file=/certs/server.key

# Worker with TLS
dagu worker \
  --worker.coordinators=coordinator:50055 \
  --peer.insecure=false \
  --peer.cert-file=/certs/client.crt \
  --peer.key-file=/certs/client.key \
  --peer.client-ca-file=/certs/ca.crt

Technical Details

Log Streaming Protocol

ParameterValue
Worker buffer size32KB
Coordinator flush threshold64KB
Stream typeBidirectional gRPC stream

Log chunks include:

  • dag_name: Name of the DAG
  • dag_run_id: Unique run identifier
  • step_name: Name of the step producing logs
  • stream_type: STDOUT or STDERR
  • sequence: Ordering sequence number
  • data: Log content bytes
  • final: Marker for stream completion

Status Pushing Protocol

Workers send ReportStatusRequest containing:

  • Full DAGRunStatus protobuf message
  • Status updates for all nodes
  • Error messages and timestamps

The coordinator:

  1. Finds or opens the DAGRunAttempt for the run
  2. Writes status to the local DAGRunStore
  3. Returns acceptance confirmation

Queue Dispatch with Previous Status

When the scheduler dispatches queued DAGs to workers:

  1. Scheduler reads the current status from DAGRunStore
  2. Status is included in the task as previous_status field
  3. Worker receives status with the task (no local store access needed)
  4. Worker uses previous_status for retry operations

This enables workers to perform retries without requiring:

  • Access to shared filesystem
  • Local DAGRunStore instance
  • Previous execution history on the worker

Temporary File Cleanup

Workers automatically clean up temporary files after each execution:

File TypeLocationCleaned After
DAG files/tmp/dagu/worker-dags/Each execution
Log directories/tmp/dagu/worker-logs/Each execution

Workers are safe to run on ephemeral nodes without risk of disk accumulation.

Advantages

  • No shared storage: Works in any environment
  • Multi-cloud ready: Workers can run anywhere with network access
  • Simple infrastructure: No NFS, EFS, or shared volumes needed
  • Fault isolation: Worker failures don't affect storage

Limitations

Network Dependency

  • Status updates and logs require network connectivity to the coordinator
  • If the coordinator is unreachable, status updates are lost (not queued)
  • Log streaming failures are non-fatal: steps succeed even if logs cannot be streamed

Cancellation Latency

  • Cancellation signals are delivered via heartbeat responses
  • Worst-case latency: 1 second (worker heartbeat interval)
  • Workers check for cancelled runs in each heartbeat response from coordinator

Log Streaming Behavior

  • Log streaming is best-effort: failures don't fail the step execution
  • Some logs may be lost if network issues occur during streaming
  • From output.go: "Log streaming failures are non-fatal - they shouldn't fail an otherwise successful step execution. Lost logs are unfortunate but acceptable."

Coordinator Availability

  • Single point for status persistence
  • If coordinator is down, workers continue executing but cannot report status
  • Zombie detection requires coordinator to be running

When to Use

Use shared nothing mode when:

  • Kubernetes without ReadWriteMany storage
  • Multi-cloud or multi-cluster deployments
  • Containerized workloads in dynamic environments
  • Infrastructure without shared filesystem capability
  • Workers need to run on ephemeral nodes

Released under the MIT License.