Managing Coordinators
The coordinator service in Alluxio acts as a distributed task scheduler and cluster manager. It is responsible for orchestrating background operations such as data loading and freeing, ensuring these tasks are executed reliably and efficiently across the cluster.
Unlike the traditional Alluxio Master, the coordinator is designed to be stateless and highly available by leveraging etcd for all state management and coordination.
1. Deployment Architecture
The coordinator service is designed to be stateless and highly available by leveraging etcd for all state management and coordination. This architecture supports both single-node and multi-node (High Availability) deployments.
Single Node Coordinator
For a single coordinator node deployment, you can store job metadata and status using either local storage or etcd. Both options ensure that job history is persisted across coordinator restarts.
RocksDB (Default): The coordinator uses a local RocksDB store by default. This is suitable for standalone deployments where high availability is not required.
alluxio.coordinator.job.meta.store.type=ROCKSEtcd (Optional): You can also connect a single coordinator to an etcd instance. This is recommended if you plan to transition to a High Availability (HA) setup in the future.
alluxio.coordinator.job.meta.store.type=ETCD
High Availability (HA) Architecture
The coordinator service achieves High Availability (HA) through a stateless design backed by etcd.
Stateless Coordinators: You can deploy multiple coordinator instances (nodes) in your cluster. Since they do not store local state regarding job queues or worker assignments, any coordinator can handle any task.
Etcd as the Source of Truth: All job metadata, status (Waiting, Running, Finished), and worker registry information are stored in etcd.
Fault Tolerance: If a coordinator instance fails, other running instances will continue to process the shared job queue in etcd. There is no "leader" election for the scheduling function itself; all coordinators actively participate in processing jobs.
Failover: The coordinator can complete failover within 10 seconds. For jobs scheduled to fail, coordination among the remaining surviving nodes will select one to handle them.
Configuring HA
To enable HA, simply deploy multiple coordinator pods or processes and ensure they all point to the same etcd cluster.
Key configuration properties for etcd connection:
2. Job Scheduling and Lifecycle
The coordinator manages long-running operations like load (cache warming) and free (cache release). The lifecycle of a job is managed entirely through etcd keys.
The Job Queue Mechanism
Submission: When a user submits a job (e.g., via CLI
alluxio load ...), the request is translated into a job entry and added to the waiting queue.Scheduling: Active coordinators pick up jobs from the queue. Only one coordinator can claim a specific job at a time, preventing duplicate execution.
Execution: The coordinator breaks the job into tasks and dispatches them to available workers.
Completion: Upon success or failure, the coordinator updates the job status to FINISHED (or FAILED).
Reliability
The system guarantees consistency even if multiple coordinators try to pick up the same job simultaneously. Operations to change a job's state are atomic.
3. Failure Recovery
The system is designed to handle coordinator failures gracefully.
Automatic Recovery
Since job states are persisted in etcd, a coordinator crash does not mean job data is lost. However, jobs that were actively being managed by the crashed coordinator need to be recovered.
Detection: Active coordinators can detect "stale" jobs in the RUNNING state that belong to a coordinator that is no longer sending heartbeats or updating its status.
Re-queueing: Mechanisms exist to revert these "orphaned" running jobs back to the WAITING state so they can be picked up by another healthy coordinator.
Manual Recovery (job rerun)
job rerun)In scenarios where a job gets stuck due to a complex failure (e.g., network partition or complete cluster restart during a job), or in the event of consecutive or complete failures, administrators can manually intervene to reclaim failed jobs.
The job rerun command allows you to reset a job's status in etcd, effectively moving it back to the queue to be retried.
Note: This command interacts directly with etcd to modify job states.
4. Load Balancing
Load balancing is inherent to the decentralized architecture of the coordinators.
Shared Queue: All coordinators consume from the same shared global queue in etcd.
Randomized Polling: To prevent "thundering herd" issues where all coordinators poll etcd at the exact same millisecond, the polling intervals include a random jitter.
Worker Capacity: When scheduling tasks, coordinators respect limits on how many tasks a single worker can handle (
alluxio.coordinator.scheduler.max.tasks.per.worker). This prevents overwhelming specific workers even if multiple coordinators are active.
5. Configuration and Tuning
You can fine-tune the coordinator's behavior for your specific workload and etcd capacity.
alluxio.coordinator.scheduler.etcd.max.retries
(varies)
Max retries for etcd operations before giving up.
alluxio.coordinator.scheduler.etcd.read.timeout
(varies)
Timeout for reading from etcd.
alluxio.coordinator.scheduler.etcd.write.timeout
(varies)
Timeout for writing to etcd.
alluxio.coordinator.scheduler.interval.time
2s
Scheduling interval. A shorter interval allows faster scheduling of small jobs but increases CPU overhead.
alluxio.coordinator.scheduler.job.etcd.path
/alluxio/jobs/
The prefix directory in etcd used for storing job metadata.
alluxio.coordinator.scheduler.job.etcd.poll.interval
1s
How often coordinators check etcd for new waiting jobs. Lower values mean faster response but higher load on etcd.
alluxio.coordinator.scheduler.max.tasks.per.worker
3
The worker capacity, corresponding to the worker's thread pool limit. This configuration determines how many tasks each coordinator can schedule simultaneously on the same worker.
alluxio.coordinator.scheduler.waiting.job.capacity
2000
The capacity for waiting jobs in the scheduler. This prevents coordinators from getting overloaded with scheduling jobs.
alluxio.job.batch.size
200
The number of files per task dispatched to a worker. Larger batches increase worker concurrency but may exceed the worker’s thread pool size. This can be overridden via CLI (--batch-size).
alluxio.job.cleanup.interval
1H
The interval for the periodic cleanup of finished jobs from ETCD.
alluxio.job.retention.time
7d
Job history save time.
alluxio.worker.load.file.partition.size
256MiB
Shard size for splitting large files during loading. Smaller shards increase concurrency but may create more overhead.
alluxio.worker.network.grpc.reader.threads.max
2048
Maximum number of threads in the worker’s pool for parallel UFS data loading.
Monitoring
To see a list of all registered workers and their current status (online or offline), as well as the list of coordinators:
6. Metrics
Monitoring the coordinator's interaction with etcd is vital for ensuring cluster health.
Note: These metrics are low-level indicators of internal scheduler operations. For general cluster health and job progress, please refer to the default Grafana dashboard.
alluxio_scheduler_etcd_poll
Counter of poll operations to etcd.
alluxio_scheduler_etcd_claim
Counter of jobs successfully claimed from the queue.
alluxio_scheduler_etcd_reclaim
Counter of jobs reclaimed/recovered from failed coordinators.
alluxio_scheduler_etcd_latency_ms
Latency distribution of etcd operations performed by the scheduler.
alluxio_scheduler_job_access_etcd
Counter of various job-related access types (list, get, transaction) to etcd.
Last updated