Airflow


Apache Airflow is an open-source platform to programmatically author, schedule, and monitor workflows, commonly used to manage complex data pipelines.

A workflow in Airflow is a DAG (Directed Acyclic Graph), which defines a set of tasks and their execution order, dependencies, and scheduling.

A DAG (Directed Acyclic Graph) represents a workflow which has collection of tasks with dependencies. 

In Apache Airflow, a task is the smallest unit of work within a workflow (DAG). Each task represents a single operation or action, such as running a Python function, executing a SQL query, or triggering a bash command. Uisng refered by task id. operator define a task.

Apache Airflow Scheduler is a core component responsible for triggering task instances to run in accordance with the defined Directed Acyclic Graphs (DAGs) and their schedules.

In Apache Airflow, an Executor is the component responsible for actually running the tasks defined in your workflows (DAGs). It takes task instances that the Scheduler determines are ready and orchestrates their execution either locally or on remote workers.




=======================================================================

πŸ”Ή What is Docker?

Docker is a platform used to:

  • Package an application and its dependencies into a container

  • Ensure the application runs the same across all environments

A Docker container is a lightweight, standalone, and executable package that includes everything needed to run a piece of software: code, libraries, environment variables, and config files.

🐳 What is a Dockerfile?

  • A Dockerfile is a text file that contains all the instructions to build a Docker image.

  • It defines the environment, dependencies, and commands your application needs to run consistently on any machine.

  • Think of it as a recipe for your container.

# Step 1: Base image
FROM python:3.11-slim

# Step 2: Set working directory inside container
WORKDIR /app

# Step 3: Copy your project files into container
COPY requirements.txt .

# Step 4: Install dependencies
RUN pip install --no-cache-dir -r requirements.txt

# Step 5: Copy all code
COPY . .

# Step 6: Set environment variables (optional)
ENV PYTHONUNBUFFERED=1

# Step 7: Define default command to run
CMD ["python", "main.py"]

πŸ”Ή Step-by-Step Explanation

  1. FROM python:3.11-slim

    • Base image with Python installed. Slim version = smaller image.

  2. WORKDIR /app

    • Sets working directory inside container.

  3. COPY requirements.txt .

    • Copies dependency file into container.

  4. RUN pip install ...

    • Installs Python packages inside container.

  5. COPY . .

    • Copies your ETL or Airflow scripts into container.

  6. ENV PYTHONUNBUFFERED=1

    • Makes Python logs visible immediately (useful for debugging).

  7. CMD ["python", "main.py"]

    • Default command when container starts. Can be your ETL job or Airflow task script.


πŸ”Ή Useful Commands

  1. Build Docker Image

docker build -t my-data-engineer-image .
  1. Run Container

docker run -it --rm my-data-engineer-image
  1. Run with mounted volume (edit locally, reflect in container)

docker run -v $(pwd):/app -it my-data-engineer-image
  1. Push to Docker Hub / Registry

docker tag my-data-engineer-image username/my-image:latest docker push username/my-image:latest

---------------------------------------------------

🐳 What is Docker Compose?

  • Docker Compose is a tool to define and run multi-container Docker applications.

  • Instead of running each container individually, you define all services in a single docker-compose.yml file.

  • You can spin up the whole environment with one command:

    docker-compose up

version: '3.8'

services:
  postgres:
    image: postgres:15
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data

  airflow-webserver:
    image: apache/airflow:2.7.1-python3.11
    depends_on:
      - postgres
    environment:
      AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
    volumes:
      - ./dags:/opt/airflow/dags
      - ./logs:/opt/airflow/logs
      - ./plugins:/opt/airflow/plugins
    ports:
      - "8080:8080"
    command: webserver

volumes:
  postgres_data:

πŸ”Ή Run Commands

  1. Build & start all services:

docker-compose up --build
  1. Run in detached mode (background):

docker-compose up -d
  1. Stop all containers:

docker-compose down
  1. View logs of a service:

docker-compose logs airflow-webserver
----------------------------------------------------------------------------

πŸ“„ What is requirements.txt?

  • It’s a text file listing all the Python packages your project needs.

  • Used by pip to install dependencies: pip install -r requirements.txt



