paint-brush
Creating Data Pipelines With Apache Airflow and MinIOby@minio
533 reads
533 reads

Creating Data Pipelines With Apache Airflow and MinIO

by MinIOOctober 19th, 2023
Read on Terminal Reader
Read this story w/o Javascript
tldt arrow

Too Long; Didn't Read

MinIO is the perfect companion for Airflow because of its industry-leading performance and scalability, which puts every data-intensive workload within reach.
featured image - Creating Data Pipelines With Apache Airflow and MinIO
MinIO HackerNoon profile picture
0-item


Apache Airflow is an open-source platform to programmatically author, schedule, and monitor workflows. It was originally developed by the engineering team at Airbnb but was given to the Apache Software Foundation where it is licensed under Apache 2.0.


Airflow is commonly used in data engineering and data science pipelines to automate the execution of tasks, such as data transformation, loading and analysis. It is also used in other industries, such as finance, healthcare and e-commerce, to automate business processes.


Airflow is very flexible with regard to what it can connect to. This includes data lakes, data warehouses, databases, APIs and, of course, object stores. It excels in those use cases that benefit from data-pipelines-as-code, such as:


  • Automation driven data flows

  • Machine learning training models and retraining

  • Backups and snapshots


Airflow is written in Python and uses a directed acyclic graph (DAG) to represent the workflow. Each node in the DAG represents a task, and the edges between the nodes represent dependencies between the tasks. A DAG does not care about the tasks themselves, just the order, number of retries etc. A complex DAG can become brittle and difficult to troubleshoot, particularly if there are dozens of tasks that must be managed by the architect.


Airflow provides a web interface to manage and monitor the workflows, as well as an API to create, update, and delete workflows. It also has a rich set of features, including support for scheduling, alerting, testing, and version control.


MinIO is the perfect companion for Airflow because of its industry-leading performance and scalability, which puts every data-intensive workload within reach. By storing petabytes of data in MinIO buckets, you can create data pipelines in Airflow to process vast amounts of data which is essential for DAGs to run as quickly as possible. Once the processing is done, you can even store the end result back in a MinIO bucket for other tools to consume. MinIO is capable of tremendous performance - a recent benchmark achieved 325 GiB/s (349 GB/s) on GETs and 165 GiB/s (177 GB/s) on PUTs with just 32 nodes of off-the-shelf NVMe SSDs.


In addition to this, Apache Airflow can also store its logs in a MinIO bucket. This can be helpful in cloud or container orchestration environments, where the local filesystem is ephemeral and the logs can be lost if the machine is terminated or the container is stopped.


To minimize the risk of data loss in these environments, it is recommended to use a more durable and cloud-native storage solution like MinIO to store petabytes of data and logs. This ensures that the data is persisted even if the machine or container is terminated. Further, they can be accessed from any location, networking and security permitting.


There are several reasons to use MinIO with Apache Airflow:


  • MinIO makes Airflow DAG runs quick with industry-leading performancethat makes it process faster than any other data store at a fraction of the cost. By having all the data required for your DAGs in MinIO, you can realize considerable savings on data storage costs, while at the same time get the best possible performance to cost ratio.
  • MinIO relies on erasure coding to provide high availability and durability for your data and also has configurable data protection policiesto ensure that your data is not lost even in the event of server failures or other disasters.
  • MinIO can store data in multiple regions and replicate it between them, allowing you to store your data in a location that is close to your users or in a region that is compliant with your data regulations. This can reduce latency and improve the performance of your workflow.
  • MinIO integrates seamlessly with Apache Airflow, allowing you to use the S3 API to store and retrieve your data and other logs. This makes it easy to set up and use MinIO with Airflow, without the need for any additional configuration. MinIO’s cloud-native integrations mean that it works smoothly with the most widely-implemented software and frameworks.

Apache Airflow and MinIO Tutorial

In this tutorial we’ll show you multiple use cases of Airflow with MinIO.


  • First we’ll show you how to send logs to a MinIO bucket from MinIO DAG runs.
  • Next we’ll create a custom DAG to send objects from an API to a MinIO bucket after post processing.

Install Airflow

Install Airflow using Pip. You have to install pip if its not installed and if python is not available you also need to symlink it.


apt-get install python3-pip

ln -s /usr/bin/python /usr/bin/python3

AIRFLOW_VERSION=2.5.0
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)
pip install "apache-airflow==${AIRFLOW_VERSION}"


Install the Amazon provider for connecting to MinIO


pip install apache-airflow-providers-amazon


Start airflow in standalone mode


airflow standalone


Configuring Logs

Create a bucket in MinIO using mc make bucket


mc admin alias minio http://<IP>:9000 minioadmin minioadmin
mc mb minio/airflow-logs


Open /root/airflow/airflow.cfg and add the following settings under [logging]


[logging]
remote_logging = True
remote_base_log_folder = s3://airflow-logs
remote_log_conn_id = my_s3_conn
encrypt_s3_logs = False


For remote_base_log_folder use the bucket name you created in MinIO in the previous step


The remote_log_conn_id should match the name of the connection ID we’ll create in the next step.


Within the Airflow UI, go to Admin -> Connections



Create a new connection with the name my_s3_conn.


Enter minioadmin for the Access Key and Secret Key.


In Extras, let's set the URL to our local MinIO deployment with the following syntax


{ "endpoint_url": "http://<ip>:9000" }


Now, to test and confirm this is working, Go to DAGs -> example_sensor_decorator and enable this DAG.


On the right hand side using the “Play” button, trigger the DAG.



After a few seconds, once the DAG is finished running, run the following command to see the logs. For each DAG run, a separate log folder is created.



mc ls minio/airflow-logs


Using a MinIO bucket to store logs for Airflow DAG runs is just one of the use cases we’re exploring. In the next phase we’ll create a custom DAG to demonstrate more use cases.

Creating a custom DAG

In this example we’re going to create a custom DAG. What will this DAG do?


  • We’ll connect to the Ghost Blog API

  • Fetch blogs based on certain parameters

  • Back them up to a bucket in MinIO


Let’s set up the Framework for the DAG.


Import the required python libraries for the DAG framework

from airflow.decorators import dag, task


Create a schedule for how often to run the DAG

@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
)


Next let's create a Task to pull the blogs from the Ghost API and put them in a MinIO bucket.


Let’s import a couple of python packages to connect to the Ghost API and MinIO bucket


import json
import requests

from minio import Minio
import urllib3


Using the requests module lets connect to the Ghost API and fetch some blogs


api_token = "<token>"

page = 1
total_pages = 1

while page <= total_pages:
    api_url = ("https://minio.ghost.io/ghost/api/content/posts/?limit=1&page=%s&key=%s" % (page, api_token))
    response_str = requests.get(api_url)
    response_json = requests.get(api_url).json()

    print(response_json["meta"])
    print(response_json["posts"][0]["url"])

    total_pages = response_json["meta"]["pagination"]["pages"]

    page = page + 1


Putting blogs in MinIO bucket


config = {
  "dest_bucket":    "processed", # This will be auto created
  "minio_endpoint": "http://<ip>:9000",
  "minio_username": "minioadmin",
  "minio_password": "minioadmin",
}

# Since we are using self-signed certs we need to disable TLS verification
http_client = urllib3.PoolManager(cert_reqs='CERT_NONE')
urllib3.disable_warnings()

# Initialize MinIO client
minio_client = Minio(config["minio_endpoint"],
               secure=True,
               access_key=config["minio_username"],
               secret_key=config["minio_password"],
               http_client = http_client
               )

# Create destination bucket if it does not exist
if not minio_client.bucket_exists(config["dest_bucket"]):
  minio_client.make_bucket(config["dest_bucket"])
  print("Destination Bucket '%s' has been created" % (config["dest_bucket"]))

      minio_client.fget_object(bucket_name, object_path, object_path)
      print("- Doing some pseudo image resizing or ML processing on %s" % object_path)
      minio_client.fput_object(config["dest_bucket"], object_path, object_path)
      print("- Uploaded processed object '%s' to Destination Bucket '%s'" % (object_path, config["dest_bucket"]))
      minio_client.fput_object(config["dest_bucket"], object_path, object_path)
      print("- Uploaded processed object '%s' to Destination Bucket '%s'" % (object_path, config["dest_bucket"]))


Please note the above code is not meant to work “out of the box” but rather give you the idea and show you the path to create your own DAG using your preferred input source; the destination will always be MinIO.

Cloud-Native Data Pipelines with Airflow and MinIO

The possibilities are endless when you have cloud-native high-performance storage such as MinIO integrated with cloud-native tools such as Airflow. In this example, we’ve shown you some of the basics, such as saving DAG logs in a MinIO bucket and also writing a custom DAG that can talk to any API and perform operations on it, in this case to back up an entire blog to a MinIO bucket.


But this is just the beginning, when you go cloud-native you tap into myriad integrated frameworks. With Airflow, you can create any number of multi-cloud pipelines. For instance, you can ETL thousands of terabytes of unstructured data into structured data placed in MinIO that other processes can then read and analyze. You could even use this pipeline as an Image Resizer (similar to Orchestrate Complex Workflows Using Apache Kafka and MinIO) by taking images of various sizes, resizing them to the size required by your business, and then putting them into another MinIO bucket where they are available for web serving.


Give Airflow and MinIO a try for yourself, if you have any questions be sure to reach out to us on Slack!