issue 117apr 27mmxxvi
est. 2017
Sun, 27 Apr 2026
vol. IX · no. 117
PapersAdda
placement intelligence, since 2017
640+ briefs · 24 campuses · by reservation
verified offers · sourced from r/developersIndia
razorpay₹65.00 LPA· iit-d · sde-1google₹54.00 LPA· iiit-h · swe-imicrosoft₹49.50 LPA· iit-b · sdeatlassian₹38.00 LPA· nit-w · sde-1amazon₹44.20 LPA· bits-p · sde-1uber₹42.00 LPA· iit-kgp · sde-1razorpay₹65.00 LPA· iit-d · sde-1google₹54.00 LPA· iiit-h · swe-imicrosoft₹49.50 LPA· iit-b · sdeatlassian₹38.00 LPA· nit-w · sde-1amazon₹44.20 LPA· bits-p · sde-1uber₹42.00 LPA· iit-kgp · sde-1

Airflow Interview Questions 2026: 25 Answers with Code

27 min read
Interview Questions
Updated: 8 Jun 2026
Aditya Sharma
Aditya's Edit

PapersAdda 2026 Placement Cycle

By Aditya Sharma·Founder & Editor, PapersAdda

What changed in 2026 drives

Mass-recruiter offer letters are flatter for 2026 batch - the 4-5 LPA ASE band has barely budged in three years while inflation eats real wages. Premium tracks (Digital, Pro, Elite, Specialist) are still where the differential lives, and they are entirely test-driven. If you are aiming higher than the default offer, the coding round is not optional pageantry - it is the entire interview.

What I'd actually study for this

  • 01Two solid coding-round answers (1 medium-hard DSA each, with edge-case discussion) > five half-baked ones
  • 02One real project you can defend end-to-end - file paths, design decisions, and what you would change
  • 03One DBMS schema you actually built (not a textbook ER diagram), with at least 3 join-heavy queries written from memory
  • 04Three behavioural STAR stories: failure recovered, conflict handled, ownership taken

Where most candidates trip up

The single biggest mistake is treating company-specific guides as primary prep and DSA as secondary. It is the opposite. Mass recruiters use the test as a filter, but premium tracks at every IT services company use coding to allocate offer band. Spend 70% of prep time on DSA + system fundamentals, 20% on company-specific patterns, 10% on HR rehearsal. Reverse that ratio and you collect the default offer.

Editorial commentary by Aditya Sharma · written for PapersAdda · not generated, not aggregated.

Apache Airflow is the dominant workflow orchestration platform for data engineering, and Airflow proficiency is expected at every data engineering interview in 2026. This guide covers 25 Apache Airflow interview questions with full code answers, from DAG fundamentals to production patterns.

PapersAdda's take: Candidates report that DAG idempotency and XCom design questions are the most common Airflow interview topics. The most common elimination question: "What happens when you re-run a DAG run for a past date?" (answer: it should produce the same output -- idempotency). Confirm the specific Airflow version and deployment (MWAA, Cloud Composer, self-hosted) expected on the official company careers portal before you prepare.

Related articles: Apache Spark Interview Questions 2026 | Kafka Interview Questions 2026 | Data Engineering Interview Questions 2026 | MLOps Interview Questions 2026 | Hadoop Interview Questions 2026


EASY: DAG Fundamentals (Questions 1-8)

Q1. What is Apache Airflow? What problems does it solve?

Airflow is a platform to programmatically author, schedule, and monitor data workflows. It solves the problem of orchestrating complex multi-step data pipelines where:

  • Steps have dependencies (Task B must run after Task A completes).
  • Steps need to run on a schedule (daily, hourly, weekly).
  • Failures need retry logic and alerting.
  • Historical re-runs (backfills) are needed.
  • Visibility into pipeline status is required.

Key concepts:

  • DAG (Directed Acyclic Graph): a collection of tasks with dependencies, no cycles.
  • Task: a unit of work (run a SQL query, call an API, run a Spark job).
  • Operator: defines what a task does (PythonOperator, BashOperator, SparkSubmitOperator).
  • Scheduler: monitors DAGs and triggers task instances when dependencies are met.
  • Executor: actually runs the tasks (SequentialExecutor, LocalExecutor, CeleryExecutor, KubernetesExecutor).

Q2. What is a DAG in Airflow? What are the key properties?

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

# DAG definition
dag = DAG(
    dag_id='daily_sales_pipeline',
    description='Process and aggregate daily sales data',
    schedule_interval='0 6 * * *',           # 6 AM UTC daily (cron syntax)
    start_date=datetime(2026, 1, 1),         # first run date
    end_date=None,                           # run indefinitely
    default_args={
        'owner': 'data-team',
        'retries': 3,
        'retry_delay': timedelta(minutes=5),
        'email_on_failure': True,
        'email': ['[email protected]'],
    },
    catchup=False,    # IMPORTANT: don't backfill all missed runs since start_date
    max_active_runs=1,  # prevent concurrent DAG runs
    tags=['sales', 'daily', 'production'],
)

# Tasks
def extract_data(**context):
    # context['ds'] = execution date string (YYYY-MM-DD)
    # context['execution_date'] = pendulum datetime
    ds = context['ds']
    print(f"Extracting data for {ds}")

extract = PythonOperator(
    task_id='extract_data',
    python_callable=extract_data,
    dag=dag,
)

transform = BashOperator(
    task_id='transform_data',
    bash_command='spark-submit /jobs/transform.py --date {{ ds }}',
    dag=dag,
)

# Set dependencies: extract must complete before transform
extract >> transform
# Equivalent: transform.set_upstream(extract)

Q3. What is the difference between execution_date, logical_date, and schedule interval?

# Airflow scheduling model: confusing but important to understand correctly
# execution_date (now called logical_date in Airflow 2.2+):
#   The start of the SCHEDULING PERIOD the DAG run covers
#   NOT the time the task actually executes

# Example: schedule_interval='@daily', start_date=2026-01-01
# First run: logical_date = 2026-01-01, actually runs on 2026-01-02 00:00:00
# Second run: logical_date = 2026-01-02, actually runs on 2026-01-03 00:00:00
# Airflow is designed to process CLOSED intervals: run covers [logical_date, next_logical_date)

# This means: the DAG for 2026-01-01 data runs AFTER midnight 2026-01-02
# Design your pipelines around this: use ds ({{ ds }}) to query yesterday's data

