Program Listing for File client_base.hpp

Return to documentation for file (include/lyrical/rtest/client_base.hpp)

// Copyright 2026 Spyrosoft Limited.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// @file      client_base.hpp
// @author    Mariusz Szczepanik (mua@spyro-soft.com)
// @date      2026-05-19

#pragma once

#include <atomic>
#include <functional>
#include <future>
#include <memory>
#include <mutex>
#include <optional>
#include <sstream>
#include <string>
#include <tuple>
#include <unordered_map>
#include <utility>
#include <variant>
#include <vector>

#include "rcl/client.h"
#include "rcl/error_handling.h"
#include "rcl/event_callback.h"
#include "rcl/service_introspection.h"
#include "rcl/wait.h"

#include "rclcpp/clock.hpp"
#include "rclcpp/detail/cpp_callback_trampoline.hpp"
#include "rclcpp/exceptions.hpp"
#include "rclcpp/expand_topic_or_service_name.hpp"
#include "rclcpp/function_traits.hpp"
#include "rclcpp/logging.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/node_interfaces/node_graph_interface.hpp"
#include "rclcpp/qos.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/utilities.hpp"
#include "rclcpp/visibility_control.hpp"

#include "rmw/error_handling.h"
#include "rmw/impl/cpp/demangle.hpp"
#include "rmw/rmw.h"

namespace rclcpp
{

namespace detail
{
template <typename FutureT>
struct FutureAndRequestId
{
  FutureT future;
  int64_t request_id;

  FutureAndRequestId(FutureT impl, int64_t req_id) : future(std::move(impl)), request_id(req_id) {}

  operator FutureT &() { return this->future; }

  // delegate future like methods in the std::future impl_

  auto get() { return this->future.get(); }
  bool valid() const noexcept { return this->future.valid(); }
  void wait() const { return this->future.wait(); }
  template <class Rep, class Period>
  std::future_status wait_for(const std::chrono::duration<Rep, Period> & timeout_duration) const
  {
    return this->future.wait_for(timeout_duration);
  }
  template <class Clock, class Duration>
  std::future_status wait_until(const std::chrono::time_point<Clock, Duration> & timeout_time) const
  {
    return this->future.wait_until(timeout_time);
  }

  // Rule of five, we could use the rule of zero here, but better be explicit as some of the
  // methods are deleted.

  FutureAndRequestId(FutureAndRequestId && other) noexcept = default;
  FutureAndRequestId(const FutureAndRequestId & other) = delete;
  FutureAndRequestId & operator=(FutureAndRequestId && other) noexcept = default;
  FutureAndRequestId & operator=(const FutureAndRequestId & other) = delete;
  ~FutureAndRequestId() = default;
};

template <typename PendingRequestsT, typename AllocatorT = std::allocator<int64_t>>
size_t prune_requests_older_than_impl(
  PendingRequestsT & pending_requests,
  std::mutex & pending_requests_mutex,
  std::chrono::time_point<std::chrono::system_clock> time_point,
  std::vector<int64_t, AllocatorT> * pruned_requests = nullptr)
{
  std::lock_guard guard(pending_requests_mutex);
  auto old_size = pending_requests.size();
  for (auto it = pending_requests.begin(), last = pending_requests.end(); it != last;) {
    if (it->second.first < time_point) {
      if (pruned_requests) {
        pruned_requests->push_back(it->first);
      }
      it = pending_requests.erase(it);
    } else {
      ++it;
    }
  }
  return old_size - pending_requests.size();
}
}  // namespace detail

namespace node_interfaces
{
class NodeBaseInterface;
}  // namespace node_interfaces

class ClientBase
{
public:
  RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(ClientBase)

  RCLCPP_PUBLIC
  ClientBase(
    rclcpp::node_interfaces::NodeBaseInterface * node_base,
    const rclcpp::node_interfaces::NodeGraphInterface::SharedPtr & node_graph);

  RCLCPP_PUBLIC
  virtual ~ClientBase() = default;


  RCLCPP_PUBLIC
  bool take_type_erased_response(void * response_out, rmw_request_id_t & request_header_out);


  RCLCPP_PUBLIC
  const char * get_service_name() const;


  RCLCPP_PUBLIC
  std::shared_ptr<rcl_client_t> get_client_handle();


  RCLCPP_PUBLIC
  std::shared_ptr<const rcl_client_t> get_client_handle() const;


  RCLCPP_PUBLIC
  bool service_is_ready() const;


  template <typename RepT = int64_t, typename RatioT = std::milli>
  bool wait_for_service(
    std::chrono::duration<RepT, RatioT> timeout = std::chrono::duration<RepT, RatioT>(-1))
  {
    return wait_for_service_nanoseconds(
      std::chrono::duration_cast<std::chrono::nanoseconds>(timeout));
  }

  virtual std::shared_ptr<void> create_response() = 0;
  virtual std::shared_ptr<rmw_request_id_t> create_request_header() = 0;
  virtual void handle_response(
    const std::shared_ptr<rmw_request_id_t> & request_header,
    const std::shared_ptr<void> & response) = 0;


