Retrieving records in bulk with a list of primary key values

When we send SQL statements to CrateDB they need to be parsed, but in most situations we do not think about this because the resources used for parsing the statements are trivial in relation to what is required to actually execute the queries.

One exception to this is when INSERTing a large amount of rows, and for this case CrateDB has a very efficient bulk operations interface which can also be used for UPDATEs and DELETEs.

However I recently came across an unusual requirement, we had a very large table with a primary key made of multiple fields, and given tens of thousands of values for these fields we needed to retrieve all the corresponding records.

Let me exemplify the situation with this table definition:

CREATE TABLE sensor_data (
  ts TIMESTAMP
  ,machine_id TEXT
  ,sensor_type SMALLINT
  ,payload OBJECT
  ,PRIMARY KEY (ts,machine_id,sensor_type)
);

Let’s also create some sample data:

INSERT INTO sensor_data (ts,machine_id,sensor_type,payload )
SELECT now()
	,concat('machine',a.b)
	,random()*10
	,{"test"='abc'}
FROM GENERATE_SERIES(1,100000) a(b);

There are different approaches we could use to retrieve multiple rows for the given PK values, such as:

SELECT * FROM sensor_data WHERE ts='2024-02-21 08:00:00.000Z' AND machine_id='machine1' AND sensor_type=8 
UNION 
SELECT * FROM sensor_data WHERE ts='2024-02-21 08:00:00.000Z' AND machine_id='machine2' AND sensor_type=5

or:

SELECT * 
FROM sensor_data 
WHERE (ts='2024-02-21 08:00:00.000Z' AND machine_id='machine1' AND sensor_type=8)
OR (ts='2024-02-21 08:00:00.000Z' AND machine_id='machine2' AND sensor_type=5);

This works reasonably well up to a few hundred records, but let’s see what happens if we try to use this approach to lookup tens of thousands of different records in a single statement as it was the requirement in this very particular case.

Let’s dynamically generate a query like the above but for 10,000 records:

WITH thedata
AS (
	SELECT CONCAT (
			'OR (ts=',(ts::BIGINT)::TEXT
			,' and machine_id=''',machine_id
			,''' and sensor_type=',sensor_type
			,')'
			) AS onewherecondition
	FROM sensor_data 
	LIMIT 10000
	)
SELECT CONCAT (
		'SELECT * FROM sensor_data WHERE '
		,replace(replace(replace({ "thearray" = array_agg(onewherecondition) }::TEXT, '{"thearray":["OR ', ''), '","', ' '), '"]}', '')
		,';')
FROM thedata;

This will generate a very long statement, and when we try to run it we may get:

StackOverflowError[null]

or

io.crate.exceptions.SQLParseException: line 1:1: statement is too large (stack overflow while parsing)

So we will need a different strategy, and we also want this to run as quickly as possible.

Let’s start by preparing a CSV file with 10,000 primary key values we will use for testing:

pip install crash
crash -c "SELECT ts,machine_id,sensor_type FROM sensor_data LIMIT 10000;" --format csv > pkvalues.csv

What we are going to do now is take advantage of a system column called _id which exists on all CrateDB tables. This column contains a unique identifier for each row, and for tables with a PK defined it is a compound string representation of all primary key values of that row. The useful characteristic here is that the value is deterministic, given 2 tables with the same PK definition rows with the same PK values will have the same _id values.

So to perform this “bulk SELECT” we are going to use a staging table defined with the same PK as the main table. The Python code below bulk loads the values from the CSV file to the staging table and then uses the _id values to locate all the rows we are interested in:

pip install pandas "crate[sqlalchemy]" --upgrade
import pandas as pd
import sqlalchemy as sa
from crate.client.sqlalchemy.support import insert_bulk

df = pd.read_csv("pkvalues.csv")

engine = sa.create_engine(
    "crate://localhost:4200",
    connect_args={"verify_ssl_cert": False},
)
connection = engine.connect()

connection.execute(sa.text("DROP TABLE IF EXISTS relevant_pk_values;"))
connection.execute(
    sa.text(
        """
        CREATE TABLE relevant_pk_values (
            ts TIMESTAMP
            ,machine_id TEXT
            ,sensor_type SMALLINT
            ,PRIMARY KEY (ts,machine_id,sensor_type)
        ) CLUSTERED INTO 1 SHARDS;
        """
    )
)

df.to_sql(
    name="relevant_pk_values",
    con=engine,
    if_exists="append",
    index=False,
    chunksize=5_000,
    method=insert_bulk,
)
connection.execute(sa.text("REFRESH TABLE relevant_pk_values;"))

resultset = connection.execute(
    sa.text(
        """
        SELECT *
        FROM sensor_data
        WHERE _id IN (SELECT _id FROM relevant_pk_values);
        """
    )
)

I hope you found this interesting, if you have any questions please do not hesitate to reach out to us through the CrateDB Community.

2 Likes