Yige

Yige

Build

Scheduling System - Airflow

Scheduling System - Airflow#

Selection of Scheduling Systems#

Current open-source scheduling systems are divided into two categories:

  • Time-based scheduling systems, where the core design is to run tasks on a schedule, data sharding, and elastic scaling, but they are not very friendly in terms of dependency support, making them more suitable for backend business development. Representatives include XXL-JOB and Elastic-Job.

  • Workflow scheduling systems centered around DAG, which emphasize task dependencies. Common examples include:

    1. Oozie: Oozie is developed based on XML format, and can be visually configured after integration into Hue. However, its drawbacks are quite obvious, with poor version management and log collection, low development flexibility, and a limited number of schedulable tasks. Additionally, its definition is overly complex, leading to high maintenance costs. The core issue is still the lack of the concept of shared variables and shared connection information.
    2. Azkaban: Similar to Oozie, the core issue is still the lack of the concept of shared variables and shared connection information.
    3. Airflow
    4. Dolphinscheduler: A recently open-sourced Apache incubator project, developed and contributed by Chinese developers.

Basic Usage of Airflow#

Basic Concepts#

  • DAG: Directed Acyclic Graph, which defines the entire job in Airflow. All tasks within the same DAG share the same scheduling time.

  • Task: A specific job task within a DAG, which must exist within a certain DAG. Tasks in a DAG configure dependencies, and cross-DAG dependencies are possible but not recommended. Cross-DAG dependencies can reduce the intuitiveness of the DAG graph and complicate dependency management.

  • DAG Run: A DAG Run occurs when a DAG meets its scheduling time or is triggered externally. It can be understood as an instance instantiated by the DAG.

  • Task Instance: A Task Instance is created when a Task is scheduled to start. It can be understood as an instance instantiated by the Task.

Basic Architecture#

Airflow is a queue system built on a metadata database. The database stores the status of queued tasks, and the scheduler uses these statuses to determine how to prioritize adding other tasks to the queue. This functionality is orchestrated by four main components:

  • Metadata Database: This database stores information about task statuses. The database performs updates using an abstraction layer implemented in SQLAlchemy. This abstraction layer cleanly separates the functionality of the remaining Airflow components from the database.

  • Scheduler: The scheduler is a process that uses the DAG definitions combined with task statuses in the metadata to decide which tasks need to be executed and their execution priorities. The scheduler typically runs as a service.

  • Executor: The Executor is a message queue process that is bound to the scheduler, used to determine the actual worker processes that execute each scheduled task. There are different types of executors, each using a specified worker process class to execute tasks. For example, LocalExecutor executes tasks using parallel processes running on the same machine as the scheduler process. Other executors, like CeleryExecutor, use worker processes that exist in a separate cluster of worker machines.

  • Workers: These are the processes that actually execute the task logic, determined by the executor in use.

The operation of Airflow is built on the metadata database that stores task statuses and workflows (i.e., DAGs). The scheduler and executor send tasks to the queue for Worker processes to execute. The WebServer runs (often on the same machine as the scheduler) and communicates with the database to present task statuses and execution logs in the Web UI. Each colored box indicates that each component can exist independently of the others, depending on the type of deployment configuration.

Installation and Deployment#

  1. Install via pip: pip install apache-airflow

  2. Modify environment variables to create a directory specified as AIRFLOW_HOME.

  3. The first execution of the airflow command will initialize and generate the airflow.cfg file in the AIRFLOW_HOME directory.

[root@server ~]# airflow
  1. Create a new MySQL database airflow and configure permissions.
  mysql > create database airflow default character set utf8;
  mysql > grant all on *.* to airflow@localhost identified by 'airflow_test';
  1. Initialize the airflow database.
[root@server ~]# airflow initdb
  1. Modify airflow.cfg.
web_server_host = IP
web_server_port = HOST
executor = LocalExecutor
sql_alchemy_conn = mysql database address
  1. Start the daemon (using supervisord to run in the background).
