Airflow Logging: Task logs to Elasticsearch

This is part three of a five-part series addressing Airflow at an enterprise scale. I will update these with links as they are published.

Previously, we enabled authentication and authorization for Airflow using Azure Active Directory App Registrations. We mapped our application roles to the provided Airflow roles and implemented the get_oauth_user_info method. This post will focus on task logging for Airflow.

Logging Architecture

Airflow uses the standard Python logging framework and configuration schemes but is unique because of the sources of the log information. That is, Airflow distinguishes between application logs (web server, scheduler, etc.) and task logs (Logs for spark jobs, SQL queries, etc.). This is important because your data scientists shouldn’t be burdened with sifting through application logs to find out, troubleshoot, and remediate their failed DAG Run.

We will use an Elastic Cloud instance as our log aggregation platform and the filebeat helm chart to configure and deploy a log shipper.

Charts, Charts, Charts

As always, start by modifying the airflow values files.

  enabled: true
  secretName: elastic-secret
    frontend: https://{elasticsearch host}:{elasticsearch port}/app/discover#/?_a=(columns:!(message),filters:!(),index:dff5d120-5d3d-11ec-b070-4d64674090d3,interval:auto,query:(language:kuery,query:'log_id:"{log_id}"'),sort:!(log.offset,asc))&_g=(filters:!(),refreshInterval:(pause:!t,value:0),time:(from:now-1y,to:now))
    host: https://{elasticsearch host}:{elasticsearch port}
    json_format: 'True'
    log_id_template: "{dag_id}_{task_id}_{execution_date}_{try_number}"
    end_of_log_mark: end_of_log
    write_stdout: 'True'
    json_fields: asctime, filename, lineno, levelname, message
    use_ssl: 'True'
    verify_certs: 'False'
    max_retries: 3
    timeout: 30
    retry_timeout: 'True'

Let’s walk through the key configurations here:

  • elasticsearch.secretName: elastic-secret points to a yet uncreated secret containing a connection string.
  • config.elasticsearch.frontend contains a few hardcoded values for the index (must be index id instead of index name) and the time range. You will need to change the index to match your environment.

Create the Secret for Elasticsearch

We need to create a secret in Kubernetes that holds the Elasticsearch connection string. Secrets are base64 encoded by default and this connection string is in the form of https://{username}:{password}@{host}:{port} so the command

echo -n "https://{username}:{password}@{host}:{port}" | openssl base64

will give the appropriate value for the secret. Inserting that into the YAML manifest gives us the needed resource.

apiVersion: v1
kind: Secret
  name: elastic-secret
  namespace: airflow
type: Opaque
  connection: *********************************

Adding the Filebeat Chart

Filebeat is a lightweight log shipper created by elastic, the organization behind Elasticsearch and the ELK stack. We will configure filebeat as a daemonset, ensuring one pod is running on each node that will mount the /var/log/containers directory. Filebeat will use its `autodiscover` feature to watch for containers in the `airflow` namespace of the cluster. If it finds a log file for a container in the airflow namespace, it will forward it to Elasticsearch.

Add the repo.

helm repo add elastic

Add the values files.

    filebeat.yml: |
      logging.json: true
          # Mounted `filebeat-inputs` configmap:
          path: ${path.config}/inputs.d/*.yml
          # Reload inputs configs as they change:
          reload.enabled: false
          path: ${path.config}/modules.d/*.yml
          # Reload module configs as they change:
          reload.enabled: false
          - type: kubernetes
            - condition:
                  kubernetes.namespace: airflow
                - type: container
                  - /var/log/containers/*${}.log
                  multiline.pattern: '^[[:space:]]'
                  multiline.negate: false
                  multiline.match: after
                  include_lines: ['^{']
      - decode_json_fields:
          fields: ["message"]
          max_depth: 1
          target: ""
          overwrite_keys: true
          add_error_key: true

        index: "airflow-dags-%{+yyyy.MM.dd}"

        id: {get this from your elastic cloud instance}
        auth: {username}:{password}

Notes on the key configurations for Filebeat

  • We want to autodiscover container logs for containers in the airflow namespace. You’ll need to change this to match your environment.
  • The message field is the actual log statement from Airflow and is a string of escaped JSON. We want to parse this field and write it to the top level of the JSON object.
  • We want to write to the airflow-dags-%{+yyyy.MM.dd} index. This is templated with Jinja and we need to create this index pattern in Elasticsearch.
  • I am using the Elastic Cloud service so I can authenticate with and cloud.auth configurations. You may need to use the output.elasticsearch depending on your environment.

Task Logging in Action

With these changes deployed, task logs from all DAG runs will be forwarded to the Elasticsearch cluster. In the Task Instance dialog, you will find a “View Logs in Elasticsearch (by attempts)” button that will navigate to the configured `frontend` URL.

And that’s it. Your task logs are available in Elasticsearch under the `airflow-dags-%{+yyyy.MM.dd}` index.

About the Author

Object Partners profile.
Leave a Reply

Your email address will not be published.

Related Blog Posts
Natively Compiled Java on Google App Engine
Google App Engine is a platform-as-a-service product that is marketed as a way to get your applications into the cloud without necessarily knowing all of the infrastructure bits and pieces to do so. Google App […]
Building Better Data Visualization Experiences: Part 2 of 2
If you don't have a Ph.D. in data science, the raw data might be difficult to comprehend. This is where data visualization comes in.
Unleashing Feature Flags onto Kafka Consumers
Feature flags are a tool to strategically enable or disable functionality at runtime. They are often used to drive different user experiences but can also be useful in real-time data systems. In this post, we’ll […]
A security model for developers
Software security is more important than ever, but developing secure applications is more confusing than ever. TLS, mTLS, RBAC, SAML, OAUTH, OWASP, GDPR, SASL, RSA, JWT, cookie, attack vector, DDoS, firewall, VPN, security groups, exploit, […]