Setup HiveMQ using CrateDB as a consumer

What is HiveMQ?

HiveMQ is the proven enterprise MQTT platform because it’s reliable under real-world stress and built for flexibility, security, and scale.

HiveMQ website
HiveMQ Install guide

Goal of this blog

The goal of this blog is to give you an overview of how to set up HiveMQ using CrateDB as a consumer.

Prerequisites and Assumptions

  • This installation was executed on a Ubuntu VM.
  • You have a CrateDB instance or cluster running.
  • Have a user in CrateDB that the MQTT broker can use to connect and write to.

Install HiveMQ

  • mkdir hivemq
  • cd hivemq
  • curl -L https://www.hivemq.com/releases/hivemq-4.27.0.zip -o hivemq-4.27.0.zip
  • unzip hivemq-*.zip

A couple of directories are important.

  • bin => where the executables are located
  • conf => where the configuration files are.
  • conf/examples => This section contains examples for different options, such as setting up a cluster and configuring websockets.
  • extensions => All the extensions to connect to different databases are found here.
  • tools => The tools like swarm and the CLI are to be found here

Run HiveMQ

To run HiveMQ simply run ./bin/run.sh to start both Control-Center (CC) and MQTT Broker. When you run HiveMQ on a VM like mine, then you need to make sure that CC is reachable on all IPs (0.0.0.0) instead of only on the loopback address (127.0.0.1). Please use this config.xml to make both CC and the Broker available on all IPs or if you want it to listen on 1 specific IP:

<?xml version="1.0" encoding="UTF-8" ?>
<hivemq xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:noNamespaceSchemaLocation="config.xsd">

    <listeners>
        <tcp-listener>
            <port>1883</port>
            <bind-address>0.0.0.0</bind-address>
        </tcp-listener>
    </listeners>

    <control-center>
        <enabled>true</enabled>
        <listeners>
            <http>
                <port>8080</port>
                <bind-address>0.0.0.0</bind-address>
            </http>

        </listeners>
    </control-center>

</hivemq>

Execute ./bin/run.sh to start HiveMQ.

This will start Hive and you will see a screen similar:

2024-04-09 08:59:46,122 INFO  - Starting HiveMQ Enterprise Server
2024-04-09 08:59:46,123 INFO  - HiveMQ version: 4.27.0
2024-04-09 08:59:46,123 INFO  - HiveMQ home directory: hivemq/hivemq-4.27.0
2024-04-09 08:59:46,322 WARN  - Data Hub Transformation and Scripting is currently not supported on ARM64 Linux and has been disabled.
2024-04-09 08:59:46,322 INFO  - Successfully loaded configuration from 'hivemq/hivemq-4.27.0/conf/config.xml'.
2024-04-09 08:59:46,378 INFO  - This node's ID is wm0bK
2024-04-09 08:59:46,378 INFO  - Clustering is disabled
2024-04-09 08:59:48,318 INFO  - No valid license file found. Using trial license, restricted to 25 connections.
2024-04-09 08:59:48,455 INFO  - No valid license file for Data Hub found. Using free license, restricted to 1 policy.
2024-04-09 08:59:48,529 INFO  - This node uses '4' CPU cores.
2024-04-09 08:59:48,539 INFO  - Starting HiveMQ extension system.
2024-04-09 08:59:48,572 INFO  - Starting extension with id "hivemq-allow-all-extension" at hivemq/hivemq-4.27.0/extensions/hivemq-allow-all-extension
2024-04-09 08:59:48,576 WARN  -
################################################################################################################
# This HiveMQ deployment is not secure! You are lacking Authentication and Authorization.                      #
# Right now any MQTT client can connect to the broker with a full set of permissions.                          #
# For production usage, add an appropriate security extension and remove the hivemq-allow-all extension.       #
# You can download security extensions from the HiveMQ Marketplace (https://www.hivemq.com/extensions/).       #
################################################################################################################
2024-04-09 08:59:48,576 INFO  - Extension "Allow All Extension" version 1.0.0 started successfully.
2024-04-09 08:59:50,632 INFO  - wm0bK: no members discovered after 2000 ms: creating cluster as first member
2024-04-09 08:59:50,682 INFO  - No user for HiveMQ Control Center configured. Starting with default user
2024-04-09 08:59:50,682 INFO  - Starting HiveMQ Control Center on address 0.0.0.0 and port 8080
2024-04-09 08:59:50,766 INFO  - Control Center Audit Logging started.
2024-04-09 08:59:50,766 INFO  - Started HiveMQ Control Center in 84ms
2024-04-09 08:59:50,773 INFO  - Starting TCP listener on address 0.0.0.0 and port 1883
2024-04-09 08:59:50,781 INFO  - Started TCP Listener on address 0.0.0.0 and on port 1883.
2024-04-09 08:59:50,781 INFO  - Started HiveMQ in 4171ms

