- Stack Overflow Public questions & answers
- Stack Overflow for Teams Where developers & technologists share private knowledge with coworkers
- Talent Build your employer brand
- Advertising Reach developers & technologists worldwide
- About the company

Collectives™ on Stack Overflow
Find centralized, trusted content and collaborate around the technologies you use most.
Q&A for work
Connect and share knowledge within a single location that is structured and easy to search.
Assigning tasks to specific machines with airflow
I'm new to Airflow.
I have a DAG which contains a task that should run on a specific machine (EMR cluster in my case). How can I tell airflow where to run specific tasks so that every time it will run it will do so on that machine only?

Run your worker on that machine with a queue name. In the airflow cli you could do something like:
Then define that task to use that queue:
- 1 would running airflow worker -q my_queue take advantage of all processors on that cluster? or do you need to run multiple airflow worker commands – David Jul 6, 2017 at 15:48
- 2 suppose I add another airflow worker to the mix, do I need to edit celeryd_concurrency in the airflow.cfg if I add a worker with more than the previously defined concurrency value? Or could I simply override it with airflow worker -c 12 (if I load up a box with 12 concurrent threads) – David Jul 6, 2017 at 17:35
- 2 additionally -- suppose I have 5 independent tasks that lead to another task (i.e., 5 independent chains of A->B). if I run them with CeleryExecutor with concurrency = 4 , none of tasks B will start until all of tasks A finish. That means that 4 threads will work on task A; when they're done, three threads will sit idle while another thread will work on task A. once all finish, they get to task B. any ideas about this? – David Jul 6, 2017 at 18:03
- 3 Please ask questions as actual questions, and not in comments. The questions and answers here in a comment chain will not easily be found by someone with the same issue. – jhnclvr Jul 7, 2017 at 14:48
- 1 is it possible to have dynamic queue name like queue='{{ ti.xcom_pull("provide_queue_name") }}' ? – deviant Apr 25, 2018 at 10:50
Your Answer
Sign up or log in, post as a guest.
Required, but never shown
By clicking “Post Your Answer”, you agree to our terms of service , privacy policy and cookie policy
Not the answer you're looking for? Browse other questions tagged airflow or ask your own question .
- The Overflow Blog
- How Intuit democratizes AI development across teams through reusability sponsored post
- The nature of simulating nature: A Q&A with IBM Quantum researcher Dr. Jamie...
- Featured on Meta
- We've added a "Necessary cookies only" option to the cookie consent popup
- Launching the CI/CD and R Collectives and community editing features for...
- Staging Ground Beta 1 Recap, and Reviewers needed for Beta 2
- The [amazon] tag is being burninated
Hot Network Questions
- Acidity of alcohols and basicity of amines
- Time arrow with "current position" evolving with overlay number
- Can I tell police to wait and call a lawyer when served with a search warrant?
- What is a word for the arcane equivalent of a monastery? A place where magic is studied and practiced?
- Why is there a voltage on my HDMI and coaxial cables?
- If a law is new but its interpretation is vague, can the courts directly ask the drafters the intent and official interpretation of their law?
- Largest Binary Area
- FAA Handbooks Copyrights
- Why are trials on "Law & Order" in the New York Supreme Court?
- ncdu: What's going on with this second size column?
- Theoretically Correct vs Practical Notation
- How to tell which packages are held back due to phased updates
- What video game is Charlie playing in Poker Face S01E07?
- Does a barbarian benefit from the fast movement ability while wearing medium armor?
- Is there a proper earth ground point in this switch box?
- How to react to a student’s panic attack in an oral exam?
- Most of the entries in the NAME column of the output from lsof +D /tmp do not begin with /tmp. Why is this the case?
- Is it possible to create a concave light?
- nicematrix: add ttfamily in the last-col
- A limit involving the quotient of two sums
- Is it plausible for constructed languages to be used to affect thought and control or mold people towards desired outcomes?
- Why are physically impossible and logically impossible concepts considered separate in terms of probability?
- How do I align things in the following tabular environment?
- If you preorder a special airline meal (e.g. vegan) just to try it, does this inconvenience the caterers and staff?
Your privacy
By clicking “Accept all cookies”, you agree Stack Exchange can store cookies on your device and disclose information in accordance with our Cookie Policy .
- Quick Start
- Basic Airflow architecture
- Installation
- Upgrading to Airflow 2.0+
- Upgrade Check Script
- Tutorial on the Taskflow API
- How-to Guides
- UI / Screenshots
- Default Arguments
- Context Manager
- TaskFlow API
- DAG decorator
- executor_config
- execution_date
- Relations between Tasks
- Python task decorator
- Accessing current context
- Relations between Task Instances
- Task Lifecycle
- DAG Assignment
- Bitshift Composition
- Relationship Builders
- Connections
- Custom XCom backend
- Trigger Rules
- Latest Run Only
- Zombies & Undeads
- Where to put airflow_local_settings.py ?
- DAG level cluster policy
- Task level cluster policy
- Task instance mutation hook
- Documentation & Notes
- Jinja Templating
- Packaged DAGs
- .airflowignore
- Logging & Monitoring
- Using the CLI
- Integration
- DAG Serialization
- Modules Management
- Smart Sensor
- Best Practices
- Production Deployment
- Backport Providers
- Privacy Notice
- Operators and hooks
- Experimental REST API
- Stable REST API
- Configurations
- Extra packages
The Airflow platform is a tool for describing, executing, and monitoring workflows.
Core Ideas ¶
In Airflow, a DAG -- or a Directed Acyclic Graph -- is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies.
A DAG is defined in a Python script, which represents the DAGs structure (tasks and their dependencies) as code.
For example, a simple DAG could consist of three tasks: A, B, and C. It could say that A has to run successfully before B can run, but C can run anytime. It could say that task A times out after 5 minutes, and B can be restarted up to 5 times in case it fails. It might also say that the workflow will run every night at 10pm, but should not start until a certain date.
In this way, a DAG describes how you want to carry out your workflow; but notice that we haven't said anything about what we actually want to do! A, B, and C could be anything. Maybe A prepares data for B to analyze while C sends an email. Or perhaps A monitors your location so B can open your garage door while C turns on your house lights. The important thing is that the DAG isn't concerned with what its constituent tasks do; its job is to make sure that whatever they do happens at the right time, or in the right order, or with the right handling of any unexpected issues.
DAGs are defined in standard Python files that are placed in Airflow's DAG_FOLDER . Airflow will execute the code in each file to dynamically build the DAG objects. You can have as many DAGs as you want, each describing an arbitrary number of tasks. In general, each one should correspond to a single logical workflow.
When searching for DAGs, Airflow only considers Python files that contain the strings "airflow" and "dag" by default (case-insensitive). To consider all Python files instead, disable the DAG_DISCOVERY_SAFE_MODE configuration flag.
Airflow will load any DAG object it can import from a DAGfile. Critically, that means the DAG must appear in globals() . Consider the following two DAGs. Only dag_1 will be loaded; the other one only appears in a local scope.
Sometimes this can be put to good use. For example, a common pattern with SubDagOperator is to define the subdag inside a function so that Airflow doesn't try to load it as a standalone DAG.
Default Arguments ¶
If a dictionary of default_args is passed to a DAG, it will apply them to any of its operators. This makes it easy to apply a common parameter to many operators without having to type it many times.
Context Manager ¶
Added in Airflow 1.8
DAGs can be used as context managers to automatically assign new operators to that DAG.
TaskFlow API ¶
New in version 2.0.0.
Airflow 2.0 adds a new style of authoring dags called the TaskFlow API which removes a lot of the boilerplate around creating PythonOperators, managing dependencies between task and accessing XCom values. (During development this feature was called "Functional DAGs", so if you see or hear any references to that, it's the same thing)
Outputs and inputs are sent between tasks using XCom values . In addition, you can wrap functions as tasks using the task decorator . Airflow will also automatically add dependencies between tasks to ensure that XCom messages are available when operators are executed.
Example DAG built with the TaskFlow API
DAG decorator ¶
In addition to creating DAGs using context manager , in Airflow 2.0 you can also create DAGs from a function. DAG decorator creates a DAG generator function. Any function decorated with @dag returns a DAG object.
DAG decorator also sets up the parameters you have in the function as DAG params. This allows you to parameterize your DAGs and set the parameters when triggering the DAG manually. See Passing Parameters when triggering dags to learn how to pass parameters when triggering DAGs.
You can also use the parameters on jinja templates by using the {{context.params}} dictionary.
Example DAG with decorator:
airflow/example_dags/example_dag_decorator.py View Source
Note that Airflow will only load DAGs that appear in globals() as noted in scope section . This means you need to make sure to have a variable for your returned DAG is in the module scope. Otherwise Airflow won't detect your decorated DAG.
executor_config ¶
The executor_config is an argument placed into operators that allow airflow users to override tasks before launch. Currently this is primarily used by the KubernetesExecutor , but will soon be available for other overrides.
A DAG run is an instantiation of a DAG, containing task instances that run for a specific execution_date .
A DAG run is usually created by the Airflow scheduler, but can also be created by an external trigger. Multiple DAG runs may be running at once for a particular DAG, each of them having a different execution_date . For example, we might currently have two DAG runs that are in progress for 2016-01-01 and 2016-01-02 respectively.
execution_date ¶
The execution_date is the logical date and time which the DAG Run, and its task instances, are running for.
This allows task instances to process data for the desired logical date & time. While a task_instance or DAG run might have an actual start date of now, their logical date might be 3 months ago because we are busy reloading something.
In the prior example the execution_date was 2016-01-01 for the first DAG Run and 2016-01-02 for the second.
A DAG run and all task instances created within it are instanced with the same execution_date , so that logically you can think of a DAG run as simulating the DAG running all of its tasks at some previous date & time specified by the execution_date .
A Task defines a unit of work within a DAG; it is represented as a node in the DAG graph, and it is written in Python.
Each task is an implementation of an Operator, for example a PythonOperator to execute some Python code, or a BashOperator to run a Bash command.
The task implements an operator by defining specific values for that operator, such as a Python callable in the case of PythonOperator or a Bash command in the case of BashOperator .
Relations between Tasks ¶
Consider the following DAG with two tasks. Each task is a node in our DAG, and there is a dependency from task_1 to task_2:
We can say that task_1 is upstream of task_2, and conversely task_2 is downstream of task_1. When a DAG Run is created, task_1 will start running and task_2 waits for task_1 to complete successfully before it may start.
Python task decorator ¶
Airflow task decorator converts any Python function to an Airflow operator. The decorated function can be called once to set the arguments and key arguments for operator execution.
Task decorator captures returned values and sends them to the XCom backend . By default, the returned value is saved as a single XCom value. You can set multiple_outputs key argument to True to unroll dictionaries, lists or tuples into separate XCom values. This can be used with regular operators to create DAGs with Task Flow API .
Calling a decorated function returns an XComArg instance. You can use it to set templated fields on downstream operators.
You can call a decorated function more than once in a DAG. The decorated function will automatically generate a unique task_id for each generated operator.
Task ids are generated by appending a number at the end of the original task id. For the above example, the DAG will have the following task ids: [update_user, update_user__1, update_user__2, ... update_user__n] .
Due to dynamic nature of the ids generations users should be aware that changing a DAG by adding or removing additional invocations of task-decorated function may change task_id of other task of the same type withing a single DAG.
For example, if there are many task-decorated tasks without explicitly given task_id. Their task_id will be generated sequentially: task__1 , task__2 , task__3 , etc. After the DAG goes into production, one day someone inserts a new task before task__2 . The task_id after that will all be shifted forward by one place. This is going to produce task__1 , task__2 , task__3 , task__4 . But at this point the task__3 is no longer the same task__3 as before. This may create confusion when analyzing history logs / DagRuns of a DAG that changed over time.
Accessing current context ¶
To retrieve current execution context you can use get_current_context method. In this way you can gain access to context dictionary from within your operators. This is especially helpful when using @task decorator.
Current context is accessible only during the task execution. The context is not accessible during pre_execute or post_execute . Calling this method outside execution context will raise an error.
Task Instances ¶
A task instance represents a specific run of a task and is characterized as the combination of a DAG, a task, and a point in time ( execution_date ). Task instances also have an indicative state, which could be "running", "success", "failed", "skipped", "up for retry", etc.
Tasks are defined in DAGs, and both are written in Python code to define what you want to do. Task Instances belong to DAG Runs, have an associated execution_date , and are instantiated, runnable entities.
Relations between Task Instances ¶
Again consider the following tasks, defined for some DAG:
When we enable this DAG, the scheduler creates several DAG Runs - one with execution_date of 2016-01-01, one with execution_date of 2016-01-02, and so on up to the current date.
Each DAG Run will contain a task_1 Task Instance and a task_2 Task instance. Both Task Instances will have execution_date equal to the DAG Run's execution_date , and each task_2 will be downstream of (depends on) its task_1.
We can also say that task_1 for 2016-01-01 is the previous task instance of the task_1 for 2016-01-02. Or that the DAG Run for 2016-01-01 is the previous DAG Run to the DAG Run of 2016-01-02. Here, previous refers to the logical past/prior execution_date , that runs independently of other runs, and upstream refers to a dependency within the same run and having the same execution_date .
The Airflow documentation sometimes refers to previous instead of upstream in places, and vice-versa. If you find any occurrences of this, please help us improve by contributing some corrections!
Task Lifecycle ¶
A task goes through various stages from start to completion. In the Airflow UI (graph and tree views), these stages are displayed by a color representing each stage:

The complete lifecycle of the task looks like this:

The happy flow consists of the following stages:
No status (scheduler created empty task instance)
Scheduled (scheduler determined task instance needs to run)
Queued (scheduler sent task to executor to run on the queue)
Running (worker picked up a task and is now running it)
Success (task completed)
Operators ¶
While DAGs describe how to run a workflow, Operators determine what actually gets done by a task.
An operator describes a single task in a workflow. Operators are usually (but not always) atomic, meaning they can stand on their own and don't need to share resources with any other operators. The DAG will make sure that operators run in the correct order; other than those dependencies, operators generally run independently. In fact, they may run on two completely different machines.
This is a subtle but very important point: in general, if two operators need to share information, like a filename or small amount of data, you should consider combining them into a single operator. If it absolutely can't be avoided, Airflow does have a feature for operator cross-communication called XCom that is described in the section XComs
Airflow provides many built-in operators for many common tasks, including:
BashOperator - executes a bash command
PythonOperator - calls an arbitrary Python function
EmailOperator - sends an email
There are also other, commonly used operators that are installed together with airflow automatically, by pre-installing some Provider packages packages (they are always available no matter which extras you chose when installing Apache Airflow:
SimpleHttpOperator - sends an HTTP request
SqliteOperator - SQLite DB operator
In addition to these basic building blocks, there are many more specific operators developed by the community that you can install additionally by installing community-maintained provider packages. You can install them by adding an extra (for example ( [mysql] ) when installing Airflow or by installing additional packages manually (for example apache-airflow-providers-mysql package).
Some examples of popular operators are:
MySqlOperator
PostgresOperator
MsSqlOperator
OracleOperator
JdbcOperator
DockerOperator
HiveOperator
S3FileTransformOperator
PrestoToMySqlOperator ,
SlackAPIOperator
But there are many, many more - you can see the list of those by following the providers documentation at Provider packages .
Operators are only loaded by Airflow if they are assigned to a DAG.
List Airflow operators
How-to guides for some Airflow operators
Sensor is an Operator that waits (polls) for a certain time, file, database row, S3 key, , another DAG/task, etc...
There are currently 3 different modes for how a sensor operates:
How to use:
For poke|schedule mode, you can configure them at the task level by supplying the mode parameter, i.e. S3KeySensor(task_id='check-bucket', mode='reschedule', ...) .
For smart sensor , you need to configure it in airflow.cfg , for example:
For more information on how to configure smart-sensor and its architecture, see: Smart Sensor Architecture and Configuration
DAG Assignment ¶
Operators do not have to be assigned to DAGs immediately (previously dag was a required argument). However, once an operator is assigned to a DAG, it can not be transferred or unassigned. DAG assignment can be done explicitly when the operator is created, through deferred assignment, or even inferred from other operators.
Bitshift Composition ¶
We recommend you setting operator relationships with bitshift operators rather than set_upstream() and set_downstream() .
Traditionally, operator relationships are set with the set_upstream() and set_downstream() methods. In Airflow 1.8, this can be done with the Python bitshift operators >> and << . The following four statements are all functionally equivalent:
When using the bitshift to compose operators, the relationship is set in the direction that the bitshift operator points. For example, op1 >> op2 means that op1 runs first and op2 runs second. Multiple operators can be composed -- keep in mind the chain is executed left-to-right and the rightmost object is always returned. For example:
is equivalent to:
We can put this all together to build a simple pipeline:
Bitshift can also be used with lists. For example:
and equivalent to:
Relationship Builders ¶
Moved in Airflow 2.0
In Airflow 2.0 those two methods moved from airflow.utils.helpers to airflow.models.baseoperator .
chain and cross_downstream function provide easier ways to set relationships between operators in specific situation.
When setting a relationship between two lists, if we want all operators in one list to be upstream to all operators in the other, we cannot use a single bitshift composition. Instead we have to split one of the lists:
cross_downstream could handle list relationships easier.
When setting single direction relationships to many operators, we could concat them with bitshift composition.
This can be accomplished using chain
even without operator's name
chain can handle a list of operators
When chain sets relationships between two lists of operators, they must have the same size.
Workflows ¶
You're now familiar with the core building blocks of Airflow. Some of the concepts may sound very similar, but the vocabulary can be conceptualized like this:
DAG: The work (tasks), and the order in which work should take place (dependencies), written in Python.
DAG Run: An instance of a DAG for a particular logical date and time.
Operator: A class that acts as a template for carrying out some work.
Task: Defines work by implementing an operator, written in Python.
Task Instance: An instance of a task - that has been assigned to a DAG and has a state associated with a specific DAG run (i.e for a specific execution_date).
execution_date: The logical date and time for a DAG Run and its Task Instances.
By combining DAGs and Operators to create TaskInstances , you can build complex workflows.
Additional Functionality ¶
In addition to the core Airflow objects, there are a number of more complex features that enable behaviors like limiting simultaneous access to resources, cross-communication, conditional execution, and more.
Hooks are interfaces to external platforms and databases like Hive, S3, MySQL, Postgres, HDFS, and Pig. Hooks implement a common interface when possible, and act as a building block for operators. They also use the airflow.models.connection.Connection model to retrieve hostnames and authentication information. Hooks keep authentication code and information out of pipelines, centralized in the metadata database.
Hooks are also very useful on their own to use in Python scripts, Airflow airflow.operators.PythonOperator, and in interactive environments like iPython or Jupyter Notebook.
List Airflow hooks
Some systems can get overwhelmed when too many processes hit them at the same time. Airflow pools can be used to limit the execution parallelism on arbitrary sets of tasks. The list of pools is managed in the UI ( Menu -> Admin -> Pools ) by giving the pools a name and assigning it a number of worker slots. Tasks can then be associated with one of the existing pools by using the pool parameter when creating tasks (i.e., instantiating operators).
The pool parameter can be used in conjunction with priority_weight to define priorities in the queue, and which tasks get executed first as slots open up in the pool. The default priority_weight is 1 , and can be bumped to any number. When sorting the queue to evaluate which task should be executed next, we use the priority_weight , summed up with all of the priority_weight values from tasks downstream from this task. You can use this to bump a specific important task and the whole path to that task gets prioritized accordingly.
Tasks will be scheduled as usual while the slots fill up. Once capacity is reached, runnable tasks get queued and their state will show as such in the UI. As slots free up, queued tasks start running based on the priority_weight (of the task and its descendants).
Note that if tasks are not given a pool, they are assigned to a default pool default_pool . default_pool is initialized with 128 slots and can changed through the UI or CLI (though it cannot be removed).
To combine Pools with SubDAGs see the SubDAGs section.
Connections ¶
The information needed to connect to external systems is stored in the Airflow metastore database and can be managed in the UI ( Menu -> Admin -> Connections ). A conn_id is defined there, and hostname / login / password / schema information attached to it. Airflow pipelines retrieve centrally-managed connections information by specifying the relevant conn_id .
Airflow also provides a mechanism to store connections outside the database, e.g. in environment variables . Additional sources may be enabled, e.g. AWS SSM Parameter Store , or you may roll your own secrets backend .
Many hooks have a default conn_id , where operators using that hook do not need to supply an explicit connection ID. For example, the default conn_id for the PostgresHook is postgres_default .
See Managing Connections for details on creating and managing connections.
XComs let tasks exchange messages, allowing more nuanced forms of control and shared state. The name is an abbreviation of "cross-communication". XComs are principally defined by a key, value, and timestamp, but also track attributes like the task/DAG that created the XCom and when it should become visible. Any object that can be pickled can be used as an XCom value, so users should make sure to use objects of appropriate size.
XComs can be "pushed" (sent) or "pulled" (received). When a task pushes an XCom, it makes it generally available to other tasks. Tasks can push XComs at any time by calling the xcom_push() method. In addition, if a task returns a value (either from its Operator's execute() method, or from a PythonOperator's python_callable function), then an XCom containing that value is automatically pushed.
Tasks call xcom_pull() to retrieve XComs, optionally applying filters based on criteria like key , source task_ids , and source dag_id . By default, xcom_pull() filters for the keys that are automatically given to XComs when they are pushed by being returned from execute functions (as opposed to XComs that are pushed manually).
If xcom_pull is passed a single string for task_ids , then the most recent XCom value from that task is returned; if a list of task_ids is passed, then a corresponding list of XCom values is returned.
When specifying arguments that are part of the context, they will be automatically passed to the function.
It is also possible to pull XCom directly in a template, here's an example of what this may look like:
Note that XComs are similar to Variables , but are specifically designed for inter-task communication rather than global settings.
Custom XCom backend ¶
It is possible to change XCom behaviour of serialization and deserialization of tasks' result. To do this one have to change xcom_backend parameter in Airflow config. Provided value should point to a class that is subclass of BaseXCom . To alter the serialization / deserialization mechanism the custom class should override serialize_value and deserialize_value methods.
It is also possible to override the orm_deserialize_value method which is used for deserialization when recreating ORM XCom object. This happens every time we query the XCom table, for example when we want to populate XCom list view in webserver. If your XCom backend performs expensive operations, or has large values that are not useful to show in such a view, override this method to provide an alternative representation. By default Airflow will use BaseXCom.orm_deserialize_value method which returns the value stored in Airflow database.
See Modules Management for details on how Python and Airflow manage modules.
Variables ¶
Variables are a generic way to store and retrieve arbitrary content or settings as a simple key value store within Airflow. Variables can be listed, created, updated and deleted from the UI ( Admin -> Variables ), code or CLI. In addition, json settings files can be bulk uploaded through the UI. While your pipeline code definition and most of your constants and variables should be defined in code and stored in source control, it can be useful to have some variables or configuration items accessible and modifiable through the UI.
The second call assumes json content and will be deserialized into bar . Note that Variable is a sqlalchemy model and can be used as such. The third call uses the default_var parameter with the value None , which either returns an existing value or None if the variable isn't defined. The get function will throw a KeyError if the variable doesn't exist and no default is provided.
You can use a variable from a jinja template with the syntax :
or if you need to deserialize a json object from the variable :
See Managing Variables for details on managing variables.
Branching ¶
Sometimes you need a workflow to branch, or only go down a certain path based on an arbitrary condition which is typically related to something that happened in an upstream task. One way to do this is by using the BranchPythonOperator .
The BranchPythonOperator is much like the PythonOperator except that it expects a python_callable that returns a task_id (or list of task_ids). The task_id returned is followed, and all of the other paths are skipped. The task_id returned by the Python function has to reference a task directly downstream from the BranchPythonOperator task.
Note that when a path is a downstream task of the returned task (list), it will not be skipped:

Paths of the branching task are branch_a , join and branch_b . Since join is a downstream task of branch_a , it will be excluded from the skipped tasks when branch_a is returned by the Python callable.
The BranchPythonOperator can also be used with XComs allowing branching context to dynamically decide what branch to follow based on upstream tasks. For example:
If you wish to implement your own operators with branching functionality, you can inherit from BaseBranchOperator , which behaves similarly to BranchPythonOperator but expects you to provide an implementation of the method choose_branch . As with the callable for BranchPythonOperator , this method should return the ID of a downstream task, or a list of task IDs, which will be run, and all others will be skipped.
SubDAGs are perfect for repeating patterns. Defining a function that returns a DAG object is a nice design pattern when using Airflow.
Airbnb uses the stage-check-exchange pattern when loading data. Data is staged in a temporary table, after which data quality checks are performed against that table. Once the checks all pass the partition is moved into the production table.
As another example, consider the following DAG:

We can combine all of the parallel task-* operators into a single SubDAG, so that the resulting DAG resembles the following:

Note that SubDAG operators should contain a factory method that returns a DAG object. This will prevent the SubDAG from being treated like a separate DAG in the main UI. For example:
airflow/example_dags/subdags/subdag.py View Source
This SubDAG can then be referenced in your main DAG file:
airflow/example_dags/example_subdag_operator.py
You can zoom into a SubDagOperator from the graph view of the main DAG to show the tasks contained within the SubDAG:

Some other tips when using SubDAGs:
by convention, a SubDAG's dag_id should be prefixed by its parent and a dot. As in parent.child
share arguments between the main DAG and the SubDAG by passing arguments to the SubDAG operator (as demonstrated above)
SubDAGs must have a schedule and be enabled. If the SubDAG's schedule is set to None or @once , the SubDAG will succeed without having done anything
clearing a SubDagOperator also clears the state of the tasks within
marking success on a SubDagOperator does not affect the state of the tasks within
refrain from using depends_on_past=True in tasks within the SubDAG as this can be confusing
it is possible to specify an executor for the SubDAG. It is common to use the SequentialExecutor if you want to run the SubDAG in-process and effectively limit its parallelism to one. Using LocalExecutor can be problematic as it may over-subscribe your worker, running multiple tasks in a single slot
See airflow/example_dags for a demonstration.
Note that airflow pool is not honored by SubDagOperator . Hence resources could be consumed by SubdagOperators.
TaskGroup ¶
TaskGroup can be used to organize tasks into hierarchical groups in Graph View. It is useful for creating repeating patterns and cutting down visual clutter. Unlike SubDagOperator , TaskGroup is a UI grouping concept. Tasks in TaskGroups live on the same original DAG. They honor all the pool configurations.
Dependency relationships can be applied across all tasks in a TaskGroup with the >> and << operators. For example, the following code puts task1 and task2 in TaskGroup group1 and then puts both tasks upstream of task3 :
By default, child tasks and TaskGroups have their task_id and group_id prefixed with the group_id of their parent TaskGroup. This ensures uniqueness of group_id and task_id throughout the DAG. To disable the prefixing, pass prefix_group_id=False when creating the TaskGroup. This then gives the user full control over the actual group_id and task_id. They have to ensure group_id and task_id are unique throughout the DAG. The option prefix_group_id=False is mainly useful for putting tasks on existing DAGs into TaskGroup without altering their task_id.
Here is a more complicated example DAG with multiple levels of nested TaskGroups:
airflow/example_dags/example_task_group.py View Source
This animated gif shows the UI interactions. TaskGroups are expanded or collapsed when clicked:
Service Level Agreements, or time by which a task or DAG should have succeeded, can be set at a task level as a timedelta . If one or many instances have not succeeded by that time, an alert email is sent detailing the list of tasks that missed their SLA. The event is also recorded in the database and made available in the web UI under Browse->SLA Misses where events can be analyzed and documented.
SLAs can be configured for scheduled tasks by using the sla parameter. In addition to sending alerts to the addresses specified in a task's email parameter, the sla_miss_callback specifies an additional Callable object to be invoked when the SLA is not met.
If you don't want to check SLAs, you can disable globally (all the DAGs) by setting check_slas=False under [core] section in airflow.cfg file:
For information on the email configuration, see Email Configuration
Trigger Rules ¶
Though the normal workflow behavior is to trigger tasks when all their directly upstream tasks have succeeded, Airflow allows for more complex dependency settings.
All operators have a trigger_rule argument which defines the rule by which the generated task get triggered. The default value for trigger_rule is all_success and can be defined as "trigger this task when all directly upstream tasks have succeeded". All other rules described here are based on direct parent tasks and are values that can be passed to any operator while creating tasks:
all_success : (default) all parents have succeeded
all_failed : all parents are in a failed or upstream_failed state
all_done : all parents are done with their execution
one_failed : fires as soon as at least one parent has failed, it does not wait for all parents to be done
one_success : fires as soon as at least one parent succeeds, it does not wait for all parents to be done
none_failed : all parents have not failed ( failed or upstream_failed ) i.e. all parents have succeeded or been skipped
none_failed_or_skipped : all parents have not failed ( failed or upstream_failed ) and at least one parent has succeeded.
none_skipped : no parent is in a skipped state, i.e. all parents are in a success , failed , or upstream_failed state
dummy : dependencies are just for show, trigger at will
Note that these can be used in conjunction with depends_on_past (boolean) that, when set to True , keeps a task from getting triggered if the previous schedule for the task hasn't succeeded.
One must be aware of the interaction between trigger rules and skipped tasks in schedule level. Skipped tasks will cascade through trigger rules all_success and all_failed but not all_done , one_failed , one_success , none_failed , none_failed_or_skipped , none_skipped and dummy .
For example, consider the following DAG:
In the case of this DAG, join is downstream of follow_branch_a and branch_false . The join task will show up as skipped because its trigger_rule is set to all_success by default and skipped tasks will cascade through all_success .

By setting trigger_rule to none_failed_or_skipped in join task,
The join task will be triggered as soon as branch_false has been skipped (a valid completion state) and follow_branch_a has succeeded. Because skipped tasks will not cascade through none_failed_or_skipped .

Latest Run Only ¶
Standard workflow behavior involves running a series of tasks for a particular date/time range. Some workflows, however, perform tasks that are independent of run time but need to be run on a schedule, much like a standard cron job. In these cases, backfills or running jobs missed during a pause just wastes CPU cycles.
For situations like this, you can use the LatestOnlyOperator to skip tasks that are not being run during the most recent scheduled run for a DAG. The LatestOnlyOperator skips all direct downstream tasks, if the time right now is not between its execution_time and the next scheduled execution_time or the DagRun has been externally triggered.
airflow/example_dags/example_latest_only_with_trigger.py View Source
In the case of this DAG, the task task1 is directly downstream of latest_only and will be skipped for all runs except the latest. task2 is entirely independent of latest_only and will run in all scheduled periods. task3 is downstream of task1 and task2 and because of the default trigger_rule being all_success will receive a cascaded skip from task1 . task4 is downstream of task1 and task2 , but it will not be skipped, since its trigger_rule is set to all_done .

Zombies & Undeads ¶
Task instances die all the time, usually as part of their normal life cycle, but sometimes unexpectedly.
Zombie tasks are characterized by the absence of a heartbeat (emitted by the job periodically) and a running status in the database. They can occur when a worker node can't reach the database, when Airflow processes are killed externally, or when a node gets rebooted for instance. Zombie killing is performed periodically by the scheduler's process.
Undead processes are characterized by the existence of a process and a matching heartbeat, but Airflow isn't aware of this task as running in the database. This mismatch typically occurs as the state of the database is altered, most likely by deleting rows in the "Task Instances" view in the UI. Tasks are instructed to verify their state as part of the heartbeat routine, and terminate themselves upon figuring out that they are in this "undead" state.
Cluster Policy ¶
Cluster policies provide an interface for taking action on every Airflow task or DAG either at DAG load time or just before task execution. In this way users are able to do the following:
set default arguments on each DAG/task
checks that DAG/task meets required standards
perform custom logic of routing task to a queue
And many other options. To use cluster-wide policies users can define in their airflow_local_settings the following functions
dag_policy - which as an input takes dag argument of DAG type. This function allows users to define dag-level policy which is executed for every DAG at loading time.
task_policy - which as an input takes task argument of BaseOperator type. This function allows users to define task-level policy which is executed for every task at DAG loading time.
task_instance_mutation_hook - which as an input takes task_instance argument of TaskInstance type. This function allows users to define task-level policy that is executed right before the task execution.
In case of DAG and task policies users may raise AirflowClusterPolicyViolation to prevent a DAG from being imported or prevent a task from being executed if the task is not compliant with users' check.
Please note, cluster policy will have precedence over task attributes defined in DAG meaning if task.sla is defined in dag and also mutated via cluster policy then later will have precedence.
In next sections we show examples of each type of cluster policy.
Where to put airflow_local_settings.py ? ¶
Add a airflow_local_settings.py file to your $PYTHONPATH or to $AIRFLOW_HOME/config folder.
DAG level cluster policy ¶
In this example we check if each DAG has at least one tag defined. Here is what it may look like:
Task level cluster policy ¶
For example, this function could apply a specific queue property when using a specific operator, or enforce a task timeout policy, making sure that no tasks run for more than 48 hours. Here's an example of what this may look like:
As a more advanced example we may consider implementing checks that are intended to help teams using Airflow to protect against common beginner errors that may get past a code reviewer, rather than as technical security controls.
For example, don't run tasks without airflow owners:
If you have multiple checks to apply, it is best practice to curate these rules in a separate python module and have a single policy / task mutation hook that performs multiple of these custom checks and aggregates the various error messages so that a single AirflowClusterPolicyViolation can be reported in the UI (and import errors table in the database).
For Example in airflow_local_settings.py :
Task instance mutation hook ¶
Task instance mutation hook can be used for example to re-routes the task to execute in a different queue during retries:
Documentation & Notes ¶
It's possible to add documentation or notes to your DAGs & task objects that become visible in the web interface ("Graph View" & "Tree View" for DAGs, "Task Details" for tasks). There are a set of special task attributes that get rendered as rich content if defined:
Please note that for DAGs, doc_md is the only attribute interpreted.
This is especially useful if your tasks are built dynamically from configuration files, it allows you to expose the configuration that led to the related tasks in Airflow.
This content will get rendered as markdown respectively in the "Graph View" and "Task Details" pages.
Jinja Templating ¶
Airflow leverages the power of Jinja Templating and this can be a powerful tool to use in combination with macros (see the Macros reference section).
For example, say you want to pass the execution date as an environment variable to a Bash script using the BashOperator .
Here, {{ ds }} is a macro, and because the env parameter of the BashOperator is templated with Jinja, the execution date will be available as an environment variable named EXECUTION_DATE in your Bash script.
You can use Jinja templating with every parameter that is marked as "templated" in the documentation. Template substitution occurs just before the pre_execute function of your operator is called.
You can also use Jinja templating with nested fields, as long as these nested fields are marked as templated in the structure they belong to: fields registered in template_fields property will be submitted to template substitution, like the path field in the example below:
template_fields property can equally be a class variable or an instance variable.
Deep nested fields can also be substituted, as long as all intermediate fields are marked as template fields:
You can pass custom options to the Jinja Environment when creating your DAG. One common usage is to avoid Jinja from dropping a trailing newline from a template string:
See Jinja documentation to find all available options.

Exceptions ¶
Airflow defines a number of exceptions; most of these are used internally, but a few are relevant to authors of custom operators or Python callables called from PythonOperator tasks. Normally any exception raised from an execute method or Python callable will either cause a task instance to fail if it is not configured to retry or has reached its limit on retry attempts, or to be marked as "up for retry". A few exceptions can be used when different behavior is desired:
AirflowSkipException can be raised to set the state of the current task instance to "skipped"
AirflowFailException can be raised to set the state of the current task to "failed" regardless of whether there are any retry attempts remaining.
This example illustrates some possibilities
List of Airflow exceptions
Packaged DAGs ¶
While often you will specify DAGs in a single .py file it might sometimes be required to combine a DAG and its dependencies. For example, you might want to combine several DAGs together to version them together or you might want to manage them together or you might need an extra module that is not available by default on the system you are running Airflow on. To allow this you can create a zip file that contains the DAG(s) in the root of the zip file and have the extra modules unpacked in directories.
For instance you can create a zip file that looks like this:
Airflow will scan the zip file and try to load my_dag1.py and my_dag2.py . It will not go into subdirectories as these are considered to be potential packages.
In case you would like to add module dependencies to your DAG you basically would do the same, but then it is more suitable to use a virtualenv and pip.
the zip file will be inserted at the beginning of module search list (sys.path) and as such it will be available to any other code that resides within the same interpreter.
packaged dags cannot be used with pickling turned on.
packaged dags cannot contain dynamic libraries (eg. libz.so) these need to be available on the system if a module needs those. In other words only pure Python modules can be packaged.
.airflowignore ¶
A .airflowignore file specifies the directories or files in DAG_FOLDER or PLUGINS_FOLDER that Airflow should intentionally ignore. Each line in .airflowignore specifies a regular expression pattern, and directories or files whose names (not DAG id) match any of the patterns would be ignored (under the hood,``Pattern.search()`` is used to match the pattern). Overall it works like a .gitignore file. Use the # character to indicate a comment; all characters on a line following a # will be ignored.
.airflowignore file should be put in your DAG_FOLDER . For example, you can prepare a .airflowignore file with contents
Then files like project_a_dag_1.py , TESTING_project_a.py , tenant_1.py , project_a/dag_1.py , and tenant_1/dag_1.py in your DAG_FOLDER would be ignored (If a directory's name matches any of the patterns, this directory and all its subfolders would not be scanned by Airflow at all. This improves efficiency of DAG finding).
The scope of a .airflowignore file is the directory it is in plus all its subfolders. You can also prepare .airflowignore file for a subfolder in DAG_FOLDER and it would only be applicable for that subfolder.
Was this entry helpful?
Configure worker queues
A worker queue is a set of configurations that apply to a group of workers in your Deployment. In Apache Airflow, a worker is responsible for executing tasks that have been scheduled and queued by the scheduler. On Astro, each worker is a Kubernetes Pod that is hosted within a Kubernetes node in your Astro cluster.
Use worker queues to create optimized execution environments for different types of tasks in the same Deployment. You can use worker queues to:
- Separate resource-intensive tasks, such as those that execute machine learning models, from tasks that require minimal resources, such as those that execute SQL queries.
- Separate short-running tasks from long-running tasks.
- Isolate a single task from other tasks in your Deployment.
- Allow some workers to scale to zero but keep a minimum of 1 for other types of workers.
By default, all tasks run in a default worker queue that does not require configuration or code. To enable worker types or configurations for different groups of tasks, you can create additional worker queues in the Cloud UI and assign tasks to queues in your DAG code.
Worker queues can enhance performance, decrease cost, and increase the reliability of task execution in your Deployment. Specifically:
- Executing a task with dedicated hardware that best fits the needs of that task can result in faster performance. In some cases, this can decrease the duration of a DAG by up to 50%.
- Paying for larger workers only for select tasks means that you can lower your infrastructure cost by not paying for that worker when your tasks don't need it.
- Separating tasks that have different characteristics often means they're less likely to result in a failed or zombie state.
By configuring multiple worker queues and assigning tasks to these queues based on the requirements of the tasks, you can enhance the performance, reliability, and throughput of your Deployment. For example, consider the following scenario:
- You are running Task A and Task B in a Deployment on an AWS cluster.
- Task A and Task B are dependent on each other, so they need to run in the same Deployment.
- Task A is a long-running task that uses a lot of CPU and little memory, while Task B is a short-running task that uses minimal amounts of CPU and memory.
You can assign Task A to a worker queue that is configured to use the c6i.4xlarge worker type that's optimized for compute. Then, you can assign Task B to a worker queue that is configured to use the m5.xlarge worker type that is smaller and optimized for general usage.
Worker queue settings
You can configure each worker queue on Astro with the following settings:
- Name: The name of your worker queue. Use this name to assign tasks to the worker queue in your DAG code. Worker queue names must consist only of lowercase letters and hyphens. For example, machine-learning-tasks or short-running-tasks .
- Worker Type: The size and type of workers in the worker queue, defined as a node instance type that is supported by the cloud provider of your cluster. For example, a worker type might be m5.2xlarge or c6i.4xlarge for a Deployment running on an AWS cluster. A worker’s total available CPU, memory, storage, and GPU is defined by its worker type. Actual worker size is equivalent to the total capacity of the worker type minus Astro’s system overhead.
- Max Tasks per Worker: The maximum number of tasks that a single worker can run at a time. If the number of queued and running tasks exceeds this number, a new worker is added to run the remaining tasks. This value is equivalent to worker concurrency in Apache Airflow. It is 16 by default.
- Worker Count : The minimum and maximum number of workers that can run at a time. The number of running workers changes regularly based on Maximum Tasks per Worker and the current number of tasks in a queued or running state. By default, the minimum number of workers is 1 and the maximum is 10.
Default worker queue
Each Deployment requires a worker queue named default to run tasks. Tasks that are not assigned to a worker queue in your DAG code are executed by workers in the default worker queue.
If you don’t change any settings in the default worker queue:
- A maximum of 16 tasks can run at one time per worker. If more than 16 tasks are queued or running, a new worker is added to run the remaining tasks.
- A maximum of 10 workers can run at once, meaning that a maximum of 160 tasks can be in a running state at a time. Remaining tasks will stay in a queued or scheduled state until running tasks complete.
You can change all settings of the default worker queue except for its name.
Worker autoscaling logic
The number of workers running per worker queue on your Deployment at a given time is based on two values:
- The total number of tasks in a queued or running state
- The worker queue's setting for Maximum Tasks per Worker
The calculation is made based on the following expression:
[Number of workers]= ([Queued tasks]+[Running tasks])/(Maximum tasks per worker)
Deployment parallelism is the maximum number of tasks that can run concurrently across worker queues. To ensure that you can always run as many tasks as your worker queues allow, parallelism is calculated with the following expression:
[Parallelism]= ([The sum of all 'Max Worker Count' values for all worker queues] * [The sum of all 'Maximum tasks per worker' values for all worker queues]) .
KEDA computes these calculations every ten seconds. When KEDA determines that it can scale down a worker, it waits for five minutes after the last running task on the worker finishes before terminating the worker Pod.
To learn more about how changes to a Deployment can affect worker resource allocation, see What happens during a code deploy .
Request a worker type
Your organization can enable up to 10 different worker types for each cluster. After a worker type is enabled on an Astro cluster, the worker type becomes available to any Deployment in that cluster and appears in the Worker Type menu of the Cloud UI.
- Review the list of supported worker types for your cloud provider. See AWS , Azure , or GCP .
- Contact Astronomer support with the name of the worker type(s) you want to enable for your cluster. For example, m6i.2xlarge .
For more information on requesting cluster changes, see Modify a cluster .
Create a worker queue
If you prefer, you can also run the astro deployment worker-queue create command in the Astro CLI to create a worker queue. See the CLI Command Reference .
In the Cloud UI, select a Workspace, click Deployments , and then select a Deployment.
Click the Worker Queues tab and then click Worker Queue .
Configure the worker queue’s settings. You can't change the name of a worker queue after you create it.
Click Create Queue .
You can create, update, and delete multiple worker queues at once using a Deployment file. See Deployments as Code .
Assign tasks to a worker queue
By default, all tasks run in the default worker queue. To run tasks on a different worker queue, assign the task to the worker queue in your DAG code.
To assign an Airflow task to a worker queue:
In the Cloud UI, select a Workspace and select a Deployment.
Click the Worker Queues tab.
Copy the name of the worker queue name you want to assign a task to.
In your DAG code, add a queue='<worker-queue-name>' argument to the definition of the task. If a task is assigned to a queue that does not exist or is not referenced properly, the task might remain in a queued state and fail to execute. Make sure that the name of the queue in your DAG code matches the name of the queue in the Cloud UI.
For example, all instances of this task will run in the short-running-tasks queue:
Update a worker queue
If you prefer, you can run the astro deployment worker-queue update command in the Astro CLI to update a worker queue. See the CLI Command Reference .
You can update worker queue configurations at any time. The worker queue name can't be changed.
If you need to change the worker type of an existing worker queue, Astronomer recommends making the change at a time when it will not affect production pipelines. After you've changed a worker type, Astronomer recommends waiting a minimum of five minutes before pushing new code to your Deployment.
Click Edit for the worker queue that you want to update.
Update the worker queue settings, and then click Update Queue .
The Airflow components of your Deployment automatically restart to apply the updated resource allocations. This action is equivalent to deploying code to your Deployment and does not impact running tasks that have 24 hours to complete before running workers are terminated. See What happens during a code deploy .
Delete a worker queue
If you prefer, you can also run the astro deployment worker-queue delete command in the Astro CLI to delete a worker queue. See the CLI Command Reference .
All scheduled tasks that are assigned to a worker queue after the worker queue is deleted remain in a queued state indefinitely and won't execute. To avoid stalled task runs, ensure that you reassign all tasks from a worker queue before deleting it. You can either remove the worker queue argument or assign the task to a different queue.
Click Delete for the worker queue that you want to delete.
Enter Delete <worker-queue-name> and then click Yes, Continue .
- Default worker queue
- Worker autoscaling logic
- Request a worker type
- Create a worker queue
- Assign tasks to a worker queue
- Update a worker queue
- Delete a worker queue

No-code Data Pipeline For your Data Warehouse
Load data from a source of your choice to your desired destination in real-time using Hevo.
Understanding the Airflow Celery Executor Simplified 101
Aditya Jadon • February 11th, 2022
Data Engineering Pipelines play a vital role in managing the flow of business data for companies. Organizations spend a significant amount of money on developing and managing Data Pipelines, so they can get streamlined data to accomplish their daily activities with ease. Many tools in the market help companies manage their messaging and task distributions for scalability.
Table of Contents
Apache Airflow is a workflow management platform that helps companies orchestrate their Data Pipeline tasks and save time. As data and the number of backend tasks grow, the need to scale up and make resources available for all the tasks becomes a necessity. Airflow Celery Executor makes it easier for developers to build a scalable application by distributing the tasks on multiple machines. With the help of Airflow Celery, companies can send multiple messages for execution without any lag.
Apache Airflow Celery Executor is easy to use as it uses Python scripts and DAGs for scheduling, monitoring, and executing tasks. In this article, you will learn about Airflow Celery Executor and how it helps in scaling out and easily distributing tasks. Also, you will read about the architecture of the Airflow Celery Executor and how its process executes the tasks.
Prerequisites
Key features of apache airflow, introduction to the airflow celery executor, airflow celery executor setup, airflow celery executor architecture, task execution process of airflow celery.
- A brief overview of Apache Airflow.
Introduction to Apache Airflow
Apache Airflow is an open-source workflow management platform to programmatically author, schedule, and monitor workflows. In Airflow, workflows are the DAGs (Directed Acyclic Graphs) of tasks and are widely used to schedule and manage the Data Pipeline . Airflow is written in Python, and workflows are created in Python scripts. Airflow using Python allows Developers to make use of libraries in creating workflows.
The airflow workflow engine executes the complex Data Pipeline jobs with ease and ensures that each task executes on time in the correct order, and makes sure every task gets the required resources. Airflow can run DAGs on the defined schedule and based on the external event triggers.
Some of the main features of Apache Airflow are listed below.
- Open Source : Airflow is free to use and has a lot of active users and an interactive community. Its resources are easily available over the web.
- Robust Integrations : Airflow comes with ready-to-use operators and many plug-and-play integrations that offer users to run tasks on Google Cloud Platform, Amazon AWS, Microsoft Azure, etc.
- Easy to Use : Airflow is written in Python which means, users can create workflows using Python scripts and import libraries that make the job easier.
- Interactive UI : Airflow also offers a web application that allows users to monitor the real-time status of tasks running and DAGs. Users can schedule and manage their workflows with ease.
Hevo Data , a No-code Data Pipeline helps to load data from any data source such as Databases, SaaS applications, Cloud Storage, SDK,s, and Streaming Services and simplifies the ETL process. It supports 150+ data sources ( including 30+ free data sources ) and is a 3-step process by just selecting the data source, providing valid credentials, and choosing the destination. Hevo not only loads the data onto the desired Data Warehouse/destination but also enriches the data and transforms it into an analysis-ready form without having to write a single line of code.
Its completely automated pipeline offers data to be delivered in real-time without any loss from source to destination. Its fault-tolerant and scalable architecture ensures that the data is handled in a secure, consistent manner with zero data loss and supports different forms of data. The solutions provided are consistent and work with different BI tools as well.
Check out why Hevo is the Best:
- Secure : Hevo has a fault-tolerant architecture that ensures that the data is handled in a secure, consistent manner with zero data loss.
- Schema Management : Hevo takes away the tedious task of schema management & automatically detects the schema of incoming data and maps it to the destination schema.
- Minimal Learning : Hevo, with its simple and interactive UI, is extremely simple for new customers to work on and perform operations.
- Hevo Is Built To Scale : As the number of sources and the volume of your data grows, Hevo scales horizontally, handling millions of records per minute with very little latency.
- Incremental Data Load : Hevo allows the transfer of data that has been modified in real-time. This ensures efficient utilization of bandwidth on both ends.
- Live Support : The Hevo team is available round the clock to extend exceptional support to its customers through chat, E-Mail, and support calls.
- Live Monitoring : Hevo allows you to monitor the data flow and check where your data is at a particular point in time.
Airflow Celery is a task queue that helps users scale and integrates with other languages. It comes with the tools and supports you need to run such a system in production.
Executors in Airflow are the mechanism by which users can run the task instances. Airflow comes with various executors, but the most widely used among those is Airflow Celery Executor used for scaling out by distributing the workload to multiple Celery workers that can run on different machines.
CeleryExecutor works with some workers it has to distribute the tasks with the help of messages. The deployment scheduler adds a message to the queue and the Airflow Celery broker delivers that message to the Airflow Celerty worker to execute the task. If due to any failure the assigned Airflow Celery worker on the job goes down then Airflow Celery quickly adapts to it and assigns the task to another worker.
To set up the Airflow Celery Executor, first, you need to set up an Airflow Celery backend using the message broker services such as RabbitMQ, Redis, etc. After that, you need to change the airflow.cfg file to point the executor parameters to CeleryExecutor and enter all the required configurations for it.
The prerequisites for setting up the Airflow Celery Executor Setup are listed below:
- Airflow was installed on the local machine and properly configured.
- Airflow configuration should be homogeneous across the cluster.
- Before executing any Operators, the workers need to have their dependencies met in that context by importing the Python library.
- The workers should have access to the DAGs directory – DAGS_FOLDER.
To start the Airflow Celery worker, use the following command given below.
airflow celery worker
As the service is started and ready to receive tasks. As soon as a task is triggered in its direction, it will start its job.
For stopping the worker, use the following command given below.
airflow celery stop
You can also use its GUI web application that allows users to monitor the workers. To install the flower Python library, use the following command given below in the terminal.
pip install ‘apache-airflow[celery]’
You can start the Flower web server by using the command given below.
airflow celery flower
The architecture of the Airflow Celery Executor consists of several components, listed below:
- Workers : Its job is to execute the assigned tasks by Airflow Celery.
- Scheduler : It is responsible for adding the necessary tasks to the queue.
- Database : It contains all the information related to the status of tasks, DAGs, Variables, connections, etc.
- Web Server : The HTTP server provides access to information related to DAG and task status.
- Broker : This component of the Celery queue stores commands for execution.
- Result Backend : It stores the status of all completed commands.
Now that you have understood the different components of the architecture of Airflow Celery Executor. Now, let’s understand how these components communicate with each other during the execution of any task.
- Web server –> Workers : It fetches all the task execution logs.
- Web server –> DAG files : It reveals the DAG structure.
- Web server –> Database : It fetches the status of the tasks.
- Workers –> DAG files : It reveals the DAG structure and executes the tasks.
- Workers –> Database : Workers get and store information about connection configuration, variables, and XCOM.
- Workers –> Celery’s result backend: It saves the status of tasks.
- Workers –> Celery’s broker : It stores all the commands for execution.
- Scheduler –> DAG files : It reveals the DAG structure and executes the tasks.
- Scheduler –> Database : It stores information on DAG runs and related tasks.
- Scheduler –> Celery’s result backend : It gets information about the status of completed tasks.
- Scheduler –> Celery’s broker : It puts the commands to be executed.

In this section, you will learn how the tasks are executed by Airflow Celery Executor. Initially when the task is begin to execute, mainly two processes are running at that time.
- SchedulerProcess : This processes the tasks and runs with the help of Airflow CeleryExecutor .
- WorkerProcess : It overserves that is waiting for new tasks to append in the queue. It also has WorkerChildProcess that waits for new tasks.
Also, The two Databases are involved in the process while executing the task:
- ResultBackend
- QueueBroker
Along with the process, the 2 other processes are created. The processes are listed below:
- RawTaskProcess : It is the process with user code.
- LocalTaskJobProcess : The logic for this process is described by the LocalTaskJob . Its job is to monitor the status of RawTaskProcess . All the new processes are started from TaskRunner .
The process of how tasks are executed using the above
process and how the data and instructions flow are given below.
- SchedulerProcess processes the tasks and when it gets a task that needs to be executed, then it sends it to the QueueBroker to add the task to the queue.
- Simultaneously the QueueBroker starts querying the ResultBackend for the status of the task. And when the QueueBroker gets acquainted with the then it sends the information about it to one WorkerProcess .
- When WorkerProcess receives the information, it assigns a single task to one WorkerChildProcess .
- After receiving a task, WorkerChildProcess executes the handling function for the task i.e. execute_command() . This creates the new process LocalTaskJobProcess .
- LocalTaskJobProcess describes the logic for LocalTaskJobProcess . A new process is started using TaskRunner .
- Both the process RawTaskProcess and LocalTaskJobProcess are stopped when they finish their work.
- Then, WorkerChildProcess notifies this information about the completion of a task and the availability of subsequent tasks to the main process WorkerProcess .
- WorkerProcess then saves the status information back into the ResultBackend.
- Now, whenever SchedulerProcess requests for the status from ResultBackend, it receives the status information of the task.
Conclusion
In this article, you learned about Apache Airflow, Airflow Celery Executor, and how it helps companies achieve scalability. Also, you read about the architecture of Airflow Celery Executor and its task execution process. Airflow Celery uses message brokers to receive tasks and distributes those on multiple machines for higher performance.
Companies need to analyze their business data stored in multiple data sources. The data needs to be loaded to the Data Warehouse to get a holistic view of the data. Hevo Data is a No-code Data Pipeline solution that helps to transfer data from 150+ sources to desired Data Warehouse. It fully automates the process of transforming and transferring data to a destination without writing a single line of code.
Want to take Hevo for a spin? Sign Up here for a 14-day free trial and experience the feature-rich Hevo suite first hand.
Share your experience of learning about Airflow Celery Executor in the comments section below!
- Airflow Celery
- Apache Airflow
- Celery Executor
- Data Pipeline
Related Articles

Continue Reading

Pratik Dwivedi
Cloud SQL to BigQuery: 4 Easy Methods

Abhinav Chola
ETL vs Data Pipeline : A Comprehensive Guide 101

Nicholas Samuel
ETL vs ELT: 7 Major Differences Simplified
Bring real-time data from any source into your warehouse, i want to read this e-book.

Towards Data Science

Jul 3, 2022
3 steps for Building Airflow Pipelines with Efficient Resource Utilisation
This blog looks at some Airflow features valuable for managing resources in a workflow. We will explore some interrelated concepts that affect resource utilisation of the underlying infrastructure for any data pipelines. In particular, we will explore the following concepts :
- Airflow Pools for capping resource allocation to a group of tasks based on a predefined metric.
- Parallelism & Concurrency for efficiently scaling the pipelines to utilise the available infrastructure fully.
- Priority Weights for prioritising and allocating resources to crucial tasks before others.
This blog does not focus on introducing Airflow; to learn more about building pipelines with Apache Airflow, check out my other blogs here . Without any further ado, let us start exploring the topics mentioned above.
1. Airflow Pools
As data engineers, we build multiple pipelines responsible for running specific workflows. A solution requires a DAG or a group of DAGs designed to achieve desired results. For instance, a set of DAGs to load data into a data warehouse, another group to deploy and monitor machine learning models and some DAGs to start or stop the project’s infrastructure.
With so many DAGs, which sometimes require running several of them simultaneously, it becomes crucial to manage the allocation of the underlying resources. It is essential to do so so that Airflow does not overwhelm the infrastructure or hinder business-critical tasks due to resource constraints such as the number of tasks/DAGs that can run concurrently. We can perform such allocation of tasks to resources using the concept of pools.
Similar to a swimming pool, a pool in Airflow can is an area where processes run. It contains slots that operators in Airflow can use once assigned to a pool. A task can use one or more slots based on its configuration. By default, each task uses one slot and gets allocated to the default_pool that contains 128 slots.
Running all tasks in a single pool is problematic. For instance, if the airflow instance is utilising all slots in a pool and requires running more tasks, in that case, the tasks are queued and wait for the ongoing processes to complete.
Using Pools in Airflow
As shown below, we can access pools in Airflow using the Admin panel. We can also use it to view/modify all existing pools or create new ones.
Once created, we can allocate tasks to pools using the pool attribute inside an operator. We can have operators within the same DAG or multiple DAGs assigned to different pools based on logical differences. Moreover, if some tasks require more resources than others within a pool, we can allocate more resources to those tasks using the pool_slots attribute inside an operator.
For instance, we can observe from the DAG below that we have some critical and some non-critical tasks. We allocate the tasks to different pools based on their criticality. Additionally, we assign more pool_slots to essential tasks to let Airflow know that those tasks require more resources.
Now that we know how to configure pools, some additional steps are necessary to use them effectively.
- Allocate tasks to different pools. Running all tasks in a single pool can be problematic. For instance, if an Airflow instance is utilising all slots and requires running more tasks, tasks, irrespective of their importance, are queued until an ongoing process frees a place. Therefore, we can create a pool for business-critical jobs and another for non-critical tasks and accordingly assign operations to the pool type.
- Besides separating pools, we also need to manage slots inside them. We must ensure that the business-critical pool has enough slots so that tasks do not wait long before starting the processing.
- The number of pools depends on use cases. For instance, we could have three pools for high, medium and low critical tasks or separate pools for different workflows, such as one pool for managing infrastructure, another for managing data pipelines and so on.
- The number of slots in a pool. A few factors that can help determine the number of slots are as follows *] number of tasks that need to run in parallel *] number of slots required by each task *] time delay toleration of a task. For instance, if a pool needs to run ten tasks in parallel for a pipeline with each task requiring two slots, and if all tasks are critical to run with no downtime, then we should allocate at least 20 slots to the pool for efficiently running the pipeline.
2. Parallelism & Concurrency
In this section, we will look at some airflow configurations necessary for pools to work suitably.
- Parallelism: It is defined as the number of tasks that can be executed simultaneously for an Airflow instance , i.e. including all DAGs. If the airflow instance is at the maximum allowed parallelism capacity, the upcoming tasks get queued irrespective of their pool configurations. For instance, let’s assume a case with the following configuration: *] tasks to run: 100 *] pool capacity (slots) = 200 *] slot per task= 1 Suppose the parallelism is set to its default value of 32. In such a case, even though the pool can run all tasks simultaneously, it will run 32 tasks at any given time due to the max parallelism constraint.
- Dag_concurrency : It is defined as the maximum number of tasks that can be executed simultaneously in a DAG . Analogous to Parallelism, dag concurrency can affect the resource utilisation of a workflow. Where Parallelism affects the queuing of tasks at the Airflow level i.e. across all DAGs, dag_concurrency affects the queuing of processes at the DAG level. For instance, let’s assume a case with the following configurations: *] tasks to run: 100 *] parallelism = 128 *] pool capacity (slots)= 200 *] slot per task= 1 Suppose dag_concurrency is set to its default of 16. Although the rest of the configuration supports parallel processing, it is capped by the dag concurrency. Hence all operations beyond the limit of 16 get queued.
3. Priority Weights
While building big data pipelines, there are times when we would like to prioritise a group of tasks over others. For instance, if we have multiple data sources and processing some data sources is more critical than others in an ETL process, we can configure our workflow to prioritise those tasks.
We can observe from the DAG above that Airflow runs one set of tasks, i.e. ETL for data_source_1 before data_source_2 . We can achieve it by setting the following configurations:
- priority_weight : The priority weight is an operator’s attribute that assigns priority to each task.
- weight_rule : The weight rule determines the method for computing weights for the tasks.
In Airflow, we have three weight rules as follows:
- Absolute : It allows the Airflow instance to prioritise tasks based on priority_weight assigned to each task. For instance, let’s assume the following configurations. *] priority_weight = 2 for all critical_data branch *] priority_weight = 1 for all non_critical_data branch Now, for DAG below, all tasks with higher priority_weights, i.e. critical_data tasks, are scheduled and executed first.
2. Upstream : Using an upstream weight rule allows the Airflow instance to prioritise upstream tasks in a DAG by assigning the weight of an operation equal to the sum of weights of its downstream tasks. For instance, let’s assume the following configurations. *] priority_weight = 1 for all tasks Now, for DAG below, the task load_data_source_1_pod_0 would have a priority_weight of 5 , that is 4 for downstream tasks and 1 for itself.
3.Downstream : Using a downstream weight rule allows the Airflow instance to prioritise downstream tasks in a DAG by assigning the weight of an operation equal to the sum of weights of its upstream tasks. For instance, let’s assume the following configurations. *] priority_weight = 1 for all tasks Now, for DAG below, the task extract_data_source_1_pod_0 would have a priority_weight of 4, that is 3 for the upstream tasks and 1 for itself.
Knowing that the priorities between tasks are comparable if they belong to the same pool is essential. If operators in a DAG get assigned to different pools, then the tasks in each pool with higher priority are executed first. A higher priority_weights implies that Airflow will run those tasks before serving any other queued job.
Building data pipelines that utilise infrastructure efficiently is a blend of multiple components. This blog explored some of those interrelated concepts so that the pipelines can scale based on underlying resources and logical flow.
No more command-line or XML black-magic! Use standard Python features to create your workflows, including date time…
airflow.apache.org
Read Other Data Blogs
Index — link to all my blogs, this article is just to have an easy access to the blogs i have written in the past. blogs in different domains would….
vachan15.medium.com
Connect On Linkedin :
Vachan anand — consultant — servian | linkedin, founder of a non-profit with a demonstrated history of working in the financial services industry. skilled in html….
www.linkedin.com
More from Towards Data Science
Your home for data science. A Medium publication sharing concepts, ideas and codes.
About Help Terms Privacy
Get the Medium app

