paint-brush
How to Seamlessly Transfer From Airflow to Apache DolphinScheduler With Air2phinby@zhoujieguang
878 reads
878 reads

How to Seamlessly Transfer From Airflow to Apache DolphinScheduler With Air2phin

by Zhou JieguangApril 9th, 2024
Read on Terminal Reader
Read this story w/o Javascript
tldt arrow

Too Long; Didn't Read

Some users originally used Airflow as their scheduling system. However, Airflow only allows workflow definition through code and lacks resource and project granularity, which fails to meet customer needs in scenarios requiring strong permission control. Consequently, some users need to migrate their scheduling system from Airflow to Apache Dolphinscheduler.
featured image - How to Seamlessly Transfer From Airflow to Apache DolphinScheduler With Air2phin
Zhou Jieguang HackerNoon profile picture

Migration Background

Some users originally used Airflow as their scheduling system. However, Airflow only allows workflow definition through code and lacks resource and project granularity which fails to meet customer needs in scenarios requiring strong permission control. Consequently, some users need to migrate their scheduling system from Airflow to Apache Dolphinscheduler.


With a commitment to addressing user needs, Whaleops developed the Air2phin migration tool to facilitate a smoother transition to DolphinScheduler. Since Airflow workflows are defined using Python code, and some metadata only exists in the Python code without being persisted to the database, it’s necessary to parse Python code for analysis and migration.

Why Migrate to DolphinScheduler

Both Airflow and DolphinScheduler are task scheduling systems addressing workflow orchestration, each with its strengths. This section will focus on DolphinScheduler’s advantages over Airflow. A detailed comparison will be covered in future articles:


  1. Both are mature open-source workflow scheduling systems with robust communities. The subtle differences include:
  • Apache DolphinScheduler: Emphasizes visualization with supplementary API, offers granular permission management, more workflow levels, lower usage costs, and democratizes data.


  • Airflow: Defines workflows through code, catering to advanced developers with high flexibility but higher usage costs, primarily targeting developers.


2. Since DolphinScheduler stores workflow definitions, task definitions, and task relationships in the database, it has several advantages:

  • No extra steps are needed when adding master or worker nodes; in contrast, Airflow requires copying DAG files to new nodes.
  • Eliminates latency issues inherent in file parsing, unlike Airflow’s loop-based scheduling which consumes substantial CPU resources.
  • Retains complete historical data on workflow and task statuses. In Airflow, deleted tasks in the latest definitions cannot be traced back.
  • Natively supports versioning information, which requires Git logs and reverting in Airflow.


3. DolphinScheduler features a resource center, making it easier for users to manage local and remote resources. In Airflow, external resources typically need version control alongside Git.


4. Besides offline scheduling tasks, DolphinScheduler supports real-time tasks, data materials, and physical resource monitoring, focusing on practical scheduling functionalities. Airflow currently concentrates more on offline workflow scheduling.


5. DolphinScheduler is a distributed, centerless system with higher master server resource utilization. Airflow’s scheduler consumes more CPU resources in scanning and scheduling tasks.

Demands and Challenges

Demands: The core demand for a migration tool is to transform Airflow DAGs into DolphinScheduler workflows with minimal human intervention. However, a balance is needed to avoid overly complex, less maintainable, and less generalizable programs, especially when accommodating different Airflow versions.


Challenges

  • Syntax Differences: Airflow and DolphinScheduler’s Python SDKs share basic Python syntax (for, if, else) but differ in task names and parameters. For example, Airflow’s bash operator corresponds to Shell in DolphinScheduler’s SDK, with different parameters.


  • Task Type Variance: Both platforms might allow some degree of customization, like custom plugins. However, they differ in the number and abstraction of task types, with some unique to each platform.


  • Scheduling Variance: Airflow uses Cron expressions (e.g., “5 4 * * *”) or Python’s datetime.timedelta, while DolphinScheduler employs more detailed Cron expressions, like “0 5 4 * * ? *”.


  • Built-in Time Parameter Differences: Airflow handles time parameters using macros and provides a Jinja2 template for calculations (e.g., ds_add(‘2015–01–06’, -5)). DolphinScheduler has its own definitions and rules, like using “yyyy-MM-dd” for run times and “yyyy-MM-dd+1” for time adjustments.


  • Migration Rule Extension: Both Airflow and DolphinScheduler’s Python SDK may update their APIs over time. Migration tool rules need to be simple to modify and add, minimizing maintenance costs.


  • Different Airflow Versions: Variations between Airflow versions might impact syntax, e.g., airflow.operators.bash_operator in versions before 2.0.0 and airflow.operators.bash in 2.0.0 and later.

Introduction to the Migration Tool

What Is Air2phin?

Air2phin is a rule-based AST (Abstract Syntax Tree) transformer that converts Airflow DAG files into pydolphinscheduler definition files. It uses LibCST for parsing and transforming Python code and YAML files to define transformation rules. It’s a tool to assist users in conversion, not an automatic one.

Air2phin Data Flow!

  • Retrieves original Airflow DAGs definitions from standard input or files.
  • Loads conversion rules from a YAML file into air2phin.
  • Parses Airflow DAGs content into a CST tree.
  • Modifies the CST tree based on transformation rules.
  • Outputs the transformed result to standard output or files.

How to Use Air2phin?

Air2phin, being a Python package, requires installation via pip. Once installed, it can be used to convert all the DAGs in Airflow into definitions for the DolphinScheduler Python SDK. After this transformation, executing these SDK scripts with Python will submit the converted workflows to DolphinScheduler.


# Install package

python -m pip install --upgrade air2phin


# Migrate Airflow's DAGs

air2phin migrate -i ~/airflow/dags


In a production environment, the ~/airflow/dags directory may contain numerous DAGs. Air2phin, by default, processes these DAGs sequentially. For more efficient processing, you can use the --multiprocess option to enable multi-process execution.


# Use multiprocess to convert the Airflow DAGs files

air2phin migrate -i --multiprocess 12 ~/airflow/dags


After completing the above transformation, the migration from Airflow DAG files to DolphinScheduler Python SDK scripts is accomplished. These scripts can then be submitted to DolphinScheduler for execution.


# Install apache-dolphinscheduler according to the Apache DolphinScheduler server you use, ref: https://dolphinscheduler.apache.org/

python/main/#versionpython -m pip install apache-dolphinscheduler


# Submit your DolphinScheduler Python SDK definition

python ~/airflow/dags/tutorial.py

Defining Custom Conversion Rules in Air2phin?

Most Airflow users customize some operators. To convert these operators, users need to define rules. Fortunately, Air2phin’s rules are based on YAML files, which means that users can easily add new rules. The following is a rule conversion YAML file that converts a user-defined Redshift operator into a DolphinScheduler SQL task type.


It is assumed here that the user has customized a redshift operator based on airflow.providers.postgres.operators.postgres. The operator code is as follows:

from airflow.providers.postgres.operators.postgres import PostgresOperator

class RedshiftOperator(PostgresOperator):
def __init__(
self,
*,
sql: str | Iterable[str],
my_custom_conn_id: str = 'postgres_default',
autocommit: bool = False,
parameters: Iterable | Mapping | None = None,
database: str | None = None,
runtime_parameters: Mapping | None = None,
**kwargs,
) -> None:
super().__init__(
sql=sql,
postgres_conn_id=my_custom_conn_id,
autocommit=autocommit,
parameters=parameters,
database=database,
runtime_parameters=runtime_parameters,
**kwargs,
)


Since this is a user-defined operator, it is definitely not in the built-in conversion rules of air2phin, so we need to customize a YAML file of conversion rules.


name: RedshiftOperator

