Building and Managing Production-Ready Apache Airflow: From Setup to Troubleshooting

Production Ready Apache Airflow

Overview

Apache Airflow is an open-source platform designed to run any sort of workflow using Python. Its flexibility lets customers define pipelines through Python scripts, utilizing loops, bash instructions, and external modules such as pandas, sklearn, and cloud carrier libraries (GCP, AWS).

Many corporations agree with Airflow for its reliability:

Pinterest: Overcame overall performance and scalability issues, lowering maintenance costs.

GoDaddy: Supports batch analytics and records teams with an orchestration device and pre-built operators for ETL pipelines.

DXC Technology: Implemented Airflow to manage an undertaking with massive facts storage desires, presenting a stable orchestration engine.

These examples spotlight Airflow’s ability to cope with complicated facts processing demanding situations through the right deployment.

Apache Airflow Use Cases

Key Use Cases:

  • ETL Pipelines: Extracting data from more than one assets, walking Spark jobs, and appearing records modifications.
  • Machine Learning Models: Training and deploying models.
  • Report Generation: Automating document creation.
  • Backups and DevOps Operations: Automating backup methods and comparable tasks.

Additionally, Airflow supports advert hoc workloads and may be manually induced via REST API, demonstrating its flexibility and programmability using Python.

Core Concepts of Apache Airflow 2.0

Airflow DAG (Directed Acyclic Graph)

  • Definition: Workflows in Airflow are defined the use of DAGs, which might be Python files.
  • Unique Identification: Each DAG is diagnosed by a unique dag_id.
  • Scheduling:
    • schedule_interval: Defines while the DAG need to run (e.g. timedelta (days=2), cron expressions, or None for manual/external triggers).
      • Start_date: The date from which the DAG starts running (the usage of days_ago is not unusual).
from airflow.models import DAG
from airflow.utils.dates import days_ago
with DAG(
   "etl_sales_daily",
   start_date=days_ago(1),
   schedule_interval=None,
) as dag:
   ...

Adding Tasks to a DAG

  • Operators: Tasks are defined the use of operators and have specific task_ids in the DAG.
  • Dependencies:
    • Upstream duties: Tasks performed before the present day task.
    • Downstream duties: Tasks performed after the cutting-edge challenge.
  • Example:
from airflow.operators.dummy_operator import DummyOperator
task_a = DummyOperator(task_id="task_a")
task_b = DummyOperator(task_id="task_b")
task_c = DummyOperator(task_id="task_c")
task_d = DummyOperator(task_id="task_d")
task_a >> [task_b, task_c]
task_c >> task_d

Graphic representation: A visual representation of the DAG showing a dependent task.

Trigger rules

all_success: All upstream operations must succeed. one_success: At least one top job must succeed. none_failed: No upstream task can fail (successful or just skipped).

Airflow can simplify the interpretation of complex business processes, through operator, dependency, and trigger rules.

Airflow Operators, Sensors, and Hooks

  • Event operators: Perform actions, e.g., BashOperator, PythonOperator, TriggerDagRunOperator.
  • Transfer: Data transfer, e.g. GCSToGCSOperator.
  • Sensors: Wait for the process to complete, get property from BaseSensorOperator and execute the poke method.

Built-in with custom operators

  • Airstream has built-in operators like BashOperator.
  • New operators can be installed with provider packages.
  • Connectivity to external services (GCP, AWS, Apache Spark) is easy.

Hooks

  • Purpose: To facilitate communication with external services, manage integrity, and handle errors.
  • Use: Create reusable items for employees.
  • For example:
    • GCSHook: create_bucket and other methods.
    • GCSCreateBucketOperator: Uses GCSHook to create the bucket and add additional logic.

Code Example for Hooks and Operators

from airflow.hooks.base_hook import BaseHook
class GCSHook(BaseHook):
   def __init__(self, *args, **kwargs):
       super().__init__()
  
   def create_bucket(self, bucket_name: str):
       # Logic for creating GCS buckets

from airflow.models.baseoperator import BaseOperator
class GCSCreateBucketOperator(BaseOperator):
   def __init__(self, *, bucket_name: str, **kwargs):
       super().__init__(**kwargs)
       self.bucket_name = bucket_name
   
   def execute(self, context):
       hook = GCSHook()
       hook.create_bucket(self.bucket_name)

XCom (advanced communication) .

  • Purpose: Allow information to be passed between services in the DAG.
  • Storage: The metadatabase uses tables as key-value storage.
  • The keys are: dag_id, task_id, execution_date, and the key (default return_value).
  • Value: The JSON must be serializable and under 48KB.
  • Use: Transfer metadata, not big data, between services.
  • Custom XCom Backend: Automate processing and deserialization for complex data types.

Example Test Case

Task A sends data to storage and writes the URI to XCom. Task B retrieves the URI from XCom and accesses the data from storage.

Tips and tricks

Reuse hooks and reduce logic duplication. Demonstrate XCom backends intended for robust data serialization and deserialization.

Deploying Airflow

  • Components:
    • Scheduling: The DAG maintains files, database state, and workflow. You can create multiple schedulers for high availability.
    • Webserver: Provides a web interface for monitoring and managing DAGs.
    • Worker: Executes tasks set by the scheduler, usually using an executor similar to Celery. Many workers can run on separate VMs or Kubernetes pods.
  • Deployment: All objects can be installed with the airflow command and require access to the Airflow metadatabase and similar DAG files.

Executors Defines how and where Airflow tasks are executed. Types of Executors:

  • LocalExecutor:
    • Executes tasks in separate processes on a single machine.
    • Suitable for small deployments.
    • Non-distributed but production-ready.
  • CeleryExecutor:
    • Uses Celery queue system (Redis or RabbitMQ).
    • Supports multiple workers on different machines.
    • Allows task routing to resourceful workers.
    • Most popular production executor.
  • KubernetesExecutor:
    • Requires a Kubernetes cluster.
    • Spawns a new pod for each task, suitable for long-running tasks.
    • Can be problematic for short-running tasks due to overhead.
  • CeleryKubernetesExecutor:
    • Combines CeleryExecutor and KubernetesExecutor.
    • Uses Kubernetes queue for specific tasks, defaulting to Celery workers for others.
    • Enables horizontal auto-scaling of worker pods.
  • DebugExecutor:
    • Used for local debugging.
    • Executes all tasks in a single process.
    • Simplifies debugging from an IDE.

Docker Image and Helm Chart for Airflow

Docker Image

  • Starting Point: Official Apache Airflow docker image allows local setup and experimentation.
  • Customization:
    • Use the official Airflow image as a base in your Dockerfile.
    • Use the Airflow Breeze tool to build optimized custom images.
  • Production Deployment: Custom Docker images can be deployed using Helm charts.

Helm Chart

  • Purpose: Simplifies deployment of Airflow on Kubernetes.
  • Features:
    • Deploys all Airflow components.
    • Supports horizontal auto-scaling of workers using KEDA.
    • Recommended for Kubernetes deployments.

Airflow DAG Distribution

  • Requirements: Both scheduler and worker need access to the DAG files.
    • Scheduler: Needs DAG files to create a database representation and schedule them.
    • Worker: Needs DAG files to execute tasks.
  • Challenges: Manually uploading DAGs is inconvenient and inefficient.

DAG Distribution Process

  • Repository: Store DAGs in a repository for version control and collaboration.
  • CI/CD: Use CI/CD systems to automate uploading DAGs to the deployment.

Methods of Distribution

  • Shared File System:
    • Example: AWS Elastic File System.
    • Suitable for small deployments.
    • Known to impact performance negatively.
  • Component Volumes:
    • Tools: git-sync, gcs rsync, gcs fuse.
    • Synchronizes local repositories with remote ones.
  • Custom Docker Image:
    • Bake DAGs into a custom Docker image.
    • Requires rebuilding the image for every change or on schedule.
    • Must easily update the image for each Airflow component.

Please find the github repo link for production ready airflow setup – https://github.com/apache/airflow

With the help of github Airflow actions by setting github environment variables and secrets , we can install the production ready airflow.

Troubleshooting Apache Airflow: Stuck Tasks and DAG Failures

Among the most common problems are tasks getting stuck in the “queued” or “scheduled” states and DAGs failing unexpectedly.

1. Tasks Stuck in the “Queued” or “Scheduled” State

When a task is stuck in the “queued” or “scheduled” state, it means that the task has been identified for execution but hasn’t yet started. This can happen due to several reasons:

Possible Causes:
  1. Insufficient Resources:
    • Description: Airflow may not have enough resources (CPU, memory, etc.) to run the tasks.
    • Solution: Monitor your resource usage and scale your environment appropriately. If using Kubernetes, consider adding more nodes or increasing the resource allocation for Airflow components.
  2. Misconfiguration of Executors:
    • Description: The executor (Celery, Kubernetes, Local, etc.) might be misconfigured, preventing tasks from being picked up.
    • Solution: Check the executor configuration in your airflow.cfg file. Ensure that the executor is correctly set and that workers are properly configured to pick up tasks.
  3. Database Issues:
    • Description: Issues with the Airflow metadata database can prevent tasks from transitioning out of the queued state.
    • Solution: Ensure your database is running smoothly. Check for any deadlocks or slow queries. Regular maintenance, such as vacuuming and analyzing tables, can help keep the database performant.
  4. Scheduler Problems:
    • Description: The Airflow scheduler might not be processing tasks correctly.
    • Solution: Restart the scheduler and check its logs for any errors or warnings. Increasing the number of scheduler instances can also help with load balancing. In Airflow 2.0 and above, running multiple schedulers can enhance reliability and availability.
  5. Configuration Limits:
    • Description: Default settings for task concurrency and parallelism might be too low, causing bottlenecks.
    • Solution: Increase the default task concurrency and parallelism in your airflow.cfg file
[core]
parallelism = 256

[scheduler]
max_active_tasks_per_dag = 256
Steps to Diagnose and Resolve:
  1. Check Airflow Logs:
    • Inspect the logs for the scheduler and workers to identify any errors or warnings.
    • Look for specific messages that indicate resource shortages, misconfigurations, or connectivity issues.
  2. Monitor Resource Usage:
    • Use monitoring tools to track CPU, memory, and network usage on your Airflow nodes.
    • Scale your resources if you notice high usage or bottlenecks.
  3. Verify Configuration:
    • Review the airflow.cfg file and ensure that executors and other settings are properly configured.
    • Confirm that your database connection settings are correct and that the database is responsive.
  4. Restart Components:
    • Sometimes a simple restart of the Airflow scheduler, web server, and workers can resolve transient issues.
    • Ensure that your environment is set up for high availability to minimize downtime.

2. DAG Failures

DAG failures can disrupt workflows and impact the reliability of your data processing. Identifying the root cause of these failures is crucial for maintaining a stable Airflow environment.

Common Causes of DAG Failures:
  1. Code Errors:
    • Description: Bugs in the Python code used to define tasks can cause DAGs to fail.
    • Solution: Carefully review your DAG code for syntax errors, logical mistakes, or runtime exceptions. Implement proper error handling and logging to catch and debug issues.
  2. Dependency Failures:
    • Description: Upstream dependencies may fail, causing downstream tasks to fail as well.
    • Solution: Use Airflow’s dependency management features to ensure tasks are retried or skipped appropriately. Check the logs of upstream tasks to diagnose their failures.
  3. Resource Limits:
    • Description: Tasks may exceed resource limits (CPU, memory, disk space) and fail.
    • Solution: Monitor resource usage and adjust resource limits as needed. Consider using Kubernetes or Celery executors to distribute the workload more effectively.
  4. External System Failures:
    • Description: Failures in external systems or services (e.g. databases, APIs) that tasks depend on.
    • Solution: Implement retries with exponential back-off for tasks that depend on external systems. Ensure robust error handling and logging for these tasks.
Steps to Diagnose and Resolve:
  1. Review Task Logs:
    • Examine the logs for failed tasks to identify the specific error messages and stack traces.
    • Look for patterns in the failures to determine if they are related to specific resources, dependencies, or external systems.
  2. Check Dependencies:
    • Ensure that all dependencies (e.g. files, databases, APIs) are available and responsive.
    • Use Airflow’s built-in sensors to check the status of dependencies before proceeding with tasks.
  3. Implement Error Handling:
    • Use try-except blocks in your Python code to handle exceptions gracefully.
    • Implement custom error handlers to log detailed error information and perform cleanup tasks.
  4. Resource Monitoring:
    • Monitor resource usage for your tasks and adjust resource limits as needed.
    • Consider using a resource manager like Kubernetes to automatically scale resources based on demand.
  5. Retries and Timeouts:
    • Configure task retries and timeouts to handle transient failures.
    • Use Airflow’s retry and timeout settings to ensure tasks are retried appropriately and do not hang indefinitely.

Apache Airflow is a powerful tool, but like any complex system, it can encounter issues that disrupt workflows. By understanding the common causes of tasks getting stuck and DAG failures, and by following the steps outlined above, you can diagnose and resolve these issues effectively. Regular monitoring, proper configuration, and robust error handling are key to maintaining a reliable Airflow deployment. Increasing task concurrency and parallelism settings, along with running multiple schedulers, can further enhance the stability and performance of your Airflow environment.

Blog Pundits: Deepak Sood and Sandeep Rawat

OpsTree is an End-to-End DevOps Solution Provider.

Connect with Us

Leave a Reply