Vachan Anand
A consultant with an interest in Data Science, Data Engineering and Cloud Technology.
Text to speech
March 4, 2023 • 3 min read
How Apache Airflow Distributes Jobs on Celery workers
Rédigé par Hugo Lime

The life of a distributed task instance

Discover what happens when Apache Airflow performs task distribution on Celery workers through RabbitMQ queues.
Apache Airflow is a tool to create workflows such as an extract-load-transform pipeline on AWS . A workflow is a directed acyclic graph (DAG) of tasks and Airflow has the ability to distribute tasks on a cluster of nodes. Let’s see how it does that.

RabbitMQ is a message broker. Its job is to manage communication between multiple services by operating message queues. It provides an API for other services to publish and to subscribe to the queues.
Celery is a task queue. It can distribute tasks on multiple workers by using a protocol to transfer jobs from the main application to Celery workers. It relies on a message broker to transfer the messages.
Inside Apache Airflow , tasks are carried out by an executor . The main types of executors are:
- Sequential Executor : Each task is run locally (on the same machine as the scheduler) in its own python subprocess. They are run sequentially which means that only one task can be executed at a time. It is the default executor.
- Local Executor : It is the same as the sequential executor except that multiple tasks can run in paralle l. It needs a metadata database (where DAGs and tasks status are stored) that supports parallelism like MySQL. Setting such a database requires some extra work since the default configuration uses SQLite.
- Celery Executor : The workload is distributed on multiple celery workers which can run on different machines. It is the executor you should use for availability and scalability.
Distributed Apache Airflow Architecture
Apache Airflow is split into different processes which run independently from each other.
When setting up Apache Airflow with the celery executor to use a distributed architecture, you have to launch a bunch of these processes and other services:
- A metadata database (MySQL): it contains the status of the DAG runs and task instances.
- Airflow web server : a web interface to query the metadata to monitor and execute DAGs.
- Airflow scheduler : checks the status of the DAGs and tasks in the metadata database, create new ones if necessary and sends the tasks to the queues.
- A message broker (RabbitMQ): it stores the task commands to be run in queues.
- Airflow Celery workers : they retrieve the commands from the queues, execute them and update the metadata.
You can look at Clairvoyant blog to set up everything.
In the end, a typical architecture looks like:

