Program Listing for File publisher_mock.hpp

Return to documentation for file (include/rtest/publisher_mock.hpp)

// Copyright 2024 Beam Limited.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// @file      publisher_mock.hpp
// @author    Sławomir Cielepak (slawomir.cielepak@gmail.com)
// @date      2024-11-26
//
// @brief     Mock header for ROS 2 Publisher.

#pragma once

#include <gmock/gmock.h>
#include <rtest/static_registry.hpp>

#include <functional>
#include <iostream>
#include <memory>
#include <sstream>
#include <string>
#include <type_traits>
#include <utility>

#include "rcl/error_handling.h"
#include "rcl/publisher.h"
#include "rmw/error_handling.h"
#include "rmw/rmw.h"
#include "rosidl_runtime_cpp/traits.hpp"

#include "rclcpp/allocator/allocator_common.hpp"
#include "rclcpp/allocator/allocator_deleter.hpp"
#include "rclcpp/detail/resolve_use_intra_process.hpp"
#include "rclcpp/detail/resolve_intra_process_buffer_type.hpp"
#include "rclcpp/experimental/buffers/intra_process_buffer.hpp"
#include "rclcpp/experimental/create_intra_process_buffer.hpp"
#include "rclcpp/experimental/intra_process_manager.hpp"
#include "rclcpp/get_message_type_support_handle.hpp"
#include "rclcpp/is_ros_compatible_type.hpp"
#include "rclcpp/loaned_message.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/node_interfaces/node_base_interface.hpp"
#include "rclcpp/publisher_base.hpp"
#include "rclcpp/publisher_options.hpp"
#include "rclcpp/type_adapter.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/visibility_control.hpp"

namespace rtest
{

template <typename MessageT>
class PublisherMock : public MockBase
{
public:
  PublisherMock(rclcpp::PublisherBase * pub) : pub_(pub) {}

  ~PublisherMock() { StaticMocksRegistry::instance().detachMock(pub_); }

  RCLCPP_SMART_PTR_DEFINITIONS(PublisherMock<MessageT>)


  MOCK_METHOD(void, publish, (const MessageT & msg));

  void setSubscriptionCount(size_t count) { subscriptions_count_ = count; }

  size_t subscriptions_count_{0UL};

  rclcpp::PublisherBase * pub_{nullptr};
};

}  // namespace rtest

namespace rclcpp
{

template <typename MessageT, typename AllocatorT>
class LoanedMessage;

template <typename MessageT, typename AllocatorT = std::allocator<void>>
class Publisher : public PublisherBase
{
public:
  using PublishedType = typename rclcpp::TypeAdapter<MessageT>::custom_type;
  using ROSMessageType = typename rclcpp::TypeAdapter<MessageT>::ros_message_type;

  using PublishedTypeAllocatorTraits = allocator::AllocRebind<PublishedType, AllocatorT>;
  using PublishedTypeAllocator = typename PublishedTypeAllocatorTraits::allocator_type;
  using PublishedTypeDeleter = allocator::Deleter<PublishedTypeAllocator, PublishedType>;

  using ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, AllocatorT>;
  using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type;
  using ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>;

  using BufferSharedPtr = typename rclcpp::experimental::buffers::
    IntraProcessBuffer<ROSMessageType, ROSMessageTypeAllocator, ROSMessageTypeDeleter>::SharedPtr;

  RCLCPP_SMART_PTR_DEFINITIONS(Publisher<MessageT, AllocatorT>)


  Publisher(
    rclcpp::node_interfaces::NodeBaseInterface * node_base,
    const std::string & topic,
    const rclcpp::QoS & qos,
    const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options)
  : PublisherBase(
      node_base,
      topic,
      rclcpp::get_message_type_support_handle<MessageT>(),
      options.template to_rcl_publisher_options<MessageT>(qos),
      options.event_callbacks,
      options.use_default_callbacks),
    options_(options),
    published_type_allocator_(*options.get_allocator()),
    ros_message_type_allocator_(*options.get_allocator())
  {
    allocator::set_allocator_for_deleter(&published_type_deleter_, &published_type_allocator_);
    allocator::set_allocator_for_deleter(&ros_message_type_deleter_, &ros_message_type_allocator_);
  }

  virtual void post_init_setup(
    rclcpp::node_interfaces::NodeBaseInterface * node_base,
    const std::string & topic,
    const rclcpp::QoS & qos,
    const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options)
  {
    (void)node_base;
    (void)topic;
    (void)qos;
    (void)options;

    rtest::StaticMocksRegistry::instance().registerPublisher<MessageT>(
      node_base->get_fully_qualified_name(), get_topic_name(), weak_from_this());
  }

  virtual ~Publisher() {}

