Writing a Publisher test
Background
In this tutorial we will write a test suite that verifies a ROS 2 Node implementation that publishes messages.
The rtest framework provides white-box access to publishers via the findPublisher API. Combined with GoogleMock (EXPECT_CALL), this allows tests to assert exactly which messages are published, without spinning executors or relying on the ROS 2 middleware.
In this example, we will:
Demonstrate a publisher node that publishes to ‘/test_topic’ on a timer.
Use the ‘rtest’ publisher interface to verify that publishing occurs as expected.
Inspect the timer and simulate the timer callback to deterministically test behavior.
Prerequisites
Tasks
1 Create a package
Navigate to your ROS 2 workspace sources, e.g. ros2_ws/src, and run the package creation command:
$ ros2 pkg create example_app --dependencies rclcpp std_msgs
Navigate to example_app.
2 Write the Publisher node
Add the Publisher class definition in include/example_app/publisher.hpp with the following code:
#pragma once
#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/string.hpp>
class Publisher : public rclcpp::Node
{
public:
explicit Publisher(const rclcpp::NodeOptions & options = rclcpp::NodeOptions());
private:
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_;
rclcpp::TimerBase::SharedPtr timer_;
};
And add a class implementation in src/publisher.cpp:
#include "example_app/publisher.hpp"
#include <memory>
#include <chrono>
using namespace std::chrono_literals;
Publisher::Publisher(const rclcpp::NodeOptions & options)
: rclcpp::Node("test_publisher", options)
{
publisher_ = create_publisher<std_msgs::msg::String>("test_topic", rclcpp::QoS{5UL});
timer_ = create_wall_timer(500ms, [this]() {
auto msg = std::make_unique<std_msgs::msg::String>();
msg->set__data("timer");
publisher_->publish(std::move(msg));
});
}
Open the CMakeLists.txt and add the Publisher as a library:
Note:
ament_target_dependenciesis depricated so we use plain CMaketarget_link_libraries
cmake_minimum_required(VERSION 3.8)
project(example_app)
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()
# find dependencies
find_package(ament_cmake REQUIRED)
find_package(rclcpp REQUIRED)
find_package(std_msgs REQUIRED)
# Add Publisher
add_library(publisher src/publisher.cpp)
target_include_directories(publisher PUBLIC
${CMAKE_CURRENT_SOURCE_DIR}/include
)
target_link_libraries(publisher
rclcpp::rclcpp
${std_msgs_TARGETS}
)
3 Examine the code
publisher_ = create_publisher<std_msgs::msg::String>("test_topic", rclcpp::QoS{5UL});
The Node creates a publisher with the msg type std_msgs::msg::String and topic name test_topic.
timer_ = create_wall_timer(500ms, [this]() {
auto msg = std::make_unique<std_msgs::msg::String>();
msg->set__data("timer");
publisher_->publish(std::move(msg));
});
The timer callback is a lambda that publishes a message when triggered
4 Add unit tests
4.1 Add dependency to rtest
Open the package.xml and add the rtest test dependency:
<package format="3">
...
<test_depend>rtest</test_depend>
</package>
NOTE Currently rtest supports writing tests with GTest/GMock only. There’s no need to add that dependencies explicitly.
4.2 Implement a simple unit test
Create the test directory and add a C++ tests implementation file test/publisher_test.cpp
#include <gtest/gtest.h>
#include "example_app/publisher.hpp"
class PubSubTest : public ::testing::Test
{
protected:
rclcpp::NodeOptions opts;
};
TEST_F(PubSubTest, PublisherTest)
{
auto node = std::make_shared<Publisher>(opts);
/// Retrieve the publisher created by the Node
auto publisher = rtest::findPublisher<std_msgs::msg::String>(node, "/test_topic");
/// Check that the Node actually created the Publisher with topic: "/test_topic"
ASSERT_TRUE(publisher);
/// Retrieve the timers created by the Node
auto nodeTimers = rtest::findTimers(node);
/// There should be just one timer
ASSERT_EQ(nodeTimers.size(), 1UL);
/// Set up expectation that the Node will publish a message when the timer callback is fired
auto expectedMsg = std_msgs::msg::String{};
expectedMsg.set__data("timer");
EXPECT_CALL(*publisher, publish(expectedMsg)).Times(1);
/// Fire the timer callback
nodeTimers[0]->execute_callback(nullptr);
}
Or with triggering callback by time advancing:
TEST_F(PubSubTest, WhenTheTimeIsMovedByTimerPeriodCallbackShouldBeExecuted)
{
// set use sim timer for mocked timers
opts = rclcpp::NodeOptions().parameter_overrides({rclcpp::Parameter("use_sim_time", true)});
auto node = std::make_shared<test_composition::Publisher>(opts);
auto triggering_test_clock = rtest::TriggeringTestClock{node};
/// Retrieve the publisher created by the Node
auto publisher = rtest::findPublisher<std_msgs::msg::String>(node, "/test_topic");
// Check that the Node actually created the Publisher with topic: "/test_topic"
ASSERT_TRUE(publisher);
/// Set up expectation that the Node will publish a message when the timer callback is fired
auto expectedMsg = std_msgs::msg::String{};
expectedMsg.set__data("timer");
// We do not expect the timer to trigger shortly before it reaches 500ms
EXPECT_CALL(*publisher, publish(expectedMsg)).Times(0);
triggering_test_clock.advance(std::chrono::milliseconds(499));
// We expect the timer to trigger every 500ms
EXPECT_CALL(*publisher, publish(expectedMsg)).Times(1);
triggering_test_clock.advance(std::chrono::milliseconds(1));
// We do not expect the timer to trigger after one period expires but before the next begins
EXPECT_CALL(*publisher, publish(expectedMsg)).Times(0);
triggering_test_clock.advance(std::chrono::milliseconds(499));
// We expect the timer to trigger every 500ms, so when the expiry time passes, the callback should fire
EXPECT_CALL(*publisher, publish(expectedMsg)).Times(1);
triggering_test_clock.advance(std::chrono::milliseconds(50));
}
Create the main test runner in test/main.cpp:
#include <gmock/gmock.h>
#include <rclcpp/rclcpp.hpp>
int main(int argc, char ** argv)
{
// Initialize Google Test and ROS2
::testing::InitGoogleMock(&argc, argv);
rclcpp::init(argc, argv);
// Run all the tests
int result = RUN_ALL_TESTS();
// Shutdown ROS2
rclcpp::shutdown();
return result;
}
4.3 Add tests to CMakeLists.txt
Create the test/CMakeLists.txt file:
find_package(rtest REQUIRED)
find_package(ament_cmake_gmock REQUIRED)
# Use add_executable + `ament_add_gmock_test` instead of `ament_add_gmock`.
add_executable(${PROJECT_NAME}-test
main.cpp
${CMAKE_SOURCE_DIR}/src/publisher.cpp
publisher_test.cpp
)
target_include_directories(${PROJECT_NAME}-test PRIVATE
${CMAKE_SOURCE_DIR}/include
)
target_link_libraries(${PROJECT_NAME}-test
rtest::publisher_mock
rtest::timer_mock
rclcpp::rclcpp
${std_msgs_TARGETS}
)
ament_add_gmock_test(${PROJECT_NAME}-test)
Update the root CMakeLists.txt with:
if(BUILD_TESTING)
add_subdirectory(test)
endif()
ament_package() # Must be the last statement
4.4 Build and run the tests
Build the example_app package:
$ colcon build --packages-up-to example_app --cmake-args -DBUILD_TESTING=On
Run the tests:
$ colcon test --packages-select example_app --event-handlers console_cohesion+
Key Concepts
rtest::findPublisherlocates a Publisher instance for testing.rtest::findTimerslocates timers, andexecute_callbacktriggers them deterministically. The callbacks can be triggered by the advancing the time as well.GoogleMock’s
EXPECT_CALL(...).Times(...)allows verifying publish behavior precisely without a running ROS 2 system.Note: Other test frameworks (e.g., Catch2) are not currently supported.
Try It Yourself!