=======================================================================

⭐ 1. Airflow Connections

Connections = saved credentials for external systems.

Examples:

  • AWS

  • Snowflake

  • Postgres

  • MySQL

  • BigQuery

  • S3

  • Kafka

  • Redshift

πŸ”Ή How to Set Connections

A) Using Airflow UI

  1. Go to Admin → Connections

  2. Click + Add

  3. Fill:

    • Conn ID → aws_default

    • Conn Type → Amazon Web Services

    • Extra → JSON (keys, region, endpoint)

  4. Save


B) Using CLI

airflow connections add my_postgres \ --conn-type postgres \ --conn-host localhost \ --conn-login user \ --conn-password pass \ --conn-schema mydb \ --conn-port 5432

C) Using Environment Variables

Format:

AIRFLOW_CONN_<CONN_ID>=<connection_uri>

Example:

export AIRFLOW_CONN_MY_PG="postgresql://user:pass@host:5432/db"

This is very common in Docker/Kubernetes.


⭐ 2. Airflow Variables

Variables = key–value store for configuration.

Example:

  • file_path

  • S3 bucket name

  • threshold value

  • list of emails


πŸ”Ή How to Set Variables

A) Using UI

Admin → Variables → Add

B) Using CLI

airflow variables set file_path /data/raw/ airflow variables get file_path

C) Using JSON IMPORT

airflow variables import variables.json

D) Using Environment Variables

AIRFLOW_VAR_BUCKET="my_bucket"

Usage inside DAG:

from airflow.models import Variable bucket = Variable.get("bucket")

⭐ 3. Airflow Secret Backends (Very Important for Data Engineers)

Airflow supports managing secrets securely using external systems.

πŸ”Ή Supported Secret Backends:

  1. AWS Secrets Manager

  2. GCP Secret Manager

  3. Hashicorp Vault

  4. Azure Key Vault

  5. Custom secret backends


Why use secret backends?

  • Secrets are not stored in Airflow DB

  • Rotated automatically

  • Secure & centralized

  • Avoid plaintext passwords in Airflow UI


πŸ”Ή Example: Using AWS Secrets Manager

Add to airflow.cfg:

[secrets] backend = airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend backend_kwargs = {"connections_prefix": "airflow/connections", "variables_prefix": "airflow/variables"}

AWS Secret Format:

airflow/connections/my_postgres

Used automatically in DAG:

conn = BaseHook.get_connection("my_postgres")

⭐ 4. Best Practices for Storing Credentials (MOST IMPORTANT)

πŸ” 1. NEVER store passwords in code

password="abcd1234"

✔ Use:

conn = BaseHook.get_connection("my_db")

πŸ” 2. Avoid storing secrets in Airflow Variables

Variables are NOT encrypted by default.


πŸ” 3. Use Secret Backends for all production credentials

  • AWS Secrets Manager

  • GCP Secret Manager

  • Hashicorp Vault


πŸ” 4. Use environment variables for local development

Safe and temporary.


πŸ” 5. Do not store credentials in GitHub / repo

Always use:

  • .env

  • Kubernetes Secrets

  • Docker Secrets


πŸ” 6. Use different connection IDs for dev/stage/prod

Example:

  • aws_dev

  • aws_stage

  • aws_prod


πŸ” 7. Use JSON "extra" field for complex configs

Example Extra field in UI:

{ "region_name": "ap-south-1", "role_arn": "arn:aws:iam::12345:role/my-role" }

=========================================================================================================

Operators 

are Python classes that define a template for a specific unit of work (task) in a workflow. When you instantiate an operator in a DAG, it becomes a task that Airflow executes. Operators encapsulate the logic required to perform a defined action or job.

Each operator represent a single task in workflow — like running a script, moving data, or checking if a file exists.


Operators = do something
Sensors = wait for something
Hooks = connection to systems (S3Hook, PostgresHook, etc.)
Executors = how tasks run (Local, Celery, Kubernetes)
Scheduler = creates DAG Runs + task instances

Type of operator

In Apache Airflow, Operators are the building blocks of your workflows (DAGs). Each operator defines a single task to be executed. There are different types of operators based on the type of work they perform.

