Program Listing for File client_base.hpp
↰ Return to documentation for file (include/jazzy/rtest/client_base.hpp)
// Copyright 2025 Spyrosoft Limited.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// @file client_base.hpp
// @author Mariusz Szczepanik (mua@spyro-soft.com)
// @date 2025-05-28
#pragma once
#include <atomic>
#include <functional>
#include <future>
#include <memory>
#include <mutex>
#include <optional>
#include <sstream>
#include <string>
#include <tuple>
#include <unordered_map>
#include <utility>
#include <variant>
#include <vector>
#include "rcl/client.h"
#include "rcl/error_handling.h"
#include "rcl/event_callback.h"
#include "rcl/service_introspection.h"
#include "rcl/wait.h"
#include "rclcpp/clock.hpp"
#include "rclcpp/detail/cpp_callback_trampoline.hpp"
#include "rclcpp/exceptions.hpp"
#include "rclcpp/expand_topic_or_service_name.hpp"
#include "rclcpp/function_traits.hpp"
#include "rclcpp/logging.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/node_interfaces/node_graph_interface.hpp"
#include "rclcpp/qos.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/utilities.hpp"
#include "rclcpp/visibility_control.hpp"
#include "rmw/error_handling.h"
#include "rmw/impl/cpp/demangle.hpp"
#include "rmw/rmw.h"
namespace rclcpp
{
namespace detail
{
template <typename FutureT>
struct FutureAndRequestId
{
FutureT future;
int64_t request_id;
FutureAndRequestId(FutureT impl, int64_t req_id) : future(std::move(impl)), request_id(req_id) {}
operator FutureT &() { return this->future; }
// delegate future like methods in the std::future impl_
auto get() { return this->future.get(); }
bool valid() const noexcept { return this->future.valid(); }
void wait() const { return this->future.wait(); }
template <class Rep, class Period>
std::future_status wait_for(const std::chrono::duration<Rep, Period> & timeout_duration) const
{
return this->future.wait_for(timeout_duration);
}
template <class Clock, class Duration>
std::future_status wait_until(const std::chrono::time_point<Clock, Duration> & timeout_time) const
{
return this->future.wait_until(timeout_time);
}
// Rule of five, we could use the rule of zero here, but better be explicit as some of the
// methods are deleted.
FutureAndRequestId(FutureAndRequestId && other) noexcept = default;
FutureAndRequestId(const FutureAndRequestId & other) = delete;
FutureAndRequestId & operator=(FutureAndRequestId && other) noexcept = default;
FutureAndRequestId & operator=(const FutureAndRequestId & other) = delete;
~FutureAndRequestId() = default;
};
template <typename PendingRequestsT, typename AllocatorT = std::allocator<int64_t>>
size_t prune_requests_older_than_impl(
PendingRequestsT & pending_requests,
std::mutex & pending_requests_mutex,
std::chrono::time_point<std::chrono::system_clock> time_point,
std::vector<int64_t, AllocatorT> * pruned_requests = nullptr)
{
std::lock_guard guard(pending_requests_mutex);
auto old_size = pending_requests.size();
for (auto it = pending_requests.begin(), last = pending_requests.end(); it != last;) {
if (it->second.first < time_point) {
if (pruned_requests) {
pruned_requests->push_back(it->first);
}
it = pending_requests.erase(it);
} else {
++it;
}
}
return old_size - pending_requests.size();
}
} // namespace detail
namespace node_interfaces
{
class NodeBaseInterface;
} // namespace node_interfaces
class ClientBase
{
public:
RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(ClientBase)
RCLCPP_PUBLIC
ClientBase(
rclcpp::node_interfaces::NodeBaseInterface * node_base,
rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph);
RCLCPP_PUBLIC
virtual ~ClientBase() = default;
RCLCPP_PUBLIC
bool take_type_erased_response(void * response_out, rmw_request_id_t & request_header_out);
RCLCPP_PUBLIC
const char * get_service_name() const;
RCLCPP_PUBLIC
std::shared_ptr<rcl_client_t> get_client_handle();
RCLCPP_PUBLIC
std::shared_ptr<const rcl_client_t> get_client_handle() const;
RCLCPP_PUBLIC
bool service_is_ready() const;
template <typename RepT = int64_t, typename RatioT = std::milli>
bool wait_for_service(
std::chrono::duration<RepT, RatioT> timeout = std::chrono::duration<RepT, RatioT>(-1))
{
return wait_for_service_nanoseconds(
std::chrono::duration_cast<std::chrono::nanoseconds>(timeout));
}
virtual std::shared_ptr<void> create_response() = 0;
virtual std::shared_ptr<rmw_request_id_t> create_request_header() = 0;
virtual void handle_response(
std::shared_ptr<rmw_request_id_t> request_header,
std::shared_ptr<void> response) = 0;
RCLCPP_PUBLIC
bool exchange_in_use_by_wait_set_state(bool in_use_state);
RCLCPP_PUBLIC
rclcpp::QoS get_request_publisher_actual_qos() const;
RCLCPP_PUBLIC
rclcpp::QoS get_response_subscription_actual_qos() const;
void set_on_new_response_callback(std::function<void(size_t)> callback)
{
if (!callback) {
throw std::invalid_argument(
"The callback passed to set_on_new_response_callback "
"is not callable.");
}
auto new_callback = [callback, this](size_t number_of_responses) {
try {
callback(number_of_responses);
} catch (const std::exception & exception) {
RCLCPP_ERROR_STREAM(
node_logger_,
"rclcpp::ClientBase@"
<< this << " caught " << rmw::impl::cpp::demangle(exception)
<< " exception in user-provided callback for the 'on new response' callback: "
<< exception.what());
} catch (...) {
RCLCPP_ERROR_STREAM(
node_logger_,
"rclcpp::ClientBase@" << this << " caught unhandled exception in user-provided callback "
<< "for the 'on new response' callback");
}
};
std::lock_guard<std::recursive_mutex> lock(callback_mutex_);
// Set it temporarily to the new callback, while we replace the old one.
// This two-step setting, prevents a gap where the old std::function has
// been replaced but the middleware hasn't been told about the new one yet.
set_on_new_response_callback(
rclcpp::detail::cpp_callback_trampoline<decltype(new_callback), const void *, size_t>,
static_cast<const void *>(&new_callback));
// Store the std::function to keep it in scope, also overwrites the existing one.
on_new_response_callback_ = new_callback;
// Set it again, now using the permanent storage.
set_on_new_response_callback(
rclcpp::detail::
cpp_callback_trampoline<decltype(on_new_response_callback_), const void *, size_t>,
static_cast<const void *>(&on_new_response_callback_));
}
void clear_on_new_response_callback()
{
std::lock_guard<std::recursive_mutex> lock(callback_mutex_);
if (on_new_response_callback_) {
set_on_new_response_callback(nullptr, nullptr);
on_new_response_callback_ = nullptr;
}
}
protected:
RCLCPP_DISABLE_COPY(ClientBase)
RCLCPP_PUBLIC
bool wait_for_service_nanoseconds(std::chrono::nanoseconds timeout);
RCLCPP_PUBLIC
rcl_node_t * get_rcl_node_handle();
RCLCPP_PUBLIC
const rcl_node_t * get_rcl_node_handle() const;
RCLCPP_PUBLIC
void set_on_new_response_callback(rcl_event_callback_t callback, const void * user_data);
rclcpp::node_interfaces::NodeGraphInterface::WeakPtr node_graph_;
std::shared_ptr<rcl_node_t> node_handle_;
std::shared_ptr<rclcpp::Context> context_;
rclcpp::Logger node_logger_;
std::recursive_mutex callback_mutex_;
// It is important to declare on_new_response_callback_ before
// client_handle_, so on destruction the client is
// destroyed first. Otherwise, the rmw client callback
// would point briefly to a destroyed function.
std::function<void(size_t)> on_new_response_callback_{nullptr};
// Declare client_handle_ after callback
std::shared_ptr<rcl_client_t> client_handle_;
std::atomic<bool> in_use_by_wait_set_{false};
};
template <typename ServiceT>
struct ClientTypes
{
using Request = typename ServiceT::Request;
using Response = typename ServiceT::Response;
using SharedRequest = typename ServiceT::Request::SharedPtr;
using SharedResponse = typename ServiceT::Response::SharedPtr;
using Future = std::future<SharedResponse>;
using SharedFuture = std::shared_future<SharedResponse>;
using SharedFutureWithRequest = std::shared_future<std::pair<SharedRequest, SharedResponse>>;
using FutureResponseAndId = struct FutureAndRequestId : detail::FutureAndRequestId<Future>
{
using detail::FutureAndRequestId<Future>::FutureAndRequestId;
SharedFuture share() noexcept { return this->future.share(); }
};
using SharedFutureResponseAndId =
struct SharedFutureAndRequestId : detail::FutureAndRequestId<SharedFuture>
{
using detail::FutureAndRequestId<SharedFuture>::FutureAndRequestId;
};
using SharedFutureWithRequestAndId =
struct SharedFutureWithRequestAndRequestId : detail::FutureAndRequestId<SharedFutureWithRequest>
{
using detail::FutureAndRequestId<SharedFutureWithRequest>::FutureAndRequestId;
};
using CallbackType = std::function<void(SharedFuture)>;
using CallbackWithRequestType = std::function<void(SharedFutureWithRequest)>;
};
} // namespace rclcpp