Install via docker compose (the official setup)

# Get the official compose file
curl -LO 'https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml'

# Critical: set AIRFLOW_UID for proper file permissions
mkdir -p ./dags ./logs ./plugins ./config
echo -e "AIRFLOW_UID=$(id -u)" > .env

# Initialize the database
docker compose up airflow-init

# Start the full stack: scheduler + webserver + triggerer + worker + postgres + redis
docker compose up -d

# Web UI at http://localhost:8080
# Default login: airflow / airflow (change immediately)

The architecture

  • Scheduler — watches DAG files, decides which tasks to run, queues them.
  • Worker — pulls queued tasks + executes them. Can be Celery workers (queue-based) or Kubernetes (one pod per task).
  • Webserver — the UI; shows DAGs, runs, logs, configuration.
  • Triggerer — handles deferred tasks (sensors that wait for events).
  • Metadata DB (Postgres) — state of every DAG, task instance, variable, connection.

The first DAG

# dags/etl_pipeline.py
from airflow import DAG
from airflow.decorators import task
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.slack.notifications.slack_webhook import SlackWebhookNotifier
from datetime import datetime, timedelta

default_args = {
    "owner": "data-team",
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
}

with DAG(
    dag_id="daily_etl",
    description="Daily extract + transform + load",
    schedule="0 3 * * *",                   # 3 AM every day
    start_date=datetime(2026, 1, 1),
    catchup=False,
    default_args=default_args,
    tags=["etl", "production"],
    on_failure_callback=[SlackWebhookNotifier(text="ETL failed: {{ run_id }}")],
) as dag:

    @task
    def extract():
        import requests
        data = requests.get("https://api.example.com/users").json()
        return [u["id"] for u in data if u["active"]]

    @task
    def transform(user_ids: list):
        # ... process the IDs
        return {"count": len(user_ids), "ids": user_ids}

    load = PostgresOperator(
        task_id="load",
        postgres_conn_id="warehouse",
        sql="INSERT INTO daily_users (run_date, active_count) VALUES ('{{ ds }}', {{ ti.xcom_pull(task_ids='transform')['count'] }})",
    )

    ids = extract()
    processed = transform(ids)
    processed >> load

Drop this file in ./dags/; Airflow picks it up within ~30 seconds. The UI shows it; click "Run" to trigger manually or let the schedule fire.

The TaskFlow API

Modern Airflow (2.0+) supports the @task decorator (above) which is much cleaner than the old PythonOperator pattern. Functions become tasks; return values flow through XCom automatically.

@task
def fetch_orders():
    return [{"id": 1, "amount": 100}, {"id": 2, "amount": 200}]

@task
def total_amount(orders: list):
    return sum(o["amount"] for o in orders)

@task
def write_total(total: int):
    print(f"Total: {total}")

# Linear pipeline
orders = fetch_orders()
total = total_amount(orders)
write_total(total)

# Or fan-out / fan-in
results = process_in_parallel.expand(items=orders)
aggregate(results)

Sensors: wait for events

from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.sensors.http_sensor import HttpSensor

# Wait for an S3 file to appear
wait_for_file = S3KeySensor(
    task_id="wait_for_data",
    bucket_name="my-bucket",
    bucket_key="data/{{ ds }}/input.csv",
    aws_conn_id="aws_default",
    poke_interval=60,
    timeout=60*60*3,          # 3 hours
    mode="reschedule",        # don't block a worker slot; check periodically
)

# Wait for an API endpoint
wait_for_api = HttpSensor(
    task_id="api_ready",
    http_conn_id="my_api",
    endpoint="/health",
    response_check=lambda r: r.status_code == 200,
)

Providers: the operator ecosystem

Airflow's killer feature is the provider packages. Pre-built operators for:

  • AWS — S3, EMR, SageMaker, Redshift, Glue, Athena, DynamoDB, ECS, Lambda, Step Functions, SQS, SNS
  • GCP — BigQuery, GCS, Dataflow, Composer, Vertex AI
  • Azure — Data Factory, Synapse, Cosmos DB
  • Databases — Postgres, MySQL, Snowflake, Redshift, Oracle, ClickHouse, Trino
  • Compute — Kubernetes (KubernetesPodOperator), Docker, Spark, Ray
  • Comms — Slack, email, PagerDuty
  • dbt, Spark, Hive, Presto, etc.

Install via pip install apache-airflow-providers-amazon, ...-snowflake, etc.

Connections + Variables

Connections (database URLs, AWS keys, API tokens) are stored encrypted in the Airflow metadata DB; per-Connection ID; reference from operators. Configure via UI (Admin → Connections) or env vars / Secrets backend.

# Env var convention
export AIRFLOW_CONN_WAREHOUSE='postgresql://user:pw@db:5432/warehouse'

# Then in DAG
PostgresOperator(task_id="...", postgres_conn_id="warehouse", sql="...")

For secrets backends: Vault (see that tutorial), AWS Secrets Manager, GCP Secret Manager. Airflow can fetch credentials per-task instead of storing in the metadata DB.

Scaling: executors

  • SequentialExecutor — dev only; one task at a time.
  • LocalExecutor — parallel within one machine via multi-processing. Good for small / mid Airflow.
  • CeleryExecutor — queue-based; workers can be on multiple hosts. Most common production choice.
  • KubernetesExecutor — one Kubernetes pod per task. Pay-per-use; no warm worker pool. Right for K8s-heavy environments.
  • CeleryKubernetesExecutor — hybrid; lightweight tasks via Celery, heavy tasks via K8s pods.

Datasets + scheduling on data updates

Airflow 2.4+ added Datasets: declarative "this task produces dataset X" + "this DAG runs when dataset Y updates." Crude version of asset-based orchestration (like Dagster):

from airflow.datasets import Dataset

my_dataset = Dataset("s3://my-bucket/processed/users.parquet")

with DAG("producer", schedule="@daily") as dag:
    extract = ... >> transform(outlets=[my_dataset])

# Downstream DAG triggers when my_dataset is updated
with DAG("consumer", schedule=[my_dataset]) as dag:
    consume = ...

Airflow 3.x

Airflow 3.x (released 2025) modernized the architecture: task SDK + execution API, multi-language tasks (not just Python), DAG versioning, better dev experience. Worth upgrading from 2.x if you're starting fresh.

Airflow vs Dagster vs Prefect

  • Airflow — task-graph mental model; largest ecosystem of operators; battle-tested at huge scale. Heavier operational footprint.
  • Dagster (see that tutorial) — asset-graph; lineage as first-class; modern Python API. Smaller ecosystem.
  • Prefect — flow-based Python; cleaner UX; smaller scope than Airflow.
  • Argo Workflows (see that tutorial) — Kubernetes-native; container-step-shaped; less Python-first.

When Airflow is the right pick

  • You're already on Airflow + the team knows it.
  • Massive ecosystem of operators matters (you need 20+ integrations off-the-shelf).
  • Workflows are clearly task-graph-shaped, not asset-graph-shaped.

When it isn't

  • For "I have a data warehouse + dashboards + want lineage," Dagster maps the work more honestly.
  • For lightweight Python flows, Prefect is less ceremony.
  • For "just run these jobs as K8s pods," Argo Workflows is simpler.
  • For "run this Python script daily," a systemd timer is one unit, not 5 Airflow components.