You should now be able to connect to the CC. Using https://<use.your.ip.here>:8080

Log in using the default username and password. Please change this quickly.

U:admin
P:hivemq

Be aware that CC doesn’t give you much control, but you can use it as an overview tool.

The MQTT CLI

This is an extensive CLI where you cannot only use the MQTT command but also swarm and configure the broker.

But mostly this is used as an MQTT tool to publish messages to the broker.

Running it will give you some insight into the options:

~/hivemq/hivemq-4.27.0/tools/mqtt-cli/bin/mqtt
Usage:  mqtt [-hV] { pub | sub | shell | test | hivemq | swarm }

MQTT Command Line Interpreter.

Options:
  -h, --help      Show this help message and exit.
  -V, --version   Print version information and exit.

Commands:
  pub, publish    Publish a message to a list of topics.
  sub, subscribe  Subscribe an MQTT client to a list of topics.
  shell, sh       Starts MqttCLI in shell mode, to enable interactive mode with further sub commands.
  test            Tests the specified broker on different MQTT feature support and prints the results.
  hivemq          HiveMQ Command Line Interpreter.
  swarm           HiveMQ Swarm Command Line Interpreter.

The simplest way is to publish a message to the broker using:

~/hivemq/hivemq-4.27.0/tools/mqtt-cli/bin/mqtt pub -t test -m test

Options and flags used explained:

  • pub => publish
  • -t => topic
  • -m => message
  • -h => host. When not specified, it will connect to the local broker.

Running this will have no effect because there is no subscription to this topic yet. So, in order to subscribe to this topic or to any topic, you can run the following using the CLI. Note that this statement runs in the foreground. If you want to run it in the background, you can use screen or nohup.

~/hivemq/hivemq-4.27.0/tools/mqtt-cli/bin/mqtt sub -t test -J

Options and flags used explained:

  • sub => Subscribe
  • -t => Topic. In this case, I used test but if you want to subscribe on all topics you can use the wildcard “#”
  • -J => Will give you JSON output

Now, when running a pub command from another window again, you should see the message coming through.

~/hivemq/hivemq-4.27.0/tools/mqtt-cli/bin/mqtt pub -t test -m "this is a test"

In the subscribe window you should see the message:

{
  "topic": "test",
  "payload": "this is a test",
  "qos": "AT_MOST_ONCE",
  "receivedAt": "2024-04-09 09:45:29",
  "retain": false
}

So, those are the basics for running a broker, subscribing to a topic, and generating a message. When you look at the CC, you will also see some of these actions reflected in connections/subscriptions. Also, when you navigate to clients, you will see that currently, there is one client connected.

When you click on the client id you will get more details about this client.

Store the messages in CrateDB

Within HiveMQ, you can use multiple extensions. Extensions are also used to make database connections (see extension directory).

image

In order to start using extensions, you first need to enable a security extension. There are a number of extensions available. Out of the box, the allow-all-extension is used, but you normally would go for the enterprise-security-extension. For testing purposes, this is a good one to start with. As you can see in the logs when starting the broker, this is already active, so unless you want to change this, you do not need to do anything at this point.

The extensions are hot-loadable, so if you delete the DISABLED file in a curtain directory of an extension, you will see from the MQTT console that this is being activated. If you have a license, then that will stay activated; otherwise, the DISABLED file will pop up again for that extension.

So enabling the postgres-extention is as easy as removing the DISABLED file. But in order to connect to a CrateDB database you will need to configure the config.xml that is found in the ./extensions/hivemq-postgresql-extension/conf/ directory. Open that and configure the following:

    <postgresqls>
        <postgresql>
            <id>my-postgresql-id</id>
            <host>localhost</host>
            <port>5432</port>
            <database>crate</database>
            <username>mqtt</username>
            <password>welcome</password>
        </postgresql>
    </postgresqls>

If you use the “INSERT STATEMENT processor” you need to add the schema name before the table name. For example, if you want to ingest the data into the mqtt.mqtt_to_postgresql_table you need to change the

accordingly like:

                    <table>mqtt.mqtt_to_postgresql_table</table>

Note: In the database, I created a user mqtt with the needed privileges. As a quick test, you can test your connectivity using psql.