  void publish_(const MessageT & msg)
  {
    auto mock = rtest::StaticMocksRegistry::instance().getMock(this).lock();
    if (mock) {
      (std::static_pointer_cast<rtest::PublisherMock<MessageT>>(mock))->publish(msg);
    }
  }

  size_t get_subscription_count()
  {
    auto mock = rtest::StaticMocksRegistry::instance().getMock(this).lock();
    if (mock) {
      return PublisherBase::get_subscription_count() +
             (std::static_pointer_cast<rtest::PublisherMock<MessageT>>(mock))->subscriptions_count_;
    }

    return PublisherBase::get_subscription_count();
  }

  rclcpp::LoanedMessage<ROSMessageType, AllocatorT> borrow_loaned_message()
  {
    return rclcpp::LoanedMessage<ROSMessageType, AllocatorT>(*this, std::allocator<void>{});
  }

  template <typename T>
  typename std::enable_if_t<
    rosidl_generator_traits::is_message<T>::value && std::is_same<T, ROSMessageType>::value>
  publish(std::unique_ptr<T, ROSMessageTypeDeleter> msg)
  {
    this->publish_(*msg);
  }

  template <typename T>
  typename std::enable_if_t<
    rosidl_generator_traits::is_message<T>::value && std::is_same<T, ROSMessageType>::value>
  publish(const T & msg)
  {
    this->publish_(msg);
  }

  template <typename T>
  typename std::enable_if_t<
    rclcpp::TypeAdapter<MessageT>::is_specialized::value && std::is_same<T, PublishedType>::value>
  publish(std::unique_ptr<T, PublishedTypeDeleter> msg)
  {
    this->publish_(*msg);
  }

  template <typename T>
  typename std::enable_if_t<
    rclcpp::TypeAdapter<MessageT>::is_specialized::value && std::is_same<T, PublishedType>::value>
  publish(const T & msg)
  {
    this->publish_(*msg);
  }

  void publish(const rcl_serialized_message_t & serialized_msg)
  {
    (void)serialized_msg;
    throw std::runtime_error{"Publisher::publish(rcl_serialized_message_t) is not implemented"};
  }

  void publish(const SerializedMessage & serialized_msg)
  {
    (void)serialized_msg;
    throw std::runtime_error{"Publisher::publish(SerializedMessage) is not implemented"};
  }

  void publish(rclcpp::LoanedMessage<ROSMessageType, AllocatorT> && loaned_msg)
  {
    if (!loaned_msg.is_valid()) {
      throw std::runtime_error("loaned message is not valid");
    }
    this->publish_(loaned_msg.get());
  }

  std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter> duplicate_ros_message_as_unique_ptr(
    const ROSMessageType & msg)
  {
    auto ptr = ROSMessageTypeAllocatorTraits::allocate(ros_message_type_allocator_, 1);
    ROSMessageTypeAllocatorTraits::construct(ros_message_type_allocator_, ptr, msg);
    return std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>(ptr, ros_message_type_deleter_);
  }

  const rclcpp::PublisherOptionsWithAllocator<AllocatorT> options_;

  PublishedTypeAllocator published_type_allocator_;
  PublishedTypeDeleter published_type_deleter_;
  ROSMessageTypeAllocator ros_message_type_allocator_;
  ROSMessageTypeDeleter ros_message_type_deleter_;

  BufferSharedPtr buffer_{nullptr};
};

}  // namespace rclcpp

namespace rtest
{

template <typename MessageT>
std::shared_ptr<PublisherMock<MessageT>> findPublisher(
  const std::string & fullyQualifiedNodeName,
  std::string topicName)
{
  if (topicName.empty()) {
    throw std::invalid_argument{"Topic name must not be empty"};
  }
  if (topicName.front() != '/') {
    topicName.insert(topicName.begin(), '/');
  }
  std::shared_ptr<PublisherMock<MessageT>> pub_mock{};
  auto pub_base =
    StaticMocksRegistry::instance().getPublisher(fullyQualifiedNodeName, topicName).lock();
  auto publisher = std::dynamic_pointer_cast<rclcpp::Publisher<MessageT>>(pub_base);
  if (publisher) {
    if (StaticMocksRegistry::instance().getMock(publisher.get()).lock()) {
      std::cerr << "rtest::findPublisher() WARNING: PublisherMock already attached "
                   "to the Publisher\n";
    } else {
      pub_mock = std::make_shared<PublisherMock<MessageT>>(publisher.get());
      StaticMocksRegistry::instance().attachMock(publisher.get(), pub_mock);
    }
  }
  return pub_mock;
}

template <typename MessageT, typename NodeT>
std::shared_ptr<PublisherMock<MessageT>> findPublisher(
  const std::shared_ptr<NodeT> nodePtr,
  const std::string & topicName)
{
  return findPublisher<MessageT>(nodePtr->get_fully_qualified_name(), topicName);
}

}  // namespace rtest