المقدمة
كـ Data Engineers، مش بس مهمتنا أننا نطور أو نحافظ على الـ Data Pipelines من أي مشاكل ممكن تحصل فيها، لأ ده كمان من ضمن مهامنا الأساسية أننا نقدر نخليها تتنفذ بشكل دوري دون أي تدخل مننا.
ومن أشهر الـ Tools المستخدمة لفكرة الـ Automation دي هي Apache Airflow ، وهي أداة مفتوحة المصدر "Open-Source Tool" نقدر من خلالها أننا نحدد وقت معين لتنفيذ الـ Pipelines، ونراقب حالتها ونفهم سبب المشاكل في حالة حدوثها، وكمان نقدر بسهولة نحدد الاعتمادية أو الـ Dependencies بتاعة أجزاء الـ Pipelines المختلفة على بعضها البعض.
زي أن يكون فيه جزء من الـ Pipeline ماينفعش يتنفذ غير لما جزء أو مجموعة أجزاء تانية تتنفذ الأول ، وكمان من خلاله نقدر نخلي جزء من الـ Pipeline يتنفذ أول ما حدث معين يحصل ، وكتير من العمليات اللي ممكن ننفذها من خلال Apache Airflow.
Apache Airflow
الـ Apache Airflow بيقوم على مبدأ مهم جدًا وهو الـ DAGs أو الـ Direct Acyclic Graphs.
وهي عبارة عن طريقة معينة لتنظيم أجزاء الـ pipeline الواحد واللي بتُسمَى Tasks ، فكل Task بينفذ مهمة معينة زي أنه يقرأ من database، يعمل data cleaning، ينفذ spark job، أو يكتب file على HDFS.
كل Task من دول بيشارك في الـ pipeline علشان في الآخر نحصل على النتايج اللي محتاجينها، وكل Task من دول ممكن جدًا يبقى شرط من شروط تنفيذه هو تنفيذ Task تاني معين قبله، وتنفيذه بنجاح كمان.
وممكن برضه يبقى فيه Task منهم مستنية معلومة من Task تانية قبلها علشان تستخدمها في العملية اللي هتعملها على الـ data.
Sharing Information Between Tasks
فكرة مشاركة المعلومات ما بين الـ Tasks وبعضها فكرة مهمة جدًا لإن المعلومات دي ممكن تبقى Lookup data ناتجة عن Task هحتاجها في Task بعدها، وممكن يبقى Timestamp بيعبر عن الوقت اللي اتنفذت فيه الـ Task دي ومهم باقي الـ Tasks تبقى واعية بالقيمة دي.
ممكن برضه تبقى parameters زي مكان file معين أو اسمه ومحتاجاه علشان أعمل عليه عمليات تانية في Tasks قادمة، وأمثلة وuse cases كتير جدًا ممكن نحتاج فيها نشارك معلومات زي دي.
فـ Apache Airflow بيقدم مجموعة من الحلول لمشكلة زي دي ومنها الـ Variables والـ XCOMs ، والإتنين هما طرق علشان نخزن المعلومات اللي محتاجين ننشرها من Task في مكانٍ ما، وتقدر أي Task تانية تستخدم المعلومة دي من خلال أنها تقراها فقط من المكان ده.
وفي المقال ده هنتكلم عن الXCOMs، هنعرف معناها، بتخزن المعلومات دي ازاي ؟ وازاي نقدر نحصل على المعلومات دي، وفي النهاية هنوضح بمثال كامل ازاي مشاركة المعلومات بتتم بين الـ Tasks وبعضها.
XCOMs
الـ XCOM هي اختصار لـ cross-communication، وهو بالظبط المبدأ اللي بندور عليه علشان يبقى فيه طريقة تواصل بين الـ Tasks وبعضها ، فأي Task يقدر بإستخدام الـ XCOM أنه يشارك بمعلومة، أو يقرأ معلومة كان بالفعل فيه Task تاني قام بمشاركتها قبل كده.
المعلومات اللي الـ Tasks بتشاركها بتكون متخزنة في الـ Metadata Database الخاصة بـ Airflow، فَبالتالي أي Task في الـ DAG أو حتى أي DAG تاني يقدر يقرأ المعلومات دي.
طيب ازاي نقدر نشارك أو نقرأ المعلومات دي ؟ عن طريق 2 Functions:
- مشاركة المعلومة عن طريق
()xcom_push
واللي بتاخد 2 parameters أساسية وهي الـ key والـ value ، والـ key بيكون قيمة unique لكل value بيتم مشاركتها عن طريق الـ XCOM، والـ value هي القيمة اللي محتاجين نشاركها نفسها.
xcom_push(key="random_number", value=number)
- قراءة المعلومة عن طريق
()xcom_pull
واللي برضه بتاخد 2 parameters أساسية وهي الـ key المرتبط بالـ value اللي محتاجي نقراها، والـ Task ID بتاع الـ Task اللي شارك المعلومة دي.
xcom_pull(key="random_number", task_ids="generate_random_number")
XCOMs Practical Example
دلوقتي هنشوف مثال كامل لإستخدام الـ XCOMs في Airflow.
في المثال عندنا 2 Tasks، واحدة فيهم بتـ publish معلومة والتانية بتـ access المعلومة دي ، والـ 2 Tasks عبارة عن Python Operators ، وده هيكون شكل الDAG بتاعنا:
with DAG(
dag_id='xcom_example_dag',
schedule_interval=None,
start_date=datetime(2023, 10, 1),
catchup=False,
) as dag:
# Task to push value
push_task = PythonOperator(
task_id='push_task',
python_callable=push_function,
provide_context=True,
)
# Task to pull value
pull_task = PythonOperator(
task_id='pull_task',
python_callable=pull_function,
provide_context=True,
)
والـ Functions اللي من خلالها هنــ push ونــ pull المعلومة اللي محتاجين نشاركها هتكون بالشكل ده:
def push_function(**kwargs):
value_to_pass = "Hello, XCom!"
# Pushing value to XCom
kwargs['ti'].xcom_push(key='greeting', value=value_to_pass)
def pull_function(**kwargs):
# Pulling value from XCom
value = kwargs['ti'].xcom_pull(key='greeting', task_ids='push_task')
print(f"Received value from XCom: {value}")
لو لاحظنا أنه في الـ push function حطينا الkey اللي بيعرف المعلومة دي، والمعلومة نفسها كـ value. وفي الـ pull function حطينا الـ key علشان نقدر نقرأ المعلومة وكمان معاها الـ Task ID المرتبط بالـ Task اللي شارك المعلومة دي وهو push_task
.
وكمان نقدر نستخدم الـ XCOMs في Operators تانية زي Bash Operator في صورة Template زي ما هو واضح في الكود:
pull_task = BashOperator(
task_id='pull_task',
bash_command='echo "Received value from XCom: {{ task_instance.xcom_pull(task_ids=\'push_task\', key=\'greeting\') }}"',
)
في الختام
استخدامات الـ XCOMs في Airflow كتير زي ما ذكرنا، وكمان مش مقتصرة على نوع معين من الـ Operators هي ممكن تستخدم في Python Operator، Bash Operator، وكمان File Sensor وغيرهم كتير من الـ Operators.
وبكده نبقى فهمنا بشكل مبسط أهمية الـ XCOMs في Airflow، ازاي نقدر نستخدمها سواء أننا نــ publish أو نـ access معلومات من الـ Tasks المختلفة في الـ DAG بتاعنا.
Discussion