  RCLCPP_PUBLIC
  bool exchange_in_use_by_wait_set_state(bool in_use_state);


  RCLCPP_PUBLIC
  rclcpp::QoS get_request_publisher_actual_qos() const;


  RCLCPP_PUBLIC
  rclcpp::QoS get_response_subscription_actual_qos() const;


  void set_on_new_response_callback(std::function<void(size_t)> callback)
  {
    if (!callback) {
      throw std::invalid_argument(
        "The callback passed to set_on_new_response_callback "
        "is not callable.");
    }

    auto new_callback = [callback, this](size_t number_of_responses) {
      try {
        callback(number_of_responses);
      } catch (const std::exception & exception) {
        RCLCPP_ERROR_STREAM(
          node_logger_,
          "rclcpp::ClientBase@"
            << this << " caught " << rmw::impl::cpp::demangle(exception)
            << " exception in user-provided callback for the 'on new response' callback: "
            << exception.what());
      } catch (...) {
        RCLCPP_ERROR_STREAM(
          node_logger_,
          "rclcpp::ClientBase@" << this << " caught unhandled exception in user-provided callback "
                                << "for the 'on new response' callback");
      }
    };

    std::lock_guard<std::recursive_mutex> lock(callback_mutex_);

    // Set it temporarily to the new callback, while we replace the old one.
    // This two-step setting, prevents a gap where the old std::function has
    // been replaced but the middleware hasn't been told about the new one yet.
    set_on_new_response_callback(
      rclcpp::detail::cpp_callback_trampoline<decltype(new_callback), const void *, size_t>,
      static_cast<const void *>(&new_callback));

    // Store the std::function to keep it in scope, also overwrites the existing one.
    on_new_response_callback_ = new_callback;

    // Set it again, now using the permanent storage.
    set_on_new_response_callback(
      rclcpp::detail::
        cpp_callback_trampoline<decltype(on_new_response_callback_), const void *, size_t>,
      static_cast<const void *>(&on_new_response_callback_));
  }

  void clear_on_new_response_callback()
  {
    std::lock_guard<std::recursive_mutex> lock(callback_mutex_);
    if (on_new_response_callback_) {
      set_on_new_response_callback(nullptr, nullptr);
      on_new_response_callback_ = nullptr;
    }
  }

protected:
  RCLCPP_DISABLE_COPY(ClientBase)

  RCLCPP_PUBLIC
  bool wait_for_service_nanoseconds(std::chrono::nanoseconds timeout);

  RCLCPP_PUBLIC
  rcl_node_t * get_rcl_node_handle();

  RCLCPP_PUBLIC
  const rcl_node_t * get_rcl_node_handle() const;

  RCLCPP_PUBLIC
  void set_on_new_response_callback(rcl_event_callback_t callback, const void * user_data);

  rclcpp::node_interfaces::NodeGraphInterface::WeakPtr node_graph_;
  std::shared_ptr<rcl_node_t> node_handle_;
  std::shared_ptr<rclcpp::Context> context_;
  rclcpp::Logger node_logger_;

  std::recursive_mutex callback_mutex_;
  // It is important to declare on_new_response_callback_ before
  // client_handle_, so on destruction the client is
  // destroyed first. Otherwise, the rmw client callback
  // would point briefly to a destroyed function.
  std::function<void(size_t)> on_new_response_callback_{nullptr};
  // Declare client_handle_ after callback
  std::shared_ptr<rcl_client_t> client_handle_;

  std::atomic<bool> in_use_by_wait_set_{false};
};

template <typename ServiceT>
struct ClientTypes
{
  using Request = typename ServiceT::Request;
  using Response = typename ServiceT::Response;
  using SharedRequest = typename ServiceT::Request::SharedPtr;
  using SharedResponse = typename ServiceT::Response::SharedPtr;

  using Future = std::future<SharedResponse>;
  using SharedFuture = std::shared_future<SharedResponse>;
  using SharedFutureWithRequest = std::shared_future<std::pair<SharedRequest, SharedResponse>>;

  using FutureResponseAndId = struct FutureAndRequestId : detail::FutureAndRequestId<Future>
  {
    using detail::FutureAndRequestId<Future>::FutureAndRequestId;
    SharedFuture share() noexcept { return this->future.share(); }
  };

  using SharedFutureResponseAndId =
    struct SharedFutureAndRequestId : detail::FutureAndRequestId<SharedFuture>
  {
    using detail::FutureAndRequestId<SharedFuture>::FutureAndRequestId;
  };

  using SharedFutureWithRequestAndId =
    struct SharedFutureWithRequestAndRequestId : detail::FutureAndRequestId<SharedFutureWithRequest>
  {
    using detail::FutureAndRequestId<SharedFutureWithRequest>::FutureAndRequestId;
  };

  using CallbackType = std::function<void(SharedFuture)>;
  using CallbackWithRequestType = std::function<void(SharedFutureWithRequest)>;
};

}  // namespace rclcpp