Journey of a task instance
Let’s now dive deeper into each step to understand the complete process and to know how to monitor and debug them.
1. In the beginning, the scheduler creates a new task instance in the metadata database with the scheduled state:
2. Then the scheduler uses the Celery Executor which sends the task to the message broker. RabbitMQ queues can be explored from the management UI on port 15672. You can see the number of messages in the queue and open each message.

If you look closely at the payload created by the Celery Executor, you will see that it contains the command that the celery worker should execute:
airflow run tutorial Hello_World 2019-03-24T13:52:56.271392+00:00
3. The celery worker then receives the command from the queue. It is now empty.

4. The celery worker updates the metadata to set the status of the task instance to running :
5. The celery worker executes the command. The worker status can be monitored from the Flower web interface by running airflow flower . It is a simple web server on which celery workers regularly report their status.

6. Once the task is finished, the celery worker updates the metadata to set the status of the task instance to success:
That’s it. The task is now complete. If other ones depend on its completion, they will now be created by the scheduler and follow the same process.
Multiple Celery workers
Obviously, what we want to achieve with a Celery Executor is to distribute the workload on multiple nodes. It only makes sense if multiple tasks are running at the same time. Let’s try with a simple DAG:

In our example, just after the Hello_World task is completed, the scheduler pushes both sleep15 and sleep20 tasks to the queue to be executed by the celery workers.
We can check with Flower that each celery worker received one task from the queue and that they are executing their assigned task simultaneously:

