Working with the MQTT message bus
In this section, we will briefly cover the basics of message buses and how to work with them, in the context of the ToMCAT project.
A message bus is a system that allows communication between software components. There are other ways to achieve this kind of communication (e.g. production and consumption of files, a shared database, remote procedure invocations) - however, an architecture built around a message bus provides a great deal of flexibility and enables asynchronous communication between the components.
It might be useful to visualize the bus as a hierarchical collection of ‘streams’ of data, called ‘topics’. The data on the bus is in the form of ‘messages’, which are usually in a lightweight machine-readable plain text format (in our case, we use JSON messages). Client applications can then ‘subscribe’ and ‘publish’ to specific topics, thus having to deal with only a subset of the information on the bus rather than all of it.
A message broker is a program that serves to handle the flow of messages, routing them as necessary between the publishers and subscribers. It enables asynchronicity by storing messages on a topic until all the subscribers who have subscribed to the topic have received them.
There are a number different types of messaging protocols. In ToMCAT, we use MQTT, since it is the protocol being used in the ASIST cloud testbed being developed by the SIM2 collaboration (Aptima + ASU + CoS). MQTT is a lightweight messaging protocol suitable for embedded computers and IoT (internet of things) systems.
The message broker that we use in ToMCAT is Mosquitto. Communication with the message broker can be done using an executable or a client library. We will go over both these methods.
Starting up the Mosquitto message broker
The invocation to start up the Mosquitto message broker depends on the
operating system and the package manager you’re using (keep in mind that
running the tools/install
script will automatically install Mosquitto for
you).
MacOS
If you installed Mosquitto using MacPorts, you can start it up in the background with the default settings by invoking::
port load mosquitto
If you used Homebrew, you can do::
brew services start mosquitto
Ubuntu
If you’re on Ubuntu, Mosquitto will automatically start up as a background service after installation, so you don’t need to start it up manually.
Message topics and topic hierarchies
Message brokers route messages to/from client applications based on the topics
that the applications published/subscribed to. Topics follow a hierarchical
pattern, with levels in the hierarchy separated by slashes (/
). For
example, consider the topics shown below (a subset of the topics
implemented in ToMCAT).
observations/events/entity_death
observations/events/mob_attacked
If a client application subscribed to observations/events/#
, they would
receive the messages from both the observations/events/entity_death
and
observations/events/mob_attacked
topics.
Publishing/subscribing
There are two ways to publish/subscribe to topics on a message bus - using client executables, or using a client library in your own program.
Using client executables
Using a client executable is a simple, no-frills method to work with a message bus. Let’s go through a small exercise in publishing/subscribing using client executables.
Once you’ve started up the broker, you can subscribe to a topic called
example_topic
using the mosquitto_sub
client::
mosquitto_sub -t example_topic
Then, in a separate terminal window, use the mosquitto_pub
client to
publish a message containing the string Hello world
to example_topic
.:
mosquitto_pub -t example_topic -m "Hello world"
You should see Hello world
printed to standard output in the terminal
window in which mosquitto_sub
was run (see screenshot below for example)
Now, let’s try an example where the message published is not supplied using the
-m
command line flag, but using a pipe instead. Since the messages we will
be producing and consuming are in JSON format, we will use JSON for this
example.
In the same window where you ran mosquitto_pub
last time, run the following
invocation::
echo '{"key": "value"}' | mosquitto_pub -t example_topic -l
You should see the JSON object output to standard output in the window in which
mosquitto_sub
was run.
The string {"key": "value"}
represents a JSON object. It is output to
standard output using echo
, and subsequently piped into mosquitto_pub
using |
. The -l
command line flag tells mosquitto_pub
to treat each
incoming line (separated by newline characters) as separate messages.
Let us now put all these components together, as well as learn a bit more about
another useful tool called jq
(which can be obtained using your package
manager: port/brew/apt-get install jq
), which is a program for working with
JSON at the command line. jq
can be a helpful tool when debugging JSON
message publishing. When used as a filter with no arguments, any JSON piped
into jq will be pretty-printed to the standard output stream.:
$ echo '{"key": "value"}' | jq
{
"key": "value"
}
We will publish the following two JSON messages to topic1
.:
{"key": "value1"}
{"key": "value2"}
For each one, we will extract the value stored in the key ‘key
’ using
jq
, and publish that value to topic2
.
In one terminal window, run the following invocation to set up the jq
processing ‘stream’.:
mosquitto_sub -t topic1 | jq --unbuffered .key | mosquitto_pub -t topic2 -l
In another window, subscribe to topic2
::
mosquitto_sub -t topic2
Then, in a third window, publish two JSON messages to topic1::
echo '{"key": "value1"}' '{"key": "value2"}' | mosquitto_pub -t topic1 -l
In the second window, you’ll see value1
and value2
being printed to
standard output.
This example illustrates an elegant stream processing pipeline setup with MQTT client executables and pipes. Hopefully, this simple exercise has helped you get an intuitive sense of how a message-based communication system works.
Using a client library
For more complex scenarios, it may be necessary to use a client library instead
of a client executable. One example of such a scenario would be when a client
application needs to publish a variety of messages to different topics. While
mosquitto_sub
is capable of subscribing to multiple topics,
mosquitto_pub
is designed to publish to a single topic.
Using a client library is also not a bad idea when your client program needs to process messages from different topics differently. While you could technically design your client to inspect each incoming message and route them based on their contents, it is probably simpler to call out to different functions within a program by inspecting the topic that the message arrived on (which can be done with a client library).
This is the case with the ToMCAT Java mod (which extends the Malmo mod), which uses the Eclipse Paho Java client library. The functionality is wrapped in the MqttService singleton class. Eclipse Paho provides client libraries in a number of languages. However, for our purposes, when we are writing non-Java programs, we’ll use the Eclipse Paho C++ client library.
For an example program that uses the client library, see the C++ agent template.
When to use a client executable versus a client library
For when a client application does not need to publish to more than one topic,
using the mosquitto_pub
and mosquitto_sub
executables provides a couple
of advantages over using a client library. For one, it makes it easier to
switch message brokers in the future - say, if you wanted to use Apache
Kafka for your streaming architecture rather than Mosquitto. Other advantages
include not having to need to compile/link C/C++ programs, manage Java
dependencies, or create a Python virtual environment for development.
On the other hand, a client library offers numerous advantages as your use case becomes more complex - e.g., if you need to publish to multiple topics, or use multithreading/multiprocessing.