Operators fall into three broad categories:

  1. Action Operators:
    Perform an action like running code or sending an email.

    Examples:

  • PythonOperator to run a Python function
  • BashOperator to run shell commands
  • EmailOperator to send emails
  • SimpleHTTPOperator to interact with APIs


  1. Transfer Operators:
    Move data between systems or different storage locations.

    Examples:

  • S3ToRedshiftOperator
  • MySqlToGoogleCloudStorageOperator
  1. Sensor Operators:
    Wait for a certain event or external condition before proceeding.

  • Examples:
  • FileSensor waits for a file to appear
  • ExternalTaskSensor waits for another task to complete






=======================================================================

Apache Airflow commands


==================================================================
Dependencies

πŸ”— chain() Function

The chain() function is part of airflow.utils.task_group (previously in airflow.utils.helpers) and helps you connect multiple tasks or groups in a sequence without writing task_1 >> task_2 >> task_3 manually.


=======================================================================

⭐ 1. Airflow Scheduling Basics

Airflow schedules based on:

  • cron expressions

  • timetables

  • logical date

  • catchup

  • backfill

A DAG run does NOT start at the exact cron time—it starts after the logical interval finishes.


🟦 2. Cron Expressions in Airflow

Cron = when to run the DAG.

Examples:

CronMeaning
0 0 * * *Every midnight
0 */2 * * *Every 2 hours
0 6 * * 1Every Monday at 6 AM
*/5 * * * *Every 5 minutes

Airflow uses cron to define the start of the schedule interval, but the DAG runs after the interval finishes*.


🟦 3. Timetables (Airflow 2.2+)

Timetables = new, flexible scheduling system.

Useful when cron is not enough.

Examples:

  • Run DAG every business day except holidays

  • Run every 3 hours between 9–5

  • Run based on dataset dependencies

  • Run after an upstream dataset is updated

Timetables replace schedule_interval for advanced cases.


🟦 4. Catchup vs No Catchup

SettingWhat it Means
catchup=TrueAirflow creates DAG Runs for all past dates since the start date
catchup=FalseAirflow only runs the latest DAG run, skips historical dates

Example:

DAG start date = Jan 1
Today = Jan 5
Schedule = daily

catchup settingRuns created
True1,2,3,4,5 Jan (5 runs)
FalseOnly Jan 5 (latest run)

🟦 5. Backfill (Manual Catchup)

Backfill = you manually run past dates even if catchup=False.

Command:

airflow dags backfill -s 2024-01-01 -e 2024-01-05 my_dag

Purpose:

  • Re-run historical data

  • Fix missed data loads

  • Reprocess partitions


🟦 6. Logical Date (MOST IMPORTANT)

Logical date = the data interval the DAG run is processing.

It is not the actual time the run starts.

Example:

Schedule: Daily
Cron: 0 0 * * * (midnight, i.e., start of interval)

DAG Run at: 2024-10-10 00:00 logical date
Run actually starts at: 2024-10-10 00:01 or later

Why this is important?

  • All tasks use logical_date for:

    • file paths

    • S3 partitions

    • SQL date parameters

    • templated variables ({{ ds }} etc.)

Think of it like:

Logical date = data date
Execution date = same as logical date (Airflow 2.2+)
NOT the real-time the task runs


🟣 Logical Date Example (Simple)

Schedule = daily
Interval = 2024-01-01 00:00 → 2024-01-02 00:00

DAG Run for 2024-01-02 actually runs at 2024-01-02 00:01, but:

{{ ds }} = 2024-01-01

Because that’s the interval start (logical date).

=======================================================

Cron

cron expressions are used in the schedule_interval parameter of a DAG to define when the DAG should run.
A cron expression is a string representing a schedule — it tells Airflow how often to run a DAG

None - dont schedule evr , usually fr manually
@once - schedule only once





=======================================================================

πŸ”Ή What Are Hooks in Apache Airflow?

Hooks in Airflow are interfaces to external platforms, like databases, cloud storage, APIs, and more. They abstract the connection and authentication logic, allowing operators to use these services easily.

Hooks are mostly used behind the scenes by Operators, but you can also call them directly in Python functions.

πŸ”Έ Why Use Hooks?

  • Reusable connection logic

  • Securely use Airflow's connection system (Airflow Connections UI)

  • Simplifies integrating with external systems (e.g., MySQL, S3, BigQuery, Snowflake)

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.utils.dates import days_ago

def get_data():
    hook = PostgresHook(postgres_conn_id='my_postgres')
    connection = hook.get_conn()
    cursor = connection.cursor()
    cursor.execute("SELECT COUNT(*) FROM my_table;")
    result = cursor.fetchone()
    print(f"Row count: {result[0]}")

with DAG('postgres_hook_example',
         start_date=days_ago(1),
         schedule_interval=None,
         catchup=False) as dag:

    t1 = PythonOperator(
        task_id='count_rows',
        python_callable=get_data
    )

=======================================================================

☁️ What is S3Hook?

  • S3Hook is a helper class in Airflow to interact with Amazon S3.

  • It abstracts the boto3 (AWS SDK for Python) operations so you can read/write files, list buckets, check if objects exist, etc., directly in your DAGs.

  • Comes from: from airflow.providers.amazon.aws.hooks.s3 import S3Hook (Airflow 2+)

πŸ”Ή When to Use S3Hook

  1. You want to upload a file to S3 from Airflow.

  2. You want to download a file from S3 for processing.

  3. You want to check if a key/object exists before running a task.

  4. You want to list files in a bucket dynamically.

=======================================================================


Defining dags





=======================================================================

🧠 What Does an Executor Do?

It communicates with the Scheduler and runs the tasks defined in your DAGs—either locally, in parallel, or on distributed systems like Celery or Kubernetes.

-> airflow info , gives which executor we are running




=======================================================================

In Apache Airflow, an SLA (Service Level Agreement) is a time-based constraint that you can apply to a task to ensure it finishes within a defined timeframe. If it misses the deadline, Airflow can alert or log the SLA miss.

🧠 Why Use SLAs?

To ensure:

  • Timely data availability

  • Reliable pipeline performance

  • Alerting for delays or failures


🧩 How SLA Works in Airflow

  • SLA is defined per task, not per DAG.

  • If a task takes longer than the SLA, it's marked as an SLA miss.

  • Airflow triggers an SLA miss callback and logs the event.

  • Email alerts can be sent if configured.


πŸ“Š Monitoring SLA Misses

  • Go to Airflow UI > DAGs> Browse > SLA Misses

  • Or check the Task Instance Details


⚠️ Notes

  • SLAs are checked after the DAG run completes.

  • SLAs are about runtime, not start time.

  • SLA doesn’t retry or fail the task—it just logs the violation.

=======================================================================

πŸ”§ What is a Template?

A template is a string that contains placeholders which are evaluated at runtime using the Jinja2 engine.

In Apache AirflowTemplates allow you to dynamically generate values at runtime using Jinja templating (similar to templating in Flask or Django). They are useful when you want task parameters to depend on execution context, such as the date, DAG ID, or other dynamic values.



=======================================================================

Jinja is a templating engine for Python used heavily in Apache Airflow to dynamically render strings using runtime context. It lets you inject variables, logic, and macros into your task parameters.

πŸ” What is Jinja?

Jinja is a mini-language similar to Django or Liquid templates. In Airflow, it's used for:

  • Creating dynamic file paths

  • Modifying behavior based on execution date

  • Using control structures like loops and conditions

🧾 DAG Example (Manual filename via params)

=======================================================================

✅ What is catchup?

When catchup=True (default), Airflow will "catch up" by running all the DAG runs from the start_date to the current date.

When catchup=False, it only runs the latest scheduled DAG run from the time it is triggered.


=======================================================================

πŸ” What is Backfill?

Backfill is the process of running a DAG for past scheduled intervals that have not yet been run.

When a DAG is created or modified with a start_date in the past, Airflow can "backfill" to ensure that all scheduled intervals between the start_date and now are executed.




=======================================================================

πŸ”§ Components of Apache Airflow

Apache Airflow is made up of several core components that work together to orchestrate workflows:

ComponentDescription
SchedulerThe brain of Airflow that monitors DAGs and tasks, triggers DAG runs based on schedules or events, and submits tasks to the executor for execution. It continuously checks dependencies and task states to decide what to run next.
it takes 5 min for AF to detect dag in dag folder
Scheduler scan for new task every 4 sec
ExecutorExecutes task instances assigned by the scheduler. It can run tasks locally, via distributed workers, or on containerized environments depending on the executor type (LocalExecutor, CeleryExecutor, KubernetesExecutor, etc.).
WorkersMachines or processes (depending on executor) that actually run the task code. For distributed executors like Celery or Kubernetes, multiple workers run tasks in parallel, scaling out capacity.
Metadata DatabaseA relational database (e.g., PostgreSQL, MySQL) that stores all Airflow metadata: DAG definitions, task states, execution history, logs, connection info, and more. The scheduler, workers, and webserver interact with it constantly.
Webserver (UI)Provides a user interface to monitor DAG runs, task status, logs, and overall workflow health. Built on a FastAPI server with APIs for workers, UI, and external clients.
DAGs FolderDirectory or location where DAG definition Python files live. These files describe the workflows and are parsed by the scheduler or DAG processor.

🟦 What is Airflow Scheduler?

The Airflow Scheduler is the component responsible for triggering DAG runs and executing tasks at the right time based on the DAG’s schedule, dependencies, and state.

πŸ“Œ It is the “brain” of Airflow.


🟣 What does the Airflow Scheduler do?

The scheduler continuously:

FunctionExplanation
Monitors DAGsWatches all DAG files for new/updated DAGs.
Creates DAG RunsStarts DAG runs at the scheduled intervals.
Checks DependenciesEnsures upstream tasks are finished before running next task.
Queues TasksDecides which tasks are ready to run.
Sends tasks to ExecutorHands tasks to workers (Local/Celery/K8s).
Handles retriesIf a task fails, scheduler triggers retries.
Manages SLADetects SLA misses.

🟦 How the Scheduler Works (Simple Flow)

Parse DAG → Check schedule → Create DAG Run → Check dependencies → Queue tasks → Executor runs tasks

The scheduler loops continuously, making decisions every few seconds.


🟣 Important Concepts for Interviews

1. Scheduling interval

Scheduler respects:

  • schedule_interval

  • start_date

  • end_date

  • catchup


2. Logical Date (Very important!)

Scheduler runs DAGs based on logical execution date, not current time.


3. Executor

Scheduler just queues tasks, but does NOT execute them.
Executor runs the task.

Example executors:

  • LocalExecutor

  • CeleryExecutor

  • KubernetesExecutor


4. Concurrency Controls

Scheduler respects:

  • DAG concurrency

  • Task concurrency

  • Pools

  • Parallelism

These prevent overload.


5. Heartbeats

Scheduler sends a “heartbeat” every few seconds.
If heartbeat stops → scheduler is down.


🟦 Example: Scheduler in Action

If a DAG has:

schedule_interval='@daily' start_date=2024-01-01

The scheduler will create DAG runs:

  • 2024-01-01 (logical date)

  • 2024-01-02

  • 2024-01-03

Each run → scheduler checks tasks → queues ready ones.

======================================================

🟦 What is an Executor in Airflow?

An Executor is the Airflow component responsible for actually running the tasks.
While the Scheduler decides what to run,
the Executor decides how and where to run it.

πŸ“Œ Executor = Task runner
πŸ“Œ Scheduler = Task coordinator


🟣 Why Executor is Important?

Executors decide:

  • How many tasks run in parallel

  • Where tasks get executed

  • Whether tasks run locally or on workers or on Kubernetes pods

The choice of executor determines Airflow’s scalability.


🟦 Types of Executors (Must Know)


1. SequentialExecutor

  • ✔ What it is:

    • Runs ONE task at a time

    • Single-threaded

    • No parallelism

    • Default for quick testing

    ✔ Use Cases:

    • Local testing

    • Development / laptop

    • Very small DAGs

    ❌ Not for production.