migration:
module:
- action: replace
src: utils.operator.RedshiftOperator.RedshiftOperator
dest: pydolphinscheduler.tasks.sql.Sql
- action: add
module: pydolphinscheduler.resources_plugin.Local
parameter:
- action: replace
src: task_id
dest: name
- action: add
arg: datasource_name
default:
type: str
value: "redshift_read_conn"
- action: add
arg: resource_plugin
default:
type: code
value: Local(prefix="/path/to/dir/")


Customers only need to add this file to the rule path of air2phin to realize the conversion action of the custom operator.


air2phin migrate --custom-rules /path/to/RedshiftOperator.yaml ~/airflow/dags

How Does Air2phin Address Migration Challenges?

Air2phin addresses the various challenges posed by the migration process:

Syntax Differences

Since both Airflow and DolphinScheduler Python SDK are written in Python, the basic syntax related to Python is similar. However, since Airflow and DolphinScheduler Python SDK are two unrelated sets of APIs, there are some differences between the two in terms of specific parameters, classes, and functions. air2phin is used to solve this problem. It resolves the differences and implements the migration process from one platform to another by defining appropriate YAML conversion rules for this part of the difference.

YAML File Conversion Rules

Parameter mapping: For different naming or structures of parameters, mapping rules can be defined in the YAML file to map the parameter names of Airflow to the corresponding parameters of the DolphinScheduler Python SDK.


Class and function conversion: If Airflow and DolphinScheduler Python SDK use different class names or functions, you can define class and function conversion rules in a YAML file to map Airflow’s class names and functions to their DolphinScheduler Python SDK equivalents.


Error handling and alerting: Given that the two platforms may have different error handling and alerting mechanisms, how to map Airflow’s error handling to the equivalent mechanism of DolphinScheduler can be defined in the YAML file.


By formulating these conversion rules, you can ensure that during the migration process, Airflow’s task code is converted into the code required by the DolphinScheduler Python SDK platform according to the definition of the YAML file to adapt to the differences between platforms and ensure that new and modified tasks flexibility.

Timing Scheduling Differences

There are also some differences between Airflow and DolphinScheduler Python SDK in terms of scheduled scheduling configuration. Airflow uses standard Cron expressions to define task scheduling, while DolphinScheduler Python SDK adopts a more precise Cron scheduling strategy. This difference may affect the precise scheduling and execution frequency of tasks.


• Airflow’s Cron expressions: Airflow uses general Cron expressions to define the scheduling frequency of tasks. A Cron expression consists of five or six fields, representing minutes, hours, dates, months, and days of the week. It allows users to define relatively loose scheduling rules, such as once an hour, once a day, etc.


• Precise Cron scheduling of DolphinScheduler Python SDK: DolphinScheduler introduces a more precise Cron scheduling strategy. It splits Cron expressions into two parts: Basic Cron and Advanced Cron. Basic Cron is used to define rough scheduling rules for tasks, such as minutes, hours, dates, etc. Advanced Cron is used to define more precise scheduling rules, including second-level accuracy. This allows DolphinScheduler to implement more fine-grained task scheduling, which is suitable for scenarios with higher execution time requirements, such as the financial field.


Since the accuracy of DolphinScheduler Python SDK is higher than that of Airflow, there will be no problem of accuracy loss during conversion, and this problem will be easily solved.

Built-in Time Parameter Differences

Built-in time parameter differences refer to the different ways in which Airflow and DolphinScheduler Python SDK use built-in time parameters in task scheduling. Airflow uses Jinja2’s macro function to implement built-in time parameters, while DolphinScheduler’s Python SDK uses a custom way to implement these parameters. These two implementation methods may lead to differences in usage and understanding.


• Airflow’s Jinja2 macro: Airflow’s built-in time parameters are implemented through Jinja2 macros. Jinja2 macros allow the use of special placeholders and functions in DAGs files for dynamically generating scheduling times. For example, you can use {{ macros.ds_add(ds, 1) }} to add one day to the scheduling time.


• Custom implementation of DolphinScheduler Python SDK: When implementing built-in time parameters, DolphinScheduler’s Python SDK may use some custom functions or classes instead of directly using Jinja2 macros. These custom implementation methods may require specific configuration and processing on the DolphinScheduler platform.