def process_data(**context):
    logical_date = context['logical_date']  # or context['ds'] for date string
    next_ds = context['next_ds']            # end of the interval

    # CORRECT: query data for the logical_date period
    sql = f"SELECT * FROM events WHERE event_date = '{context['ds']}'"

    # Common mistake: using datetime.now() inside tasks -- NOT idempotent!
    # import datetime; today = datetime.date.today()  # WRONG
    # Use context['ds'] instead -- consistent regardless of when task runs

    print(f"Processing period: {logical_date} to {next_ds}")

# data_interval_start, data_interval_end (Airflow 2.2+): explicit period boundaries
from airflow.decorators import task

@task
def process_with_taskflow(data_interval_start=None, data_interval_end=None):
    print(f"Processing: {data_interval_start} to {data_interval_end}")

Q4. What is the TaskFlow API? How does it simplify Airflow DAGs?

from airflow.decorators import dag, task
from datetime import datetime

# Before TaskFlow (Airflow 1.x style): verbose, XCom explicit
# from airflow.operators.python import PythonOperator
# def my_func(**context):
#     ...
# op = PythonOperator(task_id='my_task', python_callable=my_func, dag=dag)

# TaskFlow API (Airflow 2.0+): decorators, implicit XCom, cleaner code
@dag(
    schedule_interval='@daily',
    start_date=datetime(2026, 1, 1),
    catchup=False,
    default_args={'retries': 2, 'retry_delay': 60},
)
def etl_pipeline():
    """Daily ETL pipeline for sales data."""

    @task
    def extract(ds=None) -> dict:
        """Extract raw sales data for the given date."""
        # ds is automatically injected from Airflow context
        raw_data = {"date": ds, "rows": 1000, "source": "sales_db"}
        return raw_data  # return value automatically pushed to XCom

    @task
    def transform(raw_data: dict) -> dict:
        """Transform and clean extracted data."""
        processed = {**raw_data, "cleaned": True, "rows_processed": raw_data["rows"]}
        return processed  # returned values are XCom-pushed automatically

    @task
    def load(processed_data: dict, ds=None) -> None:
        """Load processed data to warehouse."""
        print(f"Loading {processed_data['rows_processed']} rows for {ds}")

    # Dependencies inferred from function arguments -- automatic
    raw = extract()
    processed = transform(raw)
    load(processed)

# Instantiate the DAG
etl_dag = etl_pipeline()

Q5. What is XCom in Airflow? What are its limitations?

from airflow.decorators import dag, task
from airflow.models import XCom
from datetime import datetime

@dag(schedule_interval='@daily', start_date=datetime(2026, 1, 1), catchup=False)
def xcom_demo():

    @task
    def produce_data(ds=None) -> dict:
        """XCom push: return value is stored in Airflow metadata DB."""
        return {"row_count": 50000, "status": "success", "date": ds}
        # Stored as: XCom(dag_id, run_id, task_id, key='return_value', value=json(result))

    @task
    def consume_data(data: dict) -> None:
        """XCom pull: injected automatically via TaskFlow."""
        print(f"Got {data['row_count']} rows from {data['date']}")

    # Manual XCom push/pull (non-TaskFlow style)
    def manual_push(**context):
        context['ti'].xcom_push(key='my_custom_key', value={'important': 'data'})

    def manual_pull(**context):
        data = context['ti'].xcom_pull(task_ids='manual_push_task', key='my_custom_key')
        print(data)

    data = produce_data()
    consume_data(data)

# LIMITATIONS of XCom:
# 1. Size limit: stored in Airflow metadata DB (SQLite/PostgreSQL)
#    PostgreSQL bytea limit: typically 1GB, but recommended < 1MB
#    For large data: use XCom backend (S3, GCS) or pass file paths instead
# 2. Serialization: only JSON-serializable objects (default)
#    Custom objects require custom XCom backends
# 3. Not for large data transfer: use S3/GCS/HDFS paths as XComs

# Best practice: XCom should carry metadata (file paths, row counts, statuses)
# NOT the actual data itself
@task
def good_xcom_usage(ds=None) -> str:
    """Push an S3 path, not the data."""
    output_path = f"s3://my-bucket/processed/{ds}/output.parquet"
    return output_path  # tiny, JSON-serializable

xcom_dag = xcom_demo()

Q6. What are Airflow sensors? When do you use them?

from airflow.sensors.s3_key_sensor import S3KeySensor
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.sensors.filesystem import FileSensor
from airflow.sensors.base import BaseSensorOperator
from airflow.decorators import dag, task
from datetime import datetime, timedelta

@dag(schedule_interval='@daily', start_date=datetime(2026, 1, 1), catchup=False)
def sensor_examples():

    # S3 Sensor: wait for a file to appear in S3
    wait_for_s3_file = S3KeySensor(
        task_id='wait_for_upstream_data',
        bucket_name='my-data-bucket',
        bucket_key='raw/sales/{{ ds }}/data.parquet',
        aws_conn_id='aws_default',
        poke_interval=300,       # check every 5 minutes
        timeout=3600,            # fail after 1 hour
        mode='reschedule',       # IMPORTANT: releases worker slot between pokes
        soft_fail=False,         # hard fail if timeout reached
    )

    # ExternalTaskSensor: wait for another DAG's task to complete
    wait_for_upstream_dag = ExternalTaskSensor(
        task_id='wait_for_upstream_dag',
        external_dag_id='upstream_etl_pipeline',
        external_task_id='final_load_task',
        execution_date_fn=lambda dt: dt,  # same execution date
        poke_interval=120,
        timeout=7200,
        mode='reschedule',
    )

    @task
    def process_data(ds=None):
        print(f"Both dependencies met, processing {ds}")

    wait_for_s3_file >> wait_for_upstream_dag >> process_data()

# Custom sensor
class DataQualityGateSensor(BaseSensorOperator):
    def __init__(self, table_name: str, min_row_count: int, **kwargs):
        super().__init__(**kwargs)
        self.table_name = table_name
        self.min_row_count = min_row_count

    def poke(self, context) -> bool:
        """Return True when condition is met."""
        from airflow.providers.postgres.hooks.postgres import PostgresHook
        hook = PostgresHook(postgres_conn_id='warehouse')
        result = hook.get_first(
            f"SELECT COUNT(*) FROM {self.table_name} WHERE partition_date = '{context['ds']}'"
        )
        row_count = result[0]
        self.log.info(f"Current row count: {row_count}, required: {self.min_row_count}")
        return row_count >= self.min_row_count

sensor_dag = sensor_examples()

Q7. What is the difference between poke and reschedule mode in sensors?

from airflow.sensors.s3_key_sensor import S3KeySensor

