Getting Started
If you are looking to write abstractions, or want to start converting existing DAGs into data-driven DAGs, start by writing some abstractions.
If you already have some abstractions and are looking to start using them to define concrete DAGs, you can jump straight to using abstractions.
Writing Abstractions
There are primarily three tiers of abstractions:
All three are interfaces over a pydantic
model, and thus provide much of the same functionality. Let’s explore their differences and where each one is helpful.
Common workflows that all abstractions share are:
Can use
parse_obj
to instantiate the class from a dictionary of data (e.g., from a YAML or JSON file)Can also be directly constructed in-code, just like any other
pydantic
-based classCan have custom field validation that goes well beyond mere type verification or coercion (e.g. checking that a string matches a regex, or that a number is within a given range, or converting a string to an enum)
*args, **kwargs
can often be passed to the primary methods of the various factories; however, it is recommended to make each class’s attributes self-contained such that the factory can be executed without any additional arguments.Any classes you write that should not be able to be instantiated should inherit
abc.ABC
DAG Factory
A DagFactory
constructs a complete DAG, and contains related DAG metadata (such as schedule, name, and start date). It can compose together operator factories, or it can be entirely self-container, or a mix.
from typing import List
from data_dag.dag_factory import DagFactory
from data_dag.operator_factory import OperatorFactory
class SimpleChainDag(DagFactory):
ops: List[OperatorFactory]
def _make_dag(self, *args, **kwargs) -> None:
operator_instances = [op.make_operator() for op in self.ops]
for a, b in zip(operator_instances[:-1], operator_instances[1:]):
a >> b
The base factory class provides definitions for virtually all attributes of DAG
, such that your custom factory classes need only define any additional attributes needed for your particular DAG. It also provides a convenience implementation of make_dag()
that is effectively the following:
def make_dag(self, *args, **kwargs):
with DAG(**self.dag_args) as dag:
self._make_dag(*args, **kwargs)
return dag
That is, your custom factory, in general, needs only define _make_dag()
to create and link the operators needed for your DAG. See the example DAG for reference.
Operator Factory
An OperatorFactory
use used to create one or more operators as a pattern. The result should generally be a single operator-like object, or a list (though your calling code will need to know how to handle this properly).
Let’s create, as an example, a universal Docker abstraction. We’ll start with a simple wrapper around a DockerOperator:
from data_dag.operator_factory import OperatorFactory
from airflow.providers.docker.operators.docker import DockerOperator
class DockerContainer(OperatorFactory):
name: str
image: str
command: str
def make_operator(self):
return DockerOperator(
task_id=f'docker_run_{self.name}',
image=self.image,
command=self.command,
)
To expand this, let’s say we wish to re-use this factory to create a kubernetes operator. In this case, we pass kubernetes-specific configuration into the factory, but in practice this might be best handled using external configuration, such as an Airflow connection. We might get the following:
...
from typing import Optional, Dict
class DockerContainer(OperatorFactory):
...
kubernetes_config: Optional[Dict]
def make_docker_operator(self):
return DockerOperator(...)
def make_kubernetes_operator(self):
return KubernetesPodOperator(..., **self.kubernetes_config)
def make_operator(self):
return self.make_docker_operator()
We could then expose this additional functionality to datafiles by allowing a runtime switch of which operator, regular Docker or Kubernetes, we wish to use.
import enum
class DockerRuntime(enum.Enum):
LOCAL = 'docker'
KUBERNETES = 'kubernetes'
class DockerContainer(OperatorFactory):
...
runtime: DockerRuntime
def make_operator(self):
if self.runtime == DockerRuntime.LOCAL:
return self.make_docker_operator()
elif self.runtime == DockerRuntime.KUBERNETES:
return self.make_kubernetes_operator()
This could be instantiated with the following data:
image: python3
command: python -m "print('hello world!')"
runtime: docker
Let’s say, for some reason, you want the ability to build a docker image before running it. This would be easy to integrate into this factory:
from airflow.utils.task_group import TaskGroup
class DockerContainer(OperatorFactory):
...
build_config: Optional[Dict]
def make_build_operator(self):
return BashOperator(
task_id='docker build -t {self.image} ...'
)
def make_operator(self):
with TaskGroup(group_id=f'run_{self.name}') as group:
if self.runtime == DockerRuntime.LOCAL:
run_operator = self.make_docker_operator()
elif self.runtime == DockerRuntime.KUBERNETES:
run_operator = self.make_kubernetes_operator()
if self.build_config:
build_operator = self.make_build_operator()
build_operator >> run_operator
return group
Note
In most cases, when creating multiple operators, you should wrap them up in a airflow.utils.task_group.TaskGroup
so that the result can be treated like a single operator. This makes single operators and compositions of operators indistinguishable from outside your factory, allowing both to be used identically.
This approach allows for factories to become flexible abstractions for common functionality. Additionally, if necessary, it would be straightforward to break an abstraction apart into a composition of multiple simpler factories.
Components
Not every abstraction directly maps to a DAG or operator, or even a collection of operators. For example, you might want an abstraction for a file in S3 that can be re-used in a variety of other operator factories. This can still be data-driven using OperatorComponent
:
from data_dag.operator_factory import OperatorComponent, OperatorFactory
class S3File(OperatorComponent):
bucket: str
key: str
class CopyFileToS3(OperatorFactory):
local_file: str
s3_file: S3File
def make_operator(self):
...
In some cases, it may be desirable to create a very simple component or factory with a single required field. Consider the following trivial example:
from data_dag.operator_factory import SimpleOperatorComponent, SimpleOperatorFactory
class Path(SimpleOperatorComponent):
path: str
is_file: bool = True
class CheckFileExists(SimpleOperatorFactory):
file_path: Path
def make_operator(self):
return BashOperator(...)
By extending SimpleOperatorComponent
instead of the ordinary OperatorComponent
, this object can be defined in data using just a string rather than a fully dictionary:
file_path: 'path/to/file.txt'
# is equivalent to
file_path:
path: 'path/to/file.txt'
The same is true of SimpleOperatorFactory
relative to OperatorFactory
. In fact, since the above sample code uses both a SimpleOperatorComponent
and a SimpleOperatorFactory
, the entire operator can be defined using a string rather than a full dictionary:
CheckFileExists.parse_obj('path/to/file.txt')
# CheckFileExists(file_path=Path(path='path/to/file.txt', is_file=True))
This allows, in many cases, creating fine-grained types to help keep data organized without sacrificing readability in the data needed to define those types:
from pydantic import Field
class Schema(SimpleOperatorComponent):
name: str
class Column(SimpleOperatorComponent):
name: str
type: str = 'VARCHAR'
class Table(OperatorFactory):
schema_: Schema = Field(alias='schema')
name: str
columns: List[Column]
schema: 'my_schema'
name: 'my_table'
columns:
- col1
- col2
- name: col3
type: INTEGER
Using abstractions
All abstractions in data_dag
are pydantic
models, meaning that all usual Pydantic features are available. This section is a quick introduction to how those features allow for making data-driven DAGs.
Interacting with dictionaries
Several common data languages, such as YAML and JSON, serialize collections of simple objects, such as dictionaries, lists, numbers, and strings. Different markup languages support different types and features, so they may each be appropriate under different circumstances.
When these files are loaded into memory, e.g. using json
or pyyaml
, the result is an in-memory object comprised of simple data types. Beyond the most trivial use cases, these objects are typically nested and represent a variety of application-specific details. Working directly with these raw dictionaries and lists is likely to involve a lot of boilerplate code; however, if we can map this data into classes, creating useful object instances, we can tie data directly into the functionality related to it.
To do this mapping from in-memory data (no matter where it was loaded from or how it was constructed), pydantic
allows us to use MyType.parse_obj(data)
where MyType
is the factory or component that is represented by the data.
Note
This approach requires that your full top-level data object maps directly onto a Python type. In general, this either isn’t hard or is a good idea to add; however, you can also parse out pieces of the data, or restructure it, to match the data types you’ve defined, especially if you’re working with a legacy or shared data schema that isn’t easy to modify. Another option is to add a custom __init__
to a top-level type that knows how to interpret a slightly different input data schema.
For example, consider that you already have the following model:
from data_dag.operator_factory import OperatorFactory
from data_dag.dag_factory import DagFactory
from typing import List
class EmailUser(OperatorFactory):
email_address: str
message_html: str
def make_operator(self):
...
class EmailAllUsers(DagFactory):
emails: List[EmailUser]
email_upon_completion: EmailUser
def _make_dag(self):
...
You could load an email list from the following YAML:
dag_id: email_users
schedule_interval: '@weekly'
emails:
- email_address: sample@example.com
message_html: "<p>Hello, world!</p>"
- email_address: sample2@example.com
message_html: "<p>Hi, you're our second email</p>"
email_upon_completion:
email_address: admin@my.site.com
message_html: "<p>All messages sent</p>"
This would produce the following in-memory object:
data = {
'dag_id': 'email_users',
'schedule_interval': '@weekly',
'email_upon_completion': {
'email_address': 'admin@my.site.com',
'message_html': '<p>All messages sent</p>'
},
'emails': [
{'email_address': 'sample@example.com', 'message_html': '<p>Hello, world!</p>'},
{'email_address': 'sample2@example.com', 'message_html': "<p>Hi, you're our second email</p>"}
]
}
Which we can then parse as:
dag_metadata = EmailAllUsers.parse_obj(data)
# EmailAllUsers(
# dag_id='email_users',
# ...
# emails=[
# EmailUser(
# email_address='sample@example.com',
# message_html='<p>Hello, world!</p>'),
# EmailUser(
# email_address='sample2@example.com',
# message_html="<p>Hi, you're our second email</p>")
# ],
# email_upon_completion=EmailUser(
# email_address='admin@my.site.com',
# message_html='<p>All messages sent</p>'
# )
# )
To finally convert that metadata object into an Airflow DAG, we can run dag_metadata.make_dag()
. Typically, this would be done in a loader script that’s designed to interact cleanly with Airflow.
You can also generate these objects dynamically in-code:
email_tasks = [
EmailUser(email_address=user.email, message_html=message_template.render(user))
for user in User.objects.all()
]
admin_email_task = EmailUser(
email_address=User.objects.get(admin=True),
message_html=success_template.render(),
)
dag_metadata = EmailAllUsers(
dag_id='email_users',
schedule='@weekly',
emails=email_tasks,
email_upon_completion=admin_email_task,
)
Note that you can also use pydantic
to map the resulting in-memory objects back into data, either into a dictionary or directly into JSON:
dag_metadata.dict()
dag_metadata.json()
This additionally means that you can generate the DAG metadata in one location (say, nightly in a web application), save it out to a known location, then restore it from that location back into your Airflow server to be rendered as a functional DAG. This can be done without direct communication between your servers and without introducing the possibility of arbitrary code execution in your Airflow DAGs, since all code is self-contained and only validated parameters are allowed to be passed around.
Warning
While this is similar, for example, to using JSON serialization to define celery tasks, you must keep in mind that the final result will be entirely rendered, all at once and repeatedly every several seconds, in Airflow, and is thus subject to Airflow’s scaling limitations. These limitations mean that individual DAGs should generally be kept fairly small (no more than a few hundred operators) and that the number of DAGs likewise should be kept limited, depending on the scale of your deployment.