Introduction to Tasks ¶

A task can execute any one of the following types of SQL code:

Single SQL statement

Call to a stored procedure

Procedural logic using Snowflake Scripting Developer Guide

Tasks can be combined with table streams for continuous ELT workflows to process recently changed table rows. Streams ensure exactly once semantics for new or changed data in a table.

Tasks can also be used independently to generate periodic reports by inserting or merging rows into a report table or perform other periodic work.

Compute Resources ¶

Tasks require compute resources to execute SQL code. Either of the following compute models can be chosen for individual tasks:

Snowflake-managed (i.e. serverless compute model)

User-managed (i.e. virtual warehouse)

Serverless Tasks ¶

The serverless compute model for tasks enables you to rely on compute resources managed by Snowflake instead of user-managed virtual warehouses. The compute resources are automatically resized and scaled up or down by Snowflake as required for each workload. Snowflake determines the ideal size of the compute resources for a given run based on a dynamic analysis of statistics for the most recent previous runs of the same task. The maximum size for a serverless task run is equivalent to an XXLARGE warehouse. Multiple workloads in your account share a common set of compute resources.

The option to enable the serverless compute model must be specified when creating a task. The CREATE TASK syntax is nearly identical to tasks that rely on user-managed virtual warehouses. Omit the WAREHOUSE parameter to allow Snowflake to manage the compute resources for the task. Note that the role that executes the CREATE TASK command must have the global EXECUTE MANAGED TASK privilege. For more information about the access control requirements for tasks, see Task Security .

Billing for runs of serverless tasks differs somewhat from the standard credit consumption model for tasks that rely on warehouses for compute resources. For information, see Billing for Task Runs .

Serverless tasks cannot invoke the following object types and functions:

UDFs (user-defined functions) that contain Java or Python code.

Stored procedures written in Scala (using Snowpark), or which call UDFs that contain Java or Python code.

User-managed Tasks ¶

If you prefer, you can alternatively manage the compute resources for individual tasks by specifying an existing virtual warehouse when creating the task. This option requires that you choose a warehouse that is sized appropriately for the SQL actions that are executed by the task.

Choosing a Warehouse Size ¶

If you choose to use existing warehouses to supply the compute resources for individual tasks, we recommend that you follow the best practices described in Warehouse Considerations . We suggest that you analyze the average run time for a single task or DAG of tasks using a specific warehouse based on warehouse size and clustering, as well as whether or not the warehouse is shared by multiple processes or is dedicated to running this single task (or DAG).

Query the TASK_HISTORY Account Usage view (in the SNOWFLAKE shared database). The average difference between the scheduled and completed times for a task is the expected average run time for the task, including any period in which the task was queued. A task is queued when other processes are currently using all of the compute resources in the warehouse.

Unless the SQL statements defined for the tasks can be optimized (either by rewriting the statements or using stored procedures), then this would be the expected average run time for the task (or DAG). Choose the right size for the warehouse based on your analysis to ensure the task (or DAG) finishes running within this window.

The following diagram shows a window of 1 minute in which a single task queued for 20 seconds and then ran for 40 seconds.

Example task batch window

The following diagram shows a DAG that requires 5 minutes on average to complete for each run. The diagram shows the window for 2 runs of the DAG to complete. This window is calculated from the time the root task is scheduled to start until the last child task in the DAG has completed running. In this example, the DAG is shared with other, concurrent operations that queue while each of the 3 tasks in the DAG is running. These concurrent operations consume all available resources when each task in the DAG finishes running and before the next task starts running. As a result, the window for each task includes some amount of queuing while it waits for other operations to finish and relinquish compute resources.

Note that even if this DAG ran on a dedicated warehouse, a brief lag would be expected after a predecessor task finishes running and any child task is executed; however, no queueing for shared resources with other operations would occur. The warehouse size you choose should be large enough to accommodate multiple child tasks that are triggered simultaneously by predecessor tasks.

Example DAG batch window

Recommendations for Choosing a Compute Model ¶

The following table describes various factors that can help you decide when to use serverless tasks versus user-managed tasks:

Note that the maximum size for a serverless task run is equivalent to an XXLARGE warehouse. If a task workload requires a larger warehouse, create a user-managed task that references a warehouse of the required size.

Task Scheduling ¶

A standalone task or the root task in a DAG generally runs on a schedule. You can define the schedule when creating a task (using CREATE TASK) or later (using ALTER TASK ).

Snowflake ensures only one instance of a task with a schedule (i.e. a standalone task or the root task in a DAG) is executed at a given time. If a task is still running when the next scheduled execution time occurs, then that scheduled time is skipped.

Snowflake automatically resizes and scales the compute resources for serverless tasks. For tasks that rely on a warehouse to provide compute resources, choose an appropriate warehouse size for a given task to complete its workload within the defined schedule. For information, see Choosing a Warehouse Size (in this topic).

Task Scheduling and Daylight Saving Time ¶

The cron expression in a task definition supports specifying a time zone. A scheduled task runs according to the specified cron expression in the local time for a given time zone. Special care should be taken with regard to scheduling tasks for time zones that recognize daylight saving time. Tasks scheduled during specific times on days when the transition from standard time to daylight saving time (or the reverse) occurs can have unexpected behaviors.

For example:

During the autumn change from daylight saving time to standard time, a task scheduled to start at 1 AM in the America/Los_Angeles time zone (i.e. 0 1 * * * America/Los_Angeles ) would run twice : once at 1 AM and then again when 1:59:59 AM shifts to 1:00:00 AM local time. That is, there are two points in time when the local time is 1 AM.

During the spring change from standard time to daylight saving time, a task scheduled to start at 2 AM in the America/Los_Angeles time zone (i.e. 0 2 * * * America/Los_Angeles ) would not run at all because the local time shifts from 1:59:59 AM to 3:00:00 AM. That is, there is no point during that day when the local time is 2 AM.

To avoid unexpected task executions due to daylight saving time, either :

Do not schedule tasks to run at a specific time between 1 AM and 3 AM (daily, or on days of the week that include Sundays), or

Manually adjust the cron expression for tasks scheduled during those hours twice each year to compensate for the time change due to daylight saving time.

DAG of Tasks ¶

A Directed Acyclic Graph (DAG) is a series of tasks composed of a single root task and additional tasks, organized by their dependencies. DAGs flow in a single direction, meaning a task later in the series cannot prompt the run of an earlier task (i.e. a loop). Each task (except the root task) can have multiple predecessor tasks (dependencies); likewise, each task can have multiple subsequent (child) tasks that depend on it. A task runs only after all of its predecessor tasks have run successfully to completion.

The root task should have a defined schedule that initiates a run of the DAG. Each of the other tasks has at least one defined predecessor to link the tasks in the DAG. A child task runs only after all of its predecessor tasks run successfully to completion.

You can specify the predecessor tasks when creating a new task (using CREATE TASK … AFTER) or later (using ALTER TASK … ADD AFTER). A DAG is limited to a maximum of 1000 tasks total (including the root task). A single task can have a maximum of 100 predecessor tasks and 100 child tasks.

In the following basic example, the root task prompts Tasks B and C to run simultaneously. Task D runs when both Tasks B and C have completed their runs.

Basic DAG example

The following practical example shows how a DAG could be used to update dimension tables in a sales database before aggregating fact data:

Sales database DAG example

A further example shows the concluding task in a DAG calling an external function to trigger a remote messaging service to send a notification that all previous tasks have run successfully to completion.

Cloud messaging notification DAG example

Task Ownership ¶

All tasks in a DAG must have the same task owner (i.e. a single role must have the OWNERSHIP privilege on all of the tasks) and be stored in the same database and schema.

Transferring ownership of a task severs the dependency between this task and any predecessor and child tasks. For more information, see Link Severed Between Predecessor and Child Tasks (in this topic).

When ownership of all tasks in a DAG is transferred at once, through either of the following activities, the relationships between all tasks in the DAG are retained:

The current owner of all tasks that comprise the DAG is dropped (using DROP ROLE ). Ownership of the objects owned by the dropped role is transferred to the role that executes the DROP ROLE command.

Ownership of all tasks that comprise the DAG is explicitly transferred to another role (e.g. by executing GRANT OWNERSHIP on all tasks in a schema).

Overlapping DAG Runs ¶

By default, Snowflake ensures that only one instance of a particular DAG is allowed to run at a time. The next run of a root task is scheduled only after all tasks in the DAG have finished running. This means that if the cumulative time required to run all tasks in the DAG exceeds the explicit scheduled time set in the definition of the root task, at least one run of the DAG is skipped. The behavior is controlled by the ALLOW_OVERLAPPING_EXECUTION parameter on the root task; the default value is FALSE. Setting the parameter value to TRUE permits DAG runs to overlap.

In addition, a child task begins its run only after all predecessor tasks for the child task have successfully completed their own runs. A task that executes time-intensive SQL operations delays the start of any child task that identifies the task as a predecessor.

In the following example, a DAG run is scheduled to start when a prior run has not completed yet. The period of overlap, or concurrency, is identified in red. The diagram also identifies the span of time when each task is queued before running in the user-managed warehouse. Note that if Snowflake-managed compute resources are used, there is no queuing period:

Overlapping DAG runs

Overlapping runs may be tolerated (or even desirable) when read/write SQL operations executed by overlapping runs of a DAG do not produce incorrect or duplicate data. However, for other DAGs, task owners (i.e. the role with the OWNERSHIP privilege on all tasks in the DAG) should set an appropriate schedule on the root task and choose an appropriate warehouse size (or use Snowflake-managed compute resources) to ensure an instance of the DAG finishes to completion before the root task is next scheduled to run.

To better align a DAG with the schedule defined in the root task:

If feasible, increase the scheduling time between runs of the root task.

Consider modifying compute-heavy tasks to use Snowflake-managed compute resources. If the task relies on user-managed compute resources, increase the size of the warehouse that runs large or complex SQL statements or stored procedures in the DAG.

Analyze the SQL statements or stored procedure executed by each task. Determine if code could be rewritten to leverage parallel processing.

If none of the above solutions help, consider whether it is necessary to allow concurrent runs of the DAG by setting ALLOW_OVERLAPPING_EXECUTION = TRUE on the root task. This parameter can be defined when creating a task (using CREATE TASK) or later (using ALTER TASK).

Viewing Dependent Tasks in a DAG ¶

To view either the direct child tasks for a root task or all tasks in a DAG:

Query the TASK_DEPENDENTS table function (in the Snowflake Information Schema ). Note that to retrieve all tasks in a DAG, input the root task when calling the function. If you input a child task, the function returns the children (and the children of those children, etc.) of the specified task.

DAG Runs with Suspended Child Tasks ¶

When the root task is suspended, you can resume or suspend any child tasks using ALTER TASK … RESUME | SUSPEND. Resuming any suspended child tasks is not required before you resume the root task.

When a DAG runs with one or more suspended child tasks, the run ignores those tasks. A child task with multiple predecessors runs as long as at least one of the predecessors is in a resumed state, and all resumed predecessors run successfully to completion.

Resuming Suspended Tasks in a DAG ¶

To recursively resume all tasks in a DAG, query the SYSTEM$TASK_DEPENDENTS_ENABLE function rather than resuming each task individually (using ALTER TASK … RESUME).

Link Severed Between Predecessor and Child Tasks ¶

Dependencies among tasks in a DAG can be severed as a result of any of the following actions:

Remove predecessors for a child task using ALTER TASK … REMOVE AFTER.

Drop predecessors for a child task using DROP TASK.

Transfer ownership of a child task to a different role using GRANT OWNERSHIP.

If any combination of the above actions severs the relationship between the child task and all predecessors, then the former child task becomes either a standalone task or a root task, depending on whether other tasks identify the task as their predecessor.

Versioning of Runs ¶

When a standalone task or the root task in a DAG is first resumed (or manually executed using EXECUTE TASK ), an initial version of the task is set. If the task is a root task, then a version of the entire DAG, including all properties for all tasks in the DAG, is set. The standalone task or DAG runs using this version. After a task is suspended and modified, a new version is set when the standalone or root task is resumed or manually executed.

To modify or recreate any task in a DAG, the root task must first be suspended (using ALTER TASK … SUSPEND). When the root task is suspended, all future scheduled runs of the root task are cancelled; however, if any tasks are currently running (i.e, the tasks in an EXECUTING state), these tasks and any descendent tasks continue to run using the current version.

If the definition of a stored procedure called by a task changes while the DAG is executing, the new programming could be executed when the stored procedure is called by the task in the current run.

For example, suppose the root task in a DAG is suspended, but a scheduled run of this task has already started. The owner of all tasks in the DAG modifies the SQL code called by a child task while the root task is still running. The child task runs and executes the SQL code in its definition using the version of the DAG that was current when the root task started its run. When the root task is resumed or is manually executed, a new version of the DAG is set. This new version includes the modifications to the child task.

To retrieve the history of task versions, query TASK_VERSIONS Account Usage view (in the SNOWFLAKE shared database).

Setting Session Parameters for Tasks ¶

You can set session parameters for the session in which a task runs. To do so, modify an existing task and set the desired parameter values (using ALTER TASK … SET session_parameter = value [, session_parameter = value ... ] ).

A task supports all session parameters. For the complete list, see Parameters .

Note that a task does not support account or user parameters.

Automatically Suspend Tasks After Failed Runs ¶

Optionally suspend tasks automatically after a specified number of consecutive runs that either fail or time out. This feature can reduce costs by suspending tasks that consume Snowflake credits but fail to run to completion. Failed task runs include runs in which the SQL code in the task body either produces a user error or times out. Task runs that are skipped, canceled, or that fail due to a system error are considered indeterminate and are not included in the count of failed task runs.

Set the SUSPEND_TASK_AFTER_FAILURES = num parameter on a standalone task or the root task in a DAG. When the parameter is set to a value greater than 0 , the following behavior applies to runs of the standalone task or DAG:

Standalone tasks are automatically suspended after the specified number of consecutive task runs either fail or time out.

The root task is automatically suspended after the run of any single task in a DAG fails or times out the specified number of times in consecutive runs.

The parameter can be set when creating a task (using CREATE TASK ) or later (using ALTER TASK ). The setting applies to tasks that rely on either Snowflake-managed compute resources (i.e. serverless compute model) or user-managed compute resources (i.e. a virtual warehouse).

The SUSPEND_TASK_AFTER_NUM_FAILURES parameter can also be set at the account, database, or schema level. The setting applies to all standalone or root tasks contained in the modified object. Note that explicitly setting the parameter at a lower (i.e. more granular) level overrides the parameter value set at a higher level.

Manually Executing Tasks ¶

The EXECUTE TASK command manually triggers a single run of a scheduled task (either a standalone task or the root task in a DAG) independent of the schedule defined for the task. A successful run of a root task triggers a cascading run of child tasks in the DAG as their precedent task completes, as though the root task had run on its defined schedule.

This SQL command is useful for testing new or modified standalone tasks and DAGs before you enable them to execute SQL code in production.

Call this SQL command directly in scripts or in stored procedures. In addition, this command supports integrating tasks in external data pipelines. Any third-party services that can authenticate into your Snowflake account and authorize SQL actions can execute the EXECUTE TASK command to run tasks.

Viewing the Task History for Your Account ¶

You can view the task history for your account using SQL or Snowsight. To view task history in Snowsight, refer to Viewing Task History in Snowsight .

The following roles (or roles with the specified privileges) can use SQL to view the task history within a specified date range:

Account administrator (i.e. users with the ACCOUNTADMIN role).

Task owner (i.e. role that has the OWNERSHIP privilege on a task).

Any role that has the global MONITOR EXECUTION privilege.

To view the run history for a single task:

SQL Query the TASK_HISTORY table function (in the Snowflake Information Schema ).

To view details on a DAG run that is currently scheduled or is executing:

SQL Query the CURRENT_TASK_GRAPHS table function (in the Snowflake Information Schema ).

To view the history for DAG runs that executed successfully, failed, or were cancelled in the past 60 minutes:

SQL Query the COMPLETE_TASK_GRAPHS table function (in the Snowflake Information Schema ). Query the COMPLETE_TASK_GRAPHS View view (in Account Usage ).

Billing for Task Runs ¶

The costs associated with running a task to execute SQL code differ depending on the source of the compute resources for the task:

Snowflake bills your account for credit usage based on warehouse usage while a task is running, similar to the warehouse usage for executing the same SQL statements in a client or the Snowflake web interface. Per-second credit billing and warehouse auto-suspend give you the flexibility to start with larger warehouse sizes and then adjust the size to match your task workloads.

Snowflake bills your account based on the actual compute resource usage; in contrast with customer-managed virtual warehouses, which consume credits when active, and may sit idle or be overutilized. Charges are calculated based on total usage of the resources (including cloud service usage) measured in compute-hours credit usage.

Snowflake analyzes task runs in the task history dynamically to determine the ideal size of the compute resources, and suspends these compute resources to save costs. Snowflake manages load capacity, ensuring optimal compute resources to meet demand. To recover the management costs of Snowflake-provided compute resources, we apply a 1.5x multiplier to resource consumption. Note, however, that the serverless compute model could still reduce compute costs over user-managed warehouses; in some cases significantly.

Snowflake credits charged per compute-hour:

Snowflake-managed compute resources: 1.5

Cloud services: 1

Billing is similar to other Snowflake features such as Automatic Clustering of tables, Database Replication and Failover/Failback, and Snowpipe.

To retrieve the current credit usage for a specific task, query the SERVERLESS_TASK_HISTORY table function. Execute the following statement as the task owner (i.e. the role that has the OWNERSHIP privilege on the task):

<database_name> Name of the database that contains the task. <task_name> Name of the task.

To retrieve the current credit usage for all serverless tasks, query the SERVERLESS_TASK_HISTORY view. Execute the following statement as an account administrator (i.e. a user with the ACCOUNTADMIN role):

Understanding the System Service ¶

Snowflake runs tasks with the privileges of the task owner (i.e. the role that has OWNERSHIP privilege on the task), but task runs are not associated with a user. Instead, each run is executed by a system service. Tasks are decoupled from specific users to avoid complications that can arise when users are dropped, locked due to authentication issues, or have roles removed.

Because task runs are decoupled from a user, the query history for task runs are associated with the system service. SYSTEM is not a user in the account; it is a behind-the-scenes service. As such, there are no user credentials for this service, and no individual (from Snowflake or in your account) can assume its identity. Activity for the system service is limited to your account. The same encryption protections and other security protocols are built into this service as are enforced for other operations.

To support creating and managing tasks, Snowflake provides the following set of special DDL commands:

CREATE TASK

DESCRIBE TASK

In addition, providers can view, grant, or revoke access to the necessary database objects for ELT using the following standard access control DDL:

GRANT <privileges>

REVOKE <privileges>

SHOW GRANTS

Task Functions ¶

To support retrieving information about tasks, Snowflake provides the following set of SQL functions:

SYSTEM$CURRENT_USER_TASK_NAME

TASK_HISTORY

TASK_DEPENDENTS

Task Security ¶

Access control privileges ¶, creating tasks ¶.

Creating tasks requires a role with a minimum of the following privileges:

Owning Tasks ¶

After a task is created, the task owner (i.e. the role that has the OWNERSHIP privilege on the task) must have the following privileges:

In addition, the role must have the permissions required to run the SQL statement executed by the task.

Resuming or Suspending Tasks ¶

In addition to the task owner, a role that has the OPERATE privilege on the task can suspend or resume the task. This role must have the USAGE privilege on the database and schema that contain the task. No other privileges are required.

When a task is resumed, Snowflake verifies that the task owner role has the privileges listed in Owning Tasks (in this topic).

Creating a Task Administrator Role ¶

For ease of use, we recommend creating a custom role (e.g. taskadmin ) and assigning the EXECUTE TASK privilege to this role. Any role that can grant privileges (e.g. SECURITYADMIN or any role with the MANAGE GRANTS privilege) can then grant this custom role to any task owner role to allow altering their own tasks. To remove the ability for the task owner role to execute the task, it is only necessary to revoke this custom role from the task owner role. Note that if you choose not to create this custom role, an account administrator must revoke the EXECUTE TASK privilege from the task owner role.

For example, create a custom role name taskadmin and grant that role the EXECUTE TASK privilege. Assign the taskadmin role to a task owner role named myrole :

For more information on creating custom roles and role hierarchies, see Configuring Access Control .

Dropping a Task Owner Role ¶

When the owner role of a given task (i.e. the role with the OWNERSHIP privilege on the task) is deleted, the task is “re-possessed” by the role that dropped the owner role. This ensures that ownership moves to a role that is closer to the root of the role hierarchy. When a task is re-possessed, it is automatically paused, i.e., all executions currently in flight complete processing, but new executions will not be scheduled until the task is resumed explicitly by the new owner. The rationale for this is to prevent a user with access to a particular role from leaving behind tasks that suddenly execute with higher permissions when the role is removed.

If the role that a running task is executing under is dropped while the task is running, the task completes processing under the dropped role.

This section provides a high-level overview of the task setup workflow.

Complete the steps in Creating a Task Administrator Role (in this topic) to create a role that can be used to execute the commands in the following steps.

Create a task using CREATE TASK . The task is suspended by default.

Verify the SQL statement that you will reference in a task executes as expected before you create the task. Tasks are intended to automate SQL statements or stored procedures that have already been tested thoroughly.

Execute ALTER TASK … RESUME to allow the task to run based on the parameters specified in the task definition.

Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

How can in run a dag with one task

does anyone knows how to launch a dag with one task please, the code above don't work, the dag is launched properly

monkey2's user avatar

2 Answers 2

Make sure that your indentation is correct. Your task should be defined inside the scope of the DAG context manager, otherwise you must provide the dag parameter to your task:

Or without using context manager:

Gabio's user avatar

Can you try giving schedule interval as parameter to dag, instead of in default_args?

Hope this works.

Tamil Selvan V's user avatar

Your Answer

Sign up or log in, post as a guest.

Required, but never shown

By clicking “Post Your Answer”, you agree to our terms of service , privacy policy and cookie policy

Not the answer you're looking for? Browse other questions tagged python airflow scheduler or ask your own question .

Hot Network Questions

single task dag

Your privacy

By clicking “Accept all cookies”, you agree Stack Exchange can store cookies on your device and disclose information in accordance with our Cookie Policy .

Internal DB details

A DAG Run is an object representing an instantiation of the DAG in time. Any time the DAG is executed, a DAG Run is created and all tasks inside it are executed. The status of the DAG Run depends on the tasks states. Each DAG Run is run separately from one another, meaning that you can have many runs of a DAG at the same time.

DAG Run Status ¶

A DAG Run status is determined when the execution of the DAG is finished. The execution of the DAG depends on its containing tasks and their dependencies. The status is assigned to the DAG Run when all of the tasks are in the one of the terminal states (i.e. if there is no possible transition to another state) like success , failed or skipped . The DAG Run is having the status assigned based on the so-called “leaf nodes” or simply “leaves”. Leaf nodes are the tasks with no children.

There are two possible terminal states for the DAG Run:

success if all of the leaf nodes states are either success or skipped ,

failed if any of the leaf nodes state is either failed or upstream_failed .

Be careful if some of your tasks have defined some specific trigger rule . These can lead to some unexpected behavior, e.g. if you have a leaf task with trigger rule “all_done” , it will be executed regardless of the states of the rest of the tasks and if it will succeed, then the whole DAG Run will also be marked as success , even if something failed in the middle.

Cron Presets ¶

You may set your DAG to run on a simple schedule by setting its schedule argument to either a cron expression , a datetime.timedelta object, or one of the following cron “presets”. For more elaborate scheduling requirements, you can implement a custom timetable

You can use an online editor for CRON expressions such as Crontab guru

Your DAG will be instantiated for each schedule along with a corresponding DAG Run entry in the database backend.

Data Interval ¶

Each DAG run in Airflow has an assigned “data interval” that represents the time range it operates in. For a DAG scheduled with @daily , for example, each of its data interval would start each day at midnight (00:00) and end at midnight (24:00).

A DAG run is usually scheduled after its associated data interval has ended, to ensure the run is able to collect all the data within the time period. In other words, a run covering the data period of 2020-01-01 generally does not start to run until 2020-01-01 has ended, i.e. after 2020-01-02 00:00:00.

All dates in Airflow are tied to the data interval concept in some way. The “logical date” (also called execution_date in Airflow versions prior to 2.2) of a DAG run, for example, denotes the start of the data interval, not when the DAG is actually executed.

Similarly, since the start_date argument for the DAG and its tasks points to the same logical date, it marks the start of the DAG’s first data interval , not when tasks in the DAG will start running. In other words, a DAG run will only be scheduled one interval after start_date .

If a cron expression or timedelta object is not enough to express your DAG’s schedule, logical date, or data interval, see Timetables . For more information on logical date , see Running DAGs and What does execution_date mean?

Re-run DAG ¶

There can be cases where you will want to execute your DAG again. One such case is when the scheduled DAG run fails.

An Airflow DAG defined with a start_date , possibly an end_date , and a non-dataset schedule, defines a series of intervals which the scheduler turns into individual DAG runs and executes. The scheduler, by default, will kick off a DAG Run for any data interval that has not been run since the last data interval (or has been cleared). This concept is called Catchup.

If your DAG is not written to handle its catchup (i.e., not limited to the interval, but instead to Now for instance.), then you will want to turn catchup off. This can be done by setting catchup=False in DAG or catchup_by_default=False in the configuration file. When turned off, the scheduler creates a DAG run only for the latest interval.

In the example above, if the DAG is picked up by the scheduler daemon on 2016-01-02 at 6 AM, (or from the command line), a single DAG Run will be created with a data between 2016-01-01 and 2016-01-02, and the next one will be created just after midnight on the morning of 2016-01-03 with a data interval between 2016-01-02 and 2016-01-03.

If the dag.catchup value had been True instead, the scheduler would have created a DAG Run for each completed interval between 2015-12-01 and 2016-01-02 (but not yet one for 2016-01-02, as that interval hasn’t completed) and the scheduler will execute them sequentially.

Catchup is also triggered when you turn off a DAG for a specified period and then re-enable it.

This behavior is great for atomic datasets that can easily be split into periods. Turning catchup off is great if your DAG performs catchup internally.

There can be the case when you may want to run the DAG for a specified historical period e.g., A data filling DAG is created with start_date 2019-11-21 , but another user requires the output data from a month ago i.e., 2019-10-21 . This process is known as Backfill.

You may want to backfill the data even in the cases when catchup is disabled. This can be done through CLI. Run the below command

The backfill command will re-run all the instances of the dag_id for all the intervals within the start date and end date.

Re-run Tasks ¶

Some of the tasks can fail during the scheduled run. Once you have fixed the errors after going through the logs, you can re-run the tasks by clearing them for the scheduled date. Clearing a task instance doesn’t delete the task instance record. Instead, it updates max_tries to 0 and sets the current task instance state to None , which causes the task to re-run.

Click on the failed task in the Tree or Graph views and then click on Clear . The executor will re-run it.

There are multiple options you can select to re-run -

Past - All the instances of the task in the runs before the DAG’s most recent data interval

Future - All the instances of the task in the runs after the DAG’s most recent data interval

Upstream - The upstream tasks in the current DAG

Downstream - The downstream tasks in the current DAG

Recursive - All the tasks in the child DAGs and parent DAGs

Failed - Only the failed tasks in the DAG’s most recent run

You can also clear the task through CLI using the command:

For the specified dag_id and time interval, the command clears all instances of the tasks matching the regex. For more options, you can check the help of the clear command :

External Triggers ¶

Note that DAG Runs can also be created manually through the CLI. Just run the command -

The DAG Runs created externally to the scheduler get associated with the trigger’s timestamp and are displayed in the UI alongside scheduled DAG runs. The logical date passed inside the DAG can be specified using the -e argument. The default is the current date in the UTC timezone.

In addition, you can also manually trigger a DAG Run using the web UI (tab DAGs -> column Links -> button Trigger Dag )

Passing Parameters when triggering dags ¶

When triggering a DAG from the CLI, the REST API or the UI, it is possible to pass configuration for a DAG Run as a JSON blob.

Example of a parameterized DAG:

Note : The parameters from dag_run.conf can only be used in a template field of an operator.

Using CLI ¶

../_images/example_passing_conf.png

To Keep in Mind ¶

Marking task instances as failed can be done through the UI. This can be used to stop running task instances.

Marking task instances as successful can be done through the UI. This is mostly to fix false negatives, or for instance, when the fix has been applied outside of Airflow.

Was this entry helpful?

Write Airflow DAGs

This guide shows you how to write an Apache Airflow directed acyclic graph (DAG) that runs in a Cloud Composer environment.

Structuring an Airflow DAG

An Airflow DAG is defined in a Python file and is composed of the following components: A DAG definition, operators, and operator relationships. The following code snippets show examples of each component out of context:

A DAG definition.

composer/workflows/simple.py View on GitHub Feedback import datetime from airflow import models default_dag_args = { # The start_date describes when a DAG is valid / can be run. Set this to a # fixed point in time rather than dynamically, since it is evaluated every # time a DAG is parsed. See: # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date 'start_date': datetime.datetime(2018, 1, 1), } # Define a DAG (directed acyclic graph) of tasks. # Any task you create within the context manager is automatically added to the # DAG object. with models.DAG( 'composer_sample_simple_greeting', schedule_interval=datetime.timedelta(days=1), default_args=default_dag_args) as dag:

composer/airflow_1_samples/simple.py View on GitHub Feedback import datetime from airflow import models default_dag_args = { # The start_date describes when a DAG is valid / can be run. Set this to a # fixed point in time rather than dynamically, since it is evaluated every # time a DAG is parsed. See: # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date 'start_date': datetime.datetime(2018, 1, 1), } # Define a DAG (directed acyclic graph) of tasks. # Any task you create within the context manager is automatically added to the # DAG object. with models.DAG( 'composer_sample_simple_greeting', schedule_interval=datetime.timedelta(days=1), default_args=default_dag_args) as dag:

Operators to describe the work to be done. An instantiation of an operator is called a task .

composer/workflows/simple.py View on GitHub Feedback from airflow.operators import bash_operator from airflow.operators import python_operator def greeting(): import logging logging.info('Hello World!') # An instance of an operator is called a task. In this case, the # hello_python task calls the "greeting" Python function. hello_python = python_operator.PythonOperator( task_id='hello', python_callable=greeting) # Likewise, the goodbye_bash task calls a Bash script. goodbye_bash = bash_operator.BashOperator( task_id='bye', bash_command='echo Goodbye.')

composer/airflow_1_samples/simple.py View on GitHub Feedback from airflow.operators import bash_operator from airflow.operators import python_operator def greeting(): import logging logging.info('Hello World!') # An instance of an operator is called a task. In this case, the # hello_python task calls the "greeting" Python function. hello_python = python_operator.PythonOperator( task_id='hello', python_callable=greeting) # Likewise, the goodbye_bash task calls a Bash script. goodbye_bash = bash_operator.BashOperator( task_id='bye', bash_command='echo Goodbye.')

Task relationships to describe the order in which the work should be completed.

composer/workflows/simple.py View on GitHub Feedback # Define the order in which the tasks complete by using the >> and << # operators. In this example, hello_python executes before goodbye_bash. hello_python >> goodbye_bash

composer/airflow_1_samples/simple.py View on GitHub Feedback # Define the order in which the tasks complete by using the >> and << # operators. In this example, hello_python executes before goodbye_bash. hello_python >> goodbye_bash

Full DAG workflow example in Python

The following workflow is a complete working DAG template that is composed of two tasks: a hello_python task and a goodbye_bash task:

Airflow tutorials and concepts

See the Airflow tutorial and Airflow concepts for more information on defining Airflow DAGs.

Airflow operators

The following examples show a few popular Airflow operators. For an authoritative reference of Airflow operators, see the Apache Airflow API Reference or browse the source code of the core , contrib , and providers operators.

BashOperator

Use the BashOperator to run command-line programs.

Cloud Composer runs the provided commands in a Bash script on a worker. The worker is a Debian-based Docker container and includes several packages.

PythonOperator

Use the PythonOperator to run arbitrary Python code.

Cloud Composer runs the Python code in a container that includes packages for the Cloud Composer image version used in your environment.

To install additional Python packages, see Installing Python Dependencies .

Google Cloud Operators

Use the Google Cloud Airflow operators to run tasks that use Google Cloud products. Cloud Composer automatically configures an Airflow connection to the environment's project.

BigQuery operators query and process data in BigQuery .

composer/workflows/bq_notify.py View on GitHub Feedback from airflow.providers.google.cloud.operators import bigquery from airflow.providers.google.cloud.transfers import bigquery_to_gcs bq_recent_questions_query = bigquery.BigQueryInsertJobOperator( task_id="bq_recent_questions_query", configuration={ "query": { "query": RECENT_QUESTIONS_QUERY, "useLegacySql": False, "destinationTable": { "projectId": project_id, "datasetId": bq_dataset_name, "tableId": bq_recent_questions_table_id, }, } }, location=location, )

composer/airflow_1_samples/bq_notify.py View on GitHub Feedback from airflow.contrib.operators import bigquery_operator # Query recent StackOverflow questions. bq_recent_questions_query = bigquery_operator.BigQueryOperator( task_id="bq_recent_questions_query", sql=""" SELECT owner_display_name, title, view_count FROM `bigquery-public-data.stackoverflow.posts_questions` WHERE creation_date < CAST('{max_date}' AS TIMESTAMP) AND creation_date >= CAST('{min_date}' AS TIMESTAMP) ORDER BY view_count DESC LIMIT 100 """.format( max_date=max_query_date, min_date=min_query_date ), use_legacy_sql=False, destination_dataset_table=bq_recent_questions_table_id, )

Dataflow operators run Apache Beam jobs in Dataflow .

Cloud Data Fusion operators manage and run Cloud Data Fusion pipelines.

Dataproc operators run Hadoop and Spark jobs in Dataproc .

Datastore operators read and write data in Datastore .

AI Platform operators run training and prediction jobs in AI Platform .

Cloud Storage operators read and write data in Cloud Storage .

EmailOperator

Use the EmailOperator to send email from a DAG. To send email from a Cloud Composer environment, you must configure your environment to use SendGrid .

Notifications

Set email_on_failure to True to send an email notification when an operator in the DAG fails. To send email notifications from a Cloud Composer environment, you must configure your environment to use SendGrid .

DAG workflow guidelines

Use Airflow 2 instead of Airflow 1.

The Airflow community does not publish new minor or patch releases for Airflow 1 anymore.

Place any custom Python libraries in a DAG's ZIP archive in a nested directory. Do not place libraries at the top level of the DAGs directory.

When Airflow scans the dags/ folder, Airflow only checks for DAGs in Python modules that are in the top-level of the DAGs folder and in the top level of a ZIP archive also located in the top-level dags/ folder. If Airflow encounters a Python module in a ZIP archive that does not contain both airflow and DAG substrings, Airflow stops processing the ZIP archive. Airflow returns only the DAGs found up to that point.

For fault tolerance, do not define multiple DAG objects in the same Python module.

Do not use SubDAGs. Instead, use alternatives instead.

Place files that are required at DAG parse time into dags/ folder, not in the data/ directory.

Implement unit tests for your DAGs

Test developed or modified DAGs as recommended in instructions for testing DAGs

Verify that developed DAGs do not increase DAG parse times too much.

Airflow tasks can fail for multiple reasons. To avoid failures of whole DAG runs, we recommend to enable task retries. Setting maximum retries to 0 means that no retries are performed.

We recommend to override the default_task_retries option with a value for the task retires other than 0 . In addition, you can set the retries parameter at a task level (if necessary).

If you want to use GPU in your Airflow tasks then create a separate GKE cluster based on nodes using machines with GPUs. Use GKEStartPodOperator to run your tasks.

Avoid running CPU- and memory-heavy tasks in the cluster's node pool where other Airflow components (schedulers, workers, web servers) are running. Use KubernetesPodOperator or GKEPodOperators instead. Select

When deploying DAGs into an environment, upload only the files that are absolutely necessary for interpreting and executing DAGs into the /dags folder.

Limit the number of DAG files in /dags folder

Airflow is continuously parsing DAGs in /dags folder. The parsing is a process that loops through the DAGs folder and the number of files that need to be loaded (with their dependencies) makes impacts the performance of DAG parsing and task scheduling. It is much more efficient to use 100 files with 100 DAGs each than 10000 files with 1 DAG each and so such optimization is recommended. This optimization is a balance between parsing time and efficiency of DAG authoring and management.

You can also consider, e.g. to deploy 10000 DAG files you could create 100 zip files each containing 100 DAG files.

In addition to hints above, if you have more than 10000 DAG files then generating DAGs in a programamtic way might be a good option. For example, you can implement a single Python DAG file that generates some number of DAG objects (e.g. 20, 100 DAG objects).

FAQs for writing DAGs

How do i minimize code repetition if i want to run the same or similar tasks in multiple dags.

We suggest defining libraries and wrappers to minimize the code repetition.

How do I reuse code between DAG files?

Put your utility functions in a local Python library and import the functions. You can reference the functions in any DAG located in the dags/ folder in your environment's bucket.

How do I minimize the risk of different definitions arising?

For example, you have two teams that want to aggregate raw data into revenue metrics. The teams write two slightly different tasks that accomplish the same thing. Define libraries to work with the revenue data so that the DAG implementers must clarify the definition of revenue that's being aggregated.

How do I set dependencies between DAGs?

This depends on how you want to define the dependency.

If you have two DAGs (DAG A and DAG B) and you want DAG B to trigger after DAG A, you can put a TriggerDagRunOperator at the end of Dag A.

If DAG B depends only on an artifact that DAG A generates, such as a Pub/Sub message, then a sensor might work better.

If DAG B is integrated closely with DAG A, you might be able to merge the two DAGs into one DAG.

How do I pass unique run IDs to a DAG and its tasks?

For example, you want to pass Dataproc cluster names and file paths.

You can generate a random unique ID by returning str(uuid.uuid4()) in a PythonOperator . This places the ID into XComs so that you can refer to the ID in other operators via templated fields.

Before generating a uuid , consider whether a DagRun-specific ID would be more valuable. You can also reference these IDs in Jinja substitutions by using macros .

How do I separate tasks in a DAG?

Each task should be an idempotent unit of work. Consequently, you should avoid encapsulating a multi-step workflow within a single task, such as a complex program running in a PythonOperator .

Should I define multiple tasks in a single DAG to aggregate data from multiple sources?

For example, you have multiple tables with raw data and want to create daily aggregates for each table. The tasks are not dependent on each other. Should you create one task and DAG for each table or create one general DAG?

If you are okay with each task sharing the same DAG-level properties, such as schedule_interval , then it makes sense to define multiple tasks in a single DAG. Otherwise, to minimize code repetition, multiple DAGs can be generated from a single Python module by placing them into the module's globals() .

How do I limit the number of concurrent tasks running in a DAG?

For example, you want to avoid exceeding API usage limits/quotas or avoid running too many simultaneous processes.

You can define Airflow pools in the Airflow web UI and associate tasks with existing pools in your DAGs.

FAQs for using operators

Should i use the dockeroperator .

We do not recommend using the DockerOperator , unless it's used to launch containers on a remote Docker installation (not within an environment's cluster). In a Cloud Composer environment the operator does not have access to Docker daemons.

Instead, use KubernetesPodOperator or GKEStartPodOperator . These operators launch Kubernetes pods into Kubernetes or GKE clusters respectively. Note that we don't recommend launching pods into an environment's cluster, because this can lead to resource competition.

Should I use the SubDagOperator ?

We do not recommend using the SubDagOperator .

Use alternatives as suggested in Grouping Tasks instructions

Should I run Python code only in PythonOperators to fully separate Python operators?

Depending on your goal, you have a few options.

If your only concern is maintaining separate Python dependencies, you can use the PythonVirtualenvOperator .

Consider using the KubernetesPodOperator . This operator allows you to define Kubernetes pods and run the pods in other clusters.

How do I add custom binary or non-PyPI packages?

You can install packages hosted in private package repositories .

You can also use the KubernetesPodOperator to run a Kubernetes pod with your own image built with custom packages.

How do I uniformly pass arguments to a DAG and its tasks?

You can use Airflow's built-in support for Jinja templating to pass arguments that can be used in templated fields.

When does template substitution happen?

Template substitution occurs on Airflow workers just before the pre_execute function of an operator is called. In practice, this means that templates are not substituted until just before a task runs.

How do I know which operator arguments support template substitution?

Operator arguments that support Jinja2 template substitution are explicitly marked as such.

Look for the template_fields field in the Operator definition, which contains a list of argument names that will undergo template substitution.

For example, see the BashOperator , which supports templating for the bash_command and env arguments.

Avoid using deprecated Airflow operators

Operators listed in the following table are deprecated. Avoid using them in your DAGs. Instead, use provided up-to-date alternatives.

What's next

Except as otherwise noted, the content of this page is licensed under the Creative Commons Attribution 4.0 License , and code samples are licensed under the Apache 2.0 License . For details, see the Google Developers Site Policies . Java is a registered trademark of Oracle and/or its affiliates.

Last updated 2023-03-03 UTC.

Create dynamic Airflow tasks

With the release of Airflow 2.3 , you can write DAGs that dynamically generate parallel tasks at runtime. This feature, known as dynamic task mapping, is a paradigm shift for DAG design in Airflow.

Prior to Airflow 2.3, tasks could only be generated dynamically at the time that the DAG was parsed, meaning you had to change your DAG code if you needed to adjust tasks based on some external factor. With dynamic task mapping, you can easily write DAGs that create tasks based on your current runtime environment.

In this guide, you'll learn about dynamic task mapping and complete an example implementation for a common use case.

Assumed knowledge ​

To get the most out of this guide, you should have an understanding of:

Dynamic task concepts ​

The Airflow dynamic task mapping feature is based on the MapReduce programming model. Dynamic task mapping creates a single task for each input. The reduce procedure, which is optional, allows a task to operate on the collected output of a mapped task. In practice, this means that your DAG can create an arbitrary number of parallel tasks at runtime based on some input parameter (the map), and then if needed, have a single task downstream of your parallel mapped tasks that depends on their output (the reduce).

Airflow tasks have two new functions available to implement the map portion of dynamic task mapping. For the task you want to map, all operator parameters must be passed through one of the following functions.

Airflow 2.4 allowed the mapping of multiple keyword argument sets. This type of mapping uses the function expand_kwargs() instead of expand() .

In the following example, the task uses both, .partial() and .expand() , to dynamically generate three task runs:

This expand function creates three mapped add tasks, one for each entry in the x input list. The partial function specifies a value for y that remains constant in each task.

When you work with mapped tasks, keep the following in mind:

For additional examples of how to apply dynamic task mapping functions, see Dynamic Task Mapping in the official Airflow documentation.

The Airflow UI provides observability for mapped tasks in the Graph View and the Grid View .

In the Graph View , mapped tasks are identified with a set of brackets [ ] followed by the task ID. The number in the brackets is updated for each DAG run to reflect how many mapped instances were created.

Mapped Graph

Click the mapped task to display the Mapped Instances list and select a specific mapped task run to perform actions on.

Mapped Actions

Select one of the mapped instances to access links to other views such as Instance Details, Rendered, Log, XCom, and so on.

Mapped Views

The Grid View shows task details and history for each mapped task. All mapped tasks are combined into one row on the grid. In the following image, this is shown as mix_cross_and_zip [ ] . Click the task to view details for each individual mapped instance below the Mapped Tasks tab.

Mapped Grid

Mapping over the result of another operator ​

You can use the output of an upstream operator as the input data for a dynamically mapped downstream task.

In this section you'll learn how to pass mapping information to a downstream task for each of the following scenarios:

If both tasks are defined using the TaskFlow API, you can provide a function call to the upstream task as the argument for the expand() function.

If you are mapping over the results of a traditional operator, you need to provide the argument for expand() using the .output attribute of the task object.

When mapping a traditional PythonOperator over results from an upstream TaskFlow task you need to modify the format of the output to be accepted by the op_args argument of the traditional PythonOperator.

When mapping a traditional PythonOperator over the result of another PythonOperator use the .output attribute on the task object and make sure the format returned by the upstream task matches the format expected by the op_args parameter.

Mapping over multiple parameters ​

You can use one of the following methods to map over multiple parameters:

Cross-product ​

The default behavior of the expand() function is to create a mapped task instance for every possible combination of all provided inputs. For example, if you map over three keyword arguments and provide two options to the first, four options to the second, and five options to the third, you would create 2x4x5=40 mapped task instances. One common use case for this method is tuning model hyperparameters.

The following task definition maps over three options for the bash_command parameter and three options for the env parameter. This will result in 3x3=9 mapped task instances. Each bash command runs with each definition for the environment variable WORD .

The nine mapped task instances of the task cross_product_example run all possible combinations of the bash command with the env variable:

Sets of keyword arguments ​

To map over sets of inputs to two or more keyword arguments (kwargs), you can use the expand_kwargs() function in Airflow 2.4 and later. You can provide sets of parameters as a list containing a dictionary or as an XComArg . The operator gets 3 sets of commands, resulting in 3 mapped task instances.

The task t1 will have three mapped task instances printing their results into the logs:

In Airflow 2.4 and later you can provide sets of positional arguments to the same keyword argument. For example, the op_args argument of the PythonOperator. You can use the built-in zip() Python function if your inputs are in the form of iterables such as tuples, dictionaries, or lists. If your inputs come from XCom objects, you can use the .zip() method of the XComArg object.

Provide positional arguments with the built-in Python zip() ​

The zip() function takes in an arbitrary number of iterables and uses their elements to create a zip-object containing tuples. There will be as many tuples as there are elements in the shortest iterable. Each tuple contains one element from every iterable provided. For example:

The following code snippet shows how a list of zipped arguments can be provided to the expand() function in order to create mapped tasks over sets of positional arguments. In the TaskFlow API version of the DAG, each set of positional arguments is passed to the argument zipped_x_y_z . In the DAG using a traditional PythonOperator each set of positional arguments is unpacked due to op_args expecting an iterable and passed to the arguments x , y and z .

The task add_numbers will have three mapped task instances one for each tuple of positional arguments:

Provide positional arguments with XComArg.zip() ​

It is also possible to zip XComArg objects. If the upstream task has been defined using the TaskFlow API, provide the function call. If the upstream task uses a traditional operator, provide task_object.output or XcomArg(task_object) . In the following example, you can see the results of three tasks being zipped together to form the zipped_arguments ( [(1, 10, 100), (2, 1000, 200), (1000, 1000, 300)] ).

To mimic the behavior of the zip_longest() function, you can add the optional fillvalue keyword argument to the .zip() method. If you specify a default value with fillvalue , the method produces as many tuples as the longest input has elements and fills in missing elements with the default value. If fillvalue was not specified in the example below, zipped_arguments would only contain one tuple [(1, 10, 100)] since the shortest list provided to the .zip() method is only one element long.

The add_nums task will have three mapped instances with the following results:

Mapping over task groups ​

As of Airflow 2.5, you can dynamically map over a task group that uses the @task_group decorator. The syntax for dynamically mapping over a task group is the same as dynamically mapping over a single task.

You can also dynamically map over multiple task group input parameters as you would for regular tasks using a cross-product, zip function, or sets of keyword arguments. For more on this, see Mapping over multiple parameters .

Transform outputs with .map ​

There are use cases where you want to transform the output of an upstream task before another task dynamically maps over it. For example, if the upstream traditional operator returns its output in a fixed format or if you want to skip certain mapped task instances based on a logical condition.

The .map() method was added in Airflow 2.4. It accepts a Python function and uses it to transform an iterable input before a task dynamically maps over it.

You can call .map() directly on a task using the TaskFlow API ( my_upstream_task_flow_task().map(mapping_function) ) or on the output object of a traditional operator ( my_upstream_traditional_operator.output.map(mapping_function) ).

The downstream task is dynamically mapped over the object created by the .map() method using either .expand() for a single keyword argument or .expand_kwargs() for list of dictionaries containing sets of keyword arguments.

The code snippet below shows how to use .map() to skip specific mapped tasks based on a logical condition.

In the grid view you can see how the mapped task instances 0 and 2 have been skipped.

Skipped Mapped Tasks

Example implementation ​

For this example, you'll implement one of the most common use cases for dynamic tasks: processing files in Amazon S3. In this scenario, you'll use an ELT framework to extract data from files in Amazon S3, load the data into Snowflake, and transform the data using Snowflake's built-in compute. It's assumed that the files will be dropped daily, but it's unknown how many will arrive each day. You'll leverage dynamic task mapping to create a unique task for each file at runtime. This gives you the benefit of atomicity, better observability, and easier recovery from failures.

All code used in this example is located in the dynamic-task-mapping-tutorial repository .

The example DAG completes the following steps:

The Graph View for the DAG looks similar to this image:

ELT Graph

When dynamically mapping tasks, make note of the format needed for the parameter you are mapping. In the previous example, you wrote your own Python function to get the Amazon S3 keys because the S3toSnowflakeOperator requires each s3_key parameter to be in a list format, and the s3_hook.list_keys function returns a single list with all keys. By writing your own simple function, you can turn the hook results into a list of lists that can be used by the downstream operator.

Ways to Trigger Airflow DAGs blog side bar

No Code Data Pipeline For Your Data Warehouse

Easily load data from a source of your choice to your desired destination in real-time using Hevo.

2 Easy Ways to Trigger Airflow DAGs in Apache Airflow

Yash Arora • February 17th, 2022

DAG scheduling may seem difficult at first, but it really isn’t. And, if you are a data professional who wants to learn more about data scheduling and how to trigger Airflow DAGs, you’re at the right place.

Table of Contents

But, why did the need for data scheduling surface in the first place? The answer lies with the imperative to process the correct data at the right time. With its flexibility and freedom, Airflow provides users with the ability to schedule their DAG Runs. And with the help of the Airflow Scheduler, users can ensure future, as well as, the past DAG Runs, to be performed at the right time and in the correct order.

In this guide, we will discuss, in detail, the concept of scheduling and DAG Runs in Airflow. And, moving forward, we’ll be examining two methods to Trigger Airflow DAGs. Let’s begin.

What is Apache Airflow?

The concept of scheduling in airflow, the concept of dag runs in airflow, trigger airflow dags on a schedule, trigger airflow dags manually.

Apache Airflow — an open-source and workflow management platform — is a tool to manage data engineering pipelines.

Programmed in Python and utilized with some standard features of the Python framework, Airflow enables its users to efficiently schedule data processing for engineering pipelines. The platform works as a building block that allows users to stitch together the many technologies present in today’s technological landscapes.

Some key features of Apache Airflow are as follows:

One of the apex features of Apache Airflow, scheduling helps developers schedule tasks and assist to assign instances for a DAG Run on a scheduled interval. By definition, the Apache scheduler’s job is to monitor and stay in sync with all DAG objects, employ a repetitive process to store DAG parsing results, and always be on the lookout for active tasks to be triggered.

But how does Scheduling work in Airflow?

For example, If you run a DAG with “Schedule_interval” of “1” day, and the run stamp is set at 2022-02-16, the task will trigger soon after “2022-02-16T23:59.” Hence, the instance gets a trigger once the period set limit is reached.

The Apache Scheduler is custom-built to work seamlessly in an Airflow production environment. To run the scheduler, try running the code given below. “Airflow.cfg” contains all you need to know about the configuration.

DAG stands for Direct Acyclic Graphs. These graphs are a pictorial representation of tasks in a pecking order. Each task is shown in the graph with the flow of execution from one task to another.

On the other hand, a DAG Run works as an extension — or as defined in the Apache documentation, “an instantiation” — of the DAG in time.

All DAG Runs have a schedule to abide by, but DAG might or might not have a schedule. By default, the “schedule_interval” is the DAG argument. The scheduled arguments can be treated as a cron expression. Some examples of cron presets are as follows:

Simplify your Data Analysis with Hevo’s No-code Data Pipeline

Hevo Data , a No-code Data Pipeline helps to Load Data from any data source such as Databases, SaaS applications, Cloud Storage, SDK,s, its and Streaming Services and simplifies the ETL process. It supports 100+ data sources and loads the data onto the desired Data Warehouse, enriches the data, and transforms it into an analysis-ready form without writing a single line of code.

Its completely automated pipeline offers data to be delivered in real-time without any loss from source to destination. Its fault-tolerant and scalable architecture ensure that the data is handled in a secure, consistent manner with zero data loss and supports different forms of data. The solutions provided are consistent and work with different Business Intelligence (BI) tools as well.

Check out why Hevo is the Bes t:

Two Methods to Trigger Airflow DAGs

This section of the blog post will discuss the two ways available to trigger Airflow DAGs:

To continue triggering Airflow DAGs on a schedule, it’s first required to specify the “start_date” and the “schedule_interval” parameters. Second, it’s required to upload the DAG file to your environment, too.

To specify the scheduling parameters, you define how many times the DAG will run by placing values in the “schedule_interval” parameter. And, to specify when Airflow should schedule DAG tasks, place the values in the “ start_date” parameter. 

Note: Airflow schedules DAG Runs based on the minimum start date for tasks, as defined in the “schedule_interval” parameter which is the argument for DAG.

Look at the example given below. It shows a DAG frequently running, in a periodic manner. The DAG runs every hour, from 15:00 on April 5, 2021. According to the parameters and working principles of DAG, Airflow schedules the first DAG Run at 16:00 April, 2021.

While we trigger Airflow DAGs manually, Airflow performs the subsequent DAG Run. That means, if you independently trigger Airflow DAGs, already running on their respective schedule, then Airflow will go ahead and run that DAG, not affecting the scheduled DAGs. Now, to trigger Airflow DAGs manually, two methods exist:

Method 1: Trigger Airflow DAGs manually using Airflow U in GCC:

Step 1: In GCC, open the Environment page.  Click here  to open the Environment page.

Step 2: Now, in the Airflow webserver column, an Airflow link will be present for your environment. Click on that.

Step 3: While on the Airflow web interface, find the DAGs page. Now on the Links column for the DAG, click on the “Trigger Dag” button.

Step 4: You can also specify the DAG run configuration, but it’s optional.

Step 5: Click on the “Trigger” button.

Method 2: Trigger Airflow DAGs manually using gcloud in GCC:

For Airflow 1.10.*, the CLI command, “trigger_dag” is used to trigger the DAG Run. Look at the code given below:

For Airflow 2, the CLI command, “dags trigger” is used to trigger the DAG Run. Look at the code given below:

Note: You can replace the following parameter names with the following:

In this guide, we learned in detail about the different ways to trigger Airflow DAGs. We also made our way through the basics of scheduling, triggers, and DAG Runs, too. And, if you want to get more familiar with the basics of scheduling data pipeline processes in Airflow, below-given two articles can help:

Although it’s easy to create workflow pipelines in Apache Airflow for a trained professional, the need of the hour is to create a solution that will help a professional with a non-technical background — and Hevo Data is here to help!

Hevo Data will automate your data transfer process, hence allowing you to focus on other aspects of your business like Analytics, Customer Management, etc. This platform allows you to transfer data from 100+ multiple sources like Salesforce for free to Cloud-based Data Warehouses like Snowflake, Google BigQuery, Amazon Redshift, etc. It will provide you with a hassle-free experience and make your work life much easier.

Want to take Hevo for a spin? Sign Up for a 14-day free trial . You may also have a look at the amazing price , which will assist you in selecting the best plan for your requirements.

Related Articles

single task dag

Continue Reading

single task dag

Shubhnoor Gill

7 Best Airflow Alternatives for 2023

single task dag

How to Stop or Kill Airflow Tasks: 2 Easy Methods

single task dag

Davor DSouza

Airflow GitHub Integration: 6 Easy Steps

Bring real-time data from any source into your warehouse, i want to read this e-book.

DoubleVerify Engineering

DoubleVerify Engineering

DV Engineering

Optimizing for DAG and task complexity in Airflow

Written By: Aravind Ramesh

My first data engineering project at DoubleVerify (DV) involved processing files containing billions of rows. The pipeline contained many intermediate steps with complex dependencies and was responsible for applying a series of transformations and enrichments, as well as storing the results in a database.

In this blog post, I will talk about our journey in building this pipeline on Luigi, what we learned from it, and how we migrated to Apache Airflow. I will also describe the various approaches we took to scale, parallelize, and optimize our data pipelines, and will highlight key points to consider while building such pipelines.

The following sketch shows a high-level overview of our data pipeline, which was responsible for determining if a real user viewed an ad impression for one or two continuous seconds.

The first version of our pipeline was in Luigi. Luigi offered many benefits in terms of defining dependencies, automatic retries of failed tasks, and creating dynamic tasks. Dynamic tasks helped create a task per file chunk, which further helped parallelize processing.

However, Luigi had the following limitations:

Airflow, by this time, was gaining huge traction due to its powerful UI, which rendered visualizing, backfilling, and performing manual reruns of failed tasks trivial. Thus, it became a natural choice for us to try Airflow out.

However, one area that challenged us was the replication of the dynamic task pattern. While dynamic tasks are valuable, additional tools for scaling and parallelizing your pipeline, we quickly realized that doing so would be at the expense of making the Directed Acyclic Graph (DAG) very complex.

Are there any alternative solutions to finding the right balance between the task and DAG complexity? Let us dive in!

Approach 1: Single-task, serial implementation

The most straightforward implementation is to create a single task that will process all the file chunks serially. The following pseudo-code demonstrates this idea:

The code snippet above renders the following DAG.

The main advantage of this approach is that everything is encapsulated within a single task. Either all your data is processed and your task is green, or your data is not processed and your task is red.

The disadvantage of this approach is that:

Fortunately, Airflow also has dynamic tasks like Luigi, which we will explore next.

Approach 2: Dynamic tasks

This approach is an improvement to the serial option and is similar to how we implemented it in Luigi. This approach helps:

The following pseudo-code demonstrates this idea:

However, this implementation suffers from the following downsides:

* Scalable in terms of processing but, overall, puts a lot of load on the infrastructure.

Approach 1 and Approach 2 are examples of task complexity versus DAG complexity, and shows how the two complexities are at odds with each other. Task complexity is more straightforward than DAG complexity but requires reimplementing a lot of what Airflow offers. On the other hand, DAG complexity allows you to implement powerful dynamic tasks but severely limits operability. Is there an alternate approach?

Before jumping into the final solution, let us break down the problem into two parts:

Approach 3: Single task with state store

One of the main challenges with the single-task approach was reusability during reruns. Tracking file status is a quick way to ensure that a file processed in the previous attempts is not reprocessed when rerun.

The main idea here is to modify the task to perform a lookup of file/processing status.

This can be implemented as demonstrated in the following pseudo code:

This lookup ensures that on task rerun:

Why is this better? For one, we will not have to cherry-pick all the failed tasks in the UI that need to be rerun. Instead, rerunning the single task automatically resolves this issue. Secondly, accidental reruns will have no consequences, making tasks idempotent by default.

Where should the states be stored? They can be stored anywhere, such as Airflow XCOMs, external caches like memcache/Redis, a database, or a file. The choice of the cache itself is irrelevant as long as you can easily incorporate the necessary code in your task to store and retrieve the states..

Let us now address the second question: How do we parallelize processing?

Approach 4 — Single task with state store & parallelism

The “for” loop in the code snippet above is the source of the problem that prevents us from scaling. Parallelizing this can take many shapes and forms, with the simplest being utilizing Python’s ThreadPoolExecutor, as demonstrated in the code snippet below.

In our experiments, we were able to reduce the processing time of the task from six hours to 30 minutes, with thread pools for processing about 5,000 file chunks.

The max_workers parameter can further help improve performance. While there is no theoretical limit, setting this parameter to an exceptionally high number could result in diminishing returns.

This approach handles all of our previously discussed concerns.

Optimizing further with KubernetesPodOperator

Using KubernetesPodOperator (KPO) instead of the PythonOperator helps further optimize and results in a separation of concerns. The KPO runs any Docker image in a separate container/pod, giving us the flexibility to specify the number of resources required, while PythonOperator runs the task within the Airflow worker.

So, are dynamic tasks bad?

Not at all! My colleague, Keo , wrote an excellent post about how we use dynamic tasks, and I highly encourage you to check it out. The general rule of thumb is to not overdo it and consider the operation implications of using dynamic tasks. Following this general rule of thumb can help save a lot of time and frustration in the long run. While we do not use Airflow directly to manage the life cycle of a single file transformation, we took inspiration from it, such as storing state in a cache and launching jobs dynamically. We also avoided completely reinventing the wheel and continue to find the Airflow UI invaluable. Think carefully about which features you want to leverage when using a new technology, and remember that it does not have to be all or nothing.

More from DoubleVerify Engineering

We write about software and data engineering problems, data science topics and share our experience with technologies we use

About Help Terms Privacy

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store

DV Engineering

DoubleVerify engineers, data scientists and analysts write about their work and share their experience

Text to speech

IMAGES

  1. Single Task Day

    single task dag

  2. Why Single-Tasking Is the New Multitasking

    single task dag

  3. SINGLE TASK

    single task dag

  4. An example of a DAG task τ1 which consists of 7 subtasks. The number on...

    single task dag

  5. : Type safe build logic with the Gradle Kotlin DSL

    single task dag

  6. Reddit

    single task dag

VIDEO

  1. Pulverized Necro Brain

  2. ARMA 3 Tactical Battlefield: If its Dumb and it Works

  3. *NEW* CELESTIAL SPIDER-GWEN SKIN IN FORTNITE PS5 + A VICTORY ROYALE WIN! (SOLO)

  4. Elton John

  5. Andra

  6. Arnold Wayne

COMMENTS

  1. python - Apache Airflow DAG with single task - Stack Overflow

    There are a lot of examples of basic DAGs in the Internet. Unfortunately, I didn't find any examples of single-task DAG's. Most of DAG's examples contain bitshift operator in the end of the .py script, which defines tasks order. For example: # ...our DAG's code... task1 >> task2 >> task3 But what if my DAG has just a single task at the moment?

  2. DAGs — Airflow Documentation

    A DAG (Directed Acyclic Graph) is the core concept of Airflow, collecting Tasks together, organized with dependencies and relationships to say how they should run. Here’s a basic example DAG: It defines four Tasks - A, B, C, and D - and dictates the order in which they have to run, and which tasks depend on what others.

  3. Introduction to Tasks | Snowflake Documentation

    You can specify the predecessor tasks when creating a new task (using CREATE TASK … AFTER) or later (using ALTER TASK … ADD AFTER). A DAG is limited to a maximum of 1000 tasks total (including the root task). A single task can have a maximum of 100 predecessor tasks and 100 child tasks.

  4. python - How can in run a dag with one task - Stack Overflow

    does anyone knows how to launch a dag with one task please, the code above don't work, the dag is launched properly python airflow scheduler Share Follow edited Jun 7, 2020 at 12:46 asked Jun 6, 2020 at 14:37 monkey2 31 7 Add a comment 2 Answers Sorted by: 4 Make sure that your indentation is correct.

  5. DAG Runs — Airflow Documentation

    A DAG Run is an object representing an instantiation of the DAG in time. Any time the DAG is executed, a DAG Run is created and all tasks inside it are executed. The status of the DAG Run depends on the tasks states. Each DAG Run is run separately from one another, meaning that you can have many runs of a DAG at the same time. DAG Run Status

  6. Write Airflow DAGs | Cloud Composer | Google Cloud

    # Define a DAG (directed acyclic graph) of tasks. # Any task you create within the context manager is automatically added to the # DAG object. with models.DAG(...

  7. Create dynamic Airflow tasks | Astronomer Documentation

    The reduce procedure, which is optional, allows a task to operate on the collected output of a mapped task. In practice, this means that your DAG can create an arbitrary number of parallel tasks at runtime based on some input parameter (the map), and then if needed, have a single task downstream of your parallel mapped tasks that depends on ...

  8. A Guide on Different Ways to Trigger Airflow DAGs - Hevo Data

    DAG stands for Direct Acyclic Graphs. These graphs are a pictorial representation of tasks in a pecking order. Each task is shown in the graph with the flow of execution from one task to another. On the other hand, a DAG Run works as an extension — or as defined in the Apache documentation, “an instantiation” — of the DAG in time.

  9. Optimizing for DAG and task complexity in Airflow | by DV ...

    Approach 1: Single-task, serial implementation The most straightforward implementation is to create a single task that will process all the file chunks serially. The following pseudo-code ...