# POKE mode (default): sensor holds worker slot continuously
# - Worker is blocked between pokes (sleeping)
# - Fast response when condition met
# - BAD for long waits: wastes worker slots
# - Use only if max wait is < 10 minutes
sensor_poke = S3KeySensor(
    task_id='wait_poke',
    bucket_key='s3://bucket/file.txt',
    poke_interval=60,    # check every 60 seconds
    timeout=600,         # max 10 minutes
    mode='poke',         # blocks worker continuously
)

# RESCHEDULE mode: sensor releases worker between pokes
# - Worker is freed after each poke attempt
# - Airflow reschedules sensor for next check
# - Scales well: 1000 sensors can wait without using 1000 workers
# - Slight delay: rescheduling overhead (a few seconds)
# - Use for all sensors that wait > a few minutes
sensor_reschedule = S3KeySensor(
    task_id='wait_reschedule',
    bucket_key='s3://bucket/file.txt',
    poke_interval=300,   # check every 5 minutes
    timeout=86400,       # max 24 hours
    mode='reschedule',   # releases worker between pokes -- PREFERRED
)

# DEFERRABLE operators (Airflow 2.2+): even more efficient
# Uses Airflow's Triggerer component
# Zero worker slot used while waiting (only tiny Triggerer thread)
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensorAsync

sensor_defer = S3KeySensorAsync(
    task_id='wait_deferred',
    bucket_key='s3://bucket/file.txt',
    poke_interval=300,
    # Releases worker entirely; Triggerer handles the wait
)

# Summary:
# poke: fast but inefficient (use only for < 5 min waits)
# reschedule: efficient for medium waits (5 min - 24 hours)
# deferrable: most efficient, requires Triggerer component

Q8. What is an Airflow hook? How does it differ from an operator?

from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.http.hooks.http import HttpHook
from airflow.decorators import task

# HOOK: thin wrapper around an external service connection
# Handles authentication, connection pooling, retry logic
# Provides raw access: execute SQL, read/write S3, call HTTP endpoints

# OPERATOR: higher-level abstraction that USES hooks internally
# Defines what a task DOES (the workflow step)
# Operator = Orchestration logic + Hook = Connection management

# Using hooks directly (more control)
@task
def fetch_from_postgres(ds=None) -> int:
    hook = PostgresHook(postgres_conn_id='warehouse_prod')
    # Execute query
    result = hook.get_first(
        f"SELECT COUNT(*) FROM orders WHERE order_date = '{ds}'"
    )
    return result[0]

@task
def write_to_s3(data: str, ds=None):
    s3_hook = S3Hook(aws_conn_id='aws_default')
    s3_hook.load_string(
        string_data=data,
        key=f"output/{ds}/result.json",
        bucket_name='my-bucket',
        replace=True,
    )

@task
def call_rest_api(ds=None) -> dict:
    http_hook = HttpHook(http_conn_id='my_api', method='POST')
    response = http_hook.run(
        endpoint='/api/v1/trigger',
        data={'date': ds},
        headers={'Content-Type': 'application/json'},
    )
    return response.json()

# Custom hook example
from airflow.hooks.base import BaseHook
import requests

class MyApiHook(BaseHook):
    def __init__(self, conn_id: str):
        super().__init__()
        self.conn_id = conn_id

    def get_client(self):
        conn = self.get_connection(self.conn_id)
        session = requests.Session()
        session.headers.update({'Authorization': f'Bearer {conn.password}'})
        return session

MEDIUM: Scheduling and Pipeline Design (Questions 9-18)

Q9. What is idempotency in Airflow? Why is it critical?

from airflow.decorators import dag, task
from datetime import datetime

# Idempotency: running the same DAG run multiple times produces the same result
# WHY: Airflow may re-run tasks due to failures, manual re-triggers, or backfills

@dag(schedule_interval='@daily', start_date=datetime(2026, 1, 1), catchup=False)
def idempotent_pipeline():

    # NON-IDEMPOTENT: WRONG
    @task
    def bad_insert(ds=None):
        # Running this twice inserts duplicate rows!
        sql = f"INSERT INTO daily_sales VALUES ('{ds}', 100)"
        # If this task reruns, you get duplicate data

    # IDEMPOTENT: CORRECT -- delete-then-insert pattern
    @task
    def good_upsert(ds=None):
        from airflow.providers.postgres.hooks.postgres import PostgresHook
        hook = PostgresHook('warehouse')
        # First delete existing data for this date
        hook.run(f"DELETE FROM daily_sales WHERE sales_date = '{ds}'")
        # Then insert fresh data
        hook.run(f"INSERT INTO daily_sales SELECT * FROM stg_sales WHERE dt = '{ds}'")

    # IDEMPOTENT: even better -- UPSERT / MERGE
    @task
    def best_upsert(ds=None):
        from airflow.providers.postgres.hooks.postgres import PostgresHook
        hook = PostgresHook('warehouse')
        hook.run(f"""
            INSERT INTO daily_sales (date, revenue)
            SELECT '{ds}', SUM(amount) FROM orders WHERE order_date = '{ds}'
            ON CONFLICT (date) DO UPDATE SET revenue = EXCLUDED.revenue
        """)

    # IDEMPOTENT: writing to partitioned storage (overwrite same partition)
    @task
    def write_to_s3_idempotent(ds=None):
        # S3 key includes execution date -- re-running overwrites same file
        output_path = f"s3://bucket/processed/date={ds}/data.parquet"
        # Write data to this path (overwriting if exists)
        return output_path

    best_upsert()

dag_instance = idempotent_pipeline()

Q10. How does Airflow backfilling work?

# Backfill: run a DAG for historical dates
# Triggered via CLI or UI when catchup=True or manual backfill

# CLI backfill:
# airflow dags backfill -s 2026-01-01 -e 2026-01-31 my_dag_id

# In-code catchup (not recommended for most production DAGs)
from airflow import DAG
from datetime import datetime

dag = DAG(
    'sales_pipeline',
    start_date=datetime(2026, 1, 1),
    schedule_interval='@daily',
    catchup=True,   # Will create runs for all dates from start_date to now
    max_active_runs=5,  # Max concurrent backfill runs
)

# With catchup=False (recommended):
dag_no_catchup = DAG(
    'sales_pipeline_safe',
    start_date=datetime(2026, 1, 1),
    schedule_interval='@daily',
    catchup=False,  # Only future scheduled runs; backfill manually if needed
)

