Prefect is an open-source workflow automation and orchestration tool for data engineering, machine learning, and other data-related tasks. It allows you to define, schedule, and execute complex data workflows in a straightforward manner.
Prefect workflows are defined using Python code. Each step in the workflow is represented as a “task,” and tasks can be connected to create a directed acyclic graph (DAG). The workflow defines the sequence of task execution and can include conditional logic and branching. Furthermore, Prefect provides built-in scheduling features that set up cron-like schedules for the flow. You can also parameterize your flow, allowing a run of the same flow with different input values.
This tutorial will explore how CrateDB and Prefect come together to streamline data ingestion, transformation, and loading (ETL) processes with a few lines of Python code.
Before we begin, ensure you have the following prerequisites installed on your system:
- Python 3.x: Prefect is a Python-based workflow management system, so you’ll need Python installed on your machine.
- CrateDB: To work with CrateDB, create a new cluster in CrateDB Cloud. You can choose the CRFEE tier cluster that does not require any payment information.
- Prefect: Install Prefect using pip by running the following command in your terminal or command prompt:
pip install -U prefect
- To get started with Prefect, you need to connect to Prefect’s API: the easiest way is to sign up for a free forever Cloud account at https://app.prefect.cloud/.
- Once you create a new account, create a new workspace with a name of your choice.
prefect cloud loginto log into Prefect Cloud from the local environment.
Now you are ready to build your first data workflows!
We’ll dive into the basics of Prefect by creating a simple workflow with tasks that fetch data from a source, perform basic transformations, and load it into CrateDB. For this example, we will use the yellow taxi trip data, which includes pickup time, geo-coordinates, number of passengers, and several other variables. The goal is to create a workflow that does a basic transformation on this data and inserts it into a CrateDB table named
import pandas as pd from prefect import flow, task from crate import client CSV_URL = "https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz" URI = "crate://admin:password@host:5432" @task() def extract_data(url: str): df = pd.read_csv(url, compression="gzip") return df @task() def transform_data(df): df = df[df['passenger_count'] != 0] return df @task() def load_data(table_name, df): df.to_sql(table_name,URI,if_exists="replace",index=False) @flow(name="ETL workflow", log_prints=True) def main_flow(): raw_data = extract_data(CSV_URL) data = transform_data(raw_data) load_data("trip_data", data) if __name__ == '__main__': main_flow()
- We start defining the flow by importing the necessary modules, including
prefectfor working with workflows,
pandasfor data manipulation, and
cratefor interacting with CrateDB.
- Next, we specify the connection parameters for CrateDB and the URL for a file containing the dataset. You should modify these values according to your CrateDB Cloud setup.
- We define three tasks using the
load_data(table_name, transformed_data). Each task represents a unit of work in the workflow:
read_data()task loads the data from the CSV file to a
transform_data(data)task takes the data frame and returns the data frame with entries where the
passenger_countvalue is different than 0.
load_data(transformed_data)task connects to the CrateDB and loads data into the
- We define the workflow, name it “ETL workflow“, and specify the sequence of tasks:
- Finally, we execute the flow by calling
main_flow(). This runs the workflow, and each task is executed in the order defined.
When you run this Python script, the workflow will read the trip data from a
csv file, transform it, and load it into the CrateDB table. You can see the state of the flow run in the Flows Runs tab in Prefect UI:
You can enrich the ETL pipeline with many advanced features available in Prefect such as parameterization, error handling, retries, and more. Finally, after the successful execution of the workflow, you can query the data in the CrateDB:
Throughout this tutorial, you made a simple Prefect workflow, defined tasks, and orchestrated data transformations and loading into CrateDB. Both tools offer extensive feature sets that you can use to optimize and scale your data workflows further.
As you continue exploring, don’t forget to check out the reference documentation. If you have further questions or would like to learn more about updates, features, and integrations, join the CrateDB community. Happy data wrangling!