Airflow Interview Questions 2026: 25 Answers with Code

What changed in 2026 drives
Mass-recruiter offer letters are flatter for 2026 batch - the 4-5 LPA ASE band has barely budged in three years while inflation eats real wages. Premium tracks (Digital, Pro, Elite, Specialist) are still where the differential lives, and they are entirely test-driven. If you are aiming higher than the default offer, the coding round is not optional pageantry - it is the entire interview.
What I'd actually study for this
- 01Two solid coding-round answers (1 medium-hard DSA each, with edge-case discussion) > five half-baked ones
- 02One real project you can defend end-to-end - file paths, design decisions, and what you would change
- 03One DBMS schema you actually built (not a textbook ER diagram), with at least 3 join-heavy queries written from memory
- 04Three behavioural STAR stories: failure recovered, conflict handled, ownership taken
Where most candidates trip up
The single biggest mistake is treating company-specific guides as primary prep and DSA as secondary. It is the opposite. Mass recruiters use the test as a filter, but premium tracks at every IT services company use coding to allocate offer band. Spend 70% of prep time on DSA + system fundamentals, 20% on company-specific patterns, 10% on HR rehearsal. Reverse that ratio and you collect the default offer.
Editorial commentary by Aditya Sharma · written for PapersAdda · not generated, not aggregated.
Apache Airflow is the dominant workflow orchestration platform for data engineering, and Airflow proficiency is expected at every data engineering interview in 2026. This guide covers 25 Apache Airflow interview questions with full code answers, from DAG fundamentals to production patterns.
PapersAdda's take: Candidates report that DAG idempotency and XCom design questions are the most common Airflow interview topics. The most common elimination question: "What happens when you re-run a DAG run for a past date?" (answer: it should produce the same output -- idempotency). Confirm the specific Airflow version and deployment (MWAA, Cloud Composer, self-hosted) expected on the official company careers portal before you prepare.
Related articles: Apache Spark Interview Questions 2026 | Kafka Interview Questions 2026 | Data Engineering Interview Questions 2026 | MLOps Interview Questions 2026 | Hadoop Interview Questions 2026
EASY: DAG Fundamentals (Questions 1-8)
Q1. What is Apache Airflow? What problems does it solve?
Airflow is a platform to programmatically author, schedule, and monitor data workflows. It solves the problem of orchestrating complex multi-step data pipelines where:
- Steps have dependencies (Task B must run after Task A completes).
- Steps need to run on a schedule (daily, hourly, weekly).
- Failures need retry logic and alerting.
- Historical re-runs (backfills) are needed.
- Visibility into pipeline status is required.
Key concepts:
- DAG (Directed Acyclic Graph): a collection of tasks with dependencies, no cycles.
- Task: a unit of work (run a SQL query, call an API, run a Spark job).
- Operator: defines what a task does (PythonOperator, BashOperator, SparkSubmitOperator).
- Scheduler: monitors DAGs and triggers task instances when dependencies are met.
- Executor: actually runs the tasks (SequentialExecutor, LocalExecutor, CeleryExecutor, KubernetesExecutor).
Q2. What is a DAG in Airflow? What are the key properties?
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
# DAG definition
dag = DAG(
dag_id='daily_sales_pipeline',
description='Process and aggregate daily sales data',
schedule_interval='0 6 * * *', # 6 AM UTC daily (cron syntax)
start_date=datetime(2026, 1, 1), # first run date
end_date=None, # run indefinitely
default_args={
'owner': 'data-team',
'retries': 3,
'retry_delay': timedelta(minutes=5),
'email_on_failure': True,
'email': ['[email protected]'],
},
catchup=False, # IMPORTANT: don't backfill all missed runs since start_date
max_active_runs=1, # prevent concurrent DAG runs
tags=['sales', 'daily', 'production'],
)
# Tasks
def extract_data(**context):
# context['ds'] = execution date string (YYYY-MM-DD)
# context['execution_date'] = pendulum datetime
ds = context['ds']
print(f"Extracting data for {ds}")
extract = PythonOperator(
task_id='extract_data',
python_callable=extract_data,
dag=dag,
)
transform = BashOperator(
task_id='transform_data',
bash_command='spark-submit /jobs/transform.py --date {{ ds }}',
dag=dag,
)
# Set dependencies: extract must complete before transform
extract >> transform
# Equivalent: transform.set_upstream(extract)
Q3. What is the difference between execution_date, logical_date, and schedule interval?
# Airflow scheduling model: confusing but important to understand correctly
# execution_date (now called logical_date in Airflow 2.2+):
# The start of the SCHEDULING PERIOD the DAG run covers
# NOT the time the task actually executes
# Example: schedule_interval='@daily', start_date=2026-01-01
# First run: logical_date = 2026-01-01, actually runs on 2026-01-02 00:00:00
# Second run: logical_date = 2026-01-02, actually runs on 2026-01-03 00:00:00
# Airflow is designed to process CLOSED intervals: run covers [logical_date, next_logical_date)
# This means: the DAG for 2026-01-01 data runs AFTER midnight 2026-01-02
# Design your pipelines around this: use ds ({{ ds }}) to query yesterday's data
def process_data(**context):
logical_date = context['logical_date'] # or context['ds'] for date string
next_ds = context['next_ds'] # end of the interval
# CORRECT: query data for the logical_date period
sql = f"SELECT * FROM events WHERE event_date = '{context['ds']}'"
# Common mistake: using datetime.now() inside tasks -- NOT idempotent!
# import datetime; today = datetime.date.today() # WRONG
# Use context['ds'] instead -- consistent regardless of when task runs
print(f"Processing period: {logical_date} to {next_ds}")
# data_interval_start, data_interval_end (Airflow 2.2+): explicit period boundaries
from airflow.decorators import task
@task
def process_with_taskflow(data_interval_start=None, data_interval_end=None):
print(f"Processing: {data_interval_start} to {data_interval_end}")
Q4. What is the TaskFlow API? How does it simplify Airflow DAGs?
from airflow.decorators import dag, task
from datetime import datetime
# Before TaskFlow (Airflow 1.x style): verbose, XCom explicit
# from airflow.operators.python import PythonOperator
# def my_func(**context):
# ...
# op = PythonOperator(task_id='my_task', python_callable=my_func, dag=dag)
# TaskFlow API (Airflow 2.0+): decorators, implicit XCom, cleaner code
@dag(
schedule_interval='@daily',
start_date=datetime(2026, 1, 1),
catchup=False,
default_args={'retries': 2, 'retry_delay': 60},
)
def etl_pipeline():
"""Daily ETL pipeline for sales data."""
@task
def extract(ds=None) -> dict:
"""Extract raw sales data for the given date."""
# ds is automatically injected from Airflow context
raw_data = {"date": ds, "rows": 1000, "source": "sales_db"}
return raw_data # return value automatically pushed to XCom
@task
def transform(raw_data: dict) -> dict:
"""Transform and clean extracted data."""
processed = {**raw_data, "cleaned": True, "rows_processed": raw_data["rows"]}
return processed # returned values are XCom-pushed automatically
@task
def load(processed_data: dict, ds=None) -> None:
"""Load processed data to warehouse."""
print(f"Loading {processed_data['rows_processed']} rows for {ds}")
# Dependencies inferred from function arguments -- automatic
raw = extract()
processed = transform(raw)
load(processed)
# Instantiate the DAG
etl_dag = etl_pipeline()
Q5. What is XCom in Airflow? What are its limitations?
from airflow.decorators import dag, task
from airflow.models import XCom
from datetime import datetime
@dag(schedule_interval='@daily', start_date=datetime(2026, 1, 1), catchup=False)
def xcom_demo():
@task
def produce_data(ds=None) -> dict:
"""XCom push: return value is stored in Airflow metadata DB."""
return {"row_count": 50000, "status": "success", "date": ds}
# Stored as: XCom(dag_id, run_id, task_id, key='return_value', value=json(result))
@task
def consume_data(data: dict) -> None:
"""XCom pull: injected automatically via TaskFlow."""
print(f"Got {data['row_count']} rows from {data['date']}")
# Manual XCom push/pull (non-TaskFlow style)
def manual_push(**context):
context['ti'].xcom_push(key='my_custom_key', value={'important': 'data'})
def manual_pull(**context):
data = context['ti'].xcom_pull(task_ids='manual_push_task', key='my_custom_key')
print(data)
data = produce_data()
consume_data(data)
# LIMITATIONS of XCom:
# 1. Size limit: stored in Airflow metadata DB (SQLite/PostgreSQL)
# PostgreSQL bytea limit: typically 1GB, but recommended < 1MB
# For large data: use XCom backend (S3, GCS) or pass file paths instead
# 2. Serialization: only JSON-serializable objects (default)
# Custom objects require custom XCom backends
# 3. Not for large data transfer: use S3/GCS/HDFS paths as XComs
# Best practice: XCom should carry metadata (file paths, row counts, statuses)
# NOT the actual data itself
@task
def good_xcom_usage(ds=None) -> str:
"""Push an S3 path, not the data."""
output_path = f"s3://my-bucket/processed/{ds}/output.parquet"
return output_path # tiny, JSON-serializable
xcom_dag = xcom_demo()
Q6. What are Airflow sensors? When do you use them?
from airflow.sensors.s3_key_sensor import S3KeySensor
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.sensors.filesystem import FileSensor
from airflow.sensors.base import BaseSensorOperator
from airflow.decorators import dag, task
from datetime import datetime, timedelta
@dag(schedule_interval='@daily', start_date=datetime(2026, 1, 1), catchup=False)
def sensor_examples():
# S3 Sensor: wait for a file to appear in S3
wait_for_s3_file = S3KeySensor(
task_id='wait_for_upstream_data',
bucket_name='my-data-bucket',
bucket_key='raw/sales/{{ ds }}/data.parquet',
aws_conn_id='aws_default',
poke_interval=300, # check every 5 minutes
timeout=3600, # fail after 1 hour
mode='reschedule', # IMPORTANT: releases worker slot between pokes
soft_fail=False, # hard fail if timeout reached
)
# ExternalTaskSensor: wait for another DAG's task to complete
wait_for_upstream_dag = ExternalTaskSensor(
task_id='wait_for_upstream_dag',
external_dag_id='upstream_etl_pipeline',
external_task_id='final_load_task',
execution_date_fn=lambda dt: dt, # same execution date
poke_interval=120,
timeout=7200,
mode='reschedule',
)
@task
def process_data(ds=None):
print(f"Both dependencies met, processing {ds}")
wait_for_s3_file >> wait_for_upstream_dag >> process_data()
# Custom sensor
class DataQualityGateSensor(BaseSensorOperator):
def __init__(self, table_name: str, min_row_count: int, **kwargs):
super().__init__(**kwargs)
self.table_name = table_name
self.min_row_count = min_row_count
def poke(self, context) -> bool:
"""Return True when condition is met."""
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook(postgres_conn_id='warehouse')
result = hook.get_first(
f"SELECT COUNT(*) FROM {self.table_name} WHERE partition_date = '{context['ds']}'"
)
row_count = result[0]
self.log.info(f"Current row count: {row_count}, required: {self.min_row_count}")
return row_count >= self.min_row_count
sensor_dag = sensor_examples()
Q7. What is the difference between poke and reschedule mode in sensors?
from airflow.sensors.s3_key_sensor import S3KeySensor
# POKE mode (default): sensor holds worker slot continuously
# - Worker is blocked between pokes (sleeping)
# - Fast response when condition met
# - BAD for long waits: wastes worker slots
# - Use only if max wait is < 10 minutes
sensor_poke = S3KeySensor(
task_id='wait_poke',
bucket_key='s3://bucket/file.txt',
poke_interval=60, # check every 60 seconds
timeout=600, # max 10 minutes
mode='poke', # blocks worker continuously
)
# RESCHEDULE mode: sensor releases worker between pokes
# - Worker is freed after each poke attempt
# - Airflow reschedules sensor for next check
# - Scales well: 1000 sensors can wait without using 1000 workers
# - Slight delay: rescheduling overhead (a few seconds)
# - Use for all sensors that wait > a few minutes
sensor_reschedule = S3KeySensor(
task_id='wait_reschedule',
bucket_key='s3://bucket/file.txt',
poke_interval=300, # check every 5 minutes
timeout=86400, # max 24 hours
mode='reschedule', # releases worker between pokes -- PREFERRED
)
# DEFERRABLE operators (Airflow 2.2+): even more efficient
# Uses Airflow's Triggerer component
# Zero worker slot used while waiting (only tiny Triggerer thread)
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensorAsync
sensor_defer = S3KeySensorAsync(
task_id='wait_deferred',
bucket_key='s3://bucket/file.txt',
poke_interval=300,
# Releases worker entirely; Triggerer handles the wait
)
# Summary:
# poke: fast but inefficient (use only for < 5 min waits)
# reschedule: efficient for medium waits (5 min - 24 hours)
# deferrable: most efficient, requires Triggerer component
Q8. What is an Airflow hook? How does it differ from an operator?
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.http.hooks.http import HttpHook
from airflow.decorators import task
# HOOK: thin wrapper around an external service connection
# Handles authentication, connection pooling, retry logic
# Provides raw access: execute SQL, read/write S3, call HTTP endpoints
# OPERATOR: higher-level abstraction that USES hooks internally
# Defines what a task DOES (the workflow step)
# Operator = Orchestration logic + Hook = Connection management
# Using hooks directly (more control)
@task
def fetch_from_postgres(ds=None) -> int:
hook = PostgresHook(postgres_conn_id='warehouse_prod')
# Execute query
result = hook.get_first(
f"SELECT COUNT(*) FROM orders WHERE order_date = '{ds}'"
)
return result[0]
@task
def write_to_s3(data: str, ds=None):
s3_hook = S3Hook(aws_conn_id='aws_default')
s3_hook.load_string(
string_data=data,
key=f"output/{ds}/result.json",
bucket_name='my-bucket',
replace=True,
)
@task
def call_rest_api(ds=None) -> dict:
http_hook = HttpHook(http_conn_id='my_api', method='POST')
response = http_hook.run(
endpoint='/api/v1/trigger',
data={'date': ds},
headers={'Content-Type': 'application/json'},
)
return response.json()
# Custom hook example
from airflow.hooks.base import BaseHook
import requests
class MyApiHook(BaseHook):
def __init__(self, conn_id: str):
super().__init__()
self.conn_id = conn_id
def get_client(self):
conn = self.get_connection(self.conn_id)
session = requests.Session()
session.headers.update({'Authorization': f'Bearer {conn.password}'})
return session
MEDIUM: Scheduling and Pipeline Design (Questions 9-18)
Q9. What is idempotency in Airflow? Why is it critical?
from airflow.decorators import dag, task
from datetime import datetime
# Idempotency: running the same DAG run multiple times produces the same result
# WHY: Airflow may re-run tasks due to failures, manual re-triggers, or backfills
@dag(schedule_interval='@daily', start_date=datetime(2026, 1, 1), catchup=False)
def idempotent_pipeline():
# NON-IDEMPOTENT: WRONG
@task
def bad_insert(ds=None):
# Running this twice inserts duplicate rows!
sql = f"INSERT INTO daily_sales VALUES ('{ds}', 100)"
# If this task reruns, you get duplicate data
# IDEMPOTENT: CORRECT -- delete-then-insert pattern
@task
def good_upsert(ds=None):
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook('warehouse')
# First delete existing data for this date
hook.run(f"DELETE FROM daily_sales WHERE sales_date = '{ds}'")
# Then insert fresh data
hook.run(f"INSERT INTO daily_sales SELECT * FROM stg_sales WHERE dt = '{ds}'")
# IDEMPOTENT: even better -- UPSERT / MERGE
@task
def best_upsert(ds=None):
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook('warehouse')
hook.run(f"""
INSERT INTO daily_sales (date, revenue)
SELECT '{ds}', SUM(amount) FROM orders WHERE order_date = '{ds}'
ON CONFLICT (date) DO UPDATE SET revenue = EXCLUDED.revenue
""")
# IDEMPOTENT: writing to partitioned storage (overwrite same partition)
@task
def write_to_s3_idempotent(ds=None):
# S3 key includes execution date -- re-running overwrites same file
output_path = f"s3://bucket/processed/date={ds}/data.parquet"
# Write data to this path (overwriting if exists)
return output_path
best_upsert()
dag_instance = idempotent_pipeline()
Q10. How does Airflow backfilling work?
# Backfill: run a DAG for historical dates
# Triggered via CLI or UI when catchup=True or manual backfill
# CLI backfill:
# airflow dags backfill -s 2026-01-01 -e 2026-01-31 my_dag_id
# In-code catchup (not recommended for most production DAGs)
from airflow import DAG
from datetime import datetime
dag = DAG(
'sales_pipeline',
start_date=datetime(2026, 1, 1),
schedule_interval='@daily',
catchup=True, # Will create runs for all dates from start_date to now
max_active_runs=5, # Max concurrent backfill runs
)
# With catchup=False (recommended):
dag_no_catchup = DAG(
'sales_pipeline_safe',
start_date=datetime(2026, 1, 1),
schedule_interval='@daily',
catchup=False, # Only future scheduled runs; backfill manually if needed
)
# Backfill considerations:
# 1. Idempotency: tasks must be idempotent (running twice = same result)
# 2. Rate limiting: max_active_runs prevents overloading downstream systems
# 3. Concurrency: backfill runs in parallel up to max_active_runs
# 4. Dependencies: ExternalTaskSensor may cause issues if upstream hasn't backfilled
# Partial backfill (specific tasks only):
# airflow tasks run sales_pipeline extract_task 2026-01-15 --force
# Airflow 2.x: "Clear" vs "Backfill"
# Clear: re-run existing DAG run instances (marks tasks as failed to re-trigger)
# Backfill: create new DAG run instances for dates without existing runs
Q11. How do you create dynamic DAGs in Airflow?
from airflow.decorators import dag, task
from airflow.operators.python import PythonOperator
from datetime import datetime
# Method 1: Dynamic task mapping (Airflow 2.3+) -- RECOMMENDED
@dag(schedule_interval='@daily', start_date=datetime(2026, 1, 1), catchup=False)
def dynamic_task_mapping():
@task
def get_tables() -> list:
"""Return list of tables to process."""
return ['orders', 'users', 'products', 'events']
@task
def process_table(table_name: str, ds=None):
"""Process one table -- runs once per table."""
print(f"Processing {table_name} for {ds}")
return {"table": table_name, "status": "done"}
tables = get_tables()
# expand() creates one task instance per element
results = process_table.expand(table_name=tables)
@task
def summarize(results: list):
print(f"Processed {len(results)} tables")
summarize(results)
# Method 2: Dynamic DAG generation (factory pattern)
# Generate multiple DAGs from config
import yaml
def create_pipeline_dag(config: dict):
"""Factory function: create one DAG per pipeline config."""
@dag(
dag_id=f"pipeline_{config['name']}",
schedule_interval=config.get('schedule', '@daily'),
start_date=datetime(2026, 1, 1),
catchup=False,
tags=[config['team']],
)
def pipeline():
@task
def extract():
return f"Extracted from {config['source']}"
@task
def load(data: str):
print(f"Loaded to {config['destination']}: {data}")
load(extract())
return pipeline()
# Load configs from YAML
configs = [
{'name': 'sales', 'source': 'sales_db', 'destination': 'warehouse', 'team': 'analytics'},
{'name': 'events', 'source': 'kafka', 'destination': 's3', 'team': 'data-eng'},
]
# Create DAG objects -- Airflow discovers them as module-level variables
for cfg in configs:
globals()[f"pipeline_{cfg['name']}"] = create_pipeline_dag(cfg)
dag_instance = dynamic_task_mapping()
Q12. What are the different Airflow executors? Which to use when?
Executor: component that determines HOW tasks are actually run
1. SequentialExecutor (default for local dev):
- Runs one task at a time
- Uses SQLite as metadata DB
- NEVER use in production
- Good for: development, testing
2. LocalExecutor:
- Runs tasks in parallel local processes
- Requires PostgreSQL or MySQL metadata DB
- Good for: single-machine production setups, small teams
- Limit: bounded by single machine's CPU/memory
3. CeleryExecutor:
- Distributed task queue (Celery + Redis/RabbitMQ)
- Multiple worker nodes, each running Celery workers
- Good for: medium-large scale (10s of workers)
- Complex: requires Celery, Redis, monitoring
4. KubernetesExecutor:
- Each task runs in its own Kubernetes pod
- Pods created on task start, destroyed on finish
- Perfect isolation, no idle worker cost
- Good for: variable workloads, large scale, diverse task requirements
- Slight overhead: pod startup time (~30s)
5. CeleryKubernetesExecutor:
- Hybrid: short/fast tasks via Celery, long/heavy tasks via Kubernetes
6. LocalKubernetesExecutor (Airflow 2.3+):
- Mix of local and Kubernetes tasks per DAG
# airflow.cfg / environment configuration
# executor = KubernetesExecutor
# Per-task Kubernetes configuration
from airflow.decorators import task
from kubernetes.client import models as k8s
@task(
executor_config={
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
resources=k8s.V1ResourceRequirements(
requests={"cpu": "2", "memory": "8Gi"},
limits={"cpu": "4", "memory": "16Gi"}
)
)
]
)
)
}
)
def heavy_computation_task():
"""This task gets 4 CPUs and 16GB RAM on Kubernetes."""
pass
Q13. How do you pass parameters to a DAG at runtime?
from airflow.decorators import dag, task
from airflow.models.param import Param
from datetime import datetime
@dag(
schedule_interval=None, # triggered manually or via API
start_date=datetime(2026, 1, 1),
catchup=False,
params={
'environment': Param(
default='staging',
type='string',
enum=['staging', 'production'],
description='Target environment for the pipeline',
),
'source_table': Param(
default='orders',
type='string',
),
'max_rows': Param(
default=1000,
type='integer',
minimum=1,
),
}
)
def parameterized_pipeline():
@task
def process(params=None):
env = params['environment']
table = params['source_table']
max_rows = params['max_rows']
print(f"Processing {table} in {env}, max {max_rows} rows")
process()
# Trigger with custom params via CLI:
# airflow dags trigger parameterized_pipeline \
# --conf '{"environment": "production", "source_table": "users", "max_rows": 50000}'
# Via REST API:
# POST /api/v1/dags/parameterized_pipeline/dagRuns
# {"conf": {"environment": "production", "source_table": "users"}}
dag_instance = parameterized_pipeline()
Q14. How do you handle failures and retries in Airflow?
from airflow.decorators import dag, task
from airflow.utils.email import send_email
from datetime import datetime, timedelta
def alert_on_failure(context):
"""Custom failure callback."""
dag_id = context['dag'].dag_id
task_id = context['task_instance'].task_id
execution_date = context['execution_date']
log_url = context['task_instance'].log_url
send_email(
to=['[email protected]'],
subject=f'Airflow FAILURE: {dag_id}.{task_id}',
html_content=f"""
<b>DAG:</b> {dag_id}<br>
<b>Task:</b> {task_id}<br>
<b>Execution date:</b> {execution_date}<br>
<b>Log URL:</b> <a href="{log_url}">View logs</a>
"""
)
@dag(
schedule_interval='@daily',
start_date=datetime(2026, 1, 1),
catchup=False,
default_args={
'retries': 3,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True, # 5min, 10min, 20min
'max_retry_delay': timedelta(hours=1),
'on_failure_callback': alert_on_failure,
'on_retry_callback': alert_on_failure,
'execution_timeout': timedelta(hours=2), # kill task if it takes > 2 hours
}
)
def robust_pipeline():
@task(
retries=5, # override DAG-level default for this task
retry_delay=timedelta(seconds=30),
)
def flaky_api_call():
import requests
response = requests.get('https://api.example.com/data', timeout=30)
response.raise_for_status()
return response.json()
@task(
on_failure_callback=alert_on_failure,
trigger_rule='all_done', # run even if upstream failed
)
def cleanup(ds=None):
"""Always runs: cleanup temp files regardless of pipeline success."""
print(f"Cleaning up temp files for {ds}")
data = flaky_api_call()
cleanup()
dag_instance = robust_pipeline()
Q15. What are trigger rules in Airflow?
from airflow.decorators import dag, task
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime
@dag(schedule_interval='@daily', start_date=datetime(2026, 1, 1), catchup=False)
def trigger_rule_demo():
@task
def task_a(): return "A done"
@task
def task_b(): return "B done"
@task(trigger_rule=TriggerRule.ALL_SUCCESS) # DEFAULT: all upstream must succeed
def all_success(a, b):
print("Runs only if both A and B succeeded")
@task(trigger_rule=TriggerRule.ONE_SUCCESS)
def one_success(ds=None):
print("Runs if at least one upstream succeeded")
@task(trigger_rule=TriggerRule.ALL_DONE)
def all_done(ds=None):
print("Runs when all upstream tasks are done (regardless of success/fail)")
@task(trigger_rule=TriggerRule.ALL_FAILED)
def on_failure_notification(ds=None):
print("Runs ONLY IF all upstream tasks failed -- use for failure-path cleanup")
@task(trigger_rule=TriggerRule.NONE_FAILED)
def none_failed(ds=None):
print("Runs if no upstream tasks failed (succeeds or were skipped)")
# Common patterns:
# ALL_SUCCESS: default, most common -- all must succeed
# ALL_DONE: cleanup tasks that should always run (even after failure)
# NONE_FAILED: for optional branches -- allows skipped tasks to pass through
# ONE_SUCCESS: fan-in after parallel tasks where any one passing is enough
a = task_a()
b = task_b()
all_success(a, b)
all_done()
on_failure_notification()
dag_instance = trigger_rule_demo()
Q16. How do you implement branching in Airflow DAGs?
from airflow.decorators import dag, task
from airflow.operators.python import BranchPythonOperator
from airflow.operators.empty import EmptyOperator
from datetime import datetime
@dag(schedule_interval='@daily', start_date=datetime(2026, 1, 1), catchup=False)
def branching_pipeline():
# Method 1: BranchPythonOperator (returns task_id(s) to follow)
def choose_path(**context):
"""Return task_id to execute based on day of week."""
day_of_week = context['execution_date'].day_of_week # 0=Monday, 6=Sunday
if day_of_week == 0: # Monday
return ['weekly_full_load', 'send_weekly_report'] # can return list
else:
return 'daily_incremental_load'
branch = BranchPythonOperator(
task_id='choose_load_strategy',
python_callable=choose_path,
)
full_load = EmptyOperator(task_id='weekly_full_load')
weekly_report = EmptyOperator(task_id='send_weekly_report')
incremental = EmptyOperator(task_id='daily_incremental_load')
# Join branches -- trigger_rule=NONE_FAILED allows skipped branch to pass
join = EmptyOperator(task_id='join', trigger_rule='none_failed_min_one_success')
branch >> [full_load, incremental]
full_load >> weekly_report
[full_load, weekly_report, incremental] >> join
# Method 2: TaskFlow with @task.branch decorator
@task.branch
def choose_environment(ds=None):
import pendulum
date = pendulum.parse(ds)
return 'load_to_prod' if date.day_of_week == 0 else 'load_to_staging'
@task
def load_to_prod():
print("Loading to production")
@task
def load_to_staging():
print("Loading to staging")
choice = choose_environment()
choice >> [load_to_prod(), load_to_staging()]
dag_instance = branching_pipeline()
Q17. How do you set up cross-DAG dependencies?
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.decorators import dag, task
from datetime import datetime, timedelta
@dag(
dag_id='upstream_data_pipeline',
schedule_interval='@daily',
start_date=datetime(2026, 1, 1),
catchup=False,
)
def upstream_pipeline():
@task
def final_task(ds=None):
print(f"Upstream done for {ds}")
final_task()
@dag(
dag_id='downstream_analytics',
schedule_interval='@daily',
start_date=datetime(2026, 1, 1),
catchup=False,
)
def downstream_pipeline():
# Method 1: ExternalTaskSensor -- wait for upstream task to complete
wait_for_upstream = ExternalTaskSensor(
task_id='wait_for_data_pipeline',
external_dag_id='upstream_data_pipeline',
external_task_id='final_task',
execution_date_fn=lambda dt: dt, # same execution_date
timeout=3600,
poke_interval=120,
mode='reschedule',
)
@task
def run_analytics(ds=None):
print(f"Running analytics for {ds}")
wait_for_upstream >> run_analytics()
@dag(
dag_id='orchestrator_dag',
schedule_interval='@daily',
start_date=datetime(2026, 1, 1),
catchup=False,
)
def orchestrator():
# Method 2: TriggerDagRunOperator -- actively trigger downstream DAG
trigger_downstream = TriggerDagRunOperator(
task_id='trigger_downstream_analytics',
trigger_dag_id='downstream_analytics',
conf={'triggered_by': 'orchestrator_dag'},
wait_for_completion=True, # block until triggered DAG completes
reset_dag_run=True, # re-trigger even if DAG run already exists
)
@task
def upstream_work():
print("Upstream complete, triggering downstream")
upstream_work() >> trigger_downstream
up_dag = upstream_pipeline()
down_dag = downstream_pipeline()
orch_dag = orchestrator()
Q18. How do you implement SLA monitoring in Airflow?
from airflow import DAG
from airflow.decorators import task
from airflow.utils.email import send_email
from datetime import datetime, timedelta
def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
"""Called when an SLA is missed."""
missed_tasks = [str(sla.task_id) for sla in slas]
send_email(
to=['[email protected]', '[email protected]'],
subject=f'SLA MISS: {dag.dag_id} - {", ".join(missed_tasks)}',
html_content=f"""
<h2>SLA Missed</h2>
<p><b>DAG:</b> {dag.dag_id}</p>
<p><b>Missed tasks:</b> {", ".join(missed_tasks)}</p>
<p><b>SLA defined as:</b> task must complete within configured time</p>
"""
)
with DAG(
'critical_pipeline',
schedule_interval='0 8 * * *', # 8 AM UTC daily
start_date=datetime(2026, 1, 1),
catchup=False,
sla_miss_callback=sla_miss_callback,
default_args={'owner': 'data-team'},
) as dag:
from airflow.operators.python import PythonOperator
def extract():
print("Extracting data")
def transform():
print("Transforming data")
def load():
print("Loading data")
extract_task = PythonOperator(
task_id='extract',
python_callable=extract,
sla=timedelta(minutes=30), # must complete within 30 min of dag scheduled time
)
transform_task = PythonOperator(
task_id='transform',
python_callable=transform,
sla=timedelta(hours=1), # must complete within 1 hour
)
load_task = PythonOperator(
task_id='load',
python_callable=load,
sla=timedelta(hours=2), # final result within 2 hours of 8 AM = by 10 AM
)
extract_task >> transform_task >> load_task
HARD: Production Patterns (Questions 19-25)
Q19. How do you design an ML pipeline DAG in Airflow?
from airflow.decorators import dag, task
from airflow.providers.amazon.aws.operators.sagemaker import (
SageMakerProcessingOperator, SageMakerTrainingOperator
)
from datetime import datetime, timedelta
@dag(
dag_id='ml_training_pipeline',
schedule_interval='@weekly', # retrain weekly
start_date=datetime(2026, 1, 1),
catchup=False,
default_args={'retries': 1, 'retry_delay': timedelta(minutes=10)},
tags=['ml', 'weekly'],
)
def ml_training_pipeline():
@task
def check_data_quality(ds=None) -> dict:
"""Gate: ensure training data meets quality standards."""
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook('warehouse')
row_count = hook.get_first(
f"SELECT COUNT(*) FROM features WHERE week_ending = '{ds}'"
)[0]
if row_count < 10_000:
raise ValueError(f"Insufficient training data: {row_count} rows (min: 10,000)")
return {"rows": row_count, "quality_passed": True}
@task
def prepare_training_data(quality_check: dict, ds=None) -> str:
"""Extract and split data for training."""
output_path = f"s3://ml-bucket/training-data/{ds}/"
# Run Spark job to prepare features
return output_path
@task
def train_model(data_path: str, ds=None) -> str:
"""Submit training job to SageMaker / custom training cluster."""
# Example: trigger a training job
model_artifact_path = f"s3://ml-bucket/models/{ds}/model.pkl"
# ... training logic ...
return model_artifact_path
@task
def evaluate_model(model_path: str, ds=None) -> dict:
"""Evaluate model on holdout set."""
# Run evaluation and return metrics
metrics = {"auc": 0.85, "f1": 0.82}
if metrics["auc"] < 0.80:
raise ValueError(f"Model AUC {metrics['auc']} below threshold 0.80")
return metrics
@task
def promote_model(metrics: dict, model_path: str, ds=None):
"""Deploy model to production if evaluation passes."""
print(f"Promoting model from {model_path} with AUC {metrics['auc']}")
# Register in MLflow model registry
# Update serving endpoint
pass
# Pipeline
quality = check_data_quality()
data = prepare_training_data(quality)
model = train_model(data)
metrics = evaluate_model(model)
promote_model(metrics, model)
dag_instance = ml_training_pipeline()
Q20. How do you optimize Airflow performance at scale?
# Airflow performance tuning for 1000+ DAGs and 10,000+ task instances/day
# 1. Scheduler tuning (airflow.cfg)
scheduler_config = """
[scheduler]
# How many DAGs to parse per loop
min_file_process_interval = 30 # default 30s -- increase for stable environments
dag_discovery_safe_mode = False # faster discovery
max_active_tasks_per_dag = 16 # limit parallelism per DAG
max_active_runs_per_dag = 3 # limit concurrent DAG runs
[core]
parallelism = 256 # total concurrent tasks across all DAGs
max_active_tasks_per_dag = 16
"""
# 2. Database performance
# - Use PostgreSQL (never SQLite in production)
# - Enable connection pooling (PgBouncer recommended)
# - Regular cleanup: airflow db clean --clean-before-timestamp 2025-01-01
# 3. DAG design for performance
from airflow.decorators import dag, task
from datetime import datetime
@dag(
schedule_interval='@daily',
start_date=datetime(2026, 1, 1),
max_active_tasks=32, # parallelism within this DAG
concurrency=32, # same as max_active_tasks
)
def optimized_dag():
# Keep DAG file lean: no I/O or compute in DAG file (only task definitions)
# BAD: network calls at DAG parse time
# import requests; api_data = requests.get('https://api.com').json()
# GOOD: all I/O inside tasks
@task
def task_with_io():
import requests
return requests.get('https://api.com').json()
# Avoid large Python objects in XCom -- pass file paths instead
@task
def good_xcom() -> str:
return "s3://bucket/result.parquet" # small metadata, not data
dag_instance = optimized_dag()
# 4. KubernetesExecutor: eliminates worker idle time
# 5. Pool management: prevent any one team from exhausting all slots
# airflow pools create external_api_pool 5 "Max 5 concurrent API calls"
# @task(pool='external_api_pool', pool_slots=1)
# 6. Trigger rules: use NONE_FAILED for fan-in to avoid skipped task blocking
Q21. How do you implement data quality checks in Airflow?
from airflow.decorators import dag, task
from airflow.providers.common.sql.operators.sql import SQLCheckOperator, SQLValueCheckOperator
from datetime import datetime
@dag(schedule_interval='@daily', start_date=datetime(2026, 1, 1), catchup=False)
def pipeline_with_dq_checks():
@task
def load_raw_data(ds=None) -> str:
return f"s3://bucket/raw/{ds}/data.parquet"
# Great Expectations integration
@task
def run_ge_checkpoint(data_path: str, ds=None) -> bool:
import great_expectations as ge
context = ge.get_context()
result = context.run_checkpoint(
checkpoint_name="orders_checkpoint",
batch_request={"batch_data": data_path},
run_name_template=f"airflow_{ds}"
)
if not result["success"]:
failed_expectations = [
exp["expectation_config"]["expectation_type"]
for exp in result["results"]
if not exp["success"]
]
raise ValueError(f"Data quality failed: {failed_expectations}")
return True
# SQL-based checks
check_row_count = SQLCheckOperator(
task_id='check_row_count',
conn_id='warehouse',
sql="""
SELECT COUNT(*) > 0
FROM orders
WHERE order_date = '{{ ds }}'
""",
)
check_revenue = SQLValueCheckOperator(
task_id='check_revenue_range',
conn_id='warehouse',
sql="SELECT AVG(amount) FROM orders WHERE order_date = '{{ ds }}'",
pass_value=100.0, # expected value
tolerance=0.5, # allow 50% deviation
)
# Custom assertion task
@task
def assert_no_nulls(ds=None):
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook('warehouse')
null_count = hook.get_first(
f"SELECT COUNT(*) FROM orders WHERE order_date = '{ds}' AND user_id IS NULL"
)[0]
if null_count > 0:
raise AssertionError(f"Found {null_count} orders with NULL user_id")
raw = load_raw_data()
ge_ok = run_ge_checkpoint(raw)
check_row_count >> check_revenue >> assert_no_nulls()
dag_instance = pipeline_with_dq_checks()
Q22. What is the Airflow REST API? How do you use it for automation?
import requests
import json
from datetime import datetime
# Airflow 2.0+ includes a stable REST API (OpenAPI spec)
AIRFLOW_BASE_URL = "http://airflow-webserver:8080/api/v1"
AUTH = ("admin", "admin") # use service account in production
# List DAGs
def list_dags():
response = requests.get(f"{AIRFLOW_BASE_URL}/dags", auth=AUTH)
return response.json()
# Trigger a DAG run
def trigger_dag(dag_id: str, conf: dict = None, logical_date: str = None):
payload = {"conf": conf or {}}
if logical_date:
payload["logical_date"] = logical_date
response = requests.post(
f"{AIRFLOW_BASE_URL}/dags/{dag_id}/dagRuns",
auth=AUTH,
json=payload,
headers={"Content-Type": "application/json"},
)
response.raise_for_status()
return response.json()
# Get DAG run status
def get_dag_run_status(dag_id: str, dag_run_id: str) -> str:
response = requests.get(
f"{AIRFLOW_BASE_URL}/dags/{dag_id}/dagRuns/{dag_run_id}",
auth=AUTH,
)
return response.json()["state"]
# Clear a task (re-run it)
def clear_task(dag_id: str, task_id: str, dag_run_id: str):
response = requests.post(
f"{AIRFLOW_BASE_URL}/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/clear",
auth=AUTH,
json={},
)
return response.json()
# Example: poll until DAG completes (for triggering from CI/CD)
import time
run_info = trigger_dag('ml_training_pipeline', conf={'environment': 'production'})
run_id = run_info['dag_run_id']
while True:
status = get_dag_run_status('ml_training_pipeline', run_id)
print(f"Status: {status}")
if status in ['success', 'failed']:
break
time.sleep(30)
if status == 'failed':
raise RuntimeError("ML training pipeline failed!")
Q23. How do you handle secrets in Airflow?
# Airflow Connections: store credentials in Connections (encrypted in DB)
# Access via hooks with conn_id
# Option 1: Airflow Connections (default)
from airflow.providers.postgres.hooks.postgres import PostgresHook
@task
def use_connection():
hook = PostgresHook(postgres_conn_id='my_postgres_conn')
# Credentials loaded from Airflow Connections -- never hardcoded
# Option 2: HashiCorp Vault (production recommended)
# In airflow.cfg:
# [secrets]
# backend = airflow.providers.hashicorp.secrets.vault.VaultBackend
# backend_kwargs = {"connections_path": "connections", "variables_path": "variables",
# "url": "http://vault:8200", "auth_type": "approle"}
# Option 3: AWS Secrets Manager
# backend = airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend
# backend_kwargs = {"connections_prefix": "airflow/connections",
# "variables_prefix": "airflow/variables"}
# Option 4: Airflow Variables (for non-sensitive config)
from airflow.models import Variable
@task
def use_variable():
# NOT for sensitive data -- Variables are not encrypted by default
batch_size = int(Variable.get("pipeline_batch_size", default_var=1000))
return batch_size
# Best practices:
# - Never hardcode credentials in DAG files
# - Use Connections for DB/API credentials
# - Use Vault/Secrets Manager for production
# - Rotate credentials via Connection update (CLI or UI)
# - airflow connections add my_conn --conn-type postgres --conn-host db --conn-login user
Q24. What is Airflow's best practice for DAG file organization?
Project structure for production Airflow:
airflow_dags/
dags/ # DAG files (Airflow watches this directory)
sales/
daily_sales_pipeline.py
weekly_sales_report.py
ml/
model_training.py
model_serving.py
ingestion/
source_a_ingestion.py
plugins/ # Custom operators, hooks, sensors
operators/
custom_s3_operator.py
hooks/
custom_api_hook.py
sensors/
data_quality_sensor.py
dags/utils/ # Shared utilities (imported by DAGs)
data_quality.py
notification.py
constants.py
tests/ # Unit tests for DAGs
test_sales_pipeline.py
requirements.txt
Dockerfile
# DAG file best practices:
# 1. No heavy computation at module level (slows DAG parsing)
# BAD:
# expensive_config = requests.get('https://api.com/config').json() # parsed every 30s!
# GOOD: lazy load inside tasks
@task
def get_config():
import requests
return requests.get('https://api.com/config').json()
# 2. Use TYPE_CHECKING for imports that shouldn't be in module scope
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from airflow.models import DAG
# 3. Keep dag file imports minimal
# Use lazy imports inside tasks for heavy libraries
@task
def run_spark():
from pyspark.sql import SparkSession # imported only when task runs
spark = SparkSession.builder.getOrCreate()
# 4. Version DAGs properly
dag = DAG(
'sales_pipeline_v3', # version in ID when breaking changes
tags=['sales', 'v3', 'production'],
)
# 5. Document DAGs
@dag(doc_md="""
# Sales Pipeline
Processes daily sales data from Salesforce to BigQuery.
## Schedule
Runs at 6 AM UTC daily.
## SLA
Data must be in BigQuery by 9 AM UTC.
""")
def well_documented_pipeline():
pass
Q25. Design a complete production Airflow pipeline for a daily ML feature store update.
from airflow.decorators import dag, task
from airflow.sensors.s3_key_sensor import S3KeySensor
from airflow.operators.empty import EmptyOperator
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime, timedelta
@dag(
dag_id='feature_store_daily_update',
schedule_interval='0 4 * * *', # 4 AM UTC (after upstream pipelines finish)
start_date=datetime(2026, 1, 1),
catchup=False,
max_active_runs=1, # prevent concurrent runs
default_args={
'retries': 2,
'retry_delay': timedelta(minutes=5),
'execution_timeout': timedelta(hours=3),
},
tags=['ml', 'features', 'daily'],
)
def feature_store_update():
# Gate 1: Wait for upstream data
wait_for_orders = S3KeySensor(
task_id='wait_for_orders',
bucket_key='s3://data-lake/orders/date={{ ds }}/success',
mode='reschedule',
poke_interval=300,
timeout=7200,
)
wait_for_events = S3KeySensor(
task_id='wait_for_events',
bucket_key='s3://data-lake/events/date={{ ds }}/success',
mode='reschedule',
poke_interval=300,
timeout=7200,
)
# Gate 2: Data quality validation
@task
def validate_inputs(ds=None) -> bool:
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook('warehouse')
counts = hook.get_first(
f"SELECT COUNT(*) FROM orders WHERE order_date = '{ds}'"
)[0]
if counts < 100:
raise ValueError(f"Only {counts} orders found for {ds}")
return True
# Feature computation
@task
def compute_user_features(validated: bool, ds=None) -> str:
output = f"s3://feature-store/user_features/date={ds}/"
# Spark job for feature computation
print(f"Computing user features for {ds} -> {output}")
return output
@task
def compute_item_features(validated: bool, ds=None) -> str:
output = f"s3://feature-store/item_features/date={ds}/"
print(f"Computing item features for {ds} -> {output}")
return output
# Load to online feature store (Redis)
@task
def load_to_online_store(user_features: str, item_features: str, ds=None):
print(f"Loading {user_features} and {item_features} to Redis")
# Load latest features to Redis for real-time serving
pass
# Validate loaded features
@task
def validate_online_store(ds=None) -> bool:
print("Validating online feature store completeness")
return True
# Cleanup and notification
@task(trigger_rule=TriggerRule.ALL_DONE)
def cleanup(ds=None):
print(f"Cleanup temp files for {ds}")
# Wire up the pipeline
[wait_for_orders, wait_for_events] >> validate_inputs()
v = validate_inputs()
uf = compute_user_features(v)
imf = compute_item_features(v)
load = load_to_online_store(uf, imf)
load >> validate_online_store() >> cleanup()
dag_instance = feature_store_update()
FAQ
Q: What is the difference between Airflow 1.x and 2.x in interviews? A: Airflow 2.x introduced the TaskFlow API (@task decorator, automatic XCom), the new highly available scheduler, improved REST API, dynamic task mapping, and deferrable operators. Interviews in 2026 expect Airflow 2.x knowledge. Key concepts unique to 2.x: @dag/@task decorators, .expand() for dynamic tasks, and the Triggerer component for deferrable operators. Confirm the specific version on the official company careers portal.
Q: What is the most common Airflow anti-pattern in interviews? A: Using datetime.now() inside tasks (not idempotent -- breaks backfills) and performing heavy I/O or computation at module level in the DAG file (slows scheduler parsing). Interviewers specifically probe whether candidates understand why tasks must use execution_date context variables instead of current time. Candidates from public preparation resources consistently report these as the most-asked design principles.
Q: What managed Airflow offerings should I know about? A: MWAA (AWS Managed Workflows for Apache Airflow), Cloud Composer (GCP), and Astronomer are the three major managed Airflow platforms. Core Airflow concepts are identical; managed platforms differ in scaling, networking, and integration with cloud services. Confirm the specific platform on the official company job description or interview prep guide.
Methodology applied to this articlelast verified 8 Jun 2026
- No fabricated salary numbers or success rates. If we quote a range, it's sourced.
- No noun-substituted templates. This article was not generated by swapping company names in a stock prompt.
- No paid placements, sponsored coaching links, or affiliate-shilled course pushes.
Explore this topic cluster
More resources in Interview Questions
Use the category hub to browse similar questions, exam patterns, salary guides, and preparation resources related to this topic.
Paid contributor programme
Sat this this year? Share your story, earn ₹500.
First-person experience reports help future candidates prep smarter. We pay verified contributors ₹500 via UPI per accepted story - with byline.
Submit your story →Ready to practice?
Take a free timed mock test
Put what you learned into practice. Our mock tests match the 2026 pattern with timer, navigator, reveal, and score breakdown. No signup.
Start Free Mock Test →Related Articles
Airbnb Interview Questions 2026: Top Tech, HR & Behavioural Q&As for Freshers
Clearing Airbnb's fresher loop in 2026 comes down to preparing for the exact mix of questions across technical, behavioural,...
Airtel Interview Questions 2026: Top Tech, HR & Behavioural Q&As for Freshers
Clearing Airtel's fresher loop in 2026 comes down to preparing for the exact mix of questions across technical, behavioural,...
AMD Interview Questions 2026: Top Tech, HR & Behavioural Q&As for Freshers
Clearing AMD's fresher loop in 2026 comes down to preparing for the exact mix of questions across technical, behavioural,...
Atlassian Interview Questions 2026: Top Tech, HR & Behavioural Q&As for Freshers
Clearing Atlassian's fresher loop in 2026 comes down to preparing for the exact mix of questions across technical,...
Barclays Interview Questions 2026
_Last verified by [Aditya Sharma](/author/aditya-sharma/) · cross-checked against PapersAdda Hiring Pulse and...
More from PapersAdda
Accenture Interview Questions 2026 (with Answers for Freshers)
Capgemini Interview Questions 2026 (with Answers for Freshers)
HCLTech Interview Questions 2026 (TechBee + TGT, with Answers)
IBM Interview Questions 2026 (with Answers for Freshers)