# Backfill considerations:
# 1. Idempotency: tasks must be idempotent (running twice = same result)
# 2. Rate limiting: max_active_runs prevents overloading downstream systems
# 3. Concurrency: backfill runs in parallel up to max_active_runs
# 4. Dependencies: ExternalTaskSensor may cause issues if upstream hasn't backfilled

# Partial backfill (specific tasks only):
# airflow tasks run sales_pipeline extract_task 2026-01-15 --force

# Airflow 2.x: "Clear" vs "Backfill"
# Clear: re-run existing DAG run instances (marks tasks as failed to re-trigger)
# Backfill: create new DAG run instances for dates without existing runs

Q11. How do you create dynamic DAGs in Airflow?

from airflow.decorators import dag, task
from airflow.operators.python import PythonOperator
from datetime import datetime

# Method 1: Dynamic task mapping (Airflow 2.3+) -- RECOMMENDED
@dag(schedule_interval='@daily', start_date=datetime(2026, 1, 1), catchup=False)
def dynamic_task_mapping():

    @task
    def get_tables() -> list:
        """Return list of tables to process."""
        return ['orders', 'users', 'products', 'events']

    @task
    def process_table(table_name: str, ds=None):
        """Process one table -- runs once per table."""
        print(f"Processing {table_name} for {ds}")
        return {"table": table_name, "status": "done"}

    tables = get_tables()
    # expand() creates one task instance per element
    results = process_table.expand(table_name=tables)

    @task
    def summarize(results: list):
        print(f"Processed {len(results)} tables")

    summarize(results)

# Method 2: Dynamic DAG generation (factory pattern)
# Generate multiple DAGs from config
import yaml

def create_pipeline_dag(config: dict):
    """Factory function: create one DAG per pipeline config."""

    @dag(
        dag_id=f"pipeline_{config['name']}",
        schedule_interval=config.get('schedule', '@daily'),
        start_date=datetime(2026, 1, 1),
        catchup=False,
        tags=[config['team']],
    )
    def pipeline():
        @task
        def extract():
            return f"Extracted from {config['source']}"

        @task
        def load(data: str):
            print(f"Loaded to {config['destination']}: {data}")

        load(extract())

    return pipeline()

# Load configs from YAML
configs = [
    {'name': 'sales', 'source': 'sales_db', 'destination': 'warehouse', 'team': 'analytics'},
    {'name': 'events', 'source': 'kafka', 'destination': 's3', 'team': 'data-eng'},
]

# Create DAG objects -- Airflow discovers them as module-level variables
for cfg in configs:
    globals()[f"pipeline_{cfg['name']}"] = create_pipeline_dag(cfg)

dag_instance = dynamic_task_mapping()

Q12. What are the different Airflow executors? Which to use when?

Executor: component that determines HOW tasks are actually run

1. SequentialExecutor (default for local dev):
   - Runs one task at a time
   - Uses SQLite as metadata DB
   - NEVER use in production
   - Good for: development, testing

2. LocalExecutor:
   - Runs tasks in parallel local processes
   - Requires PostgreSQL or MySQL metadata DB
   - Good for: single-machine production setups, small teams
   - Limit: bounded by single machine's CPU/memory

3. CeleryExecutor:
   - Distributed task queue (Celery + Redis/RabbitMQ)
   - Multiple worker nodes, each running Celery workers
   - Good for: medium-large scale (10s of workers)
   - Complex: requires Celery, Redis, monitoring

4. KubernetesExecutor:
   - Each task runs in its own Kubernetes pod
   - Pods created on task start, destroyed on finish
   - Perfect isolation, no idle worker cost
   - Good for: variable workloads, large scale, diverse task requirements
   - Slight overhead: pod startup time (~30s)

5. CeleryKubernetesExecutor:
   - Hybrid: short/fast tasks via Celery, long/heavy tasks via Kubernetes

6. LocalKubernetesExecutor (Airflow 2.3+):
   - Mix of local and Kubernetes tasks per DAG
# airflow.cfg / environment configuration
# executor = KubernetesExecutor

# Per-task Kubernetes configuration
from airflow.decorators import task
from kubernetes.client import models as k8s

@task(
    executor_config={
        "pod_override": k8s.V1Pod(
            spec=k8s.V1PodSpec(
                containers=[
                    k8s.V1Container(
                        name="base",
                        resources=k8s.V1ResourceRequirements(
                            requests={"cpu": "2", "memory": "8Gi"},
                            limits={"cpu": "4", "memory": "16Gi"}
                        )
                    )
                ]
            )
        )
    }
)
def heavy_computation_task():
    """This task gets 4 CPUs and 16GB RAM on Kubernetes."""
    pass

Q13. How do you pass parameters to a DAG at runtime?

from airflow.decorators import dag, task
from airflow.models.param import Param
from datetime import datetime

@dag(
    schedule_interval=None,  # triggered manually or via API
    start_date=datetime(2026, 1, 1),
    catchup=False,
    params={
        'environment': Param(
            default='staging',
            type='string',
            enum=['staging', 'production'],
            description='Target environment for the pipeline',
        ),
        'source_table': Param(
            default='orders',
            type='string',
        ),
        'max_rows': Param(
            default=1000,
            type='integer',
            minimum=1,
        ),
    }
)
def parameterized_pipeline():

    @task
    def process(params=None):
        env = params['environment']
        table = params['source_table']
        max_rows = params['max_rows']
        print(f"Processing {table} in {env}, max {max_rows} rows")

    process()

# Trigger with custom params via CLI:
# airflow dags trigger parameterized_pipeline \
#   --conf '{"environment": "production", "source_table": "users", "max_rows": 50000}'

# Via REST API:
# POST /api/v1/dags/parameterized_pipeline/dagRuns
# {"conf": {"environment": "production", "source_table": "users"}}

dag_instance = parameterized_pipeline()

Q14. How do you handle failures and retries in Airflow?

from airflow.decorators import dag, task
from airflow.utils.email import send_email
from datetime import datetime, timedelta

def alert_on_failure(context):
    """Custom failure callback."""
    dag_id = context['dag'].dag_id
    task_id = context['task_instance'].task_id
    execution_date = context['execution_date']
    log_url = context['task_instance'].log_url

    send_email(
        to=['[email protected]'],
        subject=f'Airflow FAILURE: {dag_id}.{task_id}',
        html_content=f"""
        <b>DAG:</b> {dag_id}<br>
        <b>Task:</b> {task_id}<br>
        <b>Execution date:</b> {execution_date}<br>
        <b>Log URL:</b> <a href="{log_url}">View logs</a>
        """
    )

@dag(
    schedule_interval='@daily',
    start_date=datetime(2026, 1, 1),
    catchup=False,
    default_args={
        'retries': 3,
        'retry_delay': timedelta(minutes=5),
        'retry_exponential_backoff': True,   # 5min, 10min, 20min
        'max_retry_delay': timedelta(hours=1),
        'on_failure_callback': alert_on_failure,
        'on_retry_callback': alert_on_failure,
        'execution_timeout': timedelta(hours=2),  # kill task if it takes > 2 hours
    }
)
def robust_pipeline():

    @task(
        retries=5,              # override DAG-level default for this task
        retry_delay=timedelta(seconds=30),
    )
    def flaky_api_call():
        import requests
        response = requests.get('https://api.example.com/data', timeout=30)
        response.raise_for_status()
        return response.json()

    @task(
        on_failure_callback=alert_on_failure,
        trigger_rule='all_done',  # run even if upstream failed
    )
    def cleanup(ds=None):
        """Always runs: cleanup temp files regardless of pipeline success."""
        print(f"Cleaning up temp files for {ds}")

    data = flaky_api_call()
    cleanup()

dag_instance = robust_pipeline()

Q15. What are trigger rules in Airflow?

from airflow.decorators import dag, task
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime

@dag(schedule_interval='@daily', start_date=datetime(2026, 1, 1), catchup=False)
def trigger_rule_demo():

    @task
    def task_a(): return "A done"

    @task
    def task_b(): return "B done"

    @task(trigger_rule=TriggerRule.ALL_SUCCESS)  # DEFAULT: all upstream must succeed
    def all_success(a, b):
        print("Runs only if both A and B succeeded")

    @task(trigger_rule=TriggerRule.ONE_SUCCESS)
    def one_success(ds=None):
        print("Runs if at least one upstream succeeded")

    @task(trigger_rule=TriggerRule.ALL_DONE)
    def all_done(ds=None):
        print("Runs when all upstream tasks are done (regardless of success/fail)")

    @task(trigger_rule=TriggerRule.ALL_FAILED)
    def on_failure_notification(ds=None):
        print("Runs ONLY IF all upstream tasks failed -- use for failure-path cleanup")

    @task(trigger_rule=TriggerRule.NONE_FAILED)
    def none_failed(ds=None):
        print("Runs if no upstream tasks failed (succeeds or were skipped)")

    # Common patterns:
    # ALL_SUCCESS: default, most common -- all must succeed
    # ALL_DONE: cleanup tasks that should always run (even after failure)
    # NONE_FAILED: for optional branches -- allows skipped tasks to pass through
    # ONE_SUCCESS: fan-in after parallel tasks where any one passing is enough

    a = task_a()
    b = task_b()
    all_success(a, b)
    all_done()
    on_failure_notification()

dag_instance = trigger_rule_demo()

Q16. How do you implement branching in Airflow DAGs?

from airflow.decorators import dag, task
from airflow.operators.python import BranchPythonOperator
from airflow.operators.empty import EmptyOperator
from datetime import datetime

@dag(schedule_interval='@daily', start_date=datetime(2026, 1, 1), catchup=False)
def branching_pipeline():

    # Method 1: BranchPythonOperator (returns task_id(s) to follow)
    def choose_path(**context):
        """Return task_id to execute based on day of week."""
        day_of_week = context['execution_date'].day_of_week  # 0=Monday, 6=Sunday
        if day_of_week == 0:  # Monday
            return ['weekly_full_load', 'send_weekly_report']  # can return list
        else:
            return 'daily_incremental_load'

    branch = BranchPythonOperator(
        task_id='choose_load_strategy',
        python_callable=choose_path,
    )

    full_load = EmptyOperator(task_id='weekly_full_load')
    weekly_report = EmptyOperator(task_id='send_weekly_report')
    incremental = EmptyOperator(task_id='daily_incremental_load')

    # Join branches -- trigger_rule=NONE_FAILED allows skipped branch to pass
    join = EmptyOperator(task_id='join', trigger_rule='none_failed_min_one_success')

    branch >> [full_load, incremental]
    full_load >> weekly_report
    [full_load, weekly_report, incremental] >> join

    # Method 2: TaskFlow with @task.branch decorator
    @task.branch
    def choose_environment(ds=None):
        import pendulum
        date = pendulum.parse(ds)
        return 'load_to_prod' if date.day_of_week == 0 else 'load_to_staging'

    @task
    def load_to_prod():
        print("Loading to production")

    @task
    def load_to_staging():
        print("Loading to staging")

    choice = choose_environment()
    choice >> [load_to_prod(), load_to_staging()]

dag_instance = branching_pipeline()

Q17. How do you set up cross-DAG dependencies?

from airflow.sensors.external_task import ExternalTaskSensor
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.decorators import dag, task
from datetime import datetime, timedelta

@dag(
    dag_id='upstream_data_pipeline',
    schedule_interval='@daily',
    start_date=datetime(2026, 1, 1),
    catchup=False,
)
def upstream_pipeline():
    @task
    def final_task(ds=None):
        print(f"Upstream done for {ds}")
    final_task()

@dag(
    dag_id='downstream_analytics',
    schedule_interval='@daily',
    start_date=datetime(2026, 1, 1),
    catchup=False,
)
def downstream_pipeline():

    # Method 1: ExternalTaskSensor -- wait for upstream task to complete
    wait_for_upstream = ExternalTaskSensor(
        task_id='wait_for_data_pipeline',
        external_dag_id='upstream_data_pipeline',
        external_task_id='final_task',
        execution_date_fn=lambda dt: dt,  # same execution_date
        timeout=3600,
        poke_interval=120,
        mode='reschedule',
    )

    @task
    def run_analytics(ds=None):
        print(f"Running analytics for {ds}")

    wait_for_upstream >> run_analytics()

@dag(
    dag_id='orchestrator_dag',
    schedule_interval='@daily',
    start_date=datetime(2026, 1, 1),
    catchup=False,
)
def orchestrator():
    # Method 2: TriggerDagRunOperator -- actively trigger downstream DAG
    trigger_downstream = TriggerDagRunOperator(
        task_id='trigger_downstream_analytics',
        trigger_dag_id='downstream_analytics',
        conf={'triggered_by': 'orchestrator_dag'},
        wait_for_completion=True,  # block until triggered DAG completes
        reset_dag_run=True,        # re-trigger even if DAG run already exists
    )

    @task
    def upstream_work():
        print("Upstream complete, triggering downstream")

    upstream_work() >> trigger_downstream

up_dag = upstream_pipeline()
down_dag = downstream_pipeline()
orch_dag = orchestrator()

Q18. How do you implement SLA monitoring in Airflow?

from airflow import DAG
from airflow.decorators import task
from airflow.utils.email import send_email
from datetime import datetime, timedelta

def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
    """Called when an SLA is missed."""
    missed_tasks = [str(sla.task_id) for sla in slas]
    send_email(
        to=['[email protected]', '[email protected]'],
        subject=f'SLA MISS: {dag.dag_id} - {", ".join(missed_tasks)}',
        html_content=f"""
        <h2>SLA Missed</h2>
        <p><b>DAG:</b> {dag.dag_id}</p>
        <p><b>Missed tasks:</b> {", ".join(missed_tasks)}</p>
        <p><b>SLA defined as:</b> task must complete within configured time</p>
        """
    )

with DAG(
    'critical_pipeline',
    schedule_interval='0 8 * * *',  # 8 AM UTC daily
    start_date=datetime(2026, 1, 1),
    catchup=False,
    sla_miss_callback=sla_miss_callback,
    default_args={'owner': 'data-team'},
) as dag:

    from airflow.operators.python import PythonOperator

    def extract():
        print("Extracting data")

    def transform():
        print("Transforming data")

    def load():
        print("Loading data")

    extract_task = PythonOperator(
        task_id='extract',
        python_callable=extract,
        sla=timedelta(minutes=30),  # must complete within 30 min of dag scheduled time
    )

    transform_task = PythonOperator(
        task_id='transform',
        python_callable=transform,
        sla=timedelta(hours=1),   # must complete within 1 hour
    )

    load_task = PythonOperator(
        task_id='load',
        python_callable=load,
        sla=timedelta(hours=2),   # final result within 2 hours of 8 AM = by 10 AM
    )

    extract_task >> transform_task >> load_task

HARD: Production Patterns (Questions 19-25)

Q19. How do you design an ML pipeline DAG in Airflow?

from airflow.decorators import dag, task
from airflow.providers.amazon.aws.operators.sagemaker import (
    SageMakerProcessingOperator, SageMakerTrainingOperator
)
from datetime import datetime, timedelta

@dag(
    dag_id='ml_training_pipeline',
    schedule_interval='@weekly',  # retrain weekly
    start_date=datetime(2026, 1, 1),
    catchup=False,
    default_args={'retries': 1, 'retry_delay': timedelta(minutes=10)},
    tags=['ml', 'weekly'],
)
def ml_training_pipeline():

    @task
    def check_data_quality(ds=None) -> dict:
        """Gate: ensure training data meets quality standards."""
        from airflow.providers.postgres.hooks.postgres import PostgresHook
        hook = PostgresHook('warehouse')
        row_count = hook.get_first(
            f"SELECT COUNT(*) FROM features WHERE week_ending = '{ds}'"
        )[0]
        if row_count < 10_000:
            raise ValueError(f"Insufficient training data: {row_count} rows (min: 10,000)")
        return {"rows": row_count, "quality_passed": True}

    @task
    def prepare_training_data(quality_check: dict, ds=None) -> str:
        """Extract and split data for training."""
        output_path = f"s3://ml-bucket/training-data/{ds}/"
        # Run Spark job to prepare features
        return output_path

    @task
    def train_model(data_path: str, ds=None) -> str:
        """Submit training job to SageMaker / custom training cluster."""
        # Example: trigger a training job
        model_artifact_path = f"s3://ml-bucket/models/{ds}/model.pkl"
        # ... training logic ...
        return model_artifact_path

    @task
    def evaluate_model(model_path: str, ds=None) -> dict:
        """Evaluate model on holdout set."""
        # Run evaluation and return metrics
        metrics = {"auc": 0.85, "f1": 0.82}
        if metrics["auc"] < 0.80:
            raise ValueError(f"Model AUC {metrics['auc']} below threshold 0.80")
        return metrics

    @task
    def promote_model(metrics: dict, model_path: str, ds=None):
        """Deploy model to production if evaluation passes."""
        print(f"Promoting model from {model_path} with AUC {metrics['auc']}")
        # Register in MLflow model registry
        # Update serving endpoint
        pass

    # Pipeline
    quality = check_data_quality()
    data = prepare_training_data(quality)
    model = train_model(data)
    metrics = evaluate_model(model)
    promote_model(metrics, model)

dag_instance = ml_training_pipeline()

Q20. How do you optimize Airflow performance at scale?

# Airflow performance tuning for 1000+ DAGs and 10,000+ task instances/day

# 1. Scheduler tuning (airflow.cfg)
scheduler_config = """
[scheduler]
# How many DAGs to parse per loop
min_file_process_interval = 30    # default 30s -- increase for stable environments
dag_discovery_safe_mode = False   # faster discovery
max_active_tasks_per_dag = 16     # limit parallelism per DAG
max_active_runs_per_dag = 3       # limit concurrent DAG runs

[core]
parallelism = 256                 # total concurrent tasks across all DAGs
max_active_tasks_per_dag = 16
"""

# 2. Database performance
# - Use PostgreSQL (never SQLite in production)
# - Enable connection pooling (PgBouncer recommended)
# - Regular cleanup: airflow db clean --clean-before-timestamp 2025-01-01

# 3. DAG design for performance
from airflow.decorators import dag, task
from datetime import datetime

@dag(
    schedule_interval='@daily',
    start_date=datetime(2026, 1, 1),
    max_active_tasks=32,      # parallelism within this DAG
    concurrency=32,           # same as max_active_tasks
)
def optimized_dag():
    # Keep DAG file lean: no I/O or compute in DAG file (only task definitions)
    # BAD: network calls at DAG parse time
    # import requests; api_data = requests.get('https://api.com').json()

    # GOOD: all I/O inside tasks
    @task
    def task_with_io():
        import requests
        return requests.get('https://api.com').json()

    # Avoid large Python objects in XCom -- pass file paths instead
    @task
    def good_xcom() -> str:
        return "s3://bucket/result.parquet"  # small metadata, not data

dag_instance = optimized_dag()

# 4. KubernetesExecutor: eliminates worker idle time
# 5. Pool management: prevent any one team from exhausting all slots
# airflow pools create external_api_pool 5 "Max 5 concurrent API calls"
# @task(pool='external_api_pool', pool_slots=1)

# 6. Trigger rules: use NONE_FAILED for fan-in to avoid skipped task blocking

Q21. How do you implement data quality checks in Airflow?