The scheduler now waits for both tasks to be reported as successful before sending the next one to the queue.
I hope you now have a clearer understanding of all the execution steps of task distribution with Apache Airflow.
If you need help, please post a comment or ask the community on the Apache Airflow Slack .
Thanks to Flavian Hautbois, Florian Carra, and Alexandre Sapet.
Cet article a été écrit par

Besoin d’expert pour votre projet ?

48 Bd des Batignolles, 75017 Paris
- Notre histoire
- Blog Technique
- Parlons Data
- Études de cas
- Data & IA Factory
- Augmentation du chiffre d'affaires
- Efficience opérationnelle
- Rejoignez-nous !
@Sicara 2022

IMAGES
VIDEO
COMMENTS
An organization’s task environment is the collection of factors that affects its ability to achieve goals. Common factors in the task environment include competitors, customers, suppliers and distributors.
Task interdependence sets rules and guidelines for the sharing of expertise, materials and information between members of an organization working on interdependent tasks.
Fast Interview: When is interruption is helpful? Why can’t most of us stay on task for more than three minutes? Is the best way to achieve flow to just unplug? Gloria Mark, Professor in the Department of Informatics at the University of Cal...
Run your worker on that machine with a queue name. In the airflow cli you could do something like: airflow worker -q my_queue.
A Task is the basic unit of execution in Airflow. Tasks are arranged into DAGs, and then have upstream and downstream dependencies set between them into order
queue is an attribute of BaseOperator, so any task can be assigned to any queue. The default queue for the environment is defined in the airflow.cfg 's celery -
Queues¶ · queue is an attribute of BaseOperator, so any task can be assigned to any queue. The default queue for the environment is defined in the · airflow.cfg '
Workers - Execute the assigned tasks · Scheduler - Responsible for adding the necessary tasks to the queue · Web server - HTTP Server provides access to DAG/task
Running (worker picked up a task and is now running it).
Assign tasks to a worker queue · In the Cloud UI, select a Workspace and select a Deployment. · Click the Worker Queues tab. · Copy the name of the worker queue
I currently have a setup where I have one machine running airflow webserver and airflow scheduler. Then I have a set of three machines running airflow
Workers: Its job is to execute the assigned tasks by Airflow Celery. · Scheduler: It is responsible for adding the necessary tasks to the queue.
We can have operators within the same DAG or multiple DAGs assigned to different pools based on logical differences. Moreover, if some tasks
Journey of a task instance · 1. In the beginning, the scheduler creates a new task instance in the metadata database with the scheduled state: · 2