paint-brush
Setting up Data Streams Using NiPyAPI: UI-Free Guideby@temirlan100
225 reads

Setting up Data Streams Using NiPyAPI: UI-Free Guide

by Temirlan AmanbayevMay 7th, 2024
Read on Terminal Reader
Read this story w/o Javascript
tldt arrow

Too Long; Didn't Read

In this article, I'll share how I use NiPyAPI to manage Apache NiFi data streams programmatically, bypassing the GUI entirely. This approach not only saves time but also increases the reliability and reproducibility of our data flow setups.
featured image - Setting up Data Streams Using NiPyAPI: UI-Free Guide
Temirlan Amanbayev HackerNoon profile picture

Introduction

Hello, my name is Temirlan, and I'm a Staff Software Engineer in the GovTech domain. My daily tasks often involve streamlining complex workflows and enhancing system efficiencies, which sometimes makes the use of graphical user interfaces (GUIs) more of a hindrance than a help. This is especially true in environments where precision, automation, and scalability are key.


From my experience, while GUIs can be visually appealing and user-friendly for general tasks, they often lack the flexibility needed for more sophisticated operations. This leads to inefficiencies and frustrations when handling tasks like setting up data streams, which are crucial in our field. That’s why I prefer a code-first approach, using APIs that align more naturally with my work as a software engineer.


APIs offer the precision and programmability that GUIs often fail to deliver, allowing for more controlled and customizable management of technology. In this article, I'll share how I use NiPyAPI to manage Apache NiFi data streams programmatically, bypassing the GUI entirely. This approach not only saves time but also increases the reliability and reproducibility of our data flow setups.

Use Case Overview

In this use case, we will explore how a NiFi processor can be utilized to automate the retrieval of weather data from an online source. The primary goal is to demonstrate the power and flexibility of using NiPyAPI for handling data streams without the need for a graphical user interface. The scenario involves the following steps:


  1. Data Retrieval: Our NiFi processor is configured to connect to a specified weather information website. This processor is responsible for fetching current weather data, which could include information like temperature, humidity, wind speed, and other relevant meteorological metrics.


  2. Data Processing: Once the data is retrieved, it needs to be processed and formatted appropriately. This might involve parsing JSON data into a more usable format, filtering out unnecessary details, or transforming the data into a structure that is useful for downstream applications.


  3. Data Transmission to Kafka: After processing, the transformed data is sent to a Kafka topic. This step integrates the weather data into a larger data ecosystem, making it available for real-time analytics or for consumption by other applications within the organization.


  4. Logging Actions: To ensure traceability and to aid in debugging, the processor also logs each action it performs. This includes recording the time of data retrieval, any errors encountered during the process, and successful data integration events.


Let’s Go and See How it Looks in Our Code!

"""
This script is used to create a data stream flow processor that gets data from a weather site, logs it, adds a processor 
for some simple transformation, and then puts the data to a Kafka topic.

Modules:
--------
nipyapi: Python client for NiFi API

Functions:
----------
nipyapi.config.nifi_config.host: Set the host for NiFi API
nipyapi.canvas.get_root_pg_id: Get the root process group id
nipyapi.canvas.get_process_group: Get the process group
nipyapi.canvas.create_process_group: Create a new process group
nipyapi.canvas.get_processor: Get the processor
nipyapi.canvas.create_processor: Create a new processor
nipyapi.canvas.list_all_connections: List all connections
nipyapi.canvas.create_connection: Create a new connection

Variables:
----------
root_id: The root process group id
root_process_group: The root process group
pg_name: The name of the process group
existing_pg: The existing process group
my_pg: The process group
processor_name_gff: The name of the GenerateFlowFile processor
existing_processors_gff: The existing GenerateFlowFile processor
generate_flowfile_processor: The GenerateFlowFile processor
http_processor_name: The name of the GetHTTP processor
existing_processors_http: The existing GetHTTP processor
get_http_processor: The GetHTTP processor
log_processor_name: The name of the Logger processor
existing_processors_log: The existing Logger processor
get_logger_processor: The Logger processor
existing_connections: The existing connections
connection_ff_to_http_name: The name of the connection from FlowFile to HTTP
is_connection_ff_to_http: The connection from FlowFile to HTTP
connection_http_to_logger: The name of the connection from HTTP to logger
is_connection_http_to_logger: The connection from HTTP to logger
replace_text_processor_name: The name of the ReplaceText processor
existing_processors_replace_text: The existing ReplaceText processor
replace_text_processor: The ReplaceText processor
publish_kafka_processor_name: The name of the PublishKafka processor
existing_processors_publish_kafka: The existing PublishKafka processor
publish_kafka_processor: The PublishKafka processor
connection_http_to_replace_text: The name of the connection from GetHTTP to ReplaceText
is_connection_http_to_replace_text: The connection from GetHTTP to ReplaceText
connection_replace_text_to_kafka: The name of the connection from ReplaceText to Kafka
is_connection_replace_text_to_kafka: The connection from ReplaceText to Kafka
"""
import nipyapi

nipyapi.config.nifi_config.host = 'http://localhost:8888/nifi-api'

root_id = nipyapi.canvas.get_root_pg_id()
root_process_group = nipyapi.canvas.get_process_group(root_id, 'id')

pg_name = 'weather-python'
existing_pg = nipyapi.canvas.get_process_group(pg_name, identifier_type='name')
if existing_pg is None:
    my_pg = nipyapi.canvas.create_process_group(
        parent_pg=root_process_group,
        new_pg_name=pg_name,
        location=(400.0, 100.0)
    )
    print(f"Created new process group: {pg_name}")
else:
    my_pg = existing_pg
    print(f"Using existing process group: {pg_name}")

processor_name_gff = 'GenerateFlowFile'
existing_processors_gff = nipyapi.canvas.get_processor('GenerateFlowFile')
if existing_processors_gff is None:
    generate_flowfile_processor = nipyapi.canvas.create_processor(
        parent_pg=my_pg,
        processor=nipyapi.canvas.get_processor_type('GenerateFlowFile'),
        name=processor_name_gff,
        location=(500.0, 100.0),
        config=nipyapi.nifi.ProcessorConfigDTO(
            properties={
                'Custom Text': 'Example text',
                'File Size': '1 KB',
                'time': '${time:format("yyyy-MM-dd HH:mm:ss", "GMT")}'
            },
            scheduling_period='10 sec',
            scheduling_strategy='TIMER_DRIVEN'
        )
    )
    print(f"Created new processor: {processor_name_gff}")
else:
    generate_flowfile_processor = existing_processors_gff
    print(f"Using existing processor: {processor_name_gff}")

http_processor_name = 'GetHTTP'
existing_processors_http = nipyapi.canvas.get_processor('GetHTTP')
if existing_processors_http is None:
    get_http_processor = nipyapi.canvas.create_processor(
        parent_pg=my_pg,
        processor=nipyapi.canvas.get_processor_type('InvokeHTTP'),
        name=http_processor_name,
        location=(500.0, 400.0),
        config=nipyapi.nifi.ProcessorConfigDTO(
            properties={
                'HTTP Method': 'GET',
                'Remote URL': 'https://api.open-meteo.com/v1/bom?latitude=51.1801&longitude=71.446&daily=temperature_2m_max&timezone=auto&start_date=${time}&end_date=${time}'
            },
            auto_terminated_relationships=['Retry', 'No Retry', 'Original', 'Failure']
        )
    )
    print(f"Created processor: {http_processor_name}")
else:
    get_http_processor = existing_processors_http
    print(f"Using existing processor: {http_processor_name}")

log_processor_name = 'Logger'
existing_processors_log = nipyapi.canvas.get_processor('Logger')
if existing_processors_log is None:
    get_logger_processor = nipyapi.canvas.create_processor(
        parent_pg=my_pg,
        processor=nipyapi.canvas.get_processor_type('LogMessage'),
        name=log_processor_name,
        location=(1200.0, 400.0)
    )
    print(f"Created processor: {log_processor_name}")
else:
    get_logger_processor = existing_processors_log
    print(f"Using existing processor: {log_processor_name}")

existing_connections = nipyapi.canvas.list_all_connections(my_pg.id)
connection_ff_to_http_name = 'FlowFile to HTTP'
is_connection_ff_to_http = next((conn for conn in existing_connections if conn.component.name == connection_ff_to_http_name), None)
if is_connection_ff_to_http is None:
    nipyapi.canvas.create_connection(
        source=generate_flowfile_processor,
        target=get_http_processor,
        relationships=['success'],
        name=connection_ff_to_http_name
    )
    print(f"Connected {processor_name_gff} to {http_processor_name}")
else:
    print(f"Connection '{connection_ff_to_http_name}' already exists")

connection_http_to_logger = 'HTTP to logger'
is_connection_http_to_logger = next((conn for conn in existing_connections if conn.component.name == connection_http_to_logger), None)
if is_connection_http_to_logger is None:
    nipyapi.canvas.create_connection(
        source=get_http_processor,
        target=get_logger_processor,
        relationships=['Response'],
        name=connection_http_to_logger
    )
    print(f"Connected {http_processor_name} to {log_processor_name}")
else:
    print(f"Connection '{connection_http_to_logger}' already exists")


replace_text_processor_name = 'ReplaceText'
existing_processors_replace_text = nipyapi.canvas.get_processor(replace_text_processor_name)
if existing_processors_replace_text is None:
    replace_text_processor = nipyapi.canvas.create_processor(
        parent_pg=my_pg,
        processor=nipyapi.canvas.get_processor_type('ReplaceText'),
        name=replace_text_processor_name,
        location=(800.0, 400.0),
        config=nipyapi.nifi.ProcessorConfigDTO(
            properties={
                'Replacement Strategy': 'Regex Replace',
                'Search Value': 'regex_to_search',
                'Replacement Value': 'replacement_value'
            }
        )
    )
    print(f"Created processor: {replace_text_processor_name}")
else:
    replace_text_processor = existing_processors_replace_text
    print(f"Using existing processor: {replace_text_processor_name}")


publish_kafka_processor_name = 'PublishKafka'
existing_processors_publish_kafka = nipyapi.canvas.get_processor(publish_kafka_processor_name)
if existing_processors_publish_kafka is None:
    publish_kafka_processor = nipyapi.canvas.create_processor(
        parent_pg=my_pg,
        processor=nipyapi.canvas.get_processor_type('PublishKafka'),
        name=publish_kafka_processor_name,
        location=(1100.0, 400.0),
        config=nipyapi.nifi.ProcessorConfigDTO(
            properties={
                'Kafka Brokers': 'kafka:9092',
                'Topic Name': 'my_awesome_topic',
            }
        )
    )
    print(f"Created processor: {publish_kafka_processor_name}")
else:
    publish_kafka_processor = existing_processors_publish_kafka
    print(f"Using existing processor: {publish_kafka_processor_name}")


connection_http_to_replace_text = 'HTTP to ReplaceText'
is_connection_http_to_replace_text = next((conn for conn in existing_connections if conn.component.name == connection_http_to_replace_text), None)
if is_connection_http_to_replace_text is None:
    nipyapi.canvas.create_connection(
        source=get_http_processor,
        target=replace_text_processor,
        relationships=['Response'],
        name=connection_http_to_replace_text
    )
    print(f"Connected {http_processor_name} to {replace_text_processor_name}")
else:
    print(f"Connection '{connection_http_to_replace_text}' already exists")


connection_replace_text_to_kafka = 'ReplaceText to Kafka'
is_connection_replace_text_to_kafka = next((conn for conn in existing_connections if conn.component.name == connection_replace_text_to_kafka), None)
if is_connection_replace_text_to_kafka is None:
    nipyapi.canvas.create_connection(
        source=replace_text_processor,
        target=publish_kafka_processor,
        relationships=['success'],
        name=connection_replace_text_to_kafka
    )
    print(f"Connected {replace_text_processor_name} to {publish_kafka_processor_name}")
else:
    print(f"Connection '{connection_replace_text_to_kafka}' already exists")


Summary

In this article, we explored how to build a lightweight ETL (Extract, Transform, Load) process using the Python API NiPyAPI, adopting a no-UI approach that leverages programmable interactions with Apache NiFi.


We detailed a practical use case where a NiFi processor was configured to fetch, process, and transmit weather data directly to a Kafka topic for further utilization within an enterprise ecosystem. This process involved:


  • Fetching real-time weather data from an online source without the need for manual intervention.


  • Transforming the raw data into a structured format suitable for analytical purposes and ensuring it aligns with downstream system requirements.


  • Seamlessly transmitting the processed data to Kafka, demonstrating how NiFi can integrate into broader data architectures.


  • Logging all operations to maintain a clear audit trail and facilitate debugging and operational transparency.


By employing a code-first approach using NiPyAPI, we showcased how software engineers can enhance automation, reduce reliance on graphical user interfaces, and increase the scalability and reliability of data flows. This method is especially beneficial in fields like GovTech, where accuracy, efficiency, and reliability are paramount.


The techniques discussed provide a foundation for engineers looking to implement similar ETL workflows in their projects, driving forward the capabilities of modern data handling and integration.