from airflow.decorators import dag, task
from airflow.providers.common.sql.operators.sql import SQLCheckOperator, SQLValueCheckOperator
from datetime import datetime

@dag(schedule_interval='@daily', start_date=datetime(2026, 1, 1), catchup=False)
def pipeline_with_dq_checks():

    @task
    def load_raw_data(ds=None) -> str:
        return f"s3://bucket/raw/{ds}/data.parquet"

    # Great Expectations integration
    @task
    def run_ge_checkpoint(data_path: str, ds=None) -> bool:
        import great_expectations as ge
        context = ge.get_context()
        result = context.run_checkpoint(
            checkpoint_name="orders_checkpoint",
            batch_request={"batch_data": data_path},
            run_name_template=f"airflow_{ds}"
        )
        if not result["success"]:
            failed_expectations = [
                exp["expectation_config"]["expectation_type"]
                for exp in result["results"]
                if not exp["success"]
            ]
            raise ValueError(f"Data quality failed: {failed_expectations}")
        return True

    # SQL-based checks
    check_row_count = SQLCheckOperator(
        task_id='check_row_count',
        conn_id='warehouse',
        sql="""
            SELECT COUNT(*) > 0
            FROM orders
            WHERE order_date = '{{ ds }}'
        """,
    )

    check_revenue = SQLValueCheckOperator(
        task_id='check_revenue_range',
        conn_id='warehouse',
        sql="SELECT AVG(amount) FROM orders WHERE order_date = '{{ ds }}'",
        pass_value=100.0,    # expected value
        tolerance=0.5,       # allow 50% deviation
    )

    # Custom assertion task
    @task
    def assert_no_nulls(ds=None):
        from airflow.providers.postgres.hooks.postgres import PostgresHook
        hook = PostgresHook('warehouse')
        null_count = hook.get_first(
            f"SELECT COUNT(*) FROM orders WHERE order_date = '{ds}' AND user_id IS NULL"
        )[0]
        if null_count > 0:
            raise AssertionError(f"Found {null_count} orders with NULL user_id")

    raw = load_raw_data()
    ge_ok = run_ge_checkpoint(raw)
    check_row_count >> check_revenue >> assert_no_nulls()

dag_instance = pipeline_with_dq_checks()

Q22. What is the Airflow REST API? How do you use it for automation?

import requests
import json
from datetime import datetime

# Airflow 2.0+ includes a stable REST API (OpenAPI spec)
AIRFLOW_BASE_URL = "http://airflow-webserver:8080/api/v1"
AUTH = ("admin", "admin")  # use service account in production

# List DAGs
def list_dags():
    response = requests.get(f"{AIRFLOW_BASE_URL}/dags", auth=AUTH)
    return response.json()

# Trigger a DAG run
def trigger_dag(dag_id: str, conf: dict = None, logical_date: str = None):
    payload = {"conf": conf or {}}
    if logical_date:
        payload["logical_date"] = logical_date
    response = requests.post(
        f"{AIRFLOW_BASE_URL}/dags/{dag_id}/dagRuns",
        auth=AUTH,
        json=payload,
        headers={"Content-Type": "application/json"},
    )
    response.raise_for_status()
    return response.json()

# Get DAG run status
def get_dag_run_status(dag_id: str, dag_run_id: str) -> str:
    response = requests.get(
        f"{AIRFLOW_BASE_URL}/dags/{dag_id}/dagRuns/{dag_run_id}",
        auth=AUTH,
    )
    return response.json()["state"]

# Clear a task (re-run it)
def clear_task(dag_id: str, task_id: str, dag_run_id: str):
    response = requests.post(
        f"{AIRFLOW_BASE_URL}/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/clear",
        auth=AUTH,
        json={},
    )
    return response.json()

# Example: poll until DAG completes (for triggering from CI/CD)
import time

run_info = trigger_dag('ml_training_pipeline', conf={'environment': 'production'})
run_id = run_info['dag_run_id']

while True:
    status = get_dag_run_status('ml_training_pipeline', run_id)
    print(f"Status: {status}")
    if status in ['success', 'failed']:
        break
    time.sleep(30)

if status == 'failed':
    raise RuntimeError("ML training pipeline failed!")

Q23. How do you handle secrets in Airflow?

# Airflow Connections: store credentials in Connections (encrypted in DB)
# Access via hooks with conn_id

# Option 1: Airflow Connections (default)
from airflow.providers.postgres.hooks.postgres import PostgresHook

@task
def use_connection():
    hook = PostgresHook(postgres_conn_id='my_postgres_conn')
    # Credentials loaded from Airflow Connections -- never hardcoded

# Option 2: HashiCorp Vault (production recommended)
# In airflow.cfg:
# [secrets]
# backend = airflow.providers.hashicorp.secrets.vault.VaultBackend
# backend_kwargs = {"connections_path": "connections", "variables_path": "variables",
#                   "url": "http://vault:8200", "auth_type": "approle"}

# Option 3: AWS Secrets Manager
# backend = airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend
# backend_kwargs = {"connections_prefix": "airflow/connections",
#                   "variables_prefix": "airflow/variables"}

# Option 4: Airflow Variables (for non-sensitive config)
from airflow.models import Variable

@task
def use_variable():
    # NOT for sensitive data -- Variables are not encrypted by default
    batch_size = int(Variable.get("pipeline_batch_size", default_var=1000))
    return batch_size

# Best practices:
# - Never hardcode credentials in DAG files
# - Use Connections for DB/API credentials
# - Use Vault/Secrets Manager for production
# - Rotate credentials via Connection update (CLI or UI)
# - airflow connections add my_conn --conn-type postgres --conn-host db --conn-login user

Q24. What is Airflow's best practice for DAG file organization?

Project structure for production Airflow:

airflow_dags/
  dags/                          # DAG files (Airflow watches this directory)
    sales/
      daily_sales_pipeline.py
      weekly_sales_report.py
    ml/
      model_training.py
      model_serving.py
    ingestion/
      source_a_ingestion.py
  plugins/                       # Custom operators, hooks, sensors
    operators/
      custom_s3_operator.py
    hooks/
      custom_api_hook.py
    sensors/
      data_quality_sensor.py
  dags/utils/                    # Shared utilities (imported by DAGs)
    data_quality.py
    notification.py
    constants.py
  tests/                         # Unit tests for DAGs
    test_sales_pipeline.py
  requirements.txt
  Dockerfile
# DAG file best practices:

# 1. No heavy computation at module level (slows DAG parsing)
# BAD:
# expensive_config = requests.get('https://api.com/config').json()  # parsed every 30s!

