ETL and Data Pipelines using Airflow and Kafka
Introduction
In these notes I will cover data processing methods such as ETL (Extract, Transform, Load) and ELT, their differences and trends. I will also go into data extraction techniques — such as database querying, web scraping, and APIs — emphasizing the highly important role of data transformation in adapting information for specific applications.
Practical implementation of ETL pipelines using Bash scripts scheduled with cron is also discussed, with a focus on differences between batch and streaming pipelines and techniques to address bottlenecks when they occur. I also cover the advantages of Apache Airflow’s DAG-based representation of pipelines for its code-based clarity and collaborative benefits. The platform’s user-friendly interface and essential DAG file components are explored, emphasizing the importance of efficient log handling.
Transitioning to Apache Kafka, an open-source event streaming pipeline, my notes outline core components like brokers, topics, and processors. The role of Kafka Streams API in facilitating data processing within these pipelines is detailed, accompanied by insights into popular Kafka service providers and guidance on building event streaming pipelines.
Content
- ETL and ELT Processes
- ETL using Shell
- What are Data Pipelines?
- Building Data Pipelines using Apache Airflow
- Building Streaming Pipelines using Kafka
ETL and ELT Processes
ETL Fundamentals
ETL, or Extract, Transform, Load, is a data pipeline engineering methodology automating the acquisition and preparation of data for analytics environments like data warehouses or data marts. It involves curating data from diverse sources, standardizing its format, and loading it into a new environment for visualization, exploration, and modeling, supporting automation and decision-making.
Extraction: Extraction configures data access and reads it into an application, often automated. Methods include web scraping with tools like Python or R and using APIs for programmatically querying data. Extraction varies from batch processes for static data archives to real-time streaming from dynamic sources like weather stations, social feeds, or IoT devices.
Data Transformation: Data transformation, or wrangling, adapts data to target system and use case requirements. Processes include cleaning (fixing errors), filtering (selecting needed data), joining disparate sources, feature engineering (creating KPIs), and formatting for compatibility.
Data Loading: Loading involves writing data to new destinations like databases or data warehouses, making it accessible for analytics applications. This step is crucial for end-users to derive value through dashboards, reports, and advanced analytics like forecasting.
Use Cases: ETL pipelines address various scenarios, from digitizing analog data (e.g., paper documents) to capturing historical data from OLTP systems for analysis in OLAP systems. Additionally, ETL prepares data for dashboards and machine learning models, enhancing decision-making across operations, sales, marketing, and executive levels.
ELT Basics
The ELT process, standing for Extract, Load, and Transform, is an automated data pipeline methodology differing from ETL due to its unique sequence of stages. In ELT, data is directly loaded in its original form into a destination environment like a data lake. This allows on-demand transformation within the destination platform, enabling dynamic user-driven alterations.
Extraction: Similar to ETL, the Extraction phase retrieves data from various sources asynchronously, funneling it into an application.
Loading: Raw data is directly loaded into the new environment, facilitating direct use with modern analytics tools. The Transformation process within ELT occurs dynamically within the destination, supporting interactive exploration, visualization, and advanced analytics like modeling.
ELT Use Cases: ELT finds applications in high-performance computing and Big Data scenarios, handling scale variations, real-time analytics on streaming data, and integrating globally distributed data sources efficiently.
Rationale for ELT: Cloud computing advancements cater to Big Data demands, offering scalability, cost-efficiency, and resource flexibility. ELT’s separation of data movement and processing optimizes efficiency. It retains a replica of the source data, preventing information loss during transformations, crucial for change requests upstream in the pipeline.
Flexibility and Data Preservation: ELT’s flexibility ensures diverse applications from the same data source without information loss risks present in various transformation processes. Storing raw data mitigates permanent information loss, providing flexibility for future alterations.
Comparing ETL and ELT
Differences between ETL and ELT: In ETL pipelines, transformations occur within the data pipeline before reaching the destination, while ELT decouples transformations, allowing them in the destination environment at will. ETL is rigid and fixed in purpose, whereas ELT is flexible, offering self-serve analytics.
Handling Big Data: ETL typically deals with structured, relational data on traditional computing resources, posing scalability challenges. ELT handles any data type, leveraging cloud computing for on-demand scalability, especially with Big Data.
Data Discovery and Time-to-Insight: ETL pipelines require development team intervention for changes, causing delays. ELT provides agility, allowing end users, with minimal training, to connect, experiment, create dashboards, and run predictive models independently.
Evolution from ETL to ELT: Driven by the demand to release raw data to a broader user base, ELT is an evolution of ETL. Traditional ETL includes a staging area, resembling a data lake, yet it remains private. The shift towards self-service data platforms reflects the changing landscape.
Trends: While conventional ETL retains relevance, there’s a trend favoring modern ELT. This shift addresses pain points such as prolonged time-to-insight, Big Data scalability challenges, and the siloed nature of data.
The current landscape is steering towards ELT, driven by its flexibility, agility, and solutions to contemporary challenges.
Data Extraction Techniques
Raw Data Sources
Examples of raw data sources include diverse types, such as:
- Archived text and images from paper documents and PDFs
- Web pages with text, tables, images, and links
- Analog audio and video from tapes or real-time streaming
- Survey, statistical, and economic data
- Transactional data from business, financial, real estate, and point-of-sale transactions
- Event-based data from social media streams
- Weather data from station networks
- Internet of Things (IoT) sensor streams
- Medical records, including prescription history and images
- Personal genetic data in DNA and RNA samples
Data Extraction Techniques
Various techniques cater to different data sources and use cases, such as:
- Optical character recognition (OCR) for digitizing text from paper
- Analog-to-digital converters (ADCs) for digitizing audio and images
- Web scraping for crawling web pages
- APIs for extracting data from online repositories
- SQL and NoSQL languages for querying databases
- Edge computing for feature extraction from raw data
Use Cases
Examples of high-level use cases with corresponding raw data sources and extraction techniques include:
- API usage to integrate data from multiple sources into a central repository
- APIs capturing periodic or asynchronous events for archiving
- Edge computing reducing IoT data volume by extracting features at the source
- Migration of impractical-to-extract data for further processing, analysis, or modeling
- Utilizing medical imaging devices and biometric sensors for diagnostic data acquisition.
Data is ubiquitous, often sensitive, and extraction techniques are tailored to source types and usage intentions.
Data Transformation Techniques
Data Transformation
Data transformation primarily involves formatting data to suit specific applications, encompassing various operations like:
- Data typing: Casting data into appropriate types such as integer, float, string, object, and category.
- Data structuring: Converting data formats like JSON, XML, or CSV to database tables.
- Anonymizing and encrypting transformations for privacy and security.
- Cleaning operations: Removing duplicate records and filling missing values.
- Normalizing data: Ensuring units are comparable, e.g., using a common currency.
- Filtering, sorting, aggregating, and binning operations: Accessing the right data at a suitable level of detail.
- Joining or merging disparate data sources.
Schema-on-Write vs. Schema-on-Read
In ETL pipelines, the conventional schema-on-write approach conforms data to a defined schema before loading, ensuring consistent structure but limiting versatility. ELT, in contrast, applies the schema to raw data after reading, providing versatility with ad-hoc schemas and broader data access without rigorous pre-processing.
Information Loss in Transformation
Whether intentional or accidental, information loss can occur in transformation processes. ETL processes may lose information during operations like compression, filtering, or aggregation. ELT, by copying data over as-is, preserves the original information content, avoiding loss scenarios.
Examples of Information Loss
- Lossy data compression: Converting floating-point values to integers or reducing bitrates on audio/video.
- Filtering: Permanent filtering can discard information.
- Aggregation: Comparing average yearly sales to daily or monthly averages.
- Edge computing devices: False negatives in surveillance devices designed to stream alarm signals, not raw data.
Understanding information loss in transformations is crucial, and the choice between ETL and ELT impacts data integrity and accessibility.
Data Loading Techniques
Various data loading techniques include:
Full Loading vs. Incremental Loading
- Full Loading: Initial history loaded into a database. Tracking data starts in a new warehouse.
- Incremental Loading: Inserts new data or updates already loaded data. Used for accumulating transaction history. Depending on the volume and velocity, it can be batch or stream loaded.
Scheduled vs. On-Demand Loading:
- Scheduled Loading: Periodic loading of daily transactions to a database, automated by script tasks.
- On-Demand Loading: is triggered by different events, such as source data reaching a specified size, or motion, sound, temperature change, or when a user requests it such as You Tube.
Batch vs. Stream Loading:
- Batch Loading: Data loaded in time-defined chunks, typically accumulated over hours to days.
- Stream Loading: Real-time loading as data becomes available.
- Micro-batch Loading: Access to recent data for imminent processes.
Push vs. Pull Data Loading:
- Pull Method: Clients request data from the server (e.g., RSS feeds, email).
- Push Method: Clients subscribe to a server service that delivers data as it becomes available (e.g., push notifications, instant messaging services).
Parallel Loading
- Boosts loading efficiency by employing multiple data streams or loading chunks simultaneously.
These techniques accommodate various data volume, velocity, and demand scenarios, optimizing loading efficiency based on the nature and availability of data.
ETL using Shell
A shell is a user interface for Unix-like operating systems that can interpret commands and run other programs. It also enables access to files, utilities, and applications, and is an interactive scripting language. You can use a shell to automate tasks, navigate and work with files and directories, and perform file compression and archiving.
Example Use Case
In this task, the goal is to report hourly average, minimum, and maximum temperatures from a remote sensor, updating a dashboard every minute.
We are given:
‘get_temp_api’
— read temperature from a remote sensor‘load_stats_api’
— load stats to dashboard repo
Here’s a streamlined example workflow using Bash scripting:
Extraction: Read the current temperature using the ‘get_temp’ API and append it to ‘temperature.log’. Keep only the last 60 readings.
Transformation: Create a Python script ‘get_stats.py’ to process the 60-minute log, calculating temperature stats and saving them to ‘temp_stats.csv’.
Loading: Load the computed stats into the reporting system using the provided API.
Creating the ETL Bash Script:
- Start by creating a Bash script named ‘Temperature_ETL.sh’ using the ‘touch’ command.
- Open the script in a text editor and begin with the Bash shebang to set it as a Bash shell script.
- Comment out the steps for extraction, transformation, and loading for clarity within the script.
touch Temperature_ETL.sh
gedit Temperature_ETL.sh
#!/bin/bash
#extract reading with get_temp_API
#append reading to temperature.log
#60 minutes of readings
#aggregate readings with get_stats.py
#load stats with load_stats_api
Extraction:
- Initialize the ‘temperature.log’ file via the command line using ‘touch’.
- Use commands in the text editor to read temperature via the API and append it to ‘temperature.log’.
- Keep the last 60 lines of the log file to maintain the latest readings.
- Incorporate the Python script ‘get_stats.py’ to calculate temperature stats and write them to ‘temp_stats.csv’.
touch temperature.log
#extract reading with get_temp_API
get_temp_API >> temperature.log
#60 minutes of readings
tail -n 60 temperature.log > temperature.log
#aggregate readings with get_stats.py
python3 get_stats.py temperature.log temp_stats.csv
Loading:
- Load the computed stats into the reporting system via the API by specifying ‘temp_stats.csv’ as a command line argument.
- Ensure the script is executable by setting permissions using ‘chmod’.
#load stats with load_stats_api
load_stats_api temp_stats.csv
#set permission
chmod +x Temperature_ETL.sh
Scheduling:
- Open the crontab editor.
- Schedule the ETL script to run every minute using the editor.
- Save the changes and exit.
#open crontab
crontab -e
#scheduling job in crontab
* * * * * /path/to/Temperature_ETL.sh
Your ETL job is now set and operational, running every minute to fetch, process, and report temperature statistics from the remote sensor.
What are Data Pipelines?
Introduction to Data Pipelines
The concept of a pipeline is universal, extending to any set of processes sequentially connected. This implies that the output of one process becomes the input for the subsequent process in a sequential chain. For instance, imagine a chain of friends passing boxes one by one to their nearest friend to relocate them — each friend acting as a processor with the identical function: get a box, pass a box. In mass production, the transformations may vary from stage to stage.
Data pipelines specifically deal with the movement or modification of data. These pipelines aim to transport data from one place or format to another, constituting a system that extracts data and guides it through optional transformation stages for final loading. While low-level hardware architectures are included in this definition, our focus here is on data pipelines driven by software processes, such as commands, programs, and processing threads. A prime example is the simple Bash pipe
command in Linux, acting as the ‘glue’ connecting these processes.
Visualizing data flowing through a pipeline can be conceived as data packets, referring broadly to units of data. These packets can range from a single record or event to large data collections. In this context, data packets are queued for ingestion into the pipeline, and the length of the data pipeline denotes the time it takes for a single packet to traverse it. The arrows between packets represent throughput delays or the times between successive packet arrivals.
Data Pipeline Key Performance Indicators
- Latency: It is the total time for a single packet of data to pass through the pipeline. Latency is the sum of the individual times spent during each processing stage within the pipeline, constrained by the slowest process in the pipeline. For instance, the loading time of a web page is dictated by the server’s speed, regardless of the internet service’s speed.
- Throughput: This refers to the amount of data that can be processed through the pipeline per unit of time. Increasing throughput involves processing larger packets per unit of time, analogous to passing bigger boxes in our friend chain example.
Applications of Data Pipelines:
- Simple Copying Pipeline: Involves copying data from one location to another, such as in file backups.
- Data Lake Integration: Integrating disparate raw data sources into a data lake.
- Transactional Records Movement: Transferring transactional records to a data warehouse.
- IoT Data Streaming: Streaming data from IoT devices to provide information in dashboards or alerting systems.
- Data Preparation for Machine Learning: Preparing raw data for machine learning development or production.
- Message Sending and Receiving: Involves applications like email, SMS, or online video meetings.
Key Data Pipeline Processes
Data pipeline processes typically follow a structured series of stages:
- Extraction: Involves retrieving data from one or more sources.
- Ingestion: The extracted data is ingested into the pipeline for further processing.
- Transformation: Optional stages within the pipeline can transform the data.
- Loading: The final stage loads the transformed data into a destination facility.
- Scheduling/Triggering: A mechanism schedules or triggers jobs to run as needed.
- Monitoring: The entire workflow is monitored for efficient operation.
- Maintenance and Optimization: Regular maintenance and optimization activities are performed to ensure smooth pipeline operation.
Monitoring Considerations
- Latency: Time taken for data packets to flow through the pipeline.
- Throughput Demand: Volume of data passing through the pipeline over time.
- Errors and Failures: Network overloads and failures at source or destination systems.
- Utilization Rate: How fully the pipeline’s resources are utilized, impacting cost.
- Logging and Alerting: Logging events and alerting administrators on failures is crucial.
Load Balanced Pipeline
- Efficient pipeline operation involves minimizing latency and balancing workloads. Load balanced transalates to no bottlenecks.
- The goal is to ensure no stage is left idle while the pipeline is operating, avoiding upstream bottlenecks.
- Load balancing across all stages implies uniform processing time for each packet, eliminating bottlenecks.
Handling Unbalanced Loads
- In cases where a stage exhibits higher latency (bottleneck), parallelization is a solution.
- Parallelization involves splitting data into concurrent stages, reducing latency with some overhead.
- While perfect load balancing is rare due to time and cost constraints, parallelizing bottleneck stages improves flow alignment.
- Parallelizing can involve replicating a process on multiple CPUs, cores, or threads.
- Data packets are distributed among replicated channels to speed up the bottlenecked stage.
- Dynamic or non-linear pipelines incorporate parallelism, unlike static or serial pipelines.
Stage Synchronisation
- Input and output (I/O) buffers help synchronize stages and regulate variable processing rates, enhancing throughput.
- Single I/O buffers are used for load distribution and gathering in parallelized stages.
Batch vs. Streaming Data Pipeline Use Cases
Batch Data Pipelines
- Operate on batches of data
- Operate on entire datasets at scheduled intervals (hours to weeks).
- Triggered by schedules or when data reaches a specific size.
- Suitable for accuracy-centric tasks without depending on data recency.
- Historically vital but facing competition from rapidly evolving streaming tech.
- Ideal for critical accuracy; streaming tech is catching up for mission-critical tasks.
Streaming Data Pipelines
- Process individual data packets (like transactions) in real-time.
- Aim for minimal latency and immediate processing of records or events.
- Store event streams for historical reference and interaction among users/systems.
- Use micro-batches for near-real-time processing and load balancing.
Batch vs. Stream Processing
- Batch offers high-quality output but increased latency due to cleaning and accuracy.
- Stream processing prioritizes low latency at the cost of error tolerance and cleaning.
Lambda Architecture
- Hybrid approach handling Big Data by combining batch and streaming methods.
- Batches delivered to batch layer, real-time data streamed to speed layer, integrated in serving layer.
- Addresses latency gap, useful when historical data access and speed are crucial.
- Downside: Complex design but suitable for accuracy and speed requirements.
Use Cases
Batch Data Pipelines:
- Periodic data backups and transaction history loading.
- Processing customer orders, billing, and slowly varying data modeling.
- Mid- to long-range sales and weather forecasting, historical data analysis.
- Diagnostic medical imaging.
Streaming Data Pipelines:
- Streaming movies, music, and podcasts.
- Social media feeds, sentiment analysis, fraud detection.
- User behavior analysis, targeted advertising.
- Stock market trading, real-time pricing, recommender systems.
Data Pipeline Tools and Technologies
In the world of data pipelines, a multitude of open-source and commercial tools cater to different needs. For robust ETL (Extract, Transform, Load) and ELT (Extract, Load, Transform) processes, consider the following features:
Fully Automated Pipelines:
- Seamless automation from data extraction to loading.
- Rule recommendations for extraction, transformation, and loading.
- Some tools automate data crawling.
User-Friendly Interfaces:
- Drag-and-drop GUI for rule specification and “no-code” ETL.
- Transformation support for complex operations like string manipulation and data merging.
Security and Compliance:
- Encryption of data in transit and at rest.
- Compliance with industry regulations (e.g., HIPAA, GDPR).
Some open-source data pipelines include:
Python and Pandas
- Python, coupled with Pandas, offers versatility in building data pipelines.
- Pandas uses data frames for tabular data but may face scalability challenges.
- Alternatives like Dask, Vaex, and Apache Spark handle big data scaling.
Apache Airflow
- Open-source “configuration as code” platform based on Python. It was open-sourced by AirBNB.
- Author, schedule, and monitor data pipeline workflows.
- Scalable, supporting parallel compute nodes, and integrates with major cloud platforms.
Talend Open Studio
- Open-source platform for big data migration, data warehousing, and profiling.
- Supports collaboration, monitoring, and scheduling.
- Features a drag-and-drop GUI with automatic Java code generation.
Some of the most popular commercial enterprize tools include:
AWS Glue
- Fully managed ETL service for easy data preparation.
- Crawls data sources, suggests schemas, and facilitates analytics job creation.
- Integrates seamlessly with AWS services.
Panoply
- Focuses on ELT, emphasizing data connection and integration without coding.
- SQL functionality for generating data views.
- Integrates with popular BI tools like Tableau and PowerBI.
Alteryx
- Commercial, self-service data analytics platform with drag-and-drop tools.
- No SQL or programming knowledge required.
- Versatile for creating and maintaining complex data pipelines.
IBM InfoSphere DataStage
- Data integration tool for ETL and ELT pipelines.
- Part of IBM InfoSphere Information Server.
- Utilizes drag-and-drop framework and parallel processing for scalability.
IBM Streams
- Streaming data pipeline technology for real-time analytical applications.
- Uses Streams Processing Language (SPL), Java, Python, or C++.
- IBM Streams Flows facilitates visual development with drag-and-drop features.
Stream-Processing Technologies
- Apache Storm, SQLstream, Apache Samza, Apache Spark, Azure Stream Analytics, Apache Kafka.
Building Data Pipelines using Apache Airflow
Apache Airflow stands out as a powerful open-source workflow orchestration tool with robust community support. Unlike Big Data tools focused on data streaming, such as Apache Kafka, Apache Storm, Apache Spark, or Flink, Apache Airflow excels as a workflow manager, especially for batch data pipelines.
- Scheduler: Triggers scheduled workflows.
- Executor: Runs tasks by assigning them to Workers.
- Web Server: Hosts an interactive UI for DAG inspection, triggering, and debugging.
- DAG Directory: Stores DAG files accessible by Scheduler, Executor, and Workers.
- Metadata Database: Maintains the state of each DAG and its tasks.
DAG and Task Lifecycle
DAG (Directed Acyclic Graph) specifies task dependencies and execution order.
Task States:
- No status: Task not queued for execution.
- Scheduled: Task scheduled to run as per dependencies.
- Removed: Task vanished since the run started.
- Upstream failed: Failure in an upstream task.
- Queued: Task awaiting worker availability.
- Running: Task being executed by a worker.
- Success: Task completed without errors.
- Failed: Task encountered an error during execution.
- Up for retry: Failed task with remaining retry attempts, to be rescheduled.
- Ideal Task Flow: ‘No Status’ to ‘Scheduled’ to ‘Queued’ to ‘Running’ and finally to ‘Success.’
Features and Benefits
- Pure Python: Build workflows using standard Python for full flexibility.
- Useful UI: Monitor, schedule, and manage workflows with a sophisticated web app.
- Integration: Plug-and-play integrations, like IBM Cloudant, ready for task execution.
- Easy to Use: Python knowledge is sufficient; Airflow doesn’t limit pipeline scope.
- Open Source: Active community, allowing users to share improvements via pull requests.
Principles of Airflow Pipelines
- Scalable: Modular architecture with message queues for infinite scalability.
- Dynamic: Python-defined pipelines allowing dynamic generation and simultaneous tasks.
- Extensible: Define custom operators and extend libraries for environment-specific needs.
- Lean: Explicit and lean pipelines with built-in parameterization using Jinja templating.
Use Cases
- ‘Sift’: Defined ML pipeline dependencies.
- ‘SeniorLink’: Enhanced visibility and decoupled batch processes.
- ‘Experity’: Deployed as an enterprise scheduling tool.
- ‘Onefootball’: Orchestrated SQL transformations and sent daily analytics emails.
Advantages of using Data Pipelines as DAGs
A ‘DAG’ or directed acyclic graph is a specific type of graph used to represent relationships without loops or cycles. It’s composed of nodes and edges, with directed edges showing the flow between nodes.
- Graph: nodes and edges
- Directed graph: each edge has a direction (a — — >b)
- Acyclic: no cycles — loops
Examples of DAGs
- A simple DAG consists of a single root node connected to a terminal node.
- Trees, like family trees or directory structures, are DAGs with single root and terminal nodes.
- DAGs differ from trees as they can have multiple root nodes or nodes with no parents.
Role of DAGs in Apache Airflow
- DAGs define workflow or pipeline tasks, with nodes representing individual tasks and edges dictating their sequence in which they should run.
- Each task in the DAG is coded using Python operators to execute specific actions.
- Operators include Python, SQL, and Bash operators, along with sensors that poll for a certain time or conditions to be met.
Logical Blocks in an Airflow DAG Script
- Library imports: Import necessary Python libraries.
- DAG arguments: Define default arguments for the DAG, like the ‘start date.’
- DAG definition: Instantiate the DAG with specific attributes.
- Task definitions: Define individual tasks (nodes) in the DAG.
- Task pipeline: Specify task dependencies to establish the flow between tasks.
Take a look at an example of a Python script including these logical blocks.
Deploying a DAG
- The Airflow Scheduler, a persistent service in an Airflow production setup, manages workflow deployment.
- The Scheduler orchestrates task execution based on specified dependencies and schedules.
- Once initiated, the Scheduler follows the ‘start date’ and schedule intervals to trigger DAG runs.
Benefits of Expressing Data Pipelines as DAGs
- Maintainability: Clear code allows easy understanding and modification.
- Version Control: Code revisions can be tracked using systems like Git.
- Collaboration: Development and maintenance become collaborative efforts among teams.
- Testability: Unit tests ensure revised code functions as intended.
Apache Airflow UI
This is the Apache Airflow User Interface’s landing page in your browser:
It defaults to the ‘DAGs View,’ a table with data on each DAG in your environment. Rows contain interactive details such as DAG name, run schedule (in crontab format), owner, task status from the current run, status of previous runs, and quick links for more information.
On the left, you can drill down for interactive details, and in the next column, toggle to pause a DAG (currently not running). Visualizing DAGs has multiple options.
Visualizing a DAG
- For instance, ‘
example_bash_operator
,’ running in production, opens the ‘Tree View.’ - It provides a timeline of DAG and task status for each run.
- You can select the base date and a number of runs to display. Colors indicate statuses.
Graph View
- Another DAG, displayed in ‘Graph View,’ shows tasks and dependencies at the bottom.
- Tasks are color-coded by operator type. Filtering options are available.
- ‘Task Instance Context Menu’ allows drilling down on task instances for details.
Code View
- Clicking the Code button reveals Python source code defining your DAG.
- It displays building blocks, such as library imports and task definitions (e.g., Bash Operators).
Task Duration
- ‘Task Duration’ provides a timeline chart of task durations, highlighting the last N runs.
The UI offers comprehensive insights, enabling users to manage, monitor, and troubleshoot their DAGs efficiently.
Building DAG using Airflow
An Apache Airflow DAG is a Python script with distinct logical blocks:
Library Imports
- Import necessary Python libraries, including the DAG class, Bash Operator, and date-time module.
DAG Arguments
- Specify default arguments as a Python dict, covering ownership, start date (e.g., July 28, 2021), retry settings, and retry delay.
DAG Definition
- Instantiate the workflow as a DAG object, specifying name, description, default arguments, and scheduling instructions (e.g., run every five seconds).
Task Definitions
- Define tasks (nodes) within the DAG. In this example, two Bash Operators, ‘print hello’ and ‘print date,’ each with a unique ID and Bash commands.
Task Pipeline
- Specify task dependencies using the double ‘greater than’ notation. For instance, ‘print date’ depends on the success of ‘print hello.’
Let’s create a simple Airflow DAG script:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
#DAG arguments
default_args = {
'owner': 'you',
'start_date': datetime(2021, 7, 28),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
#DAG definition
dag = DAG(
'simple_example',
description='A simple example DAG',
default_args=default_args,
schedule_interval=timedelta(seconds=5),
)
#task definitions
task1 = BashOperator(
task_id='print_hello',
bash_command="echo 'Greetings. The date and time are'",
dag=dag,
)
task2 = BashOperator(
task_id='print_date',
bash_command="date",
dag=dag,
)
#task pipeline
task1 >> task2
This script creates a DAG named ‘simple_example’ with two tasks: ‘print_hello’ and ‘print_date.’ The latter depends on the success of the former. The DAG runs every five seconds, providing a clear template for building Airflow workflows.
Airflow Monitoring and Logging
Logging in Apache Airflow is crucial for developers to monitor task statuses during DAG runs and diagnose/debug issues. By default, Airflow logs are stored locally for quick review, especially in development environments. In production, logs can be sent to cloud storage (IBM Cloud, AWS, Azure) or indexed/searched in systems like Elasticsearch and Splunk.
Log File Structure
- Log files are organized by DAG and Task IDs.
- Path convention:
logs/dag_id/task_id/execution_date/try_number.log
. - Example:
logs/dummy_dag/task1/execution_date/1.log
.
Reviewing Logs
- Log files contain useful information: running command, results, task outcomes, etc.
- Task events can be quickly reviewed via the Airflow webserver UI.
- Search events using DAG ID, Task ID, Execution Date for a specific overview.
Monitoring Metrics
Airflow provides three metric types:
- Counters: Continuous metrics that always increase (e.g., counts of successful/failed tasks).
- Gauges: Fluctuating metrics (e.g., running tasks, DAG bag sizes).
- Timers: Time-related metrics (e.g., task duration, time to task completion).
Storing and analyzing metrics
- Counters and gauges can be sent to dedicated repositories and tool.
- For production, Airflow recommends StatsD for metric gathering and monitoring.
- Prometheus is suggested for metrics monitoring and analysis, providing interactive visual analytics on aggregated metrics in a dashboard.
Building Streaming Pipelines using Kafka
Distributed Event Sreaming Platform Components
An Event, in the context of event streaming, refers to data describing the observable state updates of an entity over time. Examples include GPS coordinates of a car, room temperature, blood pressure measurements, or RAM usage of an application.
Events come in different formats:
- primitive types like text, numbers, or dates
- key-value pair format with values as primitive or complex types (e.g., JSON, XML
- key-value with a timestamp for time sensitivity.
One Source to One Destination: Event streaming involves continuous generation of real-time events by sources (sensors, databases, applications) and transporting them to destinations (file systems, databases, applications). The process is called Event Streaming.
Many Sources to Many Destinations: Managing multiple distributed event sources and destinations with varied communication protocols (FTP, HTTP, JDBC, SCP) poses challenges. An Event Stream Platform (ESP) acts as a middleware, simplifying the handling of diverse event-based ETL.
ESP Components
- Event Broker: Core component receiving and consuming events.
- Event Storage: Stores received events for asynchronous retrieval by destinations.
- Analytic and Query Engine: Queries and analyzes stored events.
Event Broker is the core component. It includes:
- Ingester: Efficiently receives events from diverse sources.
- Processor: Performs operations like serialization, deserialization, compression, decompression, encryption, and decryption.
- Consumption: Retrieves events from storage and distributes them to subscribed destinations.
Popular ESP Solutions:
- Apache Kafka
- Amazon Kinesis
- Apache Flink
- Apache Spark
- Apache Storm
Apache Kafka: One of the most popular ESPs with unique features and widespread application scenarios.
Overview of Apache Kafka
Implementing an ESP from scratch can be challenging, but thankfully, there are excellent open-source and commercial solutions available. One standout is Apache Kafka, widely recognized as a leading event streaming platform.
Common Kafka Use Cases
- User activity tracking
- Metric streaming (sensors, GPS, monitoring)
- Centralized log collection for large-scale enterprise infrastructures
- Handling payments and transactions in finance sectors
Kafka efficiently ingests events, making them available for:
- Subscriptions and consumption
- Storage and movement to other databases and backups
- Real-time processing and analytics, including machine learning and AI
- Notification generation (email, text, instant messages)
- Data governance and auditing for regulatory compliance
Kafka follows a distributed client-server model:
- Server Side: Consists of a cluster of brokers managed by ZooKeeper for efficient collaboration.
- Network Communication: Utilizes TCP for data exchange between clients and servers.
- Client Side: Offers diverse clients, including CLI, Java, Scala, REST APIs, and third-party options.
Why Kafka is Popular?
- Scalability: Distributes data across multiple brokers, ensuring scalability and high throughput.
- Highly Reliable: Uses multiple partitions and replications for reliability.
- Permanent Persistency: Stores events permanently, enabling consumption at the consumer’s convenience.
- Open Source: Freely available, allowing customization based on specific requirements.
Kafka Challenges and Solutions?
While Kafka is open source and well-documented, deploying and configuring it can be complex. Commercial service providers offer solutions like Confluent Cloud, IBM Event Streams, and Amazon Managed Streaming for Apache Kafka. These services provide managed Kafka offerings with added features like security, disaster recovery, and continuous monitoring. They simplify the deployment process, making Kafka accessible for various streaming requirements.
Building Event Streaming Pipelines using Kafka
A Kafka cluster comprises multiple brokers, each serving to receive, store, process, and distribute events. These brokers, orchestrated by ZooKeeper, manage topics — likened to databases storing specific event types like logs or transactions.
Partitioning and Replication: Kafka employs partitioning and replication for fault tolerance and parallel event handling. Even if some brokers fail, Kafka ensures continuity by distributing topic partitions across operational brokers.
Managing Topics with Kafka CLI: The Kafka command-line interface offers functionalities like creating, listing, describing, and deleting topics within a Kafka cluster. Commands include creating topics with defined partitions and replications and obtaining details about topics and their configurations:
#creating a topic
kafka-topics --create --topic log_topic --partitions 2 --replication-factor 2
#listing topics
kafka-topics --list
#describing topics
kafka-topics --describe log_topic
#removing topics
kafka-topics --topic log_topic --delete
Kafka Producers
Kafka producers are client apps publishing events to topic partitions. Events can be associated with keys for selective partitioning. The producer CLI facilitates managing producers and publishing events to specified topics with associated keys.
Reading Events with Consumers: Consumers subscribe to topics and read stored events, maintaining offsets for sequential reading. By resetting offsets, consumers can replay events from the beginning. Kafka consumers and producers operate independently, allowing events to be stored and consumed without synchronization.
Kafka Producer CLI
Kafka producers, responsible for publishing events, offer key-based association for orderly partitioning. Producers don’t need synchronization with consumers, allowing independent schedules for event consumption.
#publishing events to log_topic
kafka-console-producer --topic log_topic
#publishing events to user_topic with keys
kafka-console-producer --topic user_topic --property "parse.key=true" --property "key.separator=,"
Kafka Consumer CLI
Consumers subscribe to topics, reading events in order. They store offsets, ensuring they read events as they occur. A playback option allows reading events from the beginning.
#reading events from log_topic
kafka-console-consumer --topic log_topic
#reading all events from the beginning
kafka-console-consumer --topic log_topic --from-beginning
An End-to-End Event: Weather Pipeline
Step 1: Define Event Sources
Imagine you want to analyze weather and Twitter event streams to understand public reactions to extreme weather on Twitter. You’ll utilize two key event sources:
- IBM Weather API: Provides real-time and forecasted weather data in JSON format.
- Twitter API: Offers real-time tweets and mentions in JSON format.
Step 2: Configure Kafka Topics
In your Kafka cluster, create dedicated topics for weather and Twitter events, ensuring proper partitions and replications to handle data flow efficiently.
#creating topics EXAMPLE
kafka-topics --create --topic weather_topic --partitions 2 --replication-factor 2
kafka-topics --create --topic twitter_topic --partitions 2 --replication-factor 2
Step 3: Develop Producers
For each event source, develop specific producers. These will serialize JSON data into bytes and publish them to respective Kafka topics.
#weather producer EXAMPLE
kafka-console-producer --topic weather_topic
#twitter producer with key-value pairs EXAMPLE
kafka-console-producer --topic twitter_topic --property "parse.key=true" --property "key.separator=,"
Step 4: Implement Consumers
Create dedicated consumers for weather and Twitter events. These consumers will deserialize bytes from Kafka topics, converting them back into JSON data for processing.
#weather consumer EXAMPLE
kafka-console-consumer --topic weather_topic
#twitter consumer EXAMPLE
kafka-console-consumer --topic twitter_topic
Step 5: Integrate a DB Writer for Persistence
If you want to write event data to a relational database, use a DB writer. This component parses JSON files from consumers and generates corresponding database records.
#db writer EXAMPLE
import json
import sqlite3
def write_to_database(record):
connection = sqlite3.connect("event_database.db")
cursor = connection.cursor()
cursor.execute("INSERT INTO events VALUES (?)", (json.dumps(record),))
connection.commit()
connection.close()
record = {"event_type": "weather", "data": {"temperature": 25, "location": "NYC"}}
write_to_database(record)
Step 6: Database Interaction with SQL
Use SQL insert statements to write records into the database. This step completes the transition of data from Kafka topics to a persistent storage solution.
-- SQL insert EXAMPLE
INSERT INTO events VALUES ('{"event_type": "weather", "data": {"temperature": 25, "location": "NYC"}}');
Step 7: Visualization and Analysis
Finally, query the database records for insightful visualization and analysis. Best to use a dashboard to gain valuable insights from the collected and stored event data.
This end-to-end pipeline showcases the seamless integration of various components, emphasizing the flexibility and power of Kafka in managing event streams.
Kafka Streaming Process
Data engineers, in the world of event streaming, not only transport data but also engage in essential processing tasks like filtering, aggregation, and enhancement. Applications tailored for this purpose are known as stream processing applications. For Kafka-based stream processing applications, a direct approach involves implementing an ad hoc data processor to read from one topic, process the data, and publish it to another topic. Let’s see an illustrative example:
Raw Weather Data Processing
- Request raw weather JSON data from a weather API.
- Initiate a weather producer to publish raw data to a designated topic.
- Start a consumer to read raw weather data from the topic.
Ad Hoc Data Processor:
- Create a custom processor to filter raw weather data, e.g., isolating extreme weather events.
- This processor could be a script or an application interacting with Kafka clients.
- Processed data is sent to another producer and published to a new topic.
Processed Data Consumption:
- A dedicated consumer reads the processed weather data.
- The data is then sent to a dashboard for visualization.
While effective, ad hoc processors can become complex with numerous topics. Kafka offers a streamlined solution with the Kafka Streams API.
Kafka Streams API
Simplified Processing:
- Facilitates stream processing with a simple client library.
- Processes and analyzes data stored in Kafka topics (input and output).
- Ensures each record is processed only once.
- Processes one record at a time for efficiency.
Stream Processing Topology:
- Built on a computational graph known as a stream-processing topology.
- Nodes represent stream processors performing transformations.
- Source Processor: Consumes streams from Kafka topics.
- Sink Processor: Publishes received streams to Kafka topics.
Redesigning the Weather Data Processing with Kafka Streams API
Topology Overview:
- Raw weather topic and processed weather topic in Kafka.
- Three stream processors: source, filter, and sink.
Stream Processing Flow:
- Source processor consumes raw weather streams.
- Filter processor screens streams based on high temperatures.
- Sink processor publishes filtered streams to the processed weather topic.
In conclusion, leveraging Kafka Streams API simplifies the design of stream processing applications, offering a more efficient alternative to complex ad hoc processors, especially when dealing with diverse topics.
kafka-python
Package
For the end of this long post, I’d like to mention kafka-python and a short overview of it. Using kafka-python you can easily manage all the funtionalities we discussed in the post regarding Kafka.
Installing kafka-python
pip install kafka-python
KafkaAdminClient Class
- KafkaAdminClient facilitates essential administrative operations on Kafka servers.
- Examples include creating/deleting topics and updating configurations.
KafkaAdminClient Object
from kafka import KafkaAdminClient, NewTopic
admin_client = KafkaAdminClient(
bootstrap_servers="localhost:9092",
client_id='test'
)
Managing Topics
#creating a new topic
topic_list = []
new_topic = NewTopic(name="bankbranch", num_partitions=2, replication_factor=1)
topic_list.append(new_topic)
admin_client.create_topics(new_topics=topic_list)
#describing a topic
configs = admin_client.describe_configs(
config_resources=[ConfigResource(ConfigResourceType.TOPIC, "bankbranch")]
)
KafkaProducer
- KafkaProducer is employed for publishing messages to topics.
- We demonstrate publishing JSON messages.
Creating KafkaProducer
from kafka import KafkaProducer
import json
producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
Producing Messages
#producing json messages
producer.send("bankbranch", {'atmid': 1, 'transid': 100})
producer.send("bankbranch", {'atmid': 2, 'transid': 101})
KafkaConsumer
- KafkaConsumer is used for consuming messages from topics.
- We showcase message consumption in Python.
from kafka import KafkaConsumer
consumer = KafkaConsumer('bankbranch')
Consuming Messages
#printing messages
for msg in consumer:
print(msg.value.decode("utf-8"))
If you found this blog helpful, you might want to check out some of my other data-related blogs:
Disclaimer: The notes and information presented in this blog post were compiled during the course “ETL and Data Pipelines with Shell, Airflow and Kafka” and are intended to provide an educational overview of the subject matter for personal use.