Yige

Yige

Build

調度システム - Airflow

スケジューリングシステム - Airflow#

スケジューリングシステムの選定#

現在のオープンソーススケジューリングシステムは二つのカテゴリに分かれます:

  • 定時型スケジューリングシステム:設計の核心は定時実行、データの分割、弾力的な拡張ですが、依存関係のサポートがあまり友好的ではなく、バックエンドビジネス開発により適しています。代表的なものは XXL-JOBElastic-Job です。

  • DAG を中心としたワークフロースケジューリングシステム:タスクの依存関係を強調し、一般的なものには:

    1. Oozie:Oozie は XML 形式で開発されており、後に Hue に統合されて視覚的に設定できますが、欠点も明らかで、バージョン管理やログ収集があまり友好的ではなく、開発の柔軟性が非常に低く、スケジュール可能なタスクも少なく、定義が複雑すぎてメンテナンスコストが高いです。もちろん、最も重要なのは共用変数と共用接続情報の概念がないことです。
    2. Azkaban:Oozie とほぼ同様で、最も核心的な問題は共用変数と共用接続情報の概念がないことです。
    3. Airflow
    4. Dolphinscheduler:最近オープンソース化された Apache のインキュベーションプロジェクトで、中国人が開発し貢献しています。

Airflow 基本使用#

基本概念#

  • DAG :有向非循環グラフを意味し、Airflow では全体の作業を定義します。同じ DAG 内のすべてのタスクは同じスケジュール時間を持ちます。

  • Task:DAG 内の具体的な作業タスクであり、特定の DAG 内に存在する必要があります。タスクは DAG 内で依存関係を設定し、DAG を超えた依存は可能ですが推奨されません。DAG を超えた依存は DAG グラフの直感性を低下させ、依存管理に問題を引き起こします。

  • DAG Run:DAG がそのスケジュール時間を満たすか、外部からトリガーされると、DAG Run が生成されます。DAG によってインスタンス化されたインスタンスと理解できます。

  • Task Instance:タスクがスケジュールされて起動されると、タスクインスタンスが生成されます。タスクによってインスタンス化されたインスタンスと理解できます。

基本アーキテクチャ#

Airflow はメタデータベース上に構築されたキューシステムです。データベースはキュータスクの状態を保存し、スケジューラはこれらの状態を使用して他のタスクをキューに追加する優先順位を決定します。この機能は 4 つの主要コンポーネントによって編成されています:

  • メタデータベース:このデータベースはタスク状態に関する情報を保存します。データベースは SQLAlchemy で実装された抽象層を使用して更新を実行します。この抽象層は Airflow の残りのコンポーネント機能をデータベースからきれいに分離します。

  • スケジューラ:スケジューラは DAG 定義とメタデータ内のタスク状態を組み合わせて、どのタスクを実行する必要があるか、タスク実行の優先順位を決定するプロセスです。スケジューラは通常サービスとして実行されます。

  • 実行器:Executor はメッセージキュープロセスで、スケジューラにバインドされており、各タスクスケジュールの実際の実行を決定します。異なるタイプの実行器があり、各実行器は指定されたワーカープロセスのクラスを使用してタスクを実行します。たとえば、LocalExecutor はスケジューラプロセスと同じマシン上で実行される並行プロセスを使用してタスクを実行します。他の CeleryExecutor のような実行器は、独立したワーカー機械クラスターに存在するワーカープロセスを使用してタスクを実行します。

  • ワーカー:これらは実際にタスクロジックを実行するプロセスで、使用されている実行器によって決定されます。

Airflow の操作はタスク状態とワークフローのメタデータベース(すなわち DAG)に基づいています。スケジューラと実行器はタスクをキューに送信し、ワーカープロセスが実行します。WebServer は(しばしばスケジューラと同じマシン上で実行され)データベースと通信し、Web UI にタスク状態とタスク実行ログを表示します。各色付きボックスは、各コンポーネントが他のコンポーネントから独立して存在できることを示しており、これはデプロイ構成のタイプによって異なります。

インストールとデプロイ#

  1. pip インストール: pip install apache-airflow

  2. 環境変数を変更して新しいディレクトリをAIRFLOW_HOMEとして指定します。

  3. 初回実行時にairflowコマンドを実行すると、AIRFLOW_HOME ディレクトリに airflow.cfg ファイルが初期化生成されます。

[root@server ~]# airflow
  1. mysql で新しいデータベースairflowを作成し、権限を設定します。
  mysql > create database airflow default character set utf8;
  mysql > grant all on *.* to airflow@localhost identified by 'airflow_test';
  1. airflow データベースを初期化します。
[root@server ~]# airflow initdb
  1. airflow.cfg を変更します。
web_server_host = IP
web_server_port = HOST
executor = LocalExecutor
sql_alchemy_conn = mysqlデータベースアドレス
  1. デーモンプロセスを起動します(supervisord を利用してバックグラウンドで実行)。
[root@server ~]# airflow webserver
[root@server ~]# airflow sheduler

基本使用#

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}
dag = DAG(
    'test',
    default_args=default_args,
    description='my first DAG',
    schedule_interval='50 * * * *')
)

# オペレーターをインスタンス化して作成されたタスクの例
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag,
)

t2 = BashOperator(
    task_id='sleep',
    depends_on_past=False,
    bash_command='sleep 5',
    retries=3,
    dag=dag,
)

t3 = BashOperator(
    task_id='templated',
    depends_on_past=False,
    bash_command=templated_command,
    params={'my_param': '渡したパラメータ'},
    dag=dag,
)

# 依存関係の設定
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1

注意点#

タイムゾーンの問題#

システム起動の時間やデータベースメタデータへの書き込み時間はタイムゾーンを設定できますが、airflow が提供する webserver 上、すなわち web インターフェースで表示される時間はデフォルトで UTC+0 の時間です。したがって、実際のスケジュール時間と web ページに表示される時間が異なることがあります。解決策は、以下のブログを参考にできますairflow 中国のタイムゾーンを変更する(airflow ソースコードを変更)

Backfill#

Backfill とは、サーバーのダウンやその他の理由で一部のタスクがスケジュールされなかった場合に、再起動後に airflow が自動的にこれらのタスクを補完して実行することを指します。この動作を手動で実行することもできます:

[root@server ~]# airflow backfill sensors -s 2015-06-01 -e 2015-06-07

これは、類似のチェックポイント復元メカニズムと見なすことができ、実際には非常に良い機能ですが、自動トリガー時に、airflowはシステムの現在の時間から指定した start_date(上記の dag 設定時に指定したパラメータ)までのタスクを自動的に補完して実行しようとするため、予期しない問題が発生することがあります。
例を挙げると:
私はこのタスクのセットを設定しました。30 分ごとにスケジュールを実行し、サーバーがしばらくダウンした後、airflow が自動的に Backfill をトリガーしました。システムの現在の時間から指定した start_date までの時間がまだ長く、多くのタスクが積み重なり、最初のタスクの補完実行が終わる前に新しいタスクが追加される可能性があり、最終的にタスクの積み重ねによりタスクを実行するサーバーに問題が発生する可能性があります。

同時スケジューリングの問題#

ある時、scheduler プロセスがクラッシュし、再起動するとすべてのタスクが一度に実行されることがわかりました。さらに、各 dag で数日間実行されなかったタスクが直接実行され、サーバーに大きな負荷がかかりました。
グローバルファイル airflow.cfg のパラメータを変更して同時実行を減らす:

# Executorへの設定としての並列性の量。これにより、
# このairflowインストールで同時に実行されるべきタスクインスタンスの最大数が定義されます。
parallelism = 32

# スケジューラによって同時に実行されることが許可されているタスクインスタンスの数
dag_concurrency = 16

# DAGは作成時にデフォルトで一時停止されますか
dags_are_paused_at_creation = True

# プールを使用しない場合、タスクは「デフォルトプール」で実行されます。
# そのサイズはこの設定要素によってガイドされます。
non_pooled_task_slot_count = 128

# 各DAGごとのアクティブなDAG実行の最大数
max_active_runs_per_dag = 16

補足#

airflow のデーモンプロセスの動作メカニズム:

  • スケジューラは定期的にメタデータベース(Metastore)に登録された DAG(有向非循環グラフ、作業フローと理解できます)を実行する必要があるかどうかをポーリングします。特定の DAG がそのスケジュールに基づいて実行する必要がある場合、スケジューラデーモンはまずメタデータベースに DAG Run のインスタンスを作成し、DAG 内部の具体的なタスク(タスクは DAG 内に 1 つ以上含まれると理解できます)をトリガーします。トリガーは実際にタスクを実行するのではなく、タスクメッセージをメッセージキュー(ブローカー)にプッシュします。各タスクメッセージには、このタスクの DAG ID、タスク ID、および実行する必要がある関数が含まれます。タスクが bash スクリプトを実行する場合、タスクメッセージには bash スクリプトのコードも含まれます。

  • ユーザーは webserver 上で DAG を制御することができ、手動で DAG をトリガーして実行することができます。ユーザーがこのように行うと、メタデータベースに DAG Run のインスタンスが作成され、スケジューラは #1 と同様の方法で DAG 内の具体的なタスクをトリガーします。

  • ワーカーデーモンはメッセージキューをリッスンし、メッセージがあればメッセージキューからメッセージを取得します。タスクメッセージを取得すると、メタデータ内の DAG Run インスタンスの状態を実行中に更新し、DAG 内のタスクを実行しようとします。DAG の実行が成功した場合、DAG Run インスタンスの状態を成功に更新し、そうでない場合は失敗に更新します。

参考リンク#

読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。