Therefore, you need to pay attention to the following when migrating:


1. Different syntax and methods: The syntax and usage of Airflow’s Jinja2 macro are very different from the custom implementation of the DolphinScheduler Python SDK, which may cause parameters not to be migrated correctly part of the time. Air2phin will retain its original values for some parameters that cannot be automatically migrated.


2. Functional similarity: Although implemented differently, both aim to provide built-in time parameters for task scheduling. Ensure that migrated tasks correctly use the new platform’s built-in timing parameters.

Extension of Migration Rules

Airflow allows users to define and use custom Operators, Hooks, Sensors, etc. as needed to meet specific task requirements. These custom components may be used in DAGs, and the way they are implemented and called may require special handling during the migration process. The simplest way to deal with it is to use the method mentioned in the above question “How Air2phin defines its own conversion rules.” As long as the custom task can be defined in DolphinScheduler, the task can be migrated from Airflow to DolphinScheduler.

Different Versions of Airflow Migration

Different versions of Airflow have different operator syntax. In versions before 2.0.0, Airflow only supports bash with the class airflow.operators.bash_operator.BashOperator. However, in versions 2.0.0 and later, Airflow More recommended for bash is the airflow.operators.bash.BashOperator class which is also compatible with Airflow.operators.bash_operator.BashOperator.


There are many similar situations, so Air2phin needs to be compatible with the shell task type converted from the above two types into DolphinScheduler.


We implement the transformation of multiple classes by supporting lists in YAML. For details, see the migration.module.* nodes below.


migration:
module:
- action: replace
src:
- airflow.operators.bash.BashOperator
- airflow.operators.bash_operator.BashOperator
dest: pydolphinscheduler.tasks.shell.Shell
parameter:
- action: replace
src: task_id
dest: name
- action: replace
src: bash_command
dest: command

User Benefits

The Air2phin migration tool allows users to convert Airflow’s DAGs code to DolphinScheduler Python SDK through simple configuration, bringing many benefits to users:


• Simplify the migration process: The migration tool can automatically handle code conversion, avoiding the complex process of manual line-by-line migration, greatly reducing the workload of developers.


• Save time and costs: Manually migrating code requires a significant investment of time and human resources. Use migration tools to complete your migration quickly and efficiently, saving time and money.


• Reduce errors: Manual migration easily introduces errors, but migration tools can automatically perform conversions based on predefined rules, reducing potential human errors.


• Standardize code style: The migration tool can generate code based on predefined rules and templates to ensure consistent code style and reduce maintenance costs.


• Lower the technical threshold: The migration tool can hide the underlying technical details, allowing developers who are not familiar with DolphinScheduler to easily migrate tasks.


• Flexibility and customizability: Good migration tools usually offer some customizable options to meet the needs of different projects while maintaining flexibility.


Overall, using Air2phin can significantly improve the efficiency and quality of the migration process, reduce risks, and at the same time, reduce the workload of developers, bringing time and resource savings to the team, as well as a better development experience.

Problems that Air2phin Cannot Solve Yet

Air2phin is a tool that helps users migrate from Airflow to Apache DolphinScheduler more easily. The keyword for this is “assistance”, which means that it can reduce the user’s migration cost, but it cannot be fully automated. The currently known unsolvable problems are as follows:


• Task types that do not exist in DolphinScheduler cannot be migrated: Some task types only exist in Airflow, but do not exist in DolphinScheduler. These tasks cannot be automatically migrated and need to be processed manually. For example, the Discord operator does not exist in DolphinScheduler, so the original Discord operator definition will be retained and needs to be processed manually by the user.


• Some task attributes cannot be migrated to DolphinScheduler: Some task attributes in Airflow do not exist in DolphinScheduler, such as successc_callback and retry_callback. These attributes will be directly abandoned during the migration process.

Reference

Cited Source

  • [1] AWS Performance Evaluation: Link