LogoLogo
HomeDocsAPI & SDKsProjectsForumStudio
  • Welcome
    • Featured Machine Learning Projects
      • Getting Started with the Edge Impulse Nvidia TAO Pipeline - Renesas EK-RA8D1
      • Smart City Traffic Analysis - NVIDIA TAO + Jetson Orin Nano
      • ROS 2 Pick and Place System - Arduino Braccio++ Robotic Arm and Luxonis OAK-D
      • Optimize a cloud-based Visual Anomaly Detection Model for Edge Deployments
      • Rooftop Ice Detection with Things Network Visualization - Nvidia Omniverse Replicator
      • Surgery Inventory Object Detection - Synthetic Data - Nvidia Omniverse Replicator
      • NVIDIA Omniverse - Synthetic Data Generation For Edge Impulse Projects
      • Community Guide – Using Edge Impulse with Nvidia DeepStream
      • Computer Vision Object Counting - Avnet RZBoard V2L
      • Gesture Appliances Control with Pose Detection - BrainChip AKD1000
      • Counting for Inspection and Quality Control - Nvidia Jetson Nano (TensorRT)
      • High-resolution, High-speed Object Counting - Nvidia Jetson Nano (TensorRT)
    • Prototype and Concept Projects
      • Renesas CK-RA6M5 Cloud Kit - Getting Started with Machine Learning
      • TI CC1352P Launchpad - Getting Started with Machine Learning
      • OpenMV Cam RT1062 - Getting Started with Machine Learning
      • Getting Started with Edge Impulse Experiments
  • Computer Vision Projects
    • Workplace Organizer - Nvidia Jetson Nano
    • Recyclable Materials Sorter - Nvidia Jetson Nano
    • Analog Meter Reading - Arduino Nicla Vision
    • Creating Synthetic Data with Nvidia Omniverse Replicator
    • SonicSight AR - Sound Classification with Feedback on an Augmented Reality Display
    • Traffic Monitoring - Brainchip Akida
    • Multi-camera Video Stream Inference - Brainchip Akida
    • Industrial Inspection Line - Brainchip Akida
    • X-Ray Classification and Analysis - Brainchip Akida
    • Inventory Stock Tracker - FOMO - BrainChip Akida
    • Container Counting - Arduino Nicla Vision
    • Smart Smoke Alarm - Arduino Nano 33
    • Shield Bot Autonomous Security Robot
    • Cyclist Blind Spot Detection - Himax WE-I Plus
    • IV Drip Fluid-Level Monitoring - Arduino Portenta H7
    • Worker PPE Safety Monitoring - Nvidia Jetson Nano
    • Delivered Package Detection - ESP-EYE
    • Bean Leaf Disease Classification - Sony Spresense
    • Oil Tank Measurement Using Computer Vision - Sony Spresense
    • Object Counting for Smart Industries - Raspberry Pi
    • Smart Cashier with FOMO - Raspberry Pi
    • PCB Defect Detection with Computer Vision - Raspberry Pi
    • Bicycle Counting - Sony Spresense
    • Counting Eggs with Computer Vision - OpenMV Cam H7
    • Elevator Passenger Counting - Arduino Nicla Vision
    • ESD Protection using Computer Vision - Seeed ReComputer
    • Solar Panel Defect Detection - Arduino Portenta H7
    • Label Defect Detection - Raspberry Pi
    • Dials and Knob Monitoring with Computer Vision - Raspberry Pi
    • Digital Character Recognition on Electric Meter System - OpenMV Cam H7
    • Corrosion Detection with Computer Vision - Seeed reTerminal
    • Inventory Management with Computer Vision - Raspberry Pi
    • Monitoring Retail Checkout Lines with Computer Vision - Renesas RZ/V2L
    • Counting Retail Inventory with Computer Vision - Renesas RZ/V2L
    • Pose Detection - Renesas RZ/V2L
    • Product Quality Inspection - Renesas RZ/V2L
    • Smart Grocery Cart Using Computer Vision - OpenMV Cam H7
    • Driver Drowsiness Detection With FOMO - Arduino Nicla Vision
    • Gastroscopic Image Processing - OpenMV Cam H7
    • Pharmaceutical Pill Quality Control and Defect Detection
    • Deter Shoplifting with Computer Vision - Texas Instruments TDA4VM
    • Smart Factory Prototype - Texas Instruments TDA4VM
    • Correct Posture Detection and Enforcement - Texas Instruments TDA4VM
    • Visual Anomaly Detection with FOMO-AD - Texas Instruments TDA4VM
    • Surface Crack Detection and Localization - Texas Instruments TDA4VM
    • Surface Crack Detection - Seeed reTerminal
    • Retail Image Classification - Nvidia Jetson Nano
    • SiLabs xG24 Plus Arducam - Sorting Objects with Computer Vision and Robotics - Part 1
    • SiLabs xG24 Plus Arducam - Sorting Objects with Computer Vision and Robotics - Part 2
    • Object Detection and Visualization - Seeed Grove Vision AI Module
    • Bike Rearview Radar - Raspberry Pi
    • Build a Self-Driving RC Vehicle - Arduino Portenta H7 and Computer Vision
    • "Bring Your Own Model" Image Classifier for Wound Identification
    • Acute Lymphoblastic Leukemia Classifier - Nvidia Jetson Nano
    • Hardhat Detection in Industrial Settings - Alif Ensemble E7
    • Motorcycle Helmet Identification and Traffic Light Control - Texas Instruments AM62A
    • Import a Pretrained Model with "Bring Your Own Model" - Texas Instruments AM62A
    • Product Inspection with Visual Anomaly Detection - FOMO-AD - Sony Spresense
    • Visual Anomaly Detection in Fabric using FOMO-AD - Raspberry Pi 5
    • Car Detection and Tracking System for Toll Plazas - Raspberry Pi AI Kit
    • Visual Anomaly Detection - Seeed Grove Vision AI Module V2
    • Object Counting with FOMO - OpenMV Cam RT1062
    • Visitor Heatmap with FOMO Object Detection - Jetson Orin Nano
    • Vehicle Security Camera - Arduino Portenta H7
  • Audio Projects
    • Occupancy Sensing - SiLabs xG24
    • Smart Appliance Control Using Voice Commands - Nordic Thingy:53
    • Glass Window Break Detection - Nordic Thingy:53
    • Illegal Logging Detection - Nordic Thingy:53
    • Illegal Logging Detection - Syntiant TinyML
    • Wearable Cough Sensor and Monitoring - Arduino Nano 33 BLE Sense
    • Collect Data for Keyword Spotting - Raspberry Pi Pico
    • Voice-Activated LED Strip - Raspberry Pi Pico
    • Snoring Detection on a Smart Phone
    • Gunshot Audio Classification - Arduino Nano 33 + Portenta H7
    • AI-Powered Patient Assistance - Arduino Nano 33 BLE Sense
    • Acoustic Pipe Leakage Detection - Arduino Portenta H7
    • Location Identification using Sound - Syntiant TinyML
    • Environmental Noise Classification - Nordic Thingy:53
    • Running Faucet Detection - Seeed XIAO Sense + Blues Cellular
    • Vandalism Detection via Audio Classification - Arduino Nano 33 BLE Sense
    • Predictive Maintenance Using Audio Classification - Arduino Nano 33 BLE Sense
    • Porting an Audio Project from the SiLabs Thunderboard Sense 2 to xG24
    • Environmental Audio Monitoring Wearable - Syntiant TinyML - Part 1
    • Environmental Audio Monitoring Wearable - Syntiant TinyML - Part 2
    • Keyword Spotting - Nordic Thingy:53
    • Detecting Worker Accidents with Audio Classification - Syntiant TinyML
    • Snoring Detection with Syntiant NDP120 Neural Decision Processor - Arduino Nicla Voice
    • Recognize Voice Commands with the Particle Photon 2
    • Voice Controlled Power Plug with Syntiant NDP120 (Nicla Voice)
    • Determining Compressor State with Audio Classification - Avnet RaSynBoard
    • Developing a Voice-Activated Product with Edge Impulse's Synthetic Data Pipeline
    • Enhancing Worker Safety using Synthetic Audio to Create a Dog Bark Classifier
  • Predictive Maintenance and Defect Detection Projects
    • Predictive Maintenance - Nordic Thingy:91
    • Brushless DC Motor Anomaly Detection
    • Industrial Compressor Predictive Maintenance - Nordic Thingy:53
    • Anticipate Power Outages with Machine Learning - Arduino Nano 33 BLE Sense
    • Faulty Lithium-Ion Cell Identification in Battery Packs - Seeed Wio Terminal
    • Weight Scale Predictive Maintenance - Arduino Nano 33 BLE Sense
    • Fluid Leak Detection With a Flowmeter and AI - Seeed Wio Terminal
    • Pipeline Clog Detection with a Flowmeter and AI - Seeed Wio Terminal
    • Refrigerator Predictive Maintenance - Arduino Nano 33 BLE Sense
    • Motor Pump Predictive Maintenance - Infineon PSoC 6 WiFi-BT Pioneer Kit + CN0549
    • BrickML Demo Project - 3D Printer Anomaly Detection
    • Condition Monitoring - Syntiant TinyML Board
    • Predictive Maintenance - Commercial Printer - Sony Spresense + CommonSense
    • Vibration Classification with BrainChip's Akida
    • AI-driven Audio and Thermal HVAC Monitoring - SeeedStudio XIAO ESP32
  • Accelerometer and Activity Projects
    • Arduino x K-Way - Outdoor Activity Tracker
    • Arduino x K-Way - Gesture Recognition for Hiking
    • Arduino x K-Way - TinyML Fall Detection
    • Posture Detection for Worker Safety - SiLabs Thunderboard Sense 2
    • Hand Gesture Recognition - OpenMV Cam H7
    • Arduin-Row, a TinyML Rowing Machine Coach - Arduino Nicla Sense ME
    • Fall Detection using a Transformer Model – Arduino Giga R1 WiFi
    • Bluetooth Fall Detection - Arduino Nano 33 BLE Sense
    • Monitor Packages During Transit with AI - Arduino Nano 33 BLE Sense
    • Smart Baby Swing - Arduino Portenta H7
    • Warehouse Shipment Monitoring - SiLabs Thunderboard Sense 2
    • Gesture Recognition - Bangle.js Smartwatch
    • Gesture Recognition for Patient Communication - SiLabs Thunderboard Sense 2
    • Hospital Bed Occupancy Detection - Arduino Nano 33 BLE Sense
    • Porting a Posture Detection Project from the SiLabs Thunderboard Sense 2 to xG24
    • Porting a Gesture Recognition Project from the SiLabs Thunderboard Sense 2 to xG24
    • Continuous Gait Monitor (Anomaly Detection) - Nordic Thingy:53
    • Classifying Exercise Activities on a BangleJS Smartwatch
  • Air Quality and Environmental Projects
    • Arduino x K-Way - Environmental Asthma Risk Assessment
    • Gas Detection in the Oil and Gas Industry - Nordic Thingy:91
    • Smart HVAC System with a Sony Spresense
    • Smart HVAC System with an Arduino Nicla Vision
    • Indoor CO2 Level Estimation - Arduino Portenta H7
    • Harmful Gases Detection - Arduino Nano 33 BLE Sense
    • Fire Detection Using Sensor Fusion and TinyML - Arduino Nano 33 BLE Sense
    • AI-Assisted Monitoring of Dairy Manufacturing Conditions - Seeed XIAO ESP32C3
    • AI-Assisted Air Quality Monitoring - DFRobot Firebeetle ESP32
    • Air Quality Monitoring with Sipeed Longan Nano - RISC-V Gigadevice
    • Methane Monitoring in Mines - Silabs xG24 Dev Kit
    • Smart Building Ventilation with Environmental Sensor Fusion
    • Sensor Data Fusion with Spresense and CommonSense
    • Water Pollution Detection - Arduino Nano ESP32 + Ultrasonic Scan
    • Fire Detection Using Sensor Fusion - Arduino Nano 33 BLE Sense
  • Novel Sensor Projects
    • 8x8 ToF Gesture Classification - Arduino RP2040 Connect
    • Food Irradiation Dose Detection - DFRobot Beetle ESP32C3
    • Applying EEG Data to Machine Learning, Part 1
    • Applying EEG Data to Machine Learning, Part 2
    • Applying EEG Data to Machine Learning, Part 3
    • Liquid Classification with TinyML - Seeed Wio Terminal + TDS Sensor
    • AI-Assisted Pipeline Diagnostics and Inspection with mmWave Radar
    • Soil Quality Detection Using AI and LoRaWAN - Seeed Sensecap A1101
    • Smart Diaper Prototype - Arduino Nicla Sense ME
    • DIY Smart Glove with Flex Sensors
    • EdgeML Energy Monitoring - Particle Photon 2
    • Wearable for Monitoring Worker Stress using HR/HRV DSP Block - Arduino Portenta
  • Software Integration Demos
    • Azure Machine Learning with Kubernetes Compute and Edge Impulse
    • ROS2 + Edge Impulse, Part 1: Pub/Sub Node in Python
    • ROS2 + Edge Impulse, Part 2: MicroROS
    • Using Hugging Face Datasets in Edge Impulse
    • Using Hugging Face Image Classification Datasets with Edge Impulse
    • Edge Impulse API Usage Sample Application - Jetson Nano Trainer
    • MLOps with Edge Impulse and Azure IoT Edge
    • A Federated Approach to Train and Deploy Machine Learning Models
    • DIY Model Weight Update for Continuous AI Deployments
    • Automate the CI/CD Pipeline of your Models with Edge Impulse and GitHub Actions
    • Deploying Edge Impulse Models on ZEDEDA Cloud Devices