sudo apt-get install -y postgresql-client
psql -h localhost -U mqtt -W
Password:
psql (14.11 (Ubuntu 14.11-0ubuntu0.22.04.1), server 14.0)
mqtt=>

In the config file, you can also configure things like topic filter, batching, table_name, columnnames, and so on. We will leave them default for now. Now that we have the basic connectivity defined, we can move on to the next stage. As this is a Postgres extension, we need to make sure the table is created using the correct datatypes.

Before enabling the extension, make sure you have the table created.

CREATE TABLE mqtt.mqtt_to_postgresql_table
(
    topic                    TEXT,
    payload_utf8             TEXT
);

Now, you can enable the extension by deleting the DISABLED file in the postgres-extention directory.

If things are set up correctly config file you should see this type of message on the Broker console:

2024-04-09 12:32:55,175 INFO  - Starting extension with id "hivemq-postgresql-extension" at hivemq/hivemq-4.27.0/extensions/hivemq-postgresql-extension
2024-04-09 12:32:55,226 INFO  - HiveMQ Enterprise Extension for PostgreSQL: Successfully loaded configuration from 'hivemq/hivemq-4.27.0/extensions/hivemq-postgresql-extension/conf/config.xml'.
2024-04-09 12:32:55,240 INFO  - jasync selected transport - nio
2024-04-09 12:32:55,257 INFO  - registering pool for periodic connection tests com.github.jasync.sql.db.pool.ActorBasedObjectPool@17b76599 - PoolConfiguration(maxObjects=1, maxIdle=60000, maxQueueSize=2147483647, validationInterval=5000, createTimeout=10000, testTimeout=5000, queryTimeout=null, coroutineDispatcher=Dispatchers.Default, maxObjectTtl=null, minIdleObjects=null)
2024-04-09 12:32:55,280 INFO  - HiveMQ Enterprise Extension for PostgreSQL: Successfully registered consumer for <mqtt-to-postgresql-route> 'my-mqtt-to-postgresql-route-insert'.
2024-04-09 12:32:55,281 INFO  - Extension "HiveMQ Enterprise Extension for PostgreSQL" version 4.27.0 started successfully.

Now, when you send a message to the broker, this should be ingested into CrateDB.
~/hivemq/hivemq-4.27.0/tools/mqtt-cli/bin/mqtt pub -t test -m "message 01"

When querying the database you will see this:

topic	payload_utf8
test	 message 01

This simple test uses the "INSERT STATEMENT processor”; the other option would be to use the “STATEMENT TEMPLATE processor” instead. For this, you need to follow the following steps:

  • Disable the “INSERT STATEMENT processor” by commenting that block out.
  • Uncomment the “STATEMENT TEMPLATE processor” block.
  • Copy the statement-template.sql from the examples dir to the root of the postgres-extention dir.
  • Create a matching table in the CrateDB database. The default statement-template uses this format:

create-table

CREATE TABLE mqtt.mqtt_to_postgresql_table
(
    topic                    TEXT,
    payload_utf8             TEXT,
    qos                      TEXT,
    retain                   BOOLEAN,
    packet_id                INT,
    payload_format_indicator TEXT,
    response_topic           TEXT,
    correlation_data_utf8    TEXT,
    user_properties          ARRAY(TEXT),
    arrival_timestamp        TIMESTAMP
);
  • Edit the statement-template.sql file like this example below.

statement-template

INSERT INTO mqtt.mqtt_to_postgresql_table(topic, payload_utf8, qos, retain, packet_id, payload_format_indicator, response_topic, correlation_data_utf8, user_properties, arrival_timestamp)
VALUES (${mqtt-topic},
        ${mqtt-payload-utf8},
        ${mqtt-qos},
        ${mqtt-retain},
        ${mqtt-packet-id},
        ${mqtt-payload-format-indicator},
        ${mqtt-response-topic},
        ${mqtt-correlation-data-utf8},
        ([${mqtt-user-properties-json}]::ARRAY(OBJECT)),
        ${timestamp-iso-8601});

Note: The mqtt-user-properties-json value needs to be casted as an ARRAY in order to be ingested into the mqtt.mqtt_to_postgresql_table table!

Reload the Postgres extension by creating a file named DISABLED in the directory and checking the disablement progress in the broker console. Delete the DISABLED file again to enable the extension again.

HiveMQ-Swarm

Load and reliability testing of MQTT systems are imperative for any business-critical IoT solution. `HiveMQ Swarm provides the distributed simulation environment to successfully test millions of MQTT clients, millions of MQTT messages, and hundreds of thousands of MQTT topic names.

Before you start make sure that you have created the table using the following code here.

Make sure you also changed the statement-template.sql file as described here.

In the tools directory, you will find a directory named hivemq-swarm. More or less the same directory structure as the broker itself. In the scenarios directory you will find a senario.xml. In this file, a couple of things are defined. Like:

  • brokers => This assumes 1, but if you have multiple brokers or a cluster, then you can define those here.
  • clientGroups => Defines the amount (count) and the naming (clientIfPattern).
  • topicGroups => Defines the amount (count) and the naming (topicNamePattern).
  • stages => Defines the stages that every clientgroup will follow.

In the basic config, you have 1 client publishing to 10 clientgroups, and that publishing stage is repeated 10 times, so this swarm scenario will produce 1000 messages in total.

To start this make sure your MQTT Broker is running and then execute:

hivemq/hivemq-4.27.0/tools/hivemq-swarm/bin/hivemq-swarm

This will kick off the swarm process. Be aware that normally is done in a few seconds but if all goes well you should see something like this:

-------------------------------------------------------------------------

  _   _ _           __  __  ___
 | | | (_)_   _____|  \/  |/ _ \
 | |_| | \ \ / / _ \ |\/| | | | |
 |  _  | |\ V /  __/ |  | | |_| |
 |_|_|_|_| \_/ \___|_|  |_|\__\_\
 / ___|_      ____ _ _ __ _ __ ___
 \___ \ \ /\ / / _` | '__| '_ ` _ \
  ___) \ V  V / (_| | |  | | | | | |
 |____/ \_/\_/ \__,_|_|  |_| |_| |_|

-------------------------------------------------------------------------
2024-04-09 17:38:23,559 INFO   - HiveMQ Swarm Version: 4.27.0
2024-04-09 17:38:23,563 INFO   - Log Configuration was overridden by hivemq/hivemq-4.27.0/tools/hivemq-swarm/config/logback.xml
2024-04-09 17:38:23,730 INFO   - Discovered configuration at hivemq/hivemq-4.27.0/tools/hivemq-swarm/config/config.xml
2024-04-09 17:38:23,931 INFO   - Successfully loaded ExtensionMain 'class com.hivemq.swarm.standard.security.SecurityExtensionMain' from 'hivemq/hivemq-4.27.0/tools/hivemq-swarm/extensions/standard-security-extension-4.27.0.jar'
2024-04-09 17:38:23,975 INFO   - Waiting for Commander connection on localhost:3881
2024-04-09 17:38:23,984 INFO   - Checking license.
2024-04-09 17:38:23,986 INFO   - No valid license file found. Using trial license, restricted to 25 clients and 1 agent.
2024-04-09 17:38:23,987 INFO   - Attempting to connect to agents 0/1.
2024-04-09 17:38:23,988 INFO   - Connecting to agent localhost:3881
2024-04-09 17:38:23,996 INFO   - Commander successfully connected to Agent localhost:3881
2024-04-09 17:38:23,997 INFO   - All Agents connected.
2024-04-09 17:38:24,006 INFO   - Commander connected on localhost:3881
2024-04-09 17:38:24,016 INFO   - Scenario data distributed.
2024-04-09 17:38:24,017 INFO   - Scenario in progress: Stage with id 's1' (1/3).
2024-04-09 17:38:24,137 INFO   - Scenario in progress: Stage with id 's2' (2/3).
2024-04-09 17:38:24,170 INFO   - Scenario in progress: Stage with id 's3' (3/3).
2024-04-09 17:38:24,181 INFO   - Scenario finished.
2024-04-09 17:38:24,181 INFO   - Connection to Commander lost.
2024-04-09 17:38:24,181 INFO   - Agent localhost/<use.your.ip.here>:3881 connection lost.
2024-04-09 17:38:24,209 INFO   - Shutting down agent.
2024-04-09 17:38:24,209 INFO   - Closed connection to Agent localhost/<use.your.ip.here>:3881

Switching back to the database, you will see that you have 100 rows in your mqtt'. ' mqtt_to_postgresql_table table. Even though we configured 10 clients, because I don’t have a valid license, I’m only able to use 1 client. If you want to have more messages, you can change, for example, the count of the publish stage in the stages.

        <stage id="s2">
            <lifeCycle id="s2.l1" clientGroup="cg1">
                <publish topicGroup="tg1" count="100"/>
            </lifeCycle>

Conclusion

This howto was created to give you a quick start to using MQTT with CrateDB.

1 Like