Airflow Logging: Task logs to Elasticsearch

This is part three of a five-part series addressing Airflow at an enterprise scale. I will update these with links as they are published.

Previously, we enabled authentication and authorization for Airflow using Azure Active Directory App Registrations. We mapped our application roles to the provided Airflow roles and implemented the get_oauth_user_info method. This post will focus on task logging for Airflow.

Logging Architecture

Airflow uses the standard Python logging framework and configuration schemes but is unique because of the sources of the log information. That is, Airflow distinguishes between application logs (web server, scheduler, etc.) and task logs (Logs for spark jobs, SQL queries, etc.). This is important because your data scientists shouldn’t be burdened with sifting through application logs to find out, troubleshoot, and remediate their failed DAG Run.

We will use an Elastic Cloud instance as our log aggregation platform and the filebeat helm chart to configure and deploy a log shipper.

Charts, Charts, Charts

As always, start by modifying the airflow values files.

elasticsearch:
  enabled: true
  secretName: elastic-secret
config:
  elasticsearch:
    frontend: https://{elasticsearch host}:{elasticsearch port}/app/discover#/?_a=(columns:!(message),filters:!(),index:dff5d120-5d3d-11ec-b070-4d64674090d3,interval:auto,query:(language:kuery,query:'log_id:"{log_id}"'),sort:!(log.offset,asc))&_g=(filters:!(),refreshInterval:(pause:!t,value:0),time:(from:now-1y,to:now))
    host: https://{elasticsearch host}:{elasticsearch port}
    json_format: 'True'
    log_id_template: "{dag_id}_{task_id}_{execution_date}_{try_number}"
    end_of_log_mark: end_of_log
    write_stdout: 'True'
    json_fields: asctime, filename, lineno, levelname, message
  elasticsearch_configs:
    use_ssl: 'True'
    verify_certs: 'False'
    max_retries: 3
    timeout: 30
    retry_timeout: 'True'

Let’s walk through the key configurations here:

  • elasticsearch.secretName: elastic-secret points to a yet uncreated secret containing a connection string.
  • config.elasticsearch.frontend contains a few hardcoded values for the index (must be index id instead of index name) and the time range. You will need to change the index to match your environment.

Create the Secret for Elasticsearch

We need to create a secret in Kubernetes that holds the Elasticsearch connection string. Secrets are base64 encoded by default and this connection string is in the form of https://{username}:{password}@{host}:{port} so the command

echo -n "https://{username}:{password}@{host}:{port}" | openssl base64

will give the appropriate value for the secret. Inserting that into the YAML manifest gives us the needed resource.

apiVersion: v1
kind: Secret
metadata:
  name: elastic-secret
  namespace: airflow
type: Opaque
data:
  connection: *********************************

Adding the Filebeat Chart

Filebeat is a lightweight log shipper created by elastic, the organization behind Elasticsearch and the ELK stack. We will configure filebeat as a daemonset, ensuring one pod is running on each node that will mount the /var/log/containers directory. Filebeat will use its `autodiscover` feature to watch for containers in the `airflow` namespace of the cluster. If it finds a log file for a container in the airflow namespace, it will forward it to Elasticsearch.

Add the repo.

helm repo add elastic https://helm.elastic.co

Add the values files.

daemonset:
  filebeatConfig:
    filebeat.yml: |
      logging.json: true
      filebeat.config:
        inputs:
          # Mounted `filebeat-inputs` configmap:
          path: ${path.config}/inputs.d/*.yml
          # Reload inputs configs as they change:
          reload.enabled: false
        modules:
          path: ${path.config}/modules.d/*.yml
          # Reload module configs as they change:
          reload.enabled: false
      filebeat.autodiscover:
        providers:
          - type: kubernetes
            templates:
            - condition:
                equals:
                  kubernetes.namespace: airflow
              config:
                - type: container
                  paths:
                  - /var/log/containers/*${data.kubernetes.container.id}.log
                  multiline.pattern: '^[[:space:]]'
                  multiline.negate: false
                  multiline.match: after
                  include_lines: ['^{']
      
      processors:
      - decode_json_fields:
          fields: ["message"]
          max_depth: 1
          target: ""
          overwrite_keys: true
          add_error_key: true

      output.elasticsearch:
        index: "airflow-dags-%{+yyyy.MM.dd}"

      cloud:
        id: {get this from your elastic cloud instance}
        auth: {username}:{password}

Notes on the key configurations for Filebeat

  • We want to autodiscover container logs for containers in the airflow namespace. You’ll need to change this to match your environment.
  • The message field is the actual log statement from Airflow and is a string of escaped JSON. We want to parse this field and write it to the top level of the JSON object.
  • We want to write to the airflow-dags-%{+yyyy.MM.dd} index. This is templated with Jinja and we need to create this index pattern in Elasticsearch.
  • I am using the Elastic Cloud service so I can authenticate with cloud.id and cloud.auth configurations. You may need to use the output.elasticsearch depending on your environment.

Task Logging in Action

With these changes deployed, task logs from all DAG runs will be forwarded to the Elasticsearch cluster. In the Task Instance dialog, you will find a “View Logs in Elasticsearch (by attempts)” button that will navigate to the configured `frontend` URL.

And that’s it. Your task logs are available in Elasticsearch under the `airflow-dags-%{+yyyy.MM.dd}` index.

About the Author

Jacob Nosal profile.

Jacob Nosal

Sr Consultant
Leave a Reply

Your email address will not be published. Required fields are marked *

Related Blog Posts
Using Nix as a Professional
How to use Nix as a tool to optimize developer time with real-life examples.
Enterprise Auth for Airflow: Azure AD
This is part three of a five-part series addressing Airflow at an enterprise scale. I will update these with links as they are published. Airflow: Planning a Deployment Airflow + Helm: Deploying the Chart Without […]
More Charts: Adding TLS to Airflow
In this post, we will be adding TLS to Airflow on Azure Kubernetes Service. This is part three of a five-part series addressing Airflow at an enterprise scale. I will update these with links as […]
Introduction to Pydantic
I’ve been using the Pydantic library in my Python projects lately and it’s pretty great. At first glance, it seems very similar to Python 3’s built-in dataclass decorator but when you see it supports easy JSON serialization and deserialization with […]