Airflow Logging: Task logs to Elasticsearch
This is part three of a five-part series addressing Airflow at an enterprise scale. I will update these with links as they are published.
- Airflow: Planning a Deployment
- Airflow + Helm: Simple Airflow Deployment
- More Charts: Adding TLS to Airflow
- Enterprise Auth for Airflow: Azure AD
- Airflow Logging: Task logs to Elasticsearch
Previously, we enabled authentication and authorization for Airflow using Azure Active Directory App Registrations. We mapped our application roles to the provided Airflow roles and implemented the get_oauth_user_info
method. This post will focus on task logging for Airflow.
Logging Architecture
Airflow uses the standard Python logging framework and configuration schemes but is unique because of the sources of the log information. That is, Airflow distinguishes between application logs (web server, scheduler, etc.) and task logs (Logs for spark jobs, SQL queries, etc.). This is important because your data scientists shouldn’t be burdened with sifting through application logs to find out, troubleshoot, and remediate their failed DAG Run.

We will use an Elastic Cloud instance as our log aggregation platform and the filebeat helm chart to configure and deploy a log shipper.
Charts, Charts, Charts
As always, start by modifying the airflow values files.
elasticsearch:
enabled: true
secretName: elastic-secret
config:
elasticsearch:
frontend: https://{elasticsearch host}:{elasticsearch port}/app/discover#/?_a=(columns:!(message),filters:!(),index:dff5d120-5d3d-11ec-b070-4d64674090d3,interval:auto,query:(language:kuery,query:'log_id:"{log_id}"'),sort:!(log.offset,asc))&_g=(filters:!(),refreshInterval:(pause:!t,value:0),time:(from:now-1y,to:now))
host: https://{elasticsearch host}:{elasticsearch port}
json_format: 'True'
log_id_template: "{dag_id}_{task_id}_{execution_date}_{try_number}"
end_of_log_mark: end_of_log
write_stdout: 'True'
json_fields: asctime, filename, lineno, levelname, message
elasticsearch_configs:
use_ssl: 'True'
verify_certs: 'False'
max_retries: 3
timeout: 30
retry_timeout: 'True'
Let’s walk through the key configurations here:
elasticsearch.secretName: elastic-secret
points to a yet uncreated secret containing a connection string.config.elasticsearch.frontend
contains a few hardcoded values for the index (must be index id instead of index name) and the time range. You will need to change the index to match your environment.
Create the Secret for Elasticsearch
We need to create a secret in Kubernetes that holds the Elasticsearch connection string. Secrets are base64 encoded by default and this connection string is in the form of https://{username}:{password}@{host}:{port}
so the command
echo -n "https://{username}:{password}@{host}:{port}" | openssl base64
will give the appropriate value for the secret. Inserting that into the YAML manifest gives us the needed resource.
apiVersion: v1
kind: Secret
metadata:
name: elastic-secret
namespace: airflow
type: Opaque
data:
connection: *********************************
Adding the Filebeat Chart
Filebeat is a lightweight log shipper created by elastic, the organization behind Elasticsearch and the ELK stack. We will configure filebeat as a daemonset, ensuring one pod is running on each node that will mount the /var/log/containers directory. Filebeat will use its `autodiscover` feature to watch for containers in the `airflow` namespace of the cluster. If it finds a log file for a container in the airflow namespace, it will forward it to Elasticsearch.
Add the repo.
helm repo add elastic https://helm.elastic.co
Add the values files.
daemonset:
filebeatConfig:
filebeat.yml: |
logging.json: true
filebeat.config:
inputs:
# Mounted `filebeat-inputs` configmap:
path: ${path.config}/inputs.d/*.yml
# Reload inputs configs as they change:
reload.enabled: false
modules:
path: ${path.config}/modules.d/*.yml
# Reload module configs as they change:
reload.enabled: false
filebeat.autodiscover:
providers:
- type: kubernetes
templates:
- condition:
equals:
kubernetes.namespace: airflow
config:
- type: container
paths:
- /var/log/containers/*${data.kubernetes.container.id}.log
multiline.pattern: '^[[:space:]]'
multiline.negate: false
multiline.match: after
include_lines: ['^{']
processors:
- decode_json_fields:
fields: ["message"]
max_depth: 1
target: ""
overwrite_keys: true
add_error_key: true
output.elasticsearch:
index: "airflow-dags-%{+yyyy.MM.dd}"
cloud:
id: {get this from your elastic cloud instance}
auth: {username}:{password}
Notes on the key configurations for Filebeat
- We want to autodiscover container logs for containers in the airflow namespace. You’ll need to change this to match your environment.
- The
message
field is the actual log statement from Airflow and is a string of escaped JSON. We want to parse this field and write it to the top level of the JSON object. - We want to write to the
airflow-dags-%{+yyyy.MM.dd}
index. This is templated with Jinja and we need to create this index pattern in Elasticsearch. - I am using the Elastic Cloud service so I can authenticate with
cloud.id
andcloud.auth
configurations. You may need to use theoutput.elasticsearch
depending on your environment.
Task Logging in Action
With these changes deployed, task logs from all DAG runs will be forwarded to the Elasticsearch cluster. In the Task Instance dialog, you will find a “View Logs in Elasticsearch (by attempts)” button that will navigate to the configured `frontend` URL.


And that’s it. Your task logs are available in Elasticsearch under the `airflow-dags-%{+yyyy.MM.dd}` index.