CrateDB and Apache Airflow: Building a data ingestion pipeline


Due to a change in the data format provided (switch from CSV to Parquet) by the upstream party, this implementation became outdated. Please find below an updated version of this post:


In the first two posts of this series, we introduced Apache Airflow as an orchestration tool to run recurring CrateDB queries:

One of Airflow’s strengths is that it has a large ecosystem of providers, enabling connectivity to a large set of systems. The scope of this post is to build a data pipeline involving additional providers to regularly import new data into CrateDB.

Use case

The use case is based on the NYC Taxi Data repository which contains multi-year raw data of taxi trips from New York City. Instead of using the provided Shell scripts, we will implement the ingest process via Airflow operators.

The ingest process will consist of several steps:

  1. Retrieve a text file from GitHub that lists all available raw CSV files with taxi trips
  2. Retrieve a list of already imported files from a CrateDB table
  3. Iterate over all not yet imported files, and for each:
    1. Import from S3 into a CrateDB staging table
    2. Copy from the staging table to the destination
    3. Mark the file as processed

Airflow setup

We assume that a basic Astronomer/Airflow setup is already in place, as described in our first post of this series. If you are not using Astronomer, certain setup-related details might differ.

In addition to the basic Astronomer setup, we need to add an HTTP connection to GitHub.

This is done by setting up an environment variable which we add by appending a new line to our .env file (your preferred deployment mechanism for environment variables might differ):

AIRFLOW_CONN_HTTP_RAW_GITHUB=http://https%3A%2F%2Fraw.githubusercontent.com

The environment variable’s value looks unusual but is syntactically correct. The initial http:// refers to the type of the Airflow connection and is not part of the actual URL. The URL (including the usage of HTTPS) follows as a URL-encoded value after the connection type.

Data model

We start by creating two tables in CrateDB: A temporary staging table (nyc_taxi.load_trips_staging) and the final destination table (nyc_taxi.trips):

CREATE TABLE IF NOT EXISTS "nyc_taxi"."load_trips_staging" (
   "VendorID" INTEGER,
   "tpep_pickup_datetime" TEXT,
   "tpep_dropoff_datetime" TEXT,
   "passenger_count" INTEGER,
   "trip_distance" REAL,
   "RatecodeID" INTEGER,
   "store_and_fwd_flag" TEXT,
   "PULocationID" INTEGER,
   "DOLocationID" INTEGER,
   "payment_type" INTEGER,
   "fare_amount" REAL,
   "extra" REAL,
   "mta_tax" REAL,
   "tip_amount" REAL,
   "tolls_amount" REAL,
   "improvement_surcharge" REAL,
   "total_amount" REAL,
   "congestion_surcharge" REAL
);

CREATE TABLE IF NOT EXISTS "nyc_taxi"."trips" (
   "id" TEXT NOT NULL,
   "cab_type_id" INTEGER,
   "vendor_id" TEXT,
   "pickup_datetime" TIMESTAMP WITH TIME ZONE,
   "pickup_year" TIMESTAMP WITH TIME ZONE GENERATED ALWAYS AS DATE_TRUNC('year', "pickup_datetime"),
   "pickup_month" TIMESTAMP WITH TIME ZONE GENERATED ALWAYS AS DATE_TRUNC('month', "pickup_datetime"),
   "dropoff_datetime" TIMESTAMP WITH TIME ZONE,
   "store_and_fwd_flag" TEXT,
   "rate_code_id" INTEGER,
   "pickup_location" GEO_POINT,
   "dropoff_location" GEO_POINT,
   "passenger_count" INTEGER,
   "trip_distance" DOUBLE PRECISION,
   "trip_distance_calculated" DOUBLE PRECISION GENERATED ALWAYS AS DISTANCE("pickup_location", "dropoff_location"),
   "fare_amount" DOUBLE PRECISION,
   "extra" DOUBLE PRECISION,
   "mta_tax" DOUBLE PRECISION,
   "tip_amount" DOUBLE PRECISION,
   "tolls_amount" DOUBLE PRECISION,
   "ehail_fee" DOUBLE PRECISION,
   "improvement_surcharge" DOUBLE PRECISION,
   "congestion_surcharge" DOUBLE PRECISION,
   "total_amount" DOUBLE PRECISION,
   "payment_type" TEXT,
   "trip_type" INTEGER,
   "pickup_location_id" INTEGER,
   "dropoff_location_id" INTEGER
)
PARTITIONED BY ("pickup_year");

Additionally, we create a helper table that will store the files already processed:

CREATE TABLE IF NOT EXISTS "nyc_taxi"."load_files_processed" (
   "file_name" TEXT,
   PRIMARY KEY ("file_name")
)
CLUSTERED INTO 1 SHARDS;

Airflow DAG

The Airflow DAG is composed of tasks and their dependencies. We are mostly making use of the most flexible PythonOperator, but let’s walk through the individual tasks:

  • download_data_urls: A SimpleHttpOperator that fetches a text file from the NYC Taxi Data GitHub repository. The text files include links to files on S3 which each include monthly raw data as CSV files. The content of the text file is stored in Airflow’s inter-task communication mechanism XCom. This only refers to the URLs of S3 files, not the content of the CSV files.
  • clean_data_urls: Using the PythonOperator, we convert the retrieved text file into a proper Python data type (list) and also sort out certain CSV file URLs. The NYC taxi data comprises data from multiple taxi operators with certain differences in the corresponding CSV schemas. We are limiting the scope of this post to Yellow Cab Taxis. The resulting list is stored again via XCom.
  • get_processed_files : Loads data from the above-defined table nyc_taxi.load_files_processed via a PythonOperator/PostgresHook combination and stores the result via XCom.
  • identify_missing_urls: Subtracts the set of already processed file URLs from the set of available file URLs. The result is a set of file URLs to be imported that we again store via XCom.
  • purge_staging_init: Deletes all data from the table nyc_taxi.load_trips_staging. Later on, while processing the CSV files, we will always clean up the staging table once a file was processed. However, there might be cases where that task terminates prematurely, e.g. due to an error. Hence, we are cleaning the staging table here again.
  • process_new_files: This task is slightly more complex, as it iterates over all files returned by identify_missing_urls. For each file, several tasks are invoked, all of them being PostgresOperator:
    1. Issue a COPY FROM statement. CrateDB will pull the CSV file directly from S3, so we don’t need to download it explicitly upfront
    2. Run an INSERT INTO nyc_taxi.trips (...) SELECT ... FROM nyc_taxi.load_trips_staging to copy data from the staging table to its final destination table
    3. Trigger a DELETE FROM nyc_taxi.load_trips_staging to empty the staging table for the next file

The full DAG file is available in our GitHub repository.

Summary

We demonstrated how a multi-step data pipeline can be implemented with Airflow and CrateDB. Airflow makes it easy to implement and orchestrate preparatory steps to identify which data needs to get ingested. CrateDB integrates seamlessly via standard PostgreSQL operators. The capabilities of the COPY FROM statement to download files directly from their source saves us from having to deal with actual data in Airflow. Instead, we can fully operate on small amounts of metadata within Airflow and leave the heavy processing to CrateDB.

3 Likes