Airflow Logging: Task logs to Elasticsearch

This is part three of a five-part series addressing Airflow at an enterprise scale. I will update these with links as they are published.

Previously, we enabled authentication and authorization for Airflow using Azure Active Directory App Registrations. We mapped our application roles to the provided Airflow roles and implemented the get_oauth_user_info method. This post will focus on task logging for Airflow.

Logging Architecture

Airflow uses the standard Python logging framework and configuration schemes but is unique because of the sources of the log information. That is, Airflow distinguishes between application logs (web server, scheduler, etc.) and task logs (Logs for spark jobs, SQL queries, etc.). This is important because your data scientists shouldn’t be burdened with sifting through application logs to find out, troubleshoot, and remediate their failed DAG Run.

We will use an Elastic Cloud instance as our log aggregation platform and the filebeat helm chart to configure and deploy a log shipper.

Charts, Charts, Charts

As always, start by modifying the airflow values files.

  enabled: true
  secretName: elastic-secret
    frontend: https://{elasticsearch host}:{elasticsearch port}/app/discover#/?_a=(columns:!(message),filters:!(),index:dff5d120-5d3d-11ec-b070-4d64674090d3,interval:auto,query:(language:kuery,query:'log_id:"{log_id}"'),sort:!(log.offset,asc))&_g=(filters:!(),refreshInterval:(pause:!t,value:0),time:(from:now-1y,to:now))
    host: https://{elasticsearch host}:{elasticsearch port}
    json_format: 'True'
    log_id_template: "{dag_id}_{task_id}_{execution_date}_{try_number}"
    end_of_log_mark: end_of_log
    write_stdout: 'True'
    json_fields: asctime, filename, lineno, levelname, message
    use_ssl: 'True'
    verify_certs: 'False'
    max_retries: 3
    timeout: 30
    retry_timeout: 'True'

Let’s walk through the key configurations here:

  • elasticsearch.secretName: elastic-secret points to a yet uncreated secret containing a connection string.
  • config.elasticsearch.frontend contains a few hardcoded values for the index (must be index id instead of index name) and the time range. You will need to change the index to match your environment.

Create the Secret for Elasticsearch

We need to create a secret in Kubernetes that holds the Elasticsearch connection string. Secrets are base64 encoded by default and this connection string is in the form of https://{username}:{password}@{host}:{port} so the command

echo -n "https://{username}:{password}@{host}:{port}" | openssl base64

will give the appropriate value for the secret. Inserting that into the YAML manifest gives us the needed resource.

apiVersion: v1
kind: Secret
  name: elastic-secret
  namespace: airflow
type: Opaque
  connection: *********************************

Adding the Filebeat Chart

Filebeat is a lightweight log shipper created by elastic, the organization behind Elasticsearch and the ELK stack. We will configure filebeat as a daemonset, ensuring one pod is running on each node that will mount the /var/log/containers directory. Filebeat will use its `autodiscover` feature to watch for containers in the `airflow` namespace of the cluster. If it finds a log file for a container in the airflow namespace, it will forward it to Elasticsearch.

Add the repo.

helm repo add elastic

Add the values files.

    filebeat.yml: |
      logging.json: true
          # Mounted `filebeat-inputs` configmap:
          path: ${path.config}/inputs.d/*.yml
          # Reload inputs configs as they change:
          reload.enabled: false
          path: ${path.config}/modules.d/*.yml
          # Reload module configs as they change:
          reload.enabled: false
          - type: kubernetes
            - condition:
                  kubernetes.namespace: airflow
                - type: container
                  - /var/log/containers/*${}.log
                  multiline.pattern: '^[[:space:]]'
                  multiline.negate: false
                  multiline.match: after
                  include_lines: ['^{']
      - decode_json_fields:
          fields: ["message"]
          max_depth: 1
          target: ""
          overwrite_keys: true
          add_error_key: true

        index: "airflow-dags-%{+yyyy.MM.dd}"

        id: {get this from your elastic cloud instance}
        auth: {username}:{password}

Notes on the key configurations for Filebeat

  • We want to autodiscover container logs for containers in the airflow namespace. You’ll need to change this to match your environment.
  • The message field is the actual log statement from Airflow and is a string of escaped JSON. We want to parse this field and write it to the top level of the JSON object.
  • We want to write to the airflow-dags-%{+yyyy.MM.dd} index. This is templated with Jinja and we need to create this index pattern in Elasticsearch.
  • I am using the Elastic Cloud service so I can authenticate with and cloud.auth configurations. You may need to use the output.elasticsearch depending on your environment.

Task Logging in Action

With these changes deployed, task logs from all DAG runs will be forwarded to the Elasticsearch cluster. In the Task Instance dialog, you will find a “View Logs in Elasticsearch (by attempts)” button that will navigate to the configured `frontend` URL.

And that’s it. Your task logs are available in Elasticsearch under the `airflow-dags-%{+yyyy.MM.dd}` index.

About the Author

Object Partners profile.
Leave a Reply

Your email address will not be published.

Related Blog Posts
A security model for developers
Software security is more important than ever, but developing secure applications is more confusing than ever. TLS, mTLS, RBAC, SAML, OAUTH, OWASP, GDPR, SASL, RSA, JWT, cookie, attack vector, DDoS, firewall, VPN, security groups, exploit, […]
Building Better Data Visualization Experiences: Part 1 of 2
Through direct experience with data scientists, business analysts, lab technicians, as well as other UX professionals, I have found that we need a better understanding of the people who will be using our data visualization products in order to build them. Creating a product utilizing data with the goal of providing insight is fundamentally different from a typical user-centric web experience, although traditional UX process methods can help.
Kafka Schema Evolution With Java Spring Boot and Protobuf
In this blog I will be demonstrating Kafka schema evolution with Java, Spring Boot and Protobuf.  This app is for tutorial purposes, so there will be instances where a refactor could happen. I tried to […]
Redis Bitmaps: Storing state in small places
Redis is a popular open source in-memory data store that supports all kinds of abstract data structures. In this post and in an accompanying example Java project, I am going to explore two great use […]