[root@server ~]# airflow webserver
[root@server ~]# airflow scheduler

Basic Usage#

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}
dag = DAG(
    'test',
    default_args=default_args,
    description='my first DAG',
    schedule_interval='50 * * * *')
)

# examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag,
)

t2 = BashOperator(
    task_id='sleep',
    depends_on_past=False,
    bash_command='sleep 5',
    retries=3,
    dag=dag,
)

t3 = BashOperator(
    task_id='templated',
    depends_on_past=False,
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag,
)

# Setting up Dependencies
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1

Pitfalls#

Time Zone Issues#

The system startup time and the time written to the metadata database can be configured for time zones, which can be changed by modifying the airflow configuration file airflow.cfg. However, the time displayed on the Airflow-provided web server, i.e., the web interface, defaults to UTC+0. Therefore, there can be discrepancies between the actual scheduling time and the time displayed on the web page. A solution can be found in this blog post airflow modifies Chinese time zone (modifies airflow source code).

Backfill#

Backfill generally refers to the automatic execution of tasks that were not scheduled due to server downtime or other reasons, and after restarting, Airflow automatically backfills and executes these tasks. This behavior can be manually executed:

[root@server ~]# airflow backfill sensors -s 2015-06-01 -e 2015-06-07

It can be seen as a kind of checkpoint recovery mechanism, which is actually a very good feature. However, when triggered automatically, Airflow will default to backfilling tasks from the current system time to the specified start_date (a parameter specified when configuring the DAG above), which can sometimes lead to unexpected issues.
For example:
I set up a batch of tasks to execute scheduling every half hour, and then due to server downtime for a period, after recovery, Airflow automatically triggered Backfill. The time from the current system time to our specified start_date is still quite long, resulting in a backlog of many tasks. Then, as the backfill execution starts, if I haven't finished executing the first batch of tasks in half an hour, a new batch of tasks may come in due to the half-hour interval, which could ultimately lead to issues with the server running the tasks due to task accumulation.

Concurrent Scheduling Issues#

Once, due to the scheduler process crashing, upon restarting, I found that all tasks were executed at once. Even tasks that had not run for several days in each DAG were executed, causing a sudden increase in server pressure.
Modify the global parameters in airflow.cfg to reduce concurrency:

# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances that should run simultaneously
# on this airflow installation
parallelism = 32

# The number of task instances allowed to run concurrently by the scheduler
dag_concurrency = 16

# Are DAGs paused by default at creation
dags_are_paused_at_creation = True

# When not using pools, tasks are run in the "default pool",
# whose size is guided by this config element
non_pooled_task_slot_count = 128

# The maximum number of active DAG runs per DAG
max_active_runs_per_dag = 16

Supplement#

The working mechanism of Airflow's daemon processes:

  • The scheduler will periodically poll the metadata database (Metastore) to check if the registered DAGs (Directed Acyclic Graphs, which can be understood as job flows) need to be executed. If a specific DAG needs to be executed according to its scheduling plan, the scheduler daemon will first create an instance of DagRun in the metadata database and trigger the specific tasks within the DAG (which can be understood as: a DAG contains one or more tasks). The triggering does not actually execute the tasks but pushes task messages to the message queue (i.e., broker). Each task message contains the DAG ID, task ID, and the specific function to be executed. If the task is to execute a bash script, the task message will also contain the code of the bash script.

  • Users may control the DAG on the web server, such as manually triggering a DAG to execute. When users do this, an instance of DagRun will be created in the metadata database, and the scheduler will trigger the specific tasks within the DAG in the same way as described above.

  • The worker daemon will listen to the message queue. If there are messages, it will retrieve them from the message queue. When it retrieves a task message, it will update the status of the DagRun instance in the metadata to running and attempt to execute the tasks in the DAG. If the DAG execution is successful, it will update the status of the DagRun instance to success; otherwise, it will update the status to failure.

Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.