Powered by GitBook
On this page
  • Background
  • Equipment & software
  • Getting started
  • Creating a ROS2 package
  • Building a node for the accelerometer (or other sensor)
  • 1: Imports
  • 2: Node class
  • Building an AI powered Edge Impulse pub/sub node
  • 1: Imports:
  • 2: Edge Impulse node class
  • 3: Classifier class
  • Adding entry points
  • We’re ready to build!
  • Testing our Edge Impulse node
  • To summarize

Was this helpful?

Edit on GitHub
Export as PDF
  1. Software Integration Demos

ROS2 + Edge Impulse, Part 1: Pub/Sub Node in Python

Build an AI-driven ROS2 node for robotics using an Edge Impulse model and a 3-axis accelerometer.

PreviousAzure Machine Learning with Kubernetes Compute and Edge ImpulseNextROS2 + Edge Impulse, Part 2: MicroROS

Last updated 1 year ago

Was this helpful?

Created By: Avi Brown

Public Project Link:

GitHub Repository:

Background

ROS2 is the world’s most popular robotics development framework. It enables developers to build organized, modular, and scalable robotics platforms with the full force of the open source community behind it. When developing a new robot, it’s common to recycle existing ROS2 packages instead of writing new ones, saving precious time in development.

In this tutorial we will build a recyclable ROS2 node based around an Edge Impulse machine learning model. This node will draw sensor data from a sensor topic, run the data through an Edge Impulse model, and then publish the results of the machine learning to another topic, to which other nodes in the system can subscribe.

For the sake of demonstration, we’ll be using an accelerometer-based machine learning model trained to recognize “circle”, “up_down”, and “side_side” movements.

In this project we’ll learn how to:

  • Build a pub/sub Edge Impulse node

  • Fill a sensor data buffer using a subscriber

  • Import and use a machine learning model from within a ROS2 node

  • Publish the inferences made by the machine learning node to a topic


Equipment & software

  • Raspberry Pi 4

  • Adafruit MPU6050 accelerometer + gyroscope module (*)

  • Ubuntu 20.04 server

  • ROS2 Foxy Fitzroy

  • Edge Impulse Linux CLI

  • VSCode Remote Development extension

(*) BYO sensor For this tutorial we’ll be using a 3-axis accelerometer, but it is general enough to be adapted for just about any type of sensor.


Getting started

edge-impulse-linux

You should see the CLI initialize.


Creating a ROS2 package

Navigate to your workspace, and from within ros2_ws/src run:

ros2 pkg create ei_ros2 --build-type ament_python --dependencies rclpy std_msgs board os adafruit_mpu6050

This will create a package called “ei_ros2”. Adjust the dependencies to suit whatever libraries your package requires (or edit them using the package.xml file).


Building a node for the accelerometer (or other sensor)

Now we need to build the node that will publish sensor data that in turn will be fed to the machine learning model. Navigate to the “Impulse design” section of your Edge Impulse project, and take a loot at the first block:

This block contains a lot of important information. Let’s break it down:

  • First, we see that the block expects to receive three input axes (since it is a 3-axis accelerometer).

  • Next, we see that the model expects 2000ms (2 seconds) worth of data each time it is called.

  • And finally, the frequency tells us how many times per second the sensor is sampled. In this case, 98 times / second (Hz).

We’ll return to these numbers a couple of times, but for now let’s start building our sensor node!

Make a file called mpu6050_node.py (or whatever your sensor is called) under /ros2_ws/src/ei_ros2/ei_ros2.

1: Imports

Here are the libraries we need for this node. Depending on which sensor you’ll need your imports may look different.

I’m using the Float32MultiArray as the message type, because we want to send an array with three readings (accelerometer X, Y, Z) which are float values.

import board # Sensor specific
import rclpy # Always
from rclpy.node import Node # Always
from std_msgs.msg import Float32MultiArray # Sensor specific
import adafruit_mpu6050 # Sensor specific

2: Node class

Now we need a class with a publisher to handle our sensor. A lot of this code will be familiar to you if you have experience with ROS2.

  • Notice the frequency: This should be the same frequency as the one in the “Impulse design” section of your EI project. This will make it so that sensor data is published at the required rate (1 / frequency).

  • 98 times per second the read_mpu6050() method is called. Each time the 3 accelerometer axes are read, and sent together in an array to the mpu6050_stream topic. Once it arrives to the topic it becomes available for our machine learning node to subscribe to.

class MPU6050Node(Node):
    def __init__(self):
        super().__init__("mpu6050")
        self.i2c = board.I2C()
        self.mpu = adafruit_mpu6050.MPU6050(self.i2c)
        self.publisher_ = self.create_publisher(Float32MultiArray, "mpu6050_stream", 15)
        self.frequency = 98 # See "Impulse design" block in EI project
        self.timer_ = self.create_timer((1 / self.frequency), self.read_mpu6050)
        self.get_logger().info("MP6050 stream opened.")

    def read_mpu6050(self):
        msg = Float32MultiArray()
        msg.data = [round(self.mpu.acceleration[0], 2), # accX
                    round(self.mpu.acceleration[1], 2), # accY
                    round(self.mpu.acceleration[2], 2)] # accZ
        self.publisher_.publish(msg)

Adding the main() function, the full publisher node for the MPU6050 accelerometer is:

import board
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float32MultiArray
import adafruit_mpu6050

class MPU6050Node(Node):
    def __init__(self):
        super().__init__("mpu6050")
        self.i2c = board.I2C()
        self.mpu = adafruit_mpu6050.MPU6050(self.i2c)
        self.publisher_ = self.create_publisher(Float32MultiArray, "mpu6050_stream", 15)
        self.frequency = 98
        self.timer_ = self.create_timer((1 / self.frequency), self.read_mpu6050)
        self.get_logger().info("MP6050 stream opened.")

    def read_mpu6050(self):
        msg = Float32MultiArray()
        msg.data = [round(self.mpu.acceleration[0], 2),
                    round(self.mpu.acceleration[1], 2),
                    round(self.mpu.acceleration[2], 2)]
        self.publisher_.publish(msg)

 
def main(args=None):
    rclpy.init(args=args)
    node = MPU6050Node()
    rclpy.spin(node)
    rclpy.shutdown()

Now just run chmod +x mpu6050_node.py to make your node file executable, since we're using --symlink-install


Building an AI powered Edge Impulse pub/sub node

We’ve arrived to the fun part! Effort was made to make this next chunk of code as reusable as possible. Let’s go through it piece by piece.

First let’s download our model from Edge Impulse.

  • From the terminal, run:

edge-impulse-linux-runner --clean
  • After entering your credentials you should see:

  • Select the project you want, and hit Enter. Now you should see this message:

Take note of the file location and name. You should now copy and paste this file to the same directory where the sensor / Edge Impulse nodes are located (/ros2_ws/src/ei_ros2/ei_ros2).

Now let’s build the node!

Make a file called ei_node.py (or something) under /ros2_ws/src/ei_ros2/ei_ros2.

1: Imports:

Note that in this case we need both String and Float32MultiArray:

  • Float32MultiArray for subscribing to the sensor node we just created

  • And String, for publishing the machine learning results to an inference stream

import os
import rclpy
from rclpy.node import Node
from std_msgs.msg import String, Float32MultiArray
from edge_impulse_linux.runner import ImpulseRunner

2: Edge Impulse node class

There’s a lot going on here, but the important parts have numbered comments beside them, so let’s take them one by one:

  1. “Change subscription” - Make sure you are subscribed to the correct topic (the same one that the sensor node is publishing to)

  2. “Adjust timing” - This timer calls the function that runs the machine learning model. Basically, the function will only run the model once the buffer (which we’ll discuss in a moment) is full of data, so this timer essentially means: How often do you want to check if the buffer is full. If time isn’t an issue, you can set it to check every couple of seconds. Here I want to run the model as soon as I have a full buffer, so I set it to check every 0.01 seconds.

  3. “Check file name” - The name of the .eim file here should match the name of the file downloaded via the CLI.

  4. “Set max length” - The model expects to receive a specific amount of data, so we need to set the buffer (the array of sensor data that is passed to the model) to a specific length. How do we find the length? If we return to the “Impulse design” section of our Edge Impulse project, we’ll see that the window length is 2 seconds, where the sensor is sampled 98 times per second, each time giving 3 values. So: 2 * 98 * 3 = 588.

Don’t stress too much about this part — if you put the wrong number in, you’ll get an error telling you what the correct number is 🙂

class EdgeImpulseNode(Node):
    def __init__(self):
        super().__init__("edge_impulse_classifier")

        # Sensor data subscriber
        self.subscriber_ = self.create_subscription(Float32MultiArray, "mpu6050_stream", self.callback_fill_buffer, 10) # 1: Change subscription
        # Inference publisher
        self.publisher_ = self.create_publisher(String, "inference_stream", 10)
        self.timer_ = self.create_timer(0.01, self.classify) # 2: Adjust timing if desired

        # Create Classifier object based on Edge Impulse model file
        self.classifier = Classifier('model.eim') # 3: Check file name
        
        self.buffer = []
        self.buffer_full_len = 588 #4: Set max length of buffer array

        self.get_logger().info("Edge Impulse node opened.")

    # If buffer is full, classify and publish result!
    def classify(self):
        # String since we're publishing the class name
        msg = String()

        if len(self.buffer) == self.buffer_full_len:
            # Pass buffer to classifier
            msg.data = self.classifier.classify(self.buffer)
            self.publisher_.publish(msg)

            # Reset buffer
            self.buffer = []

    # Adds to buffer whenever new sensor data is published
    def callback_fill_buffer(self, msg):
        if len(self.buffer) < self.buffer_full_len:
            for val in msg.data:
                self.buffer.append(val)

3: Classifier class

This class handles passing the sensor data to the Edge Impulse model file we downloaded.

Note: The classify() method returns a string with the name of the most likely result. Machine learning inferences look like: {'dog': 0.8, 'cat': 0.2, 'fish': 0.1}. This function returns only the string with the highest probability.

class Classifier:
    def __init__(self, model_path):
        self.runner = None
        self.model_path = model_path

    def classify(self, features):
        dir_path = os.path.dirname(os.path.realpath(__file__))
        modelfile = os.path.join(dir_path, self.model_path)
        self.runner = ImpulseRunner(modelfile)

        try:
            self.runner.init()
            res = self.runner.classify(features)

            # Return classification with highest probability
            return max(res["result"]["classification"], key=res["result"]["classification"].get)

        finally:
            if (self.runner):
                self.runner.stop()

Now we can add the main() function and we’re ready to go!

Full Edge Impulse Pub/Sub code:

# Imports
import os
import rclpy
from rclpy.node import Node
from std_msgs.msg import String, Float32MultiArray
from edge_impulse_linux.runner import ImpulseRunner

# Classificiation node that subs to sensor stream and publishes to inference topic
class EdgeImpulseNode(Node):
    def __init__(self):
        super().__init__("edge_impulse_classifier")

        # Sensor data subscriber
        self.subscriber_ = self.create_subscription(Float32MultiArray, "mpu6050_stream", self.callback_fill_buffer, 10) # 1: Change subscription
        # Inference publisher
        self.publisher_ = self.create_publisher(String, "inference_stream", 10)
        self.timer_ = self.create_timer(0.01, self.classify) # 2: Adjust timing

        # Create Classifier object based on Edge Impulse model file
        self.classifier = Classifier('model.eim') # 3: Check file name
        
        self.buffer = []
        self.buffer_full_len = 588 #4: Set max length of buffer array

        self.get_logger().info("Edge Impulse node opened.")

    # If buffer is full, classify and publish result!
    def classify(self):
        # String since we're publishing the class name
        msg = String()

        if len(self.buffer) == self.buffer_full_len:
            # Pass buffer to classifier
            msg.data = self.classifier.classify(self.buffer)
            self.publisher_.publish(msg)

            # Reset buffer
            self.buffer = []

    # Adds to buffer whenever new sensor data is published
    def callback_fill_buffer(self, msg):
        if len(self.buffer) < self.buffer_full_len:
            for val in msg.data:
                self.buffer.append(val)

# Edge Impulse classification class
class Classifier:
    def __init__(self, model_path):
        self.runner = None
        self.model_path = model_path

    def classify(self, features):
        dir_path = os.path.dirname(os.path.realpath(__file__))
        modelfile = os.path.join(dir_path, self.model_path)
        self.runner = ImpulseRunner(modelfile)

        try:
            self.runner.init()
            res = self.runner.classify(features)

            # Return classification with highest probability
            return max(res["result"]["classification"], key=res["result"]["classification"].get)

        finally:
            if (self.runner):
                self.runner.stop()

def main(args=None):
    rclpy.init(args=args)
    node = EdgeImpulseNode()
    rclpy.spin(node)
    rclpy.shutdown()

if __name__ == "__main__":
    main()

Once again run chmod +x ei_node.py to make your node file executable.


Adding entry points

In your [setup.py](http://setup.py) file add entry points for your nodes:

.
.
.

entry_points={
        'console_scripts': [
            "mpu6050 = ei_ros2.mpu6050_node:main",
            "ei = ei_ros2.ei_node:main"
        ],
    }

We’re ready to build!

Navigate to ros2_ws (or your main ROS2 workspace directory) and build the package using:

colcon build --packages-select ei_ros2 --symlink-install

Once that finishes, we’re ready to for tests!


Testing our Edge Impulse node

Let’s run both the sensor node and the Edge Impulse machine learning node. From two separate terminals run:

ros2 run ei_ros2 mpu6050
ros2 run ei_ros2 ei

And now in a third terminal let’s listen on the inference_stream topic using:

ros2 topic echo /inference_stream

Now let’s move our accelerometer around and see what appears on the inference_stream topic:

Looking good!


To summarize

In this tutorial we looked at how to make a AI-powered pub/sub node in ROS2. This tutorial is part of an Edge Impulse + ROS2 series, and in Part 2 we’ll look at how to use a service / client architecture. The contents of this tutorial can be generalized to suit just about any sensor and use case, so please don’t hesitate to try it out yourself! Feel free to ask questions at forum.edgeimpulse.com, or on the YouTube video version of this tutorial here:

This tutorial is meant to be as general as possible, and as such will not go in depth on building an Edge Impulse project / model. If you need help getting started you can find many high quality examples spanning a variety of sensors, boards, and use cases .

You should also have the Edge Impulse Linux CLI up and running on your Linux board (RPi, Jetson Nano, etc.). Read about installing the CLI . Test that it is indeed installed by running:

Untitled
Untitled
Untitled
nodes.gif

here
here
https://youtu.be/0SabLvJqSaM
https://studio.edgeimpulse.com/public/108508/latest
https://github.com/avielbr/edge-impulse/tree/main/ros2/ei_ros2