2. LocalExecutor

  • ✔ What it is:

    • Runs tasks in parallel on the same machine

    • Uses multiple processes/threads

    • Good performance for small pipelines

    ✔ Use Cases:

    • Small teams

    • Single-server Airflow deployments

    • Use case: 10–20 parallel tasks

    ❌ Not suitable for distributed workloads

    ❌ Cannot scale beyond one machine


3. CeleryExecutor

  • ✔ What it is:

    • Distributed task execution

    • Multiple worker machines

    • Uses a message broker:

      • Redis

      • RabbitMQ

    ✔ Use Cases:

    • Medium to large teams

    • Many DAGs running at same time

    • Need dozens or hundreds of parallel tasks

    • On-prem or AWS EC2 deployments

    πŸ‘ Pros

    • Highly scalable

    • Fault-tolerant

    • Good for data engineering teams

    πŸ‘Ž Cons

    • Complex setup (workers + broker + DB)

    • Higher maintenance


4. KubernetesExecutor (Most modern)

  • ✔ What it is:

    • Each task runs in its own Kubernetes pod

    • True elastic scaling

    • Perfect isolation of tasks

    • Clean environment per task

    ✔ Use Cases:

    • Cloud-native setups

    • Very large workloads

    • Need per-task compute scaling

    • Mixed workloads (Python, Spark, Java, Bash, etc.)

    πŸ‘ Pros:

    • Auto-scaling

    • GPU/High-memory pods

    • Per-task docker image support

    πŸ‘Ž Cons:

    • Requires Kubernetes knowledge

    • Complex to manage for small teams


(Bonus) — LocalKubernetesExecutor (Hybrid)

  • LocalExecutor for small tasks

  • KubernetesExecutor for heavy tasks


🟦 How Scheduler and Executor Work Together

Scheduler → Sends task to Executor → Executor launches worker/pod → Task runs → Updates state

🟣 Comparison Table

ExecutorParallel?Distributed?Use Case
SequentialExecutor❌ No❌ NoTesting only
LocalExecutor✔ Yes❌ NoMedium workloads
CeleryExecutor✔ Yes✔ YesLarge-scale pipelines
KubernetesExecutor✔ Yes✔ YesCloud-native, scalable workloads

🟦 Best Executor for Data Engineering?

Use CaseBest Executor
Small team, single VMLocalExecutor
Distributed on-prem clusterCeleryExecutor
Cloud environments (AWS/GCP/Azure)KubernetesExecutor

======================================================

🟦 What is the Airflow Webserver?

The Airflow Webserver is the component that provides the UI (User Interface) for Airflow.
It lets you view, monitor, trigger, pause, and manage DAGs through a browser.

πŸ“Œ Webserver = Airflow UI
πŸ“Œ It shows everything happening inside Airflow.


🟣 What Webserver Does

FunctionExplanation
Displays DAGsShows all DAGs in the UI
Trigger DAGsYou can manually run a DAG
Pause/Unpause DAGsEnable or disable scheduling
View Graph ViewDAG structure (dependencies)
Task LogsView task execution logs
Monitor statusSuccess / Failed / Queued / Running
View XComSee data passed between tasks
Manage ConnectionsAdd/edit database or API credentials
VariablesStore global values for DAGs
Admin PanelDAG runs, task instances, users, roles

🟦 How Webserver Works (Simple Explanation)

  1. Webserver reads DAG files

  2. Displays DAGs in the UI

  3. Shows scheduler and executor status

  4. Allows user actions (trigger, clear, rerun tasks)

It runs using Flask (Python web framework) behind the scenes.


🟦 Important Ports

Default port:

http://localhost:8080

In production you may use Nginx/HTTPS.


🟣 Webserver vs Scheduler

ComponentPurpose
WebserverUI to view/manage pipelines
SchedulerDecides when tasks should run
ExecutorActually runs the tasks

🟦 How to Start the Webserver

airflow webserver

In Docker:

docker-compose up airflow-webserver

=======================================================================


🟦 1. max_active_runs (at DAG level)

Definition

max_active_runs = maximum number of DAG Runs that can run at the same time for a specific DAG.

πŸ“Œ Think:

"How many full pipeline runs can run in parallel?"

Example:

DAG( dag_id='etl_pipeline', max_active_runs=1 )

✔ Only one DAG run will run at a time
✖ A new scheduled run will wait until the previous run finishes

πŸ“˜ Why important?

  • Prevents overlapping runs

  • Useful for pipelines that update the same tables

  • Avoids data corruption


🟦 2. concurrency (at DAG level)

Definition

concurrency = maximum number of task instances from the SAME DAG that can run in parallel.

πŸ“Œ Think:

"How many tasks inside this DAG can run at the same time?"

Example:

DAG( dag_id='etl_pipeline', concurrency=5 )

✔ Maximum 5 tasks from this DAG can run at once
✖ The 6th task waits in the queue

πŸ“˜ Why important?

  • Controls the load on your system

  • Prevents overwhelming the database, Spark cluster, APIs, etc.


=======================================================================

⭐ 1. Dynamic DAGs (Airflow)

Dynamic DAGs = DAGs that are generated programmatically instead of hardcoding tasks.

Example:

for table in ["customers", "orders", "sales"]: PythonOperator( task_id=f"load_{table}", python_callable=load_table, op_kwargs={"table": table}, dag=dag, )

✔ Why use Dynamic DAGs?

  • Automatically create tasks for multiple tables/files

  • Avoid writing duplicate code

  • Perfect for pipelines with 20–500 tables


⭐ 2. Dynamic Tasks (Task Mapping in Airflow 2.3+)

Task mapping = Airflow automatically creates multiple task instances at runtime.

Example (Best Interview Answer):

@task def load_table(table): ... tables = ["customers", "orders", "sales"] load_table.expand(table=tables)

✔ Why Task Mapping is powerful:

  • Dynamically generates tasks at runtime

  • No DAG parsing overhead (unlike old dynamic DAGs)

  • Much cleaner & more scalable

✔ Example Use Cases:

  • Load 100 S3 files

  • Process N partitions

  • Trigger N API calls

  • Run ML jobs for each model


⭐ 3. Avoiding DAG Explosion

DAG Explosion = too many tasks or too many DAGs, causing:

  • Slow UI

  • Scheduler overload

  • Metadata DB pressure

  • DAG parsing delays

Causes:

  • Generating thousands of tasks in the DAG file

  • Creating DAGs dynamically for each table (e.g., 100 tables → 100 DAGs)

Solution:

  • Use Task Mapping

  • Use TaskGroup

  • Batch tasks

  • Push dynamic behavior to runtime, not DAG file parse time


⭐ 4. TaskGroup (Organizing Large DAGs)

TaskGroup = visual and logical grouping of tasks.

Example:

with TaskGroup("load_all_tables") as tg: for table in tables: PythonOperator( task_id=f"load_{table}", python_callable=load, op_kwargs={"table": table}, )

✔ Why TaskGroup is used:

  • Organize DAGs with 50+ tasks

  • Avoid clutter in Airflow UI

  • Easier debugging

  • Logical grouping like:

    • extract group

    • transform group

    • load group

Not for isolation — only for visual and logical grouping.


=======================================================================

DAG View


=======================================================================

πŸ”„ XCom(Cross-Communication)

XCom (short for “Cross-communication”) allows tasks to exchange small amounts of data between each other in a DAG.

πŸ”§ How XCom Works

  1. Push → Send data to XCom from one task

  2. Pull → Retrieve that data in another task


πŸ”₯ What XCom Should NOT be Used For

Very important for interviews:

❌ Do NOT pass large datasets

❌ Not meant for files
❌ Not used for DataFrames
❌ Not used for binary data

Use XCom only for small metadata, like:

  • file paths

  • S3 keys

  • table names

  • row counts


🟦 Types of XCom in Airflow

There are 3 main types of XCom you must know:


1. Default / Implicit XCom (PythonOperator return value)

  • When a PythonOperator function returns a value, Airflow automatically pushes it to XCom.

  • No need to write xcom_push() manually.

Example:

def task1(): return "hello"

✔ Automatically becomes an XCom value
✔ Most commonly used type


2. Manual XCom (Explicit push & pull)

Used when you want full control.

Push:

def push_func(**context): context['ti'].xcom_push(key='mykey', value='mydata')

Pull:

def pull_func(**context): value = context['ti'].xcom_pull(key='mykey', task_ids='push_task')

✔ Used when you need custom key names
✔ Useful when returning multiple values


3. TaskFlow API XCom (@task decorator)

This works like implicit XCom but with ** cleaner syntax** using the TaskFlow API.

Example:

from airflow.decorators import task @task def t1(): return "hi" @task def t2(msg): print(msg) t2(t1())

✔ Return values automatically become XCom
✔ Passing function outputs becomes easier
✔ Preferred in modern Airflow (2.x)

=======================================================================

🧩 Airflow Variables

Airflow Variables are key-value pairs used to store and retrieve dynamic configurations in your DAGs and tasks.


=======================================================================

πŸ›°️ Apache Airflow Sensors

Sensors are special types of operators in Airflow that wait for a condition to be true before allowing downstream tasks to proceed.



=======================================================================

🌿 Branching in Apache Airflow

Branching allows you to dynamically choose one (or more) downstream paths from a set of tasks based on logic. This is done using the BranchPythonOperator.

🧠 Why Use Branching?

Branching is useful when:

  • You want to run different tasks based on a condition

  • You need to skip certain tasks

  • You want "if/else" logic in your DAG




✅ Notes:

  • Tasks not returned by BranchPythonOperator will be skipped.

  • You can return a single task ID or a list of task IDs.

  • Ensure your downstream tasks can handle being skipped, or use appropriate trigger_rule.

=======================================================================


Subdag

πŸ”„ What is a SubDAG in Apache Airflow?

A SubDAG is a DAG within a DAG — essentially, a child DAG defined inside a parent DAG. It's used to logically group related tasks together and reuse workflow patterns, making complex DAGs easier to manage.

πŸ“Œ Think of a SubDAG as a modular block that can be reused or organized separately.



🧩 TaskGroup in Apache Airflow

A TaskGroup in Airflow is a way to visually and logically group tasks together in the UI without creating a separate DAG like SubDagOperator. It's lightweight, easier to use, and the recommended approach in Airflow 2.x+.



=======================================================================

πŸ”— Edge Labels in Apache Airflow

Edge labels in Airflow are annotations you can add to the edges (arrows) between tasks in the DAG graph view. They help clarify why one task depends on another, especially when using complex branching, conditionals, or TriggerRules.



==========================================================

1. Catchup

Definition:
Airflow automatically creates DAG Runs for all missed schedule intervals since the DAG’s start_date.

FeatureDetails
Parametercatchup=True/False
DefaultTrue
BehaviorCreates runs for all past intervals until today
Use CaseWhen you want to process historical data automatically

Example:

DAG start date = Jan 1, today = Jan 5, daily DAG, catchup=True → DAG runs for Jan 1,2,3,4,5


2. Backfill

Definition:
Manually run DAG runs for specific past dates, regardless of catchup setting.

FeatureDetails
Commandairflow dags backfill -s 2024-01-01 -e 2024-01-05 my_dag
BehaviorForces DAG to run historical intervals
Use CaseMissed runs, reprocessing, fixing failed jobs

✅ Backfill is manual and selective, unlike catchup which is automatic.


3. Manual Run

Definition:
Trigger a DAG run manually at any time, usually for testing or ad-hoc runs.

FeatureDetails
MethodAirflow UI → Trigger DAG
CLI → airflow dags trigger my_dag
BehaviorCreates a single DAG run immediately
Use CaseTest DAG, ad-hoc execution, debugging

Comparison Table

FeatureAutomatic/ManualPurposeExample
CatchupAutomaticRun all missed DAG runscatchup=True → process Jan 1–5 automatically
BackfillManualRun specific historical DAG runsairflow dags backfill -s Jan1 -e Jan5
Manual RunManualTrigger DAG on demandClick “Trigger DAG” in UI or CLI command

Logical Date vs These Runs (Important!)

  • Catchup → generates DAG runs using logical dates for past intervals

  • Backfill → same, but manually specified dates

  • Manual Run → logical date can be specified manually or default to current timestamp


=======================================================









Comments

Popular posts from this blog

Teradata

Git

work