Load testing CrateDB using Locust

Introduction

As with every other database, users want to run performance tests to get a feeling of the performance of their workload.

CrateDB offers a couple of tools that can be used for specific use cases. For example, the NodeIngestBench allows you to run high-performance ingest benchmarks against a CrateDB cluster or use the TimeIt function within the cr8 toolkit to measure the runtime of a given SQL statement on a cluster.

We use Locust as the framework to run load tests with a customizable set of SQL statements. Locust is a great, flexible, open-source (Python) framework that can swarm the database with users and get the RPS (request per second) for different queries. This small blog shows how to use Locust to load test CrateDB in your environment.

For this blog, I’m running a 3-node cluster created in a local docker environment as described in this tutorial.

First, we must set up the data model and load some data. I’m using DBeaver to connect in this case, but this can be done by either the CrateDB CLI tools or the Admin UI that comes with either the self- or fully-managed CrateDB solution.

Create the following tables:

CREATE TABLE "weather_data" (
       "timestamp" TIMESTAMP,
       "location" VARCHAR,
       "temperature" DOUBLE,
       "humidity" DOUBLE,
       "wind_speed" DOUBLE
);
CREATE TABLE IF NOT EXISTS "weekly_aggr_weather_data"(
       "week" TIMESTAMP,
       "location" VARCHAR,     
       "avgtemp" DOUBLE,
       "maxhumid" DOUBLE,
       "minwind" DOUBLE,
       "lastupdated" TIMESTAMP,
       primary key (week, location)
);

Create the user used further down the line.

CREATE USER locust with (password = 'loadtest');
GRANT ALL PRIVILEGES ON table weather_data to locust;
GRANT ALL PRIVILEGES ON table weekly_aggr_weather_data to locust;

Load some data into the weather_data table by using the following statement.

COPY weather_data
FROM 'https://github.com/crate/cratedb-datasets/raw/main/cloud-tutorials/data_weather.csv.gz'
WITH (format = 'csv', compression = 'gzip', empty_string_as_null = true);

The weather_data table should now have 70k rows of data.

select count(*) from weather_data;

count(*)|
--------+
   70000|

We leave the other table empty as that one will be populated as part of the load test.

Install Locust

In this case, I installed Locust on my Mac, but in an acceptance environment, you probably want to run this Locust on one or more “driver” machines. Especially when you want to push the database, you will need enough firepower on the driver side to push the database.

Install Python (3.8 or later)

# Install pip3
pip3 install locust

Validate your installation

locust -V
# locust 2.18.3 from /opt/homebrew/lib/python3.11/site-packages/locust (python 3.11.6)

Run Locust

Start with a simple test to ensure the connectivity is there and you can connect to the database. Copy the code below and write to a file named locustfile.py.

import time

from locust import HttpUser, task, between, User, tag, constant_throughput


class QuickstartUser(HttpUser):
    # wait_time = between(1, 50)
    # Using constant_throughput in combination with the number of users to start with gives the option to control the pace.
    # Using a constant_throughput of 1 will ensure that a task runs at least 1x per sec.
    # https://docs.locust.io/en/stable/writing-a-locustfile.html#wait-time-attribute
    # Starting with 200 users will end up in 200 queries/sec.
    wait_time = constant_throughput(1.0)

    # Add this to prevent SSL Check.
    def on_start(self):
        self.client.verify = False

    # Start with the queries you want to execute
    @tag("query0")
    @task(1)
    def query0(self):
        self.client.post(
            "/_sql",
            json={"stmt": "SELECT * FROM weather_data LIMIT 100"},
            auth=("locust", "loadtest"),
            name="query0",
        )

Some explanation on some of the code above :point_up:

With the wait_time = between (1, 5), you can control the amount of queries and the randomization of the queries by using between. This will execute the different queries with a random interval between 1 and 5 sec. Another option that will give you more control over the amount of executed queries per second is using the wait_time = constant_throughput(1.0), which will execute 1 of the queries per second for every user, or if you set it to (2.0), will execute two queries every second.

For every query you want to include in your test, you will need to create a block like this:

    # Start with the queries you want to execute
    @tag("query0")
    @task(1)
    def query0(self):
        self.client.post(
            "/_sql",
            json={"stmt": "SELECT * FROM weather_data LIMIT 100"},
            auth=("locust", "loadtest"),
            name="query0",
        )

The @tag is just a logical name you want to give to this query. With @task, you give weight to the queries. If you have two questions and want to execute one of the statements twice as often, you can control this by changing @task(2). The auth piece in the self.client.post portion is the username and password you want to use to connect to the database. The name in that section will be visible on the Locust report.

Let’s see how to run Locust. Run the following command to start the locust application. The --web-auth option is just a way to prevent passwordless connections to the application.

locust --web-auth "crate:crate"

Open a browser and navigate to the web interface (at http://localhost:8089).

In this example, sign in using the username and password specified while starting Locust (--web-auth username:password).

signin-locust

This will take you to the “Start new load test” page.

Define the number of users and the spawn rate. As this is an initial test, we leave the numbers as they are. Fill in the connection details of your CrateDB. This runs on my localhost on port 60961 (the default HTTP port is 4200).

start-initial-loadtest

Click “Start swarming” to start the load test.

As you can see, is 1 query being executed with an RPS of 1. The number of failures should be 0. If you stop the test and start a New test with ten users, you should get an RPS of 10.

Now that we have confirmed that Locust is running correctly, we can expand the number of queries we execute. For this blog, we are adding the following queries.

-- Avg Temperature per City
SELECT location, round(AVG(temperature)) AS avg_temp
FROM weather_data
WHERE location = 'CITY'
GROUP BY location
ORDER by 2 DESC;

-- When was Max Temp 
SELECT location,
    max(temperature) AS highest_temp,
    max_by(timestamp, temperature) AS time_of_highest_temp
FROM weather_data
GROUP BY location;

-- Bridge the gaps (not all readings have values and with LAG and LEAD we can calculate the missing values). 
WITH OrderedData AS (
    SELECT timestamp,
           location,
           temperature,
           LAG(temperature, 1) IGNORE NULLS OVER (ORDER BY timestamp) AS prev_temp,
           LEAD(temperature, 1) IGNORE NULLS OVER (ORDER BY timestamp) AS next_temp
    FROM weather_data
)
SELECT timestamp,
       location,
       temperature,
       (prev_temp + next_temp) / 2 AS interpolated_temperature
FROM OrderedData
ORDER BY location, timestamp
LIMIT 1000;

-- Bridge the Gaps per City 
WITH minmax AS (
    SELECT location,
           min (timestamp) as mintstamp,
           max (timestamp) as maxtstamp
    FROM weather_data
    where location = 'CITY'
    group by location
)
SELECT a.timestamp,
           a.location,
           a.temperature,
           LAG(a.temperature, 1) IGNORE NULLS OVER (ORDER BY timestamp) AS prev_temp,
           LEAD(a.temperature, 1) IGNORE NULLS OVER (ORDER BY timestamp) AS next_temp
    FROM weather_data a,minmax b
WHERE a.location = b.location
AND a.timestamp between (b.mintstamp) and (b.maxtstamp)
ORDER by 1;

-- Upsert the Aggr per week
insert into weekly_aggr_weather_data (week,location,avgtemp,maxhumid,minwind,lastupdated)
(select distinct(date_trunc('week', timestamp)) as week,
location,
avg(temperature),
max(humidity),
min(wind_speed),
now()
FROM weather_data
group by 1, location) ON CONFLICT (week,location) DO UPDATE SET  avgtemp = excluded.avgtemp, maxhumid = excluded.maxhumid, minwind = excluded.minwind, lastupdated = excluded.lastupdated;

We create an array of different cities to randomize the city used. In the execution of the queries, we randomly choose one of those to execute.

cities = ["Berlin", "Dornbirn", "Redwood City", "Vienna", "Zurich"]

This will be used in queries 01 and 04.

These queries in a locustfile.py will look like this:

import time
import warnings
import random
from locust import HttpUser, task, between, User, tag, constant_throughput

cities = ["Berlin", "Dornbirn", "Redwood City", "Vienna", "Zurich"]


class QuickstartUser(HttpUser):
    # wait_time = between(1, 50)
    # Using constant_throughput in combination with the number of users to start with gives the option to control the pace.
    # Using a constant_throughput of 1 will ensure that a task runs at least 1x per sec.
    # https://docs.locust.io/en/stable/writing-a-locustfile.html#wait-time-attribute
    # Starting with 200 users will end up in 200 queries/sec.

    warnings.filterwarnings("ignore")
    wait_time = constant_throughput(1.0)

    def on_start(self):
        self.client.verify = False

    @tag("query01")
    @task(5)
    def query01(self):
        id = random.choice(cities)
        #       print(f"Randomly chosen value: {id}")
        self.client.post(
            "/_sql",
            json={
                "stmt": f"SELECT location, round(AVG(temperature)) AS avg_temp FROM weather_data where location = '{id}' GROUP BY location ORDER by 2 DESC"
            },
            auth=("locust", "loadtest"),
            name="Avg Temperature per City",
        )

    @tag("query02")
    @task(1)
    def query02(self):
        self.client.post(
            "/_sql",
            json={
                "stmt": "SELECT location, max(temperature) AS highest_temp, max_by(timestamp, temperature) AS time_of_highest_temp FROM weather_data GROUP BY location"
            },
            auth=("locust", "loadtest"),
            name="When was Max Temp",
        )

    @tag("query03")
    @task(1)
    def query03(self):
        self.client.post(
            "/_sql",
            json={
                "stmt": "WITH OrderedData AS (SELECT timestamp,location,temperature,LAG(temperature, 1) IGNORE NULLS OVER (ORDER BY timestamp) AS prev_temp,LEAD(temperature, 1) IGNORE NULLS OVER (ORDER BY timestamp) AS next_temp FROM weather_data) SELECT timestamp,location,temperature,(prev_temp + next_temp) / 2 AS interpolated_temperature FROM OrderedData ORDER BY location, timestamp LIMIT 1000"
            },
            auth=("locust", "loadtest"),
            name="Bridge the Gaps",
        )

    @tag("query04")
    @task(5)
    def query04(self):
        id = random.choice(cities)
        #       print(f"Randomly chosen value: {id}")
        self.client.post(
            "/_sql",
            json={
                "stmt": f"WITH minmax AS (SELECT location,min (timestamp) as mintstamp,max (timestamp) as maxtstamp FROM weather_data where location = '{id}' group by location) SELECT a.timestamp,a.location,a.temperature,LAG(a.temperature, 1) IGNORE NULLS OVER (ORDER BY timestamp) AS prev_temp,LEAD(a.temperature, 1) IGNORE NULLS OVER (ORDER BY timestamp) AS next_temp FROM weather_data a,minmax b WHERE a.location = b.location AND a.timestamp between (b.mintstamp) and (b.maxtstamp) ORDER by 1"
            },
            auth=("locust", "loadtest"),
            name="Bridge the Gaps per City",
        )

    @tag("query05")
    @task(1)
    def query05(self):
        self.client.post(
            "/_sql",
            json={
                "stmt": "insert into weekly_aggr_weather_data (week,location,avgtemp,maxhumid,minwind,lastupdated) (select distinct(date_trunc('week', timestamp)) as week,location,avg(temperature),max(humidity),min(wind_speed),now() FROM weather_data group by 1, location) ON CONFLICT (week,location) DO UPDATE SET  avgtemp = excluded.avgtemp, maxhumid = excluded.maxhumid, minwind = excluded.minwind, lastupdated = excluded.lastupdated"
            },
            auth=("locust", "loadtest"),
            name="Upsert the Aggr per week",
        )

Note that the weight (of query01 and query04) is five compared to the rest, which has a weight of 1, which means that the likelihood that two queries will execute is five times higher than the others. This shows how you can influence the weight of the different queries.

Let’s run this load test and see what happens.

I started the run with 100 users.

You can see that 100 users running those five queries result in 100 requests per second (this is because we set the wait_time = constant_throughput(1.0)). The two “per City” queries are executed ~5x as much as the others.

On the second tab in Locust, you see the Charts of the same data.

If you want to download the locust data, you can do that on the last tab.

download-stats-100users

Conclusion

When you want to run a load test against a CrateDB Cluster with multiple queries, Locust is a great and flexible tool that lets you quickly define a load test and see what numbers regarding users and RPS are possible for that particular setup.

4 Likes