ROS2 + Edge Impulse, Part 1: Pub/Sub Node in Python
Build an AI-driven ROS2 node for robotics using an Edge Impulse model and a 3-axis accelerometer.
Last updated
Build an AI-driven ROS2 node for robotics using an Edge Impulse model and a 3-axis accelerometer.
Last updated
Created By: Avi Brown
Public Project Link: https://studio.edgeimpulse.com/public/108508/latest
GitHub Repository: https://github.com/avielbr/edge-impulse/tree/main/ros2/ei_ros2
ROS2 is the world’s most popular robotics development framework. It enables developers to build organized, modular, and scalable robotics platforms with the full force of the open source community behind it. When developing a new robot, it’s common to recycle existing ROS2 packages instead of writing new ones, saving precious time in development.
In this tutorial we will build a recyclable ROS2 node based around an Edge Impulse machine learning model. This node will draw sensor data from a sensor topic, run the data through an Edge Impulse model, and then publish the results of the machine learning to another topic, to which other nodes in the system can subscribe.
For the sake of demonstration, we’ll be using an accelerometer-based machine learning model trained to recognize “circle”, “up_down”, and “side_side” movements.
In this project we’ll learn how to:
Build a pub/sub Edge Impulse node
Fill a sensor data buffer using a subscriber
Import and use a machine learning model from within a ROS2 node
Publish the inferences made by the machine learning node to a topic
Raspberry Pi 4
Adafruit MPU6050 accelerometer + gyroscope module (*)
Ubuntu 20.04 server
ROS2 Foxy Fitzroy
Edge Impulse Linux CLI
VSCode Remote Development extension
(*) BYO sensor For this tutorial we’ll be using a 3-axis accelerometer, but it is general enough to be adapted for just about any type of sensor.
This tutorial is meant to be as general as possible, and as such will not go in depth on building an Edge Impulse project / model. If you need help getting started you can find many high quality examples spanning a variety of sensors, boards, and use cases here.
You should also have the Edge Impulse Linux CLI up and running on your Linux board (RPi, Jetson Nano, etc.). Read about installing the CLI here. Test that it is indeed installed by running:
You should see the CLI initialize.
Navigate to your workspace, and from within ros2_ws/src
run:
This will create a package called “ei_ros2”. Adjust the dependencies to suit whatever libraries your package requires (or edit them using the package.xml
file).
Now we need to build the node that will publish sensor data that in turn will be fed to the machine learning model. Navigate to the “Impulse design” section of your Edge Impulse project, and take a loot at the first block:
This block contains a lot of important information. Let’s break it down:
First, we see that the block expects to receive three input axes (since it is a 3-axis accelerometer).
Next, we see that the model expects 2000ms (2 seconds) worth of data each time it is called.
And finally, the frequency tells us how many times per second the sensor is sampled. In this case, 98 times / second (Hz).
We’ll return to these numbers a couple of times, but for now let’s start building our sensor node!
Make a file called mpu6050_node.py
(or whatever your sensor is called) under /ros2_ws/src/ei_ros2/ei_ros2
.
Here are the libraries we need for this node. Depending on which sensor you’ll need your imports may look different.
I’m using the Float32MultiArray
as the message type, because we want to send an array with three readings (accelerometer X, Y, Z) which are float values.
Now we need a class with a publisher to handle our sensor. A lot of this code will be familiar to you if you have experience with ROS2.
Notice the frequency: This should be the same frequency as the one in the “Impulse design” section of your EI project. This will make it so that sensor data is published at the required rate (1 / frequency).
98 times per second the read_mpu6050()
method is called. Each time the 3 accelerometer axes are read, and sent together in an array to the mpu6050_stream
topic. Once it arrives to the topic it becomes available for our machine learning node to subscribe to.
Adding the main()
function, the full publisher node for the MPU6050 accelerometer is:
Now just run chmod +x mpu6050_node.py
to make your node file executable, since we're using --symlink-install
We’ve arrived to the fun part! Effort was made to make this next chunk of code as reusable as possible. Let’s go through it piece by piece.
First let’s download our model from Edge Impulse.
From the terminal, run:
After entering your credentials you should see:
Select the project you want, and hit Enter. Now you should see this message:
Take note of the file location and name. You should now copy and paste this file to the same directory where the sensor / Edge Impulse nodes are located (/ros2_ws/src/ei_ros2/ei_ros2
).
Now let’s build the node!
Make a file called ei_node.py
(or something) under /ros2_ws/src/ei_ros2/ei_ros2
.
Note that in this case we need both String
and Float32MultiArray
:
Float32MultiArray
for subscribing to the sensor node we just created
And String
, for publishing the machine learning results to an inference stream
There’s a lot going on here, but the important parts have numbered comments beside them, so let’s take them one by one:
“Change subscription” - Make sure you are subscribed to the correct topic (the same one that the sensor node is publishing to)
“Adjust timing” - This timer calls the function that runs the machine learning model. Basically, the function will only run the model once the buffer (which we’ll discuss in a moment) is full of data, so this timer essentially means: How often do you want to check if the buffer is full. If time isn’t an issue, you can set it to check every couple of seconds. Here I want to run the model as soon as I have a full buffer, so I set it to check every 0.01 seconds.
“Check file name” - The name of the .eim file here should match the name of the file downloaded via the CLI.
“Set max length” - The model expects to receive a specific amount of data, so we need to set the buffer (the array of sensor data that is passed to the model) to a specific length. How do we find the length? If we return to the “Impulse design” section of our Edge Impulse project, we’ll see that the window length is 2 seconds, where the sensor is sampled 98 times per second, each time giving 3 values. So: 2 * 98 * 3 = 588.
Don’t stress too much about this part — if you put the wrong number in, you’ll get an error telling you what the correct number is 🙂
This class handles passing the sensor data to the Edge Impulse model file we downloaded.
Note: The classify()
method returns a string with the name of the most likely result. Machine learning inferences look like: {'dog': 0.8, 'cat': 0.2, 'fish': 0.1}
. This function returns only the string with the highest probability.
Now we can add the main()
function and we’re ready to go!
Full Edge Impulse Pub/Sub code:
Once again run chmod +x ei_node.py
to make your node file executable.
In your [setup.py](http://setup.py)
file add entry points for your nodes:
Navigate to ros2_ws
(or your main ROS2 workspace directory) and build the package using:
Once that finishes, we’re ready to for tests!
Let’s run both the sensor node and the Edge Impulse machine learning node. From two separate terminals run:
And now in a third terminal let’s listen on the inference_stream
topic using:
Now let’s move our accelerometer around and see what appears on the inference_stream
topic:
Looking good!
In this tutorial we looked at how to make a AI-powered pub/sub node in ROS2. This tutorial is part of an Edge Impulse + ROS2 series, and in Part 2 we’ll look at how to use a service / client architecture. The contents of this tutorial can be generalized to suit just about any sensor and use case, so please don’t hesitate to try it out yourself! Feel free to ask questions at forum.edgeimpulse.com, or on the YouTube video version of this tutorial here: