.. _program_listing_file_include_rtest_subscription_mock.hpp: Program Listing for File subscription_mock.hpp ============================================== |exhale_lsh| :ref:`Return to documentation for file ` (``include/rtest/subscription_mock.hpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp // Copyright 2024 Beam Limited. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // // @file subscription_mock.hpp // @author Sławomir Cielepak (slawomir.cielepak@gmail.com) // @date 2024-11-26 // // @brief Mock header for ROS 2 Subscriber. #pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include "rcl/error_handling.h" #include "rcl/subscription.h" #include "rclcpp/any_subscription_callback.hpp" #include "rclcpp/detail/resolve_use_intra_process.hpp" #include "rclcpp/detail/resolve_intra_process_buffer_type.hpp" #include "rclcpp/exceptions.hpp" #include "rclcpp/expand_topic_or_service_name.hpp" #include "rclcpp/experimental/intra_process_manager.hpp" #include "rclcpp/experimental/subscription_intra_process.hpp" #include "rclcpp/logging.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/message_info.hpp" #include "rclcpp/message_memory_strategy.hpp" #include "rclcpp/node_interfaces/node_base_interface.hpp" #include "rclcpp/subscription_base.hpp" #include "rclcpp/subscription_options.hpp" #include "rclcpp/subscription_traits.hpp" #include "rclcpp/type_support_decl.hpp" #include "rclcpp/visibility_control.hpp" #include "rclcpp/waitable.hpp" #include "rclcpp/topic_statistics/subscription_topic_statistics.hpp" #include "tracetools/tracetools.h" // Helper to detect if a type has _data_type template struct has_data_type : std::false_type { }; template struct has_data_type> : std::true_type { }; namespace rclcpp { namespace node_interfaces { class NodeTopicsInterface; } // namespace node_interfaces template < typename MessageT, typename AllocatorT = std::allocator, typename SubscribedT = typename rclcpp::TypeAdapter::custom_type, typename ROSMessageT = typename rclcpp::TypeAdapter::ros_message_type, typename MessageMemoryStrategyT = rclcpp::message_memory_strategy::MessageMemoryStrategy> class Subscription : public SubscriptionBase { friend class rclcpp::node_interfaces::NodeTopicsInterface; public: using SubscribedType = SubscribedT; using ROSMessageType = ROSMessageT; using MessageMemoryStrategyType = MessageMemoryStrategyT; using SubscribedTypeAllocatorTraits = allocator::AllocRebind; using SubscribedTypeAllocator = typename SubscribedTypeAllocatorTraits::allocator_type; using SubscribedTypeDeleter = allocator::Deleter; using ROSMessageTypeAllocatorTraits = allocator::AllocRebind; using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type; using ROSMessageTypeDeleter = allocator::Deleter; using MessageAllocatorTraits [[deprecated("use ROSMessageTypeAllocatorTraits")]] = ROSMessageTypeAllocatorTraits; using MessageAllocator [[deprecated("use ROSMessageTypeAllocator")]] = ROSMessageTypeAllocator; using MessageDeleter [[deprecated("use ROSMessageTypeDeleter")]] = ROSMessageTypeDeleter; using ConstMessageSharedPtr [[deprecated]] = std::shared_ptr; using MessageUniquePtr [[deprecated("use std::unique_ptr instead")]] = std::unique_ptr; private: using SubscriptionTopicStatisticsSharedPtr = std::shared_ptr; public: RCLCPP_SMART_PTR_DEFINITIONS(Subscription) Subscription( rclcpp::node_interfaces::NodeBaseInterface * node_base, const rosidl_message_type_support_t & type_support_handle, const std::string & topic_name, const rclcpp::QoS & qos, AnySubscriptionCallback callback, const rclcpp::SubscriptionOptionsWithAllocator & options, typename MessageMemoryStrategyT::SharedPtr message_memory_strategy, SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics = nullptr) : SubscriptionBase( node_base, type_support_handle, topic_name, options.to_rcl_subscription_options(qos), options.event_callbacks, options.use_default_callbacks, callback.is_serialized_message_callback() ? DeliveredMessageKind::SERIALIZED_MESSAGE : DeliveredMessageKind::ROS_MESSAGE), // NOLINT any_callback_(callback), options_(options), message_memory_strategy_(message_memory_strategy) { // Setup intra process publishing if requested. if (rclcpp::detail::resolve_use_intra_process(options_, *node_base)) { using rclcpp::detail::resolve_intra_process_buffer_type; auto qos_profile = get_actual_qos(); if (qos_profile.history() != rclcpp::HistoryPolicy::KeepLast) { throw std::invalid_argument( "intraprocess communication allowed only with keep last history qos policy"); } if (qos_profile.depth() == 0) { throw std::invalid_argument( "intraprocess communication is not allowed with 0 depth qos policy"); } } if (subscription_topic_statistics != nullptr) { this->subscription_topic_statistics_ = std::move(subscription_topic_statistics); } } void post_init_setup( rclcpp::node_interfaces::NodeBaseInterface * node_base, const rclcpp::QoS & qos, const rclcpp::SubscriptionOptionsWithAllocator & options) { (void)node_base; (void)qos; (void)options; rtest::StaticMocksRegistry::instance().registerSubscription( node_base->get_fully_qualified_name(), get_topic_name(), weak_from_this()); } bool take(ROSMessageType & message_out, rclcpp::MessageInfo & message_info_out) { return this->take_type_erased(static_cast(&message_out), message_info_out); } template std::enable_if_t< !rosidl_generator_traits::is_message::value && std::is_same_v, bool> take(TakeT & message_out, rclcpp::MessageInfo & message_info_out) { ROSMessageType local_message; bool taken = this->take_type_erased(static_cast(&local_message), message_info_out); if (taken) { rclcpp::TypeAdapter::convert_to_custom(local_message, message_out); } return taken; } std::shared_ptr create_message() override { /* The default message memory strategy provides a dynamically allocated message on each call to * create_message, though alternative memory strategies that re-use a preallocated message may be * used (see rclcpp/strategies/message_pool_memory_strategy.hpp). */ return message_memory_strategy_->borrow_message(); } std::shared_ptr create_serialized_message() override { return message_memory_strategy_->borrow_serialized_message(); } // Only enabled for std_msgs which have _data_type and data member template typename std::enable_if::value>::type handle_message( const typename T::_data_type & data) { auto sptr = std::make_shared(); sptr->set__data(data); handle_message(sptr); } void handle_message(MessageT & message) { // The message is referenced from outside, so make non-deleting shared_ptr auto sptr = std::shared_ptr(&message, [](MessageT * msg) { (void)msg; }); handle_message(sptr); } void handle_message(std::shared_ptr message) { auto voidMsg = std::static_pointer_cast(message); this->handle_message(voidMsg, rclcpp::MessageInfo{}); } void handle_message(std::shared_ptr & message, const rclcpp::MessageInfo & message_info) override { auto typed_message = std::static_pointer_cast(message); any_callback_.dispatch(typed_message, message_info); } void handle_serialized_message( const std::shared_ptr & serialized_message, const rclcpp::MessageInfo & message_info) override { std::chrono::time_point now; if (subscription_topic_statistics_) { // get current time before executing callback to // exclude callback duration from topic statistics result. now = std::chrono::system_clock::now(); } any_callback_.dispatch(serialized_message, message_info); if (subscription_topic_statistics_) { const auto nanos = std::chrono::time_point_cast(now); const auto time = rclcpp::Time(nanos.time_since_epoch().count()); subscription_topic_statistics_->handle_message(message_info.get_rmw_message_info(), time); } } void handle_loaned_message(void * loaned_message, const rclcpp::MessageInfo & message_info) override { if (matches_any_intra_process_publishers(&message_info.get_rmw_message_info().publisher_gid)) { // In this case, the message will be delivered via intra process and // we should ignore this copy of the message. return; } auto typed_message = static_cast(loaned_message); auto sptr = std::shared_ptr(typed_message, [](ROSMessageType * msg) { (void)msg; }); std::chrono::time_point now; if (subscription_topic_statistics_) { // get current time before executing callback to // exclude callback duration from topic statistics result. now = std::chrono::system_clock::now(); } any_callback_.dispatch(sptr, message_info); if (subscription_topic_statistics_) { const auto nanos = std::chrono::time_point_cast(now); const auto time = rclcpp::Time(nanos.time_since_epoch().count()); subscription_topic_statistics_->handle_message(message_info.get_rmw_message_info(), time); } } void return_message(std::shared_ptr & message) override { auto typed_message = std::static_pointer_cast(message); message_memory_strategy_->return_message(typed_message); } void return_serialized_message(std::shared_ptr & message) override { message_memory_strategy_->return_serialized_message(message); } bool use_take_shared_method() const { return any_callback_.use_take_shared_method(); } rclcpp::dynamic_typesupport::DynamicMessageType::SharedPtr get_shared_dynamic_message_type() override { throw rclcpp::exceptions::UnimplementedError( "get_shared_dynamic_message_type is not implemented for Subscription"); } rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr get_shared_dynamic_message() override { throw rclcpp::exceptions::UnimplementedError( "get_shared_dynamic_message is not implemented for Subscription"); } rclcpp::dynamic_typesupport::DynamicSerializationSupport::SharedPtr get_shared_dynamic_serialization_support() override { throw rclcpp::exceptions::UnimplementedError( "get_shared_dynamic_serialization_support is not implemented for Subscription"); } rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr create_dynamic_message() override { throw rclcpp::exceptions::UnimplementedError( "create_dynamic_message is not implemented for Subscription"); } void return_dynamic_message( rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr & message) override { (void)message; throw rclcpp::exceptions::UnimplementedError( "return_dynamic_message is not implemented for Subscription"); } void handle_dynamic_message( const rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr & message, const rclcpp::MessageInfo & message_info) override { (void)message; (void)message_info; throw rclcpp::exceptions::UnimplementedError( "handle_dynamic_message is not implemented for Subscription"); } private: RCLCPP_DISABLE_COPY(Subscription) AnySubscriptionCallback any_callback_; const rclcpp::SubscriptionOptionsWithAllocator options_; typename message_memory_strategy::MessageMemoryStrategy::SharedPtr message_memory_strategy_; SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics_{nullptr}; }; } // namespace rclcpp namespace rtest { template std::shared_ptr> findSubscription( const std::string & fullyQualifiedNodeName, std::string topicName) { if (topicName.empty()) { throw std::invalid_argument{"Topic name must not be empty"}; } if (topicName.front() != '/') { topicName.insert(topicName.begin(), '/'); } auto sub = StaticMocksRegistry::instance().getSubscription(fullyQualifiedNodeName, topicName).lock(); return std::dynamic_pointer_cast>(sub); } template std::shared_ptr> findSubscription( const std::shared_ptr nodePtr, const std::string & topicName) { return findSubscription(nodePtr->get_fully_qualified_name(), topicName); } } // namespace rtest