Airflow notes

- 13 mins

Airflow là công cụ mã nguồn mở sử dụng để lập lịch, quản lí và giám sát quy trình xử lý dữ liệu.

Airflow glossary

DAG

Direct acyclic graph (DAG) là một đồ thị có hướng, không chu trình, biểu diễn tất cả các bước xử lý dữ liệu. Mỗi luồng xử lý được khởi tạo bên trong một file DAG, bên trong file này, ta định nghĩa một quy trình. Quy trình này được hiểu là một đồ thị với các node (đỉnh) tương ứng một task chịu trách nhiệm thực hiện một nhiệm vụ riêng. Các task có chạy tuần tự hoặc song song theo một thứ tự được cấu hình sẵn.

Task

Task là đơn vị cơ bản trong airflow dùng để định nghĩa một công việc cần thực hiện bên trong DAG. Các task được sắp xếp theo thứ tự thực hiện bên trong DAG, upstream/downstream dùng để chỉ các task trước và sau của một task. Có 3 loại task cơ bản bên trong airflow:

Task, Operator, Sensor là các lớp con của một lớp gọi là BaseOperator, các khái niệm task và operator có thể dùng để thay thế cho nhau.

Để khai báo mối quan hệ giữa các task bên trong một file DAG, có 2 cách để cấu hình:

Control flow

Mặc định, các task sẽ được chạy khi các task phụ thuộc nó chạy thành công. Tuy nhiên, airflow cung cấp thêm các method để điều khiển quá trình task đang chạy:

Branching

Branching là tính năng phân nhánh các task, quy trình không phải đi theo một luồng cố định mà tùy điều kiện sẽ thực hiện các nhánh task khác nhau bên trong một DAG. Branching sử dụng decorator @task.branch đánh dấu trong file DAG để thông báo cho airflow.

@task.branch(task_id="branch_task")
def branch_func(ti=None):
    xcom_value = int(ti.xcom_pull(task_ids="start_task"))
    if xcom_value >= 5:
        return "continue_task"
    elif xcom_value >= 3:
        return "stop_task"
    else:
        return None


start_op = BashOperator(
    task_id="start_task",
    bash_command="echo 5",
    do_xcom_push=True,
    dag=dag,
)

branch_op = branch_func()

continue_op = EmptyOperator(task_id="continue_task", dag=dag)
stop_op = EmptyOperator(task_id="stop_task", dag=dag)

start_op >> branch_op >> [continue_op, stop_op]

Trigger rules

Airflow sẽ chờ tất cả các task phía trước hoàn thành để tiếp tục chạy task hiện tại. Tuy nhiên, airflow cung cấp thêm trigger dùng để điều khiển luồng chạy của một task trong DAG thông qua tham số trigger_rule nhận một trong các giá trị:

Setup and Teardown

Setup and teardown là tính năng cung cấp tài nguyên để chạy một task, sau đó thu hồi khi task chạy xong. Một số đặc điểm của tính năng này:

Để cấu hình task setups and teardown, ta sử dụng method as_setupas_teardown. Giả sử ta có 3 task create_cluster, run_query, delete_cluster, chạy theo thứ tự tạo cluster cấp tài nguyên, chạy query lấy kết quả và xóa cluster vừa tạo khi chạy xong. DAG được cấu hình như sau:

create_cluster.as_setup() >> run_query >> delete_cluster.as_teardown()

hoặc

create_cluster >> run_query >> delete_cluster.as_teardown(setups=create_cluster)

Trường hợp có nhiều task cần chạy trước khi teardown, dùng thêm context manager trong python như sau:

with delete_cluster().as_teardown(setups=create_cluster()):
    [RunQueryOne(), RunQueryTwo()] >> DoSomeOtherStuff()
    WorkOne() >> [do_this_stuff(), do_other_stuff()]

Ngoài ra có thể chạy nhiều cặp ràng buộc setup and teardown trong DAG:

with TaskGroup("setup") as tg_s:
    create_cluster = create_cluster()
    create_bucket = create_bucket()
run_query = run_query()
with TaskGroup("teardown") as tg_t:
    delete_cluster = delete_cluster().as_teardown(setups=create_cluster)
    delete_bucket = delete_bucket().as_teardown(setups=create_bucket)
tg_s >> run_query >> tg_t

Latest only

Latest only là một trường hợp đặc biệt của branching. Giả sử trong một DAG có 3 task được cấu hình như sau:

task_1 >> task_2 >> task_3

Trường hợp khi chạy lại DAG của những ngày trước, không phải ngày hiện tại, bạn muốn chỉ thực hiện task_1 mà không cần chạy task_2, task_3 thì có thể sử dụng tính năng này.

import datetime

import pendulum

from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.latest_only import LatestOnlyOperator
from airflow.utils.trigger_rule import TriggerRule

with DAG(
    dag_id="latest_only_with_trigger",
    schedule=datetime.timedelta(hours=4),
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example3"],
) as dag:
    latest_only = LatestOnlyOperator(task_id="latest_only")
    task1 = EmptyOperator(task_id="task1")
    task2 = EmptyOperator(task_id="task2")
    task3 = EmptyOperator(task_id="task3")
    task4 = EmptyOperator(task_id="task4", trigger_rule=TriggerRule.ALL_DONE)

    latest_only >> task1 >> [task3, task4]
    task2 >> [task3, task4]

Trong DAG trên:

Depends On Past

Depends on past là điều kiện thực hiện một task dựa vào điều kiện của task này ở lần chạy trước thành công hay không. Trước hợp task được lập lịch chạy và dừng độc lập mỗi lần thì tính năng này không khả dụng.

Vòng đời task

Một task trong DAG có thể trải qua các trạng thái sau:

Airflow task states

Luồng cơ bản của một task sẽ đi từ các trạng thái: none -> scheduled -> queued -> running -> success

Operator

Operator là một template khai báo sẵn cấu hình của một task, operator được khai báo bên trong file DAG. Airflow cung cấp nhiều loại operator tích hợp sẵn để thực hiện các task với yêu cầu khác nhau và giao tiếp với một số hệ thống khác như HDFS, MySQL, AWS. Một số loại operator thông dụng như bash operator, python operator, email operator, decorator @task

BashOperator

# The start of the data interval as YYYY-MM-DD
date = ""
t = BashOperator(
    task_id="test_env",
    bash_command="/tmp/test.sh ",
    dag=dag,
    env={"DATA_INTERVAL_START": date},
)

PythonOperator

class MyDataReader:
    template_fields: Sequence[str] = ("path",)

    def __init__(self, my_path):
        self.path = my_path

    # [additional code here...]


t = PythonOperator(
    task_id="transform_data",
    python_callable=transform_data,
    op_args=[MyDataReader("/tmp//my_file")],
    dag=dag,
)

EmailOperator

Decorator @task

dag = DAG(
    dag_id="example_template_as_python_object",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    render_template_as_native_obj=True,
)


@task(task_id="extract")
def extract():
    data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
    return json.loads(data_string)


@task(task_id="transform")
def transform(order_data):
    print(type(order_data))
    for value in order_data.values():
        total_order_value += value
    return {"total_order_value": total_order_value}


extract_task = extract()

transform_task = PythonOperator(
    task_id="transform",
    op_kwargs={"order_data": ""},
    python_callable=transform,
)

extract_task >> transform_task

Sensor

Sensor là một loại operator đặc biệt, thực hiện một task khi có một sự kiện nào đó xảy ra. Sự kiện này dựa vào một thời gian cụ thể, một file được tạo hoặc một event từ bên ngoài. Có 2 trạng thái của sensor:

comments powered by Disqus
rss facebook twitter github gitlab youtube mail spotify lastfm instagram linkedin google google-plus pinterest medium vimeo stackoverflow reddit quora quora