# GOOD: lazy load inside tasks
@task
def get_config():
    import requests
    return requests.get('https://api.com/config').json()

# 2. Use TYPE_CHECKING for imports that shouldn't be in module scope
from typing import TYPE_CHECKING
if TYPE_CHECKING:
    from airflow.models import DAG

# 3. Keep dag file imports minimal
# Use lazy imports inside tasks for heavy libraries
@task
def run_spark():
    from pyspark.sql import SparkSession  # imported only when task runs
    spark = SparkSession.builder.getOrCreate()

# 4. Version DAGs properly
dag = DAG(
    'sales_pipeline_v3',  # version in ID when breaking changes
    tags=['sales', 'v3', 'production'],
)

# 5. Document DAGs
@dag(doc_md="""
# Sales Pipeline

Processes daily sales data from Salesforce to BigQuery.

## Schedule
Runs at 6 AM UTC daily.

## SLA
Data must be in BigQuery by 9 AM UTC.
""")
def well_documented_pipeline():
    pass

Q25. Design a complete production Airflow pipeline for a daily ML feature store update.

from airflow.decorators import dag, task
from airflow.sensors.s3_key_sensor import S3KeySensor
from airflow.operators.empty import EmptyOperator
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime, timedelta

@dag(
    dag_id='feature_store_daily_update',
    schedule_interval='0 4 * * *',  # 4 AM UTC (after upstream pipelines finish)
    start_date=datetime(2026, 1, 1),
    catchup=False,
    max_active_runs=1,  # prevent concurrent runs
    default_args={
        'retries': 2,
        'retry_delay': timedelta(minutes=5),
        'execution_timeout': timedelta(hours=3),
    },
    tags=['ml', 'features', 'daily'],
)
def feature_store_update():

    # Gate 1: Wait for upstream data
    wait_for_orders = S3KeySensor(
        task_id='wait_for_orders',
        bucket_key='s3://data-lake/orders/date={{ ds }}/success',
        mode='reschedule',
        poke_interval=300,
        timeout=7200,
    )

    wait_for_events = S3KeySensor(
        task_id='wait_for_events',
        bucket_key='s3://data-lake/events/date={{ ds }}/success',
        mode='reschedule',
        poke_interval=300,
        timeout=7200,
    )

    # Gate 2: Data quality validation
    @task
    def validate_inputs(ds=None) -> bool:
        from airflow.providers.postgres.hooks.postgres import PostgresHook
        hook = PostgresHook('warehouse')
        counts = hook.get_first(
            f"SELECT COUNT(*) FROM orders WHERE order_date = '{ds}'"
        )[0]
        if counts < 100:
            raise ValueError(f"Only {counts} orders found for {ds}")
        return True

    # Feature computation
    @task
    def compute_user_features(validated: bool, ds=None) -> str:
        output = f"s3://feature-store/user_features/date={ds}/"
        # Spark job for feature computation
        print(f"Computing user features for {ds} -> {output}")
        return output

    @task
    def compute_item_features(validated: bool, ds=None) -> str:
        output = f"s3://feature-store/item_features/date={ds}/"
        print(f"Computing item features for {ds} -> {output}")
        return output

    # Load to online feature store (Redis)
    @task
    def load_to_online_store(user_features: str, item_features: str, ds=None):
        print(f"Loading {user_features} and {item_features} to Redis")
        # Load latest features to Redis for real-time serving
        pass

    # Validate loaded features
    @task
    def validate_online_store(ds=None) -> bool:
        print("Validating online feature store completeness")
        return True

    # Cleanup and notification
    @task(trigger_rule=TriggerRule.ALL_DONE)
    def cleanup(ds=None):
        print(f"Cleanup temp files for {ds}")

    # Wire up the pipeline
    [wait_for_orders, wait_for_events] >> validate_inputs()
    v = validate_inputs()
    uf = compute_user_features(v)
    imf = compute_item_features(v)
    load = load_to_online_store(uf, imf)
    load >> validate_online_store() >> cleanup()

dag_instance = feature_store_update()

FAQ

Q: What is the difference between Airflow 1.x and 2.x in interviews? A: Airflow 2.x introduced the TaskFlow API (@task decorator, automatic XCom), the new highly available scheduler, improved REST API, dynamic task mapping, and deferrable operators. Interviews in 2026 expect Airflow 2.x knowledge. Key concepts unique to 2.x: @dag/@task decorators, .expand() for dynamic tasks, and the Triggerer component for deferrable operators. Confirm the specific version on the official company careers portal.

Q: What is the most common Airflow anti-pattern in interviews? A: Using datetime.now() inside tasks (not idempotent -- breaks backfills) and performing heavy I/O or computation at module level in the DAG file (slows scheduler parsing). Interviewers specifically probe whether candidates understand why tasks must use execution_date context variables instead of current time. Candidates from public preparation resources consistently report these as the most-asked design principles.

Q: What managed Airflow offerings should I know about? A: MWAA (AWS Managed Workflows for Apache Airflow), Cloud Composer (GCP), and Astronomer are the three major managed Airflow platforms. Core Airflow concepts are identical; managed platforms differ in scaling, networking, and integration with cloud services. Confirm the specific platform on the official company job description or interview prep guide.

Methodology applied to this articlelast verified 8 Jun 2026
Sources used
Public exam-pattern documents, official recruiter pages, and verified candidate reports on r/developersIndia and LinkedIn.
Verification window
Page last edited 8 Jun 2026 by Aditya Sharma. Numbers and patterns sanity-checked against the most recent 2026 cycle drives we tracked.
What we did NOT do
  • No fabricated salary numbers or success rates. If we quote a range, it's sourced.
  • No noun-substituted templates. This article was not generated by swapping company names in a stock prompt.
  • No paid placements, sponsored coaching links, or affiliate-shilled course pushes.
Verification policy: /editorial-standards/. Found something incorrect? Submit a correction - we respond within 48 hours.

Explore this topic cluster

More resources in Interview Questions

Use the category hub to browse similar questions, exam patterns, salary guides, and preparation resources related to this topic.

Paid contributor programme

Sat this this year? Share your story, earn ₹500.

First-person experience reports help future candidates prep smarter. We pay verified contributors ₹500 via UPI per accepted story - with byline.

Submit your story →

Ready to practice?

Take a free timed mock test

Put what you learned into practice. Our mock tests match the 2026 pattern with timer, navigator, reveal, and score breakdown. No signup.

Start Free Mock Test →

Related Articles

More from PapersAdda

Share this guide: