Managed Airflow: Running a Basic Task on AWS, GCP, and Astronomer
The steps required to get a basic Airflow task running on each platform.
Intro
The leading solution for orchestrating jobs in modern data engineering environments is Apache Airflow, an open-source orchestration platform spun out of Airbnb and adopted by Apache. Organizations have many options for deploying Airflow. Self-hosting generally involves using the freely available Helm charts to deploy Airflow on-prem or in a generic cloud Kubernetes service like EKS. Organizations seeking to avoid the complexity of maintaining an Airflow deployment will generally select from hosted offerings providing Airflow as a service.
The three leading hosted solutions are:
This article will provide little background on each of the services, instead focusing on the steps required to get a basic Airflow task running on each platform. Wherever possible, I will use infrastructure-as-code practices for each platform, rather than using the online consoles. If you are seeking an article focused on the platform trade-offs, I suggest reading Paul Fry’s article on managed Airflow solutions.
Task Definition
First, let’s define a local Airflow task and ensure it behaves as expected. The demo task will hit the NBA API to gather NBA games for the most recent season, select the game with the greatest total score, and send that game’s ID as a Slack message to a configured Slack connection.
import pendulum
from airflow.decorators import dag, task
from airflow.providers.slack.operators.slack import SlackAPIPostOperator
from nba_api.stats.endpoints import leaguegamefinder
import os
# Mac local fix.
os.environ["no_proxy"] = "*"
@dag(
schedule=None,
start_date=pendulum.datetime(2023, 1, 1),
catchup=False,
)
def game_alerter():
@task()
def find_max_pts_game_id():
games_df = leaguegamefinder.LeagueGameFinder(
player_or_team_abbreviation="T",
season_nullable="2022-23",
league_id_nullable="00",
).league_game_finder_results.get_data_frame()
max_game_id = games_df.loc[games_df["PTS"].idxmax(), "GAME_ID"]
return max_game_id
max_game_id = find_max_pts_game_id()
slack = SlackAPIPostOperator(
task_id="send_max_game_id_to_slack",
channel="#general",
text=max_game_id,
slack_conn_id="slack_api_default",
)
game_alerter()
Try installing Airflow and setting up the Slack API to verify the sample task works. In this article, we’ll assume this file lives in dags/example.py
.
Google Cloud Composer (GCC)
Google Cloud Composer (GCC) is GCP’s hosted Airflow offering, introduced in July 2018. The entire configuration for GCC can be completed using Terraform for GCP. First, let’s define some Terraform variables for storing project and connection information. Note that this is not best practice for the Airflow connection string - in production, a secrets backend should be used for storing our Slack API authentication info.
variable "project" {
type = string
default = "<your_project>"
}
variable "project_num" {
type = string
default = "<your_project_num>"
}
variable "airflow_slack_conn" {
type = string
default = "<your_airflow_slack_conn_string>"
}
Now, set up the GCC environment. Note that we will use the pypi_packages
configuration to install the NBA API library and the Airflow Slack provider module, then use the env_variables
config to notify Airflow of the Slack connection.
provider "google" {
project = var.project
region = "us-central1"
}
resource "google_project_service" "composer_api" {
provider = google
project = var.project
service = "composer.googleapis.com"
// Disabling Cloud Composer API might irreversibly break all other
// environments in your project.
disable_on_destroy = false
}
resource "google_project_iam_member" "service_agent_role" {
provider = google
project = var.project
member = "serviceAccount:service-${var.project_num}@cloudcomposer-accounts.iam.gserviceaccount.com"
role = "roles/composer.ServiceAgentV2Ext"
}
resource "google_composer_environment" "example_environment" {
provider = google
name = "test-environment"
config {
software_config {
image_version = "composer-2.4.4-airflow-2.5.3"
pypi_packages = {
nba_api = ""
apache-airflow-providers-slack = ""
}
env_variables = {
AIRFLOW_CONN_SLACK_API_DEFAULT = var.airflow_slack_conn
}
}
}
}
Finally, we upload our DAG to the GCP bucket created for the composer environment, indicating to GCC that it should register this task.
# Dynamically upload our test DAG to the Cloud Storage bucket.
resource "google_storage_bucket_object" "dag" {
name = "dags/example.py"
source = "dags/example.py"
content_type = "text/x-python"
bucket = element(split("/", google_composer_environment.example_environment.config[0].dag_gcs_prefix), 2)
}
After running this Terraform plan (takes ~25 minutes), we can trigger our DAG using the Airflow UI via the GCP dashboard, or by using the Airflow CLI via the gcloud utility.
Don’t forget to clean up your resources. Note that not all resources are accounted for in the Terraform code provided. When an environment is created, GCP also creates a storage bucket for that environment’s information, and allocates disk space in the compute engine for the Redis queue supporting the Celery executors used by GCP. If we want to shut down everything in one go, we can take advantage of the Terraform export functionality provided by GCP. Run the following command to get importable Terraform definitions printed to stdout:
gcloud beta resource-config bulk-export --resource-types=StorageBucket,ComputeDisk --project=... --resource-format terraform
Then, import each of these using the terraform import
command. Now, we can deallocate all resources at once with a terraform destroy
invocation.
Amazon Managed Workflows (MWAA)
Amazon introduced its hosted Airflow offering, Managed Workflows for Apache Airflow (MWAA) in November 2020. We’ll begin our MWAA setup by using the Amazon-provided CloudFormation template for MWAA. This will construct the basic cloud resources supporting MWAA including a VPC, a MWAA environment, an execution role, and a S3 bucket. After creating the stack, upload our DAG file and a requirements.txt
dependency file to the newly created S3 bucket. Then, we’ll add configurations informing MWAA how to find the requirements file, and configure the secrets backend, which is required for instantiating our Slack connection:
RequirementsS3ObjectVersion: <requirements_object_version>
RequirementsS3Path: requirements.txt
AirflowConfigurationOptions:
secrets.backend: airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend
secrets.backend_kwargs: {"connections_prefix" : "airflow/connections", "variables_prefix" : "airflow/variables"}
Now, add a new secret containing our Airflow-formatted Slack connection string:
SlackConnectionSecret:
Type: AWS::SecretsManager::Secret
Properties:
Name: airflow/connections/slack_api_default
SecretString: <your_airflow_slack_conn_string>
Next, ensure the execution role has access to SecretsManager by adding the SecretsManagerReadWrite
managed policy to the role:
ManagedPolicyArns:
- arn:aws:iam::aws:policy/SecretsManagerReadWrite
Lastly, it is unfortunately the case that all AWS IP addresses are blocked by the NBA API. We can circumvent this by using a $3 proxy from IPRoyal, and providing the credentials to the LeagueGameFinder
call in our Airflow task. After all of these changes, we are now able trigger and run the Airflow job via the Airflow UI.
Astronomer (Astro)
Astronomer is a company focusing exclusively on products in the orchestration space, with Airflow as their underlying technology. They announced their hosted Airflow offering in 2018, and have gone on to raise ~$300M supporting their efforts. Creating an Astronomer deployment involves the Astro CLI and a deployment file. We can create a basic deployment using the following configuration:
deployment:
configuration:
name: test
description: ""
runtime_version: 9.1.0
dag_deploy_enabled: false
ci_cd_enforcement: false
scheduler_size: small
is_high_availability: false
executor: CeleryExecutor
scheduler_au: 10
scheduler_count: 1
cluster_name: us-central1
workspace_name: Phil's Workspace
deployment_type: HOSTED_SHARED
cloud_provider: gcp
region: us-central1
Simply run astro deployment create --deployment-file astr.yaml
to construct the deployment. This will instantiate the deployment on Astro’s side, enabling us to release new DAGs to the deployment.
The deployment process for Astro involves building an Astronomer Runtime image locally and uploading that image to an Astronomer-hosted Docker registry. This build is driven by a project folder, which can be created using the astro dev init
command. Once this folder is created, we can:
Populate the
requirements.txt
file with our dependencies.Add the
AIRFLOW_CONN_SLACK_API_DEFAULT
environment variable to theDockerfile
with our Slack connection string.Move our sample task into the
dags/
folder.
Once this is complete, we can build the image and deploy it using the astro deploy
command. Note - you will need Docker setup for this to succeed. On my M1 Mac locally, I used Colima as the Docker runtime. After the deploy is completed, we are able to trigger the task via the Airflow UI or the CLI support provided by Astronomer.