Apache Airflow XCOMs - Sharing Information Between Tasks

الـ Apache Airflow بيقوم على مبدأ مهم جدًا وهو الـ DAGs أو الـ Direct Acyclic Graphs ، وعشان دورنا كـ Data Engineers اننا نطور ونحافظ على الـ Data Pipelines ونخليها Automated محتاجين نعرف ازاي ممكن نشارك المعلومات بين الـ Tasks وبعضهم بشكل فعال.
Apache Airflow XCOMs - Sharing Information Between Tasks
Apache Airflow XCOMs - Sharing Information Between Tasks

في هذه الصفحة

المقدمة

كـ Data Engineers، مش بس مهمتنا أننا نطور أو نحافظ على الـ Data Pipelines من أي مشاكل ممكن تحصل فيها، لأ ده كمان من ضمن مهامنا الأساسية أننا نقدر نخليها تتنفذ بشكل دوري دون أي تدخل مننا.

ومن أشهر الـ Tools المستخدمة لفكرة الـ Automation دي هي Apache Airflow ، وهي أداة مفتوحة المصدر "Open-Source Tool" نقدر من خلالها أننا نحدد وقت معين لتنفيذ الـ Pipelines، ونراقب حالتها ونفهم سبب المشاكل في حالة حدوثها، وكمان نقدر بسهولة نحدد الاعتمادية أو الـ Dependencies بتاعة أجزاء الـ Pipelines المختلفة على بعضها البعض.

زي أن يكون فيه جزء من الـ Pipeline ماينفعش يتنفذ غير لما جزء أو مجموعة أجزاء تانية تتنفذ الأول ، وكمان من خلاله نقدر نخلي جزء من الـ Pipeline يتنفذ أول ما حدث معين يحصل ، وكتير من العمليات اللي ممكن ننفذها من خلال Apache Airflow.


Apache Airflow

الـ Apache Airflow بيقوم على مبدأ مهم جدًا وهو الـ DAGs أو الـ Direct Acyclic Graphs.

وهي عبارة عن طريقة معينة لتنظيم أجزاء الـ pipeline الواحد واللي بتُسمَى Tasks ، فكل Task بينفذ مهمة معينة زي أنه يقرأ من database، يعمل data cleaning، ينفذ spark job، أو يكتب file على HDFS.

كل Task من دول بيشارك في الـ pipeline علشان في الآخر نحصل على النتايج اللي محتاجينها، وكل Task من دول ممكن جدًا يبقى شرط من شروط تنفيذه هو تنفيذ Task تاني معين قبله، وتنفيذه بنجاح كمان.

وممكن برضه يبقى فيه Task منهم مستنية معلومة من Task تانية قبلها علشان تستخدمها في العملية اللي هتعملها على الـ data.


Sharing Information Between Tasks

فكرة مشاركة المعلومات ما بين الـ Tasks وبعضها فكرة مهمة جدًا لإن المعلومات دي ممكن تبقى Lookup data ناتجة عن Task هحتاجها في Task بعدها، وممكن يبقى Timestamp بيعبر عن الوقت اللي اتنفذت فيه الـ Task دي ومهم باقي الـ Tasks تبقى واعية بالقيمة دي.

ممكن برضه تبقى parameters زي مكان file معين أو اسمه ومحتاجاه علشان أعمل عليه عمليات تانية في Tasks قادمة، وأمثلة وuse cases كتير جدًا ممكن نحتاج فيها نشارك معلومات زي دي.


فـ Apache Airflow بيقدم مجموعة من الحلول لمشكلة زي دي ومنها الـ Variables والـ XCOMs ، والإتنين هما طرق علشان نخزن المعلومات اللي محتاجين ننشرها من Task في مكانٍ ما، وتقدر أي Task تانية تستخدم المعلومة دي من خلال أنها تقراها فقط من المكان ده.

وفي المقال ده هنتكلم عن الXCOMs، هنعرف معناها، بتخزن المعلومات دي ازاي ؟ وازاي نقدر نحصل على المعلومات دي، وفي النهاية هنوضح بمثال كامل ازاي مشاركة المعلومات بتتم بين الـ Tasks وبعضها.


XCOMs

الـ XCOM هي اختصار لـ cross-communication، وهو بالظبط المبدأ اللي بندور عليه علشان يبقى فيه طريقة تواصل بين الـ Tasks وبعضها ، فأي Task يقدر بإستخدام الـ XCOM أنه يشارك بمعلومة، أو يقرأ معلومة كان بالفعل فيه Task تاني قام بمشاركتها قبل كده.

المعلومات اللي الـ Tasks بتشاركها بتكون متخزنة في الـ Metadata Database الخاصة بـ Airflow، فَبالتالي أي Task في الـ DAG أو حتى أي DAG تاني يقدر يقرأ المعلومات دي.

طيب ازاي نقدر نشارك أو نقرأ المعلومات دي ؟ عن طريق 2 Functions:

  1. مشاركة المعلومة عن طريق ()xcom_push واللي بتاخد 2 parameters أساسية وهي الـ key والـ value ، والـ key بيكون قيمة unique لكل value بيتم مشاركتها عن طريق الـ XCOM، والـ value هي القيمة اللي محتاجين نشاركها نفسها.
xcom_push(key="random_number", value=number)
  1. قراءة المعلومة عن طريق ()xcom_pull واللي برضه بتاخد 2 parameters أساسية وهي الـ key المرتبط بالـ value اللي محتاجي نقراها، والـ Task ID بتاع الـ Task اللي شارك المعلومة دي.
xcom_pull(key="random_number", task_ids="generate_random_number")

XCOMs Practical Example

دلوقتي هنشوف مثال كامل لإستخدام الـ XCOMs في Airflow.

في المثال عندنا 2 Tasks، واحدة فيهم بتـ publish معلومة والتانية بتـ access المعلومة دي ، والـ 2 Tasks عبارة عن Python Operators ، وده هيكون شكل الDAG بتاعنا:


تقدروا دلوقتي تشتركوا في النشرة الأسبوعية لاقرأ-تِك بشكل مجاني تمامًا عشان يجيلكوا كل جديد بشكل أسبوعي فيما يخص مواضيع متنوعة وبشروحات بسيطة وسهلة وبجودة عالية 🚀

النشرة هيكون ليها شكل جديد ومختلف عن شكلها القديم وهنحاول انها تكون مميزة ومختلفة وخليط بين المحتوى الأساسي اللي بينزل ومفاجآت تانية كتير 🎉

Eqraatech Newsletter | Eqraatech - اقرأ-تِك | Substack
محتوى تقني متميز في مختلف مجالات هندسة البرمجيات باللغة العربية عن طريق تبسيط المفاهيم البرمجية المعقدة بشكل سلس وباستخدام صور توضيحية مذهلة. Click to read Eqraatech Newsletter, a Substack publication with hundreds of subscribers.

بفضل الله قمنا بإطلاق قناة اقرأ-تِك على التليجرام مجانًا للجميع 🚀

آملين بده اننا نفتح باب تاني لتحقيق رؤيتنا نحو إثراء المحتوى التقني باللغة العربية ، ومساعدة لكل متابعينا في انهم يوصلوا لجميع أخبار اقرأ-تِك من حيث المقالات ومحتوى ورقة وقلم والنشرة الأسبوعية وكل جديد بطريقة سريعة وسهلة

مستنينكوا تنورونا , وده رابط القناة 👇

https://t.me/eqraatechcom


with DAG(
    dag_id='xcom_example_dag',
    schedule_interval=None,
    start_date=datetime(2023, 10, 1),
    catchup=False,
) as dag:

    # Task to push value
    push_task = PythonOperator(
        task_id='push_task',
        python_callable=push_function,
        provide_context=True,
    )

    # Task to pull value
    pull_task = PythonOperator(
        task_id='pull_task',
        python_callable=pull_function,
        provide_context=True,
    )

والـ Functions اللي من خلالها هنــ push ونــ pull المعلومة اللي محتاجين نشاركها هتكون بالشكل ده:

def push_function(**kwargs):
    value_to_pass = "Hello, XCom!"
    # Pushing value to XCom
    kwargs['ti'].xcom_push(key='greeting', value=value_to_pass)

def pull_function(**kwargs):
    # Pulling value from XCom
    value = kwargs['ti'].xcom_pull(key='greeting', task_ids='push_task')
    print(f"Received value from XCom: {value}")

لو لاحظنا أنه في الـ push function حطينا الkey اللي بيعرف المعلومة دي، والمعلومة نفسها كـ value. وفي الـ pull function حطينا الـ key علشان نقدر نقرأ المعلومة وكمان معاها الـ Task ID المرتبط بالـ Task اللي شارك المعلومة دي وهو push_task.

وكمان نقدر نستخدم الـ XCOMs في Operators تانية زي Bash Operator في صورة Template زي ما هو واضح في الكود:

pull_task = BashOperator(
    task_id='pull_task',
    bash_command='echo "Received value from XCom: {{ task_instance.xcom_pull(task_ids=\'push_task\', key=\'greeting\') }}"',
)

في الختام

استخدامات الـ XCOMs في Airflow كتير زي ما ذكرنا، وكمان مش مقتصرة على نوع معين من الـ Operators هي ممكن تستخدم في Python Operator، Bash Operator، وكمان File Sensor وغيرهم كتير من الـ Operators.

وبكده نبقى فهمنا بشكل مبسط أهمية الـ XCOMs في Airflow، ازاي نقدر نستخدمها سواء أننا نــ publish أو نـ access معلومات من الـ Tasks المختلفة في الـ DAG بتاعنا.

اشترك الآن بنشرة اقرأ-تِك الأسبوعية

لا تدع أي شيء يفوتك. واحصل على أحدث المقالات المميزة مباشرة إلى بريدك الإلكتروني وبشكل مجاني!