Study/[ROS2] ROS2 Basic

ROS2 with C++ - Quality of Service

soohwan_justin 2023. 8. 4. 00:31

f본 포스트는 ROS1에 대한 전반적인 내용 및 ROS2 Python에 대한 지식이 있다는 가정 하에 작성되었으며, The Construct Youtube 영상을 참고하였습니다.

또한, 시뮬레이션 환경은 turtlebot3 waffle_pi를 사용합니다 : https://github.com/ROBOTIS-GIT/turtlebot3_simulations

 

 

 

 

 

1. QoS란?

ROS1에서는 TCP를 사용해서 메시지를 전달했었습니다. 그러나 이 방식은 Wifi나 불안정한 시스템 같이 데이터가 유실될 수 있는 그런 네트워크에서 사용되는 것을 고려하지는 않았습니다. 

 

ROS2에서는 데이터 전송을 위해 UDP와 DDS를 사용합니다. 이를 통해 우리는 node의 신뢰도(reliability) 레벨을 결정할 수 있습니다.

 

ROS2에서 지원되는 모든 DDS들은 QoS를 세밀하게 설정할 수 있습니다. 예를 들어, publisher나 subscribers에서 설정값으로 적용할 수 있습니다.

 

그러나 publisher와 subscriber가 서로 통신하기 위해서는 QoS가 반드시 호환되어야 합니다.

 

 

 

 

2. QoS를 바꾸는 법과 호환성

1. 다음 명령으로 패키지를 만듭니다.

$ ros2 pkg create --build-type ament_cmake qos_tests_cpp_pkg --dependencies rclcpp std_msgs

 

2. subscriber, publisher 코드 작성

subscriber_custom_minimal_qos.cpp

#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"

using std::placeholders::_1;

class SubscriberQos : public rclcpp::Node {
public:
  SubscriberQos() : Node("subscriber_qos_obj") {
    rclcpp::QoS qos_profile(10);
    qos_profile.reliability(RMW_QOS_POLICY_RELIABILITY_RELIABLE);
    subscription_ = this->create_subscription<std_msgs::msg::String>(
        "qos_test", qos_profile,
        std::bind(&SubscriberQos::listener_callback, this, _1));
  }

private:
  void listener_callback(const std_msgs::msg::String::SharedPtr msg) {
    RCLCPP_INFO(this->get_logger(), "Data Received = '%s'", msg->data.c_str());
  }

  rclcpp::Subscription<std_msgs::msg::String>::SharedPtr subscription_;
};

int main(int argc, char *argv[]) {
  rclcpp::init(argc, argv);
  rclcpp::spin(std::make_shared<SubscriberQos>());
  rclcpp::shutdown();
  return 0;
}

위 코드에서 subscriber 객체를 만들 때, reliability=ReliabilityPolicy.RELIABLE 이라는 값을 넣어줬는데, 이는 subscriber가 publisher에서 보내는 모든 메시지를 다 받아들이도록 합니다.

 

 

publisher_custom_minimal_qos.cpp

#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"
#include <chrono>
#include <thread>

using namespace std::chrono_literals;
using std::placeholders::_1;

class PublisherQoS : public rclcpp::Node {
public:
  PublisherQoS(int &argc, char **argv) : Node("publisher_qos_obj") {

    reliability = argv[2];

    rclcpp::QoS qos_profile_publisher(10);

    if (reliability == "reliable") {
      qos_profile_publisher.reliability(RMW_QOS_POLICY_RELIABILITY_RELIABLE);
    } else {
      qos_profile_publisher.reliability(RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT);
    }

    rclcpp::PublisherOptions publisher_options;
    publisher_options.event_callbacks.incompatible_qos_callback =
        std::bind(&PublisherQoS::incompatible_qos_info_callback, this, _1);

    publisher_ = this->create_publisher<std_msgs::msg::String>(
        "/qos_test", qos_profile_publisher, publisher_options);

    timer_ = this->create_wall_timer(
        1000ms, std::bind(&PublisherQoS::timer_callback, this));

    msgs_id = 0;
  }

  void
  incompatible_qos_info_callback(rclcpp::QOSOfferedIncompatibleQoSInfo &event) {
    RCLCPP_ERROR(this->get_logger(),
                 "A subscriber is asking for an INCOMPATIBLE QoS Triggered!");
    RCLCPP_ERROR(
        this->get_logger(),
        "Offered incompatible qos - total %d delta %d last_policy_kind: %d",
        event.total_count, event.total_count_change, event.last_policy_kind);
  }

  void timer_callback() {
    auto msg = std_msgs::msg::String();
    auto current_time = this->now();
    auto current_time_s = current_time.seconds();
    auto current_time_ns = current_time.nanoseconds();
    time_str =
        std::to_string(current_time_s) + "," + std::to_string(current_time_ns);
    dds_msg_str = std::to_string(msgs_id) + ":" + time_str;
    msg.data = dds_msg_str;
    publisher_->publish(msg);
    RCLCPP_INFO(this->get_logger(), "Publishing: %s ", msg.data.c_str());
    ++msgs_id;
  }

private:
  std::string reliability;
  int msgs_id;
  std::string time_str;
  std::string dds_msg_str;
  rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_;
  rclcpp::TimerBase::SharedPtr timer_;
};

int main(int argc, char *argv[]) {
  rclcpp::init(argc, argv);
  rclcpp::spin(std::make_shared<PublisherQoS>(argc, argv));
  rclcpp::shutdown();
  return 0;
}

 

CMakeLists.txt

cmake_minimum_required(VERSION 3.8)
project(qos_tests_cpp_pkg)

if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
  add_compile_options(-Wall -Wextra -Wpedantic)
endif()

# find dependencies
find_package(ament_cmake REQUIRED)
find_package(rclcpp REQUIRED)
find_package(std_msgs REQUIRED)

if(BUILD_TESTING)
  find_package(ament_lint_auto REQUIRED)
  # the following line skips the linter which checks for copyrights
  # comment the line when a copyright and license is added to all source files
  set(ament_cmake_copyright_FOUND TRUE)
  # the following line skips cpplint (only works in a git repo)
  # comment the line when this package is in a git repo and when
  # a copyright and license is added to all source files
  set(ament_cmake_cpplint_FOUND TRUE)
  ament_lint_auto_find_test_dependencies()
endif()

add_executable(publisher_custom_minimal_qos_exe src/publisher_custom_minimal_qos.cpp)
ament_target_dependencies(publisher_custom_minimal_qos_exe rclcpp)

add_executable(subscriber_custom_minimal_qos_exe src/subscriber_custom_minimal_qos.cpp)
ament_target_dependencies(subscriber_custom_minimal_qos_exe rclcpp)

install(TARGETS
   publisher_custom_minimal_qos_exe
   subscriber_custom_minimal_qos_exe
   DESTINATION lib/${PROJECT_NAME}
)

ament_package()

 

빌드 후, 커맨드 창에 다음 명령으로 실행합니다.

$ export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp
$ ros2 run qos_tests_pkg publisher_custom_minimal_qos_exe -reliability reliable
$ ros2 run qos_tests_pkg subscriber_custom_minimal_qos_exe

첫 번째 명령은, 간단하게 비유하자면... 어떤 통신사를 사용할거냐 하는 것과 비슷하다고 보시면 됩니다. KT, SKT, LG 중에 고르는 셈입니다.

 

실행 결과는 다음과 같습니다

 

 

만약 publisher의 reliability를 바꿔서 다음과 같이 실행하면 에러가 뜹니다.

$ ros2 run qos_tests_pkg publisher_custom_minimal_qos_exe -reliability best_effort

 

즉, 호환되지 않는 QoS로 통신을 시도하는 경우에는

- publisher에서는 incompatible_qos가 트리거되며, incompatible_qos_clb 함수가 호출됩니다.

- subscriber에서는 incompatible QoS에 대한 경고가 나옵니다.

 

위 예제에서 QoS가 호환되지 않았던 이유는

- publisher Qos Reliability = Best_Effort

- subscriber Qos Reliability = Reliable

이었기 때문입니다.

 

그러면 호환되게 하려면 QoS를 어떤 값으로 설정해야 하나..?

 

다음 표는 QoS 호환성 관련 테이블 입니다.

 

일단 기본적으로, subscriber랑 publisher를 만들면 아래와 같은 상태의 default policies로 생성됩니다.

  • history = “keep last”
  • queue size = 10
  • reliability = reliable
  • durability = volatile
  • liveliness = system default.
  • Deadline, lifespan, and lease durations 은 매우 큰 숫자 = 9223372036854775807 nanoseconds 로 설정됩니다.

 

 

이번엔 각 policies를 바꿔가면서 테스트해보겠습니다.

 

 

Durability

Durability는 publish된 메시지가 영구적으로 남아있도록 할지의 여부를 결정합니다. 예를 들어, robot_description같은 전역 파라미터에 적용됩니다.

 

durability에는 2가지 옵션이 있는데,

- Transient local : publisher가 메시지를 publish하고 시간이 지난 뒤에도 subscribers가 해당 값을 읽을 수 있도록 메시지가 지속되게 합니다.

- Volatile : 메시지가 지속되도록 하기 위한 그 어떠한 것도 하지 않습니다.

 

subscriber_durability.cpp

#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"

using std::placeholders::_1;

class SubscriberQos : public rclcpp::Node {
public:
  SubscriberQos(int &argc, char **argv) : Node("subscriber_qos_obj") {

    durability = argv[2];
    rclcpp::QoS qos_profile_subscriber(10);
    if (durability == "transient_local") {
      qos_profile_subscriber.durability(
          RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL);
    } else {
      // Leave the one by default, which is VOLATILE
    }

    rclcpp::SubscriptionOptions subscription_options;
    subscription_options.event_callbacks.incompatible_qos_callback =
        std::bind(&SubscriberQos::incompatible_qos_info_callback, this, _1);

    subscription_ = this->create_subscription<std_msgs::msg::String>(
        "qos_test", qos_profile_subscriber,
        std::bind(&SubscriberQos::listener_callback, this, _1),
        subscription_options);
  }

private:
  void incompatible_qos_info_callback(
      rclcpp::QOSRequestedIncompatibleQoSInfo &event) {
    RCLCPP_ERROR(this->get_logger(),
                 "SUBSCRIBER::: INCOMPATIBLE QoS Triggered!");
    RCLCPP_INFO(
        this->get_logger(),
        "Requested incompatible qos - total %d delta %d last_policy_kind: %d",
        event.total_count, event.total_count_change, event.last_policy_kind);
  }

  void listener_callback(const std_msgs::msg::String::SharedPtr msg) {
    RCLCPP_INFO(this->get_logger(), "Data Received = '%s'", msg->data.c_str());
  }

  rclcpp::Subscription<std_msgs::msg::String>::SharedPtr subscription_;
  std::string durability;
};

int main(int argc, char *argv[]) {
  rclcpp::init(argc, argv);
  rclcpp::spin(std::make_shared<SubscriberQos>(argc, argv));
  rclcpp::shutdown();
  return 0;
}

 

publisher_durability.cpp

#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"
#include <chrono>
#include <thread>

using namespace std::chrono_literals;
using std::placeholders::_1;

class PublisherQoS : public rclcpp::Node {
public:
  PublisherQoS(int &argc, char **argv) : Node("publisher_qos_obj") {

    durability = argv[2];

    rclcpp::QoS qos_profile_publisher(10);

    if (durability == "transient_local") {
      qos_profile_publisher.durability(
          RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL);
    } else {
      // Leave the one by default, which is VOLATILE
    }

    rclcpp::PublisherOptions publisher_options;
    publisher_options.event_callbacks.incompatible_qos_callback =
        std::bind(&PublisherQoS::incompatible_qos_info_callback, this, _1);

    publisher_ = this->create_publisher<std_msgs::msg::String>(
        "/qos_test", qos_profile_publisher, publisher_options);

    timer_ = this->create_wall_timer(
        1000ms, std::bind(&PublisherQoS::publish_one_message, this));

    msgs_id = 0;
  }

  void
  incompatible_qos_info_callback(rclcpp::QOSOfferedIncompatibleQoSInfo &event) {
    RCLCPP_ERROR(this->get_logger(),
                 "A subscriber is asking for an INCOMPATIBLE QoS Triggered!");
    RCLCPP_ERROR(
        this->get_logger(),
        "Offered incompatible qos - total %d delta %d last_policy_kind: %d",
        event.total_count, event.total_count_change, event.last_policy_kind);
  }

  void publish_one_message() {
    this->timer_->cancel();
    auto msg = std_msgs::msg::String();
    auto current_time = this->now();
    auto current_time_s = current_time.seconds();
    auto current_time_ns = current_time.nanoseconds();
    time_str =
        std::to_string(current_time_s) + "," + std::to_string(current_time_ns);
    dds_msg_str = std::to_string(msgs_id) + ":" + time_str;
    msg.data = dds_msg_str;
    publisher_->publish(msg);
    RCLCPP_INFO(this->get_logger(), "Publishing: %s ", msg.data.c_str());
  }

private:
  std::string durability;
  int msgs_id;
  std::string time_str;
  std::string dds_msg_str;
  rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_;
  rclcpp::TimerBase::SharedPtr timer_;
};

int main(int argc, char *argv[]) {
  rclcpp::init(argc, argv);
  rclcpp::spin(std::make_shared<PublisherQoS>(argc, argv));
  rclcpp::shutdown();
  return 0;
}

 

CMakeLists.txt

cmake_minimum_required(VERSION 3.8)
project(qos_tests_cpp_pkg)

if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
  add_compile_options(-Wall -Wextra -Wpedantic)
endif()

# find dependencies
find_package(ament_cmake REQUIRED)
find_package(rclcpp REQUIRED)
find_package(std_msgs REQUIRED)

if(BUILD_TESTING)
  find_package(ament_lint_auto REQUIRED)
  # the following line skips the linter which checks for copyrights
  # comment the line when a copyright and license is added to all source files
  set(ament_cmake_copyright_FOUND TRUE)
  # the following line skips cpplint (only works in a git repo)
  # comment the line when this package is in a git repo and when
  # a copyright and license is added to all source files
  set(ament_cmake_cpplint_FOUND TRUE)
  ament_lint_auto_find_test_dependencies()
endif()

add_executable(publisher_custom_minimal_qos_exe src/publisher_custom_minimal_qos.cpp)
ament_target_dependencies(publisher_custom_minimal_qos_exe rclcpp std_msgs)

add_executable(subscriber_custom_minimal_qos_exe src/subscriber_custom_minimal_qos.cpp)
ament_target_dependencies(subscriber_custom_minimal_qos_exe rclcpp std_msgs)

add_executable(subscriber_durability_exe src/subscriber_durability.cpp)
ament_target_dependencies(subscriber_durability_exe rclcpp)

add_executable(publisher_durability_exe src/publisher_durability.cpp)
ament_target_dependencies(publisher_durability_exe rclcpp)

install(TARGETS
   publisher_custom_minimal_qos_exe
   subscriber_custom_minimal_qos_exe
   subscriber_durability_exe
   publisher_durability_exe
   DESTINATION lib/${PROJECT_NAME}
)

ament_package()

 

빌드 후, 실행해봅니다.

$ ros2 run qos_tests_cpp_pkg publisher_durability_exe -durability transient_local
$ ros2 run qos_tests_cpp_pkg subscriber_durability_exe -durability transient_local

위 명령으로 실행하면 transient_local이기 때문에 publisher를 실행하고, 시간이 좀 지나고 나서 subscriber를 실행해도 topic을 받을 수 있습니다.

 

 

이번엔 다음과 같이 실행해봅니다.

$ ros2 run qos_tests_cpp_pkg publisher_durability_exe -durability volatile
$ ros2 run qos_tests_cpp_pkg subscriber_durability_exe -durability volatile

이렇게 실행하면 아무 결과도 볼 수 없는데, volatile이기 때문에 publish하는 순간 미리 subscribe하고 있지 않으면 바로 사라집니다. 하지만, subscriber를 먼저 실행하면 메시지를 받을 수 있습니다.

 

아래 명령으로도 메시지를 확인할 수 있습니다.

$ ros2 topic echo --qos-durability transient_local --qos-reliability reliable /qos_test

 

Deadline

이 값은 하나의 메시지로부터 다음 메시지를 안정적으로 받을 수 있을 때 까지의 최대 시간입니다. cmd_vel이나 장애물 감지같이 정기적이어야 하고, 중요한 값들에 대한 명령에 유용하게 쓰입니다.

 

subscriber_deadline.cpp

#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"
#include <chrono>
#include <thread>

using namespace std::chrono_literals;
using std::placeholders::_1;

class PublisherQoS : public rclcpp::Node {
public:
  PublisherQoS(int &argc, char **argv) : Node("publisher_qos_obj") {

    deadline = std::stof(argv[2]);
    deadline_ms = deadline * 1000;
    rclcpp::QoS qos_profile_publisher(1);
    qos_profile_publisher.deadline(std::chrono::milliseconds(deadline_ms));

    rclcpp::PublisherOptions publisher_options;
    publisher_options.event_callbacks.incompatible_qos_callback =
        std::bind(&PublisherQoS::incompatible_qos_info_callback, this, _1);
    publisher_options.event_callbacks.deadline_callback =
        std::bind(&PublisherQoS::incompatible_deadline_callback, this);

    publisher_ = this->create_publisher<std_msgs::msg::String>(
        "/qos_test", qos_profile_publisher, publisher_options);

    timer_ = this->create_wall_timer(
        1000ms, std::bind(&PublisherQoS::timer_callback, this));

    msgs_id = 0;
    timer_period = 1.0;
    swap_state_time = 5.0;
    time_pause = 2.0;
    counter = 0;
  }

  void incompatible_deadline_callback() {
    RCLCPP_ERROR(this->get_logger(), "PUBLISHER:::  Deadline Triggered!");
  }

  void
  incompatible_qos_info_callback(rclcpp::QOSOfferedIncompatibleQoSInfo &event) {
    RCLCPP_ERROR(this->get_logger(),
                 "A subscriber is asking for an INCOMPATIBLE QoS Triggered!");
    RCLCPP_ERROR(
        this->get_logger(),
        "Offered incompatible qos - total %d delta %d last_policy_kind: %d",
        event.total_count, event.total_count_change, event.last_policy_kind);
  }

  void publish_one_message() {
    auto msg = std_msgs::msg::String();
    auto current_time = this->now();
    auto current_time_s = current_time.seconds();
    auto current_time_ns = current_time.nanoseconds();
    time_str =
        std::to_string(current_time_s) + "," + std::to_string(current_time_ns);
    dds_msg_str = std::to_string(msgs_id) + ":" + time_str;
    msg.data = dds_msg_str;
    publisher_->publish(msg);
    RCLCPP_INFO(this->get_logger(), "Publishing: %s ", msg.data.c_str());
  }

  void timer_callback() {
    if (counter > int(swap_state_time / timer_period)) {
      delta = 0.1;
      delta_ms = delta * 1000;
      range_steps = int(time_pause / delta);
      for (int i = 0; i < range_steps; i++) {
        std::this_thread::sleep_for(std::chrono::milliseconds(delta_ms));
        RCLCPP_INFO(this->get_logger(), "Paused = %f / %f ", (i * delta),
                    time_pause);
      }
      counter = 0;
    } else {
      publish_one_message();
      ++counter;
      RCLCPP_INFO(this->get_logger(), "Counter = %d", counter);
    }
  }

private:
  float deadline;
  float timer_period;
  float swap_state_time;
  float time_pause;
  float delta;
  int counter;
  int range_steps;
  int delta_ms;
  int deadline_ms;
  int msgs_id;
  std::string time_str;
  std::string dds_msg_str;
  rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_;
  rclcpp::TimerBase::SharedPtr timer_;
};

int main(int argc, char *argv[]) {
  rclcpp::init(argc, argv);
  rclcpp::spin(std::make_shared<PublisherQoS>(argc, argv));
  rclcpp::shutdown();
  return 0;
}

 

publisher_deadline.cpp

#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"
#include <chrono>
#include <thread>

using namespace std::chrono_literals;
using std::placeholders::_1;

class PublisherQoS : public rclcpp::Node {
public:
  PublisherQoS(int &argc, char **argv) : Node("publisher_qos_obj") {

    deadline = std::stof(argv[2]);
    deadline_ms = deadline * 1000;
    rclcpp::QoS qos_profile_publisher(1);
    qos_profile_publisher.deadline(std::chrono::milliseconds(deadline_ms));

    rclcpp::PublisherOptions publisher_options;
    publisher_options.event_callbacks.incompatible_qos_callback =
        std::bind(&PublisherQoS::incompatible_qos_info_callback, this, _1);
    publisher_options.event_callbacks.deadline_callback =
        std::bind(&PublisherQoS::incompatible_deadline_callback, this);

    publisher_ = this->create_publisher<std_msgs::msg::String>(
        "/qos_test", qos_profile_publisher, publisher_options);

    timer_ = this->create_wall_timer(
        1000ms, std::bind(&PublisherQoS::timer_callback, this));

    msgs_id = 0;
    timer_period = 1.0;
    swap_state_time = 5.0;
    time_pause = 2.0;
    counter = 0;
  }

  void incompatible_deadline_callback() {
    RCLCPP_ERROR(this->get_logger(), "PUBLISHER:::  Deadline Triggered!");
  }

  void
  incompatible_qos_info_callback(rclcpp::QOSOfferedIncompatibleQoSInfo &event) {
    RCLCPP_ERROR(this->get_logger(),
                 "A subscriber is asking for an INCOMPATIBLE QoS Triggered!");
    RCLCPP_ERROR(
        this->get_logger(),
        "Offered incompatible qos - total %d delta %d last_policy_kind: %d",
        event.total_count, event.total_count_change, event.last_policy_kind);
  }

  void publish_one_message() {
    auto msg = std_msgs::msg::String();
    auto current_time = this->now();
    auto current_time_s = current_time.seconds();
    auto current_time_ns = current_time.nanoseconds();
    time_str =
        std::to_string(current_time_s) + "," + std::to_string(current_time_ns);
    dds_msg_str = std::to_string(msgs_id) + ":" + time_str;
    msg.data = dds_msg_str;
    publisher_->publish(msg);
    RCLCPP_INFO(this->get_logger(), "Publishing: %s ", msg.data.c_str());
  }

  void timer_callback() {
    if (counter > int(swap_state_time / timer_period)) {
      delta = 0.1;
      delta_ms = delta * 1000;
      range_steps = int(time_pause / delta);
      for (int i = 0; i < range_steps; i++) {
        std::this_thread::sleep_for(std::chrono::milliseconds(delta_ms));
        RCLCPP_INFO(this->get_logger(), "Paused = %f / %f ", (i * delta),
                    time_pause);
      }
      counter = 0;
    } else {
      publish_one_message();
      ++counter;
      RCLCPP_INFO(this->get_logger(), "Counter = %d", counter);
    }
  }

private:
  float deadline;
  float timer_period;
  float swap_state_time;
  float time_pause;
  float delta;
  int counter;
  int range_steps;
  int delta_ms;
  int deadline_ms;
  int msgs_id;
  std::string time_str;
  std::string dds_msg_str;
  rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_;
  rclcpp::TimerBase::SharedPtr timer_;
};

int main(int argc, char *argv[]) {
  rclcpp::init(argc, argv);
  rclcpp::spin(std::make_shared<PublisherQoS>(argc, argv));
  rclcpp::shutdown();
  return 0;
}

 

CMakeLists.txt

cmake_minimum_required(VERSION 3.8)
project(qos_tests_cpp_pkg)

if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
  add_compile_options(-Wall -Wextra -Wpedantic)
endif()

# find dependencies
find_package(ament_cmake REQUIRED)
find_package(rclcpp REQUIRED)
find_package(std_msgs REQUIRED)

if(BUILD_TESTING)
  find_package(ament_lint_auto REQUIRED)
  # the following line skips the linter which checks for copyrights
  # comment the line when a copyright and license is added to all source files
  set(ament_cmake_copyright_FOUND TRUE)
  # the following line skips cpplint (only works in a git repo)
  # comment the line when this package is in a git repo and when
  # a copyright and license is added to all source files
  set(ament_cmake_cpplint_FOUND TRUE)
  ament_lint_auto_find_test_dependencies()
endif()

add_executable(publisher_custom_minimal_qos_exe src/publisher_custom_minimal_qos.cpp)
ament_target_dependencies(publisher_custom_minimal_qos_exe rclcpp std_msgs)

add_executable(subscriber_custom_minimal_qos_exe src/subscriber_custom_minimal_qos.cpp)
ament_target_dependencies(subscriber_custom_minimal_qos_exe rclcpp std_msgs)

add_executable(subscriber_durability_exe src/subscriber_durability.cpp)
ament_target_dependencies(subscriber_durability_exe rclcpp std_msgs)

add_executable(publisher_durability_exe src/publisher_durability.cpp)
ament_target_dependencies(publisher_durability_exe rclcpp std_msgs)

add_executable(subscriber_deadline_exe src/subscriber_deadline.cpp)
ament_target_dependencies(subscriber_deadline_exe rclcpp std_msgs)

add_executable(publisher_deadline_exe src/publisher_deadline.cpp)
ament_target_dependencies(publisher_deadline_exe rclcpp std_msgs)

install(TARGETS
   publisher_custom_minimal_qos_exe
   subscriber_custom_minimal_qos_exe
   subscriber_durability_exe
   publisher_durability_exe
   subscriber_deadline_exe
   publisher_deadline_exe
   DESTINATION lib/${PROJECT_NAME}
)

ament_package()

 

빌드 후, 실행해봅니다

$ ros2 run qos_tests_cpp_pkg publisher_deadline_exe -deadline 10.0
$ ros2 run qos_tests_cpp_pkg subscriber_deadline_exe -deadline 10.0

publisher가 일정 주기로 데이터를 publish하다가, 2초동안 데이터를 보내지 않습니다. 지금은 deadline이 10초로 되어있는 상태이기 때문에, 딱히 이상이 없습니다.

 

 

deadline 값을 바꿔서 실행해봅니다.

$ ros2 run qos_tests_cpp_pkg publisher_deadline_exe -deadline 0.99

콛드에서 1초 주기로 데이터를 publisher하게 했으므로, 매번 deadline callback이 호출됩니다.

근데 중간에 2초간 데이터 publish를 멈추는 동안에 callback도 멈추게 되는데, 이는 multi threading을 적용하지 않았기 때문입니다.

 

 

이번엔 subscriber가 제 시간 안에 메시지를 받지 못하는 경우를 확인해보겠습니다

$ ros2 run qos_tests_cpp_pkg publisher_deadline_exe -deadline 1.2
$ ros2 run qos_tests_cpp_pkg subscriber_deadline_exe -deadline 1.8

 

 

이번엔 호환되지 않는 QoS 값을 넣어봅니다. subscriber의 deadline이 publisher의 deadline 값 보다 작으면 호환되지 않습니다

$ ros2 run qos_tests_cpp_pkg publisher_deadline_exe -deadline 1.5
$ ros2 run qos_tests_cpp_pkg subscriber_deadline_exe -deadline 1.0

 

Lifespan

이 값은 duration으로 정해지며, 데이터가 publish되고 나서, 이 값을 받을 때 까지의 시간입니다. 즉, 이 시간이 지나고 나서는 오래된 데이터니 사용하지 않겠다는 용도로 사용됩니다. 일반적으로 가장 최근의 데이터만을 사용하기 위해 센서 데이터에 많이 적용됩니다.

 

먼저 알아둬야 할 점은, lifespan duration은 reliable, 그리고 transient_local일때만 쓸모가 있다는 것입니다. 만약 데이터를 받았다는 것을 보장할 수 없다면(reliable이 아니라면), 그리고 데이터가 들어올 때까지 기다리지 않는다면(transient_local) publish와 subscribe 사이의 간격을 측정할 수 없습니다.

 

subscriber_lifespan.cpp

#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"
#include <chrono>
#include <math.h>

using std::placeholders::_1;

class SubscriberQos : public rclcpp::Node {
public:
  SubscriberQos(int &argc, char **argv) : Node("subscriber_qos_obj") {

    lifespan = std::stof(argv[2]);
    lifespan_ms = lifespan * 1000;
    rclcpp::QoS qos_profile_subscriber(1);
    qos_profile_subscriber.lifespan(std::chrono::milliseconds(lifespan_ms));

    qos_profile_subscriber.reliability(RMW_QOS_POLICY_RELIABILITY_RELIABLE);
    qos_profile_subscriber.durability(
        RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL);

    rclcpp::SubscriptionOptions subscription_options;
    subscription_options.event_callbacks.incompatible_qos_callback =
        std::bind(&SubscriberQos::incompatible_qos_info_callback, this, _1);

    subscription_ = this->create_subscription<std_msgs::msg::String>(
        "qos_test", qos_profile_subscriber,
        std::bind(&SubscriberQos::listener_callback, this, _1),
        subscription_options);
  }

private:
  void incompatible_qos_info_callback(
      rclcpp::QOSRequestedIncompatibleQoSInfo &event) {
    RCLCPP_ERROR(this->get_logger(),
                 "SUBSCRIBER::: INCOMPATIBLE QoS Triggered!");
    RCLCPP_INFO(
        this->get_logger(),
        "Requested incompatible qos - total %d delta %d last_policy_kind: %d",
        event.total_count, event.total_count_change, event.last_policy_kind);
  }

  void listener_callback(const std_msgs::msg::String::SharedPtr msg) {
    // The data has the format Seconds,NanoSeconds

    raw_data = msg->data;
    RCLCPP_INFO(this->get_logger(), "Data Received = %s seconds",
                raw_data.c_str());
    delimiter = ",";
    last = 0;
    next = 0;
    while ((next = raw_data.find(delimiter, last)) != std::string::npos) {
      seconds = std::stod(raw_data.substr(last, next - last));
      last = next + 1;
    }
    nanoseconds = std::stod(raw_data.substr(last));
    RCLCPP_INFO(this->get_logger(), "SPLIT = [%lf,%lf]", seconds, nanoseconds);
    RCLCPP_INFO(this->get_logger(), "seconds = %lf, nseconds = %lf", seconds,
                nanoseconds);

    total_seconds = seconds + pow(nanoseconds, -9);
    RCLCPP_INFO(this->get_logger(), "total_seconds = %lf", total_seconds);

    rclcpp::Time time_now_obj = this->now();
    total_current_time =
        time_now_obj.seconds() + pow((time_now_obj.nanoseconds()), -9);
    RCLCPP_INFO(this->get_logger(), "total_current_time = %lf",
                total_current_time);
    delta = total_current_time - total_seconds;
    RCLCPP_INFO(this->get_logger(), "Message Age = %lf seconds", delta);
  }

  rclcpp::Subscription<std_msgs::msg::String>::SharedPtr subscription_;
  float lifespan;
  int lifespan_ms;
  double total_seconds;
  double total_current_time;
  double seconds;
  double nanoseconds;
  size_t last;
  size_t next;
  double delta;
  std::string raw_data;
  std::string delimiter;
};

int main(int argc, char *argv[]) {
  rclcpp::init(argc, argv);
  rclcpp::spin(std::make_shared<SubscriberQos>(argc, argv));
  rclcpp::shutdown();
  return 0;
}

 

publisher_lifespan.cpp

#include "rclcpp/rclcpp.hpp"
#include "rmw/types.h"
#include "std_msgs/msg/string.hpp"
#include <chrono>
#include <thread>

using namespace std::chrono_literals;
using std::placeholders::_1;

class PublisherQoS : public rclcpp::Node {
public:
  PublisherQoS(int &argc, char **argv) : Node("publisher_qos_obj") {

    lifespan = std::stof(argv[2]);
    lifespan_ms = lifespan * 1000;
    rclcpp::QoS qos_profile_publisher(1);
    qos_profile_publisher.lifespan(std::chrono::milliseconds(lifespan_ms));

    qos_profile_publisher.reliability(RMW_QOS_POLICY_RELIABILITY_RELIABLE);
    qos_profile_publisher.durability(RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL);

    rclcpp::PublisherOptions publisher_options;
    publisher_options.event_callbacks.incompatible_qos_callback =
        std::bind(&PublisherQoS::incompatible_qos_info_callback, this, _1);

    publisher_ = this->create_publisher<std_msgs::msg::String>(
        "/qos_test", qos_profile_publisher, publisher_options);

    timer_ = this->create_wall_timer(
        1000ms, std::bind(&PublisherQoS::publish_one_message, this));
  }

  void
  incompatible_qos_info_callback(rclcpp::QOSOfferedIncompatibleQoSInfo &event) {
    RCLCPP_ERROR(this->get_logger(),
                 "A subscriber is asking for an INCOMPATIBLE QoS Triggered!");
    RCLCPP_ERROR(
        this->get_logger(),
        "Offered incompatible qos - total %d delta %d last_policy_kind: %d",
        event.total_count, event.total_count_change, event.last_policy_kind);
  }

  void publish_one_message() {
    this->timer_->cancel();
    auto msg = std_msgs::msg::String();
    rclcpp::Time current_time = this->now();
    double seconds = current_time.seconds();
    double nanoseconds = current_time.nanoseconds();
    time_str = std::to_string(seconds) + "," + std::to_string(nanoseconds);
    msg.data = time_str;
    publisher_->publish(msg);
    RCLCPP_INFO(this->get_logger(), "Publishing: %s ", msg.data.c_str());
  }

private:
  float lifespan;
  int lifespan_ms;
  std::string time_str;
  rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_;
  rclcpp::TimerBase::SharedPtr timer_;
};

int main(int argc, char *argv[]) {
  rclcpp::init(argc, argv);
  rclcpp::spin(std::make_shared<PublisherQoS>(argc, argv));
  rclcpp::shutdown();
  return 0;
}

 

CMakeLists.txt

cmake_minimum_required(VERSION 3.8)
project(qos_tests_cpp_pkg)

if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
  add_compile_options(-Wall -Wextra -Wpedantic)
endif()

# find dependencies
find_package(ament_cmake REQUIRED)
find_package(rclcpp REQUIRED)
find_package(std_msgs REQUIRED)

if(BUILD_TESTING)
  find_package(ament_lint_auto REQUIRED)
  # the following line skips the linter which checks for copyrights
  # comment the line when a copyright and license is added to all source files
  set(ament_cmake_copyright_FOUND TRUE)
  # the following line skips cpplint (only works in a git repo)
  # comment the line when this package is in a git repo and when
  # a copyright and license is added to all source files
  set(ament_cmake_cpplint_FOUND TRUE)
  ament_lint_auto_find_test_dependencies()
endif()

add_executable(publisher_custom_minimal_qos_exe src/publisher_custom_minimal_qos.cpp)
ament_target_dependencies(publisher_custom_minimal_qos_exe rclcpp std_msgs)

add_executable(subscriber_custom_minimal_qos_exe src/subscriber_custom_minimal_qos.cpp)
ament_target_dependencies(subscriber_custom_minimal_qos_exe rclcpp std_msgs)

add_executable(subscriber_durability_exe src/subscriber_durability.cpp)
ament_target_dependencies(subscriber_durability_exe rclcpp std_msgs)

add_executable(publisher_durability_exe src/publisher_durability.cpp)
ament_target_dependencies(publisher_durability_exe rclcpp std_msgs)

add_executable(subscriber_deadline_exe src/subscriber_deadline.cpp)
ament_target_dependencies(subscriber_deadline_exe rclcpp std_msgs)

add_executable(publisher_deadline_exe src/publisher_deadline.cpp)
ament_target_dependencies(publisher_deadline_exe rclcpp std_msgs)

add_executable(subscriber_lifespan_exe src/subscriber_lifespan.cpp)
ament_target_dependencies(subscriber_lifespan_exe rclcpp std_msgs)

add_executable(publisher_lifespan_exe src/publisher_lifespan.cpp)
ament_target_dependencies(publisher_lifespan_exe rclcpp std_msgs)

install(TARGETS
   publisher_custom_minimal_qos_exe
   subscriber_custom_minimal_qos_exe
   subscriber_durability_exe
   publisher_durability_exe
   subscriber_deadline_exe
   publisher_deadline_exe
   subscriber_lifespan_exe
   publisher_lifespan_exe
   DESTINATION lib/${PROJECT_NAME}
)

ament_package()

 

빌드 후, 다음 명령으로 확인

$ ros2 run qos_tests_cpp_pkg publisher_lifespan_exe -lifespan 20.0
$ ros2 run qos_tests_cpp_pkg subscriber_lifespan_exe -lifespan 1.0

 

위와 같이 실행하면, publish된지 1초가 지나지 않은 데이터만 subscribe하게 됩니다. 즉, 미리 subscriber를 실행하고 pub하거나, pub 하고 나서 1초 안에 subscriber를 실행해야 합니다.

 

따라서, 만약 아래와 같이 극단적으로 lifespan을 줄이면 아무 데이터도 받을 수 없습니다. subscribe에서 데이터를 받고, Callback 을 호출하고... 시간 계산하고... 이런 과정을 저 시간 안에 처리할 수 없기 때문입니다.

$ ros2 run qos_tests_cpp_pkg publisher_lifespan_exe -lifespan 0.00001

 

 

 

Liveliness and LeaseDuration

Liveliness는 publisher node가 살아있는지 확인하기 위해 사용됩니다. 즉, 일정한 주기로 데이터가 publish되었는지를 확인합니다.

leaseDuration은 publisher가 자기가 살아있다는 보내는 어떤 메시지나 신호를 판단하기 위한 주기입니다.

 

subscriber_liveliness.cpp

#include "rclcpp/rclcpp.hpp"
#include "rmw/types.h"
#include "std_msgs/msg/string.hpp"
#include <chrono>
#include <math.h>

using std::placeholders::_1;

class SubscriberQos : public rclcpp::Node {
public:
  SubscriberQos(int &argc, char **argv) : Node("subscriber_qos_obj") {

    liveliness_lease_duration = std::stoi(argv[2]);
    rclcpp::QoS qos_profile_subscriber(1);
    qos_profile_subscriber.liveliness_lease_duration(
        std::chrono::milliseconds(liveliness_lease_duration));

    liveliness_policy = argv[4];
    if (liveliness_policy == "MANUAL_BY_TOPIC") {
      qos_profile_subscriber.liveliness(
          RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC);
    } else {
      qos_profile_subscriber.liveliness(RMW_QOS_POLICY_LIVELINESS_AUTOMATIC);
    }

    rclcpp::SubscriptionOptions subscription_options;
    subscription_options.event_callbacks.incompatible_qos_callback =
        std::bind(&SubscriberQos::incompatible_qos_info_callback, this, _1);
    subscription_options.event_callbacks.liveliness_callback =
        std::bind(&SubscriberQos::incompatible_liveliness_callback, this, _1);

    subscription_ = this->create_subscription<std_msgs::msg::String>(
        "qos_test", qos_profile_subscriber,
        std::bind(&SubscriberQos::listener_callback, this, _1),
        subscription_options);
  }

private:
  void
  incompatible_liveliness_callback(rclcpp::QOSLivelinessChangedInfo &event) {
    RCLCPP_ERROR(this->get_logger(), "SUBSCRIBER::: Liveliness Triggered!");
    RCLCPP_ERROR(this->get_logger(), "alive_count = %d", event.alive_count);
    RCLCPP_ERROR(this->get_logger(), "not_alive_count = %d",
                 event.not_alive_count);
    RCLCPP_ERROR(this->get_logger(), "alive_count_change = %d",
                 event.alive_count_change);
    RCLCPP_ERROR(this->get_logger(), "not_alive_count_change = %d",
                 event.not_alive_count_change);
    RCLCPP_ERROR(this->get_logger(), "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@");
  }

  void incompatible_qos_info_callback(
      rclcpp::QOSRequestedIncompatibleQoSInfo &event) {
    RCLCPP_ERROR(this->get_logger(),
                 "SUBSCRIBER::: INCOMPATIBLE QoS Triggered!");
    RCLCPP_INFO(
        this->get_logger(),
        "Requested incompatible qos - total %d delta %d last_policy_kind: %d",
        event.total_count, event.total_count_change, event.last_policy_kind);
  }

  void listener_callback(const std_msgs::msg::String::SharedPtr msg) {
    RCLCPP_INFO(this->get_logger(), "Data Received = '%s'", msg->data.c_str());
  }

  rclcpp::Subscription<std_msgs::msg::String>::SharedPtr subscription_;
  int liveliness_lease_duration;
  std::string liveliness_policy;
};

int main(int argc, char *argv[]) {
  rclcpp::init(argc, argv);
  rclcpp::spin(std::make_shared<SubscriberQos>(argc, argv));
  rclcpp::shutdown();
  return 0;
}

 

publisher_liveliness.cpp

#include "rclcpp/rclcpp.hpp"
#include "rmw/types.h"
#include "std_msgs/msg/string.hpp"
#include <chrono>
#include <thread>

using namespace std::chrono_literals;
using std::placeholders::_1;
using std::placeholders::_2;

class PublisherQoS : public rclcpp::Node {
public:
  PublisherQoS(int &argc, char **argv) : Node("publisher_qos_obj") {

    group_timer_publisher = this->create_callback_group(
        rclcpp::CallbackGroupType::MutuallyExclusive);
    group_alive_timer = this->create_callback_group(
        rclcpp::CallbackGroupType::MutuallyExclusive);
    group_events_clb = this->create_callback_group(
        rclcpp::CallbackGroupType::MutuallyExclusive);

    liveliness_lease_duration = std::stoi(argv[2]);
    rclcpp::QoS qos_profile_publisher(1);
    qos_profile_publisher.liveliness_lease_duration(
        std::chrono::milliseconds(liveliness_lease_duration));

    liveliness_policy = argv[10];
    if (liveliness_policy == "MANUAL_BY_TOPIC") {
      qos_profile_publisher.liveliness(
          RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC);
    } else {
      qos_profile_publisher.liveliness(RMW_QOS_POLICY_LIVELINESS_AUTOMATIC);
    }

    publish_period = std::stoi(argv[4]);
    topic_assert_period = std::stoi(argv[6]);
    publish_assert = argv[8];
    if (publish_assert == "yes") {
      publish_assert_bool = true;
    } else {
      publish_assert_bool = false;
    }

    rclcpp::PublisherOptions publisher_options;
    publisher_options.event_callbacks.incompatible_qos_callback =
        std::bind(&PublisherQoS::incompatible_qos_info_callback, this, _1);
    publisher_options.event_callbacks.liveliness_callback =
        std::bind(&PublisherQoS::incompatible_liveliness_callback, this);
    publisher_options.callback_group = group_events_clb;

    publisher_ = this->create_publisher<std_msgs::msg::String>(
        "/qos_test", qos_profile_publisher, publisher_options);

    swap_state_time = 5.0;
    time_pause = 5.0;
    counter = 0;

    timer1_ = this->create_wall_timer(
        std::chrono::milliseconds(publish_period),
        std::bind(&PublisherQoS::timer_callback, this), group_timer_publisher);
    timer2_ = this->create_wall_timer(
        std::chrono::milliseconds(topic_assert_period),
        std::bind(&PublisherQoS::alive_callback, this), group_alive_timer);
  }

  void incompatible_liveliness_callback() {
    RCLCPP_ERROR(this->get_logger(), "PUBLISHER::: Liveliness Triggered!");
  }

  void
  incompatible_qos_info_callback(rclcpp::QOSOfferedIncompatibleQoSInfo &event) {
    RCLCPP_ERROR(this->get_logger(),
                 "A subscriber is asking for an INCOMPATIBLE QoS Triggered!");
    RCLCPP_ERROR(
        this->get_logger(),
        "Offered incompatible qos - total %d delta %d last_policy_kind: %d",
        event.total_count, event.total_count_change, event.last_policy_kind);
  }

  void publish_one_message() {
    auto msg = std_msgs::msg::String();
    rclcpp::Time current_time = this->now();
    double seconds = current_time.seconds();
    double nanoseconds = current_time.nanoseconds();
    time_str = std::to_string(seconds) + "," + std::to_string(nanoseconds);
    msg.data = time_str;
    publisher_->publish(msg);
    RCLCPP_INFO(this->get_logger(), "Publishing: %s ", msg.data.c_str());
  }

  void timer_callback() {
    if (counter > int(swap_state_time / (publish_period / 1000))) {
      delta = 0.1;
      delta_ms = delta * 1000;
      range_steps = int(time_pause / delta);
      for (int i = 0; i < range_steps; i++) {
        std::this_thread::sleep_for(std::chrono::milliseconds(delta_ms));
        RCLCPP_INFO(this->get_logger(), "Paused = %f / %f ", (i * delta),
                    time_pause);
      }
      counter = 0;
    } else {
      publish_one_message();
      ++counter;
      RCLCPP_INFO(this->get_logger(), "Counter = %d", counter);
    }
  }

  void alive_callback() { i_am_alive(); }

  void i_am_alive() {
    if (publish_assert_bool) {
      if (publisher_->assert_liveliness()) {
        RCLCPP_INFO(this->get_logger(), "Publisher Alive...");
      } else {
        RCLCPP_INFO(this->get_logger(), "Publisher Dead...");
      }
    }
  }

private:
  int liveliness_lease_duration;
  std::string liveliness_policy;
  std::string publish_assert;
  bool publish_assert_bool;
  int publish_period;
  int topic_assert_period;
  float swap_state_time;
  float time_pause;
  float delta;
  int counter;
  int range_steps;
  int delta_ms;
  std::string time_str;
  rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_;
  rclcpp::TimerBase::SharedPtr timer1_;
  rclcpp::TimerBase::SharedPtr timer2_;
  rclcpp::CallbackGroup::SharedPtr group_timer_publisher;
  rclcpp::CallbackGroup::SharedPtr group_alive_timer;
  rclcpp::CallbackGroup::SharedPtr group_events_clb;
};

int main(int argc, char *argv[]) {
  rclcpp::init(argc, argv);
  std::shared_ptr<PublisherQoS> publisher_qos =
      std::make_shared<PublisherQoS>(argc, argv);
  rclcpp::executors::MultiThreadedExecutor executor(rclcpp::ExecutorOptions(),
                                                    3);
  executor.add_node(publisher_qos);
  executor.spin();
  rclcpp::shutdown();
  return 0;
}

 

CMakeLists.txt

cmake_minimum_required(VERSION 3.8)
project(qos_tests_cpp_pkg)

if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
  add_compile_options(-Wall -Wextra -Wpedantic)
endif()

# find dependencies
find_package(ament_cmake REQUIRED)
find_package(rclcpp REQUIRED)
find_package(std_msgs REQUIRED)

if(BUILD_TESTING)
  find_package(ament_lint_auto REQUIRED)
  # the following line skips the linter which checks for copyrights
  # comment the line when a copyright and license is added to all source files
  set(ament_cmake_copyright_FOUND TRUE)
  # the following line skips cpplint (only works in a git repo)
  # comment the line when this package is in a git repo and when
  # a copyright and license is added to all source files
  set(ament_cmake_cpplint_FOUND TRUE)
  ament_lint_auto_find_test_dependencies()
endif()

add_executable(publisher_custom_minimal_qos_exe src/publisher_custom_minimal_qos.cpp)
ament_target_dependencies(publisher_custom_minimal_qos_exe rclcpp std_msgs)

add_executable(subscriber_custom_minimal_qos_exe src/subscriber_custom_minimal_qos.cpp)
ament_target_dependencies(subscriber_custom_minimal_qos_exe rclcpp std_msgs)

add_executable(subscriber_durability_exe src/subscriber_durability.cpp)
ament_target_dependencies(subscriber_durability_exe rclcpp std_msgs)

add_executable(publisher_durability_exe src/publisher_durability.cpp)
ament_target_dependencies(publisher_durability_exe rclcpp std_msgs)

add_executable(subscriber_deadline_exe src/subscriber_deadline.cpp)
ament_target_dependencies(subscriber_deadline_exe rclcpp std_msgs)

add_executable(publisher_deadline_exe src/publisher_deadline.cpp)
ament_target_dependencies(publisher_deadline_exe rclcpp std_msgs)

add_executable(subscriber_lifespan_exe src/subscriber_lifespan.cpp)
ament_target_dependencies(subscriber_lifespan_exe rclcpp std_msgs)

add_executable(publisher_lifespan_exe src/publisher_lifespan.cpp)
ament_target_dependencies(publisher_lifespan_exe rclcpp std_msgs)

add_executable(publisher_liveliness_exe src/publisher_liveliness.cpp)
ament_target_dependencies(publisher_liveliness_exe rclcpp std_msgs)

add_executable(subscriber_liveliness_exe src/subscriber_liveliness.cpp)
ament_target_dependencies(subscriber_liveliness_exe rclcpp std_msgs)

install(TARGETS
   publisher_custom_minimal_qos_exe
   subscriber_custom_minimal_qos_exe
   subscriber_durability_exe
   publisher_durability_exe
   subscriber_deadline_exe
   publisher_deadline_exe
   subscriber_lifespan_exe
   publisher_lifespan_exe
   publisher_liveliness_exe
   subscriber_liveliness_exe
   DESTINATION lib/${PROJECT_NAME}
)

ament_package()

 

AUTOMATIC liveliness를 사용하는 경우

alive message는 publisher가 살아있는지 확인하는 용도의 메시지 입니다. 데이터용 메시지가 따로 있고, 생존신고용 메시지가 따로 있는 것이라고 보시면 됩니다.

 

이번 예시에서는 alive message를 publish하지 않고(--publish_assert no 옵션), 2초짜리 leaseDuration을 3초마다 publish합니다. 따라서 이론상으로는 항상 liveliness Callback error가 떠야 하는데, 막상 실행해보면 그렇지 않습니다.

$ ros2 run qos_tests_cpp_pkg publisher_liveliness_exe -liveliness_lease_duration 2000 -publish_period 3000 -topic_assert_period 3000 --publish_assert no --policy AUTOMATIC
$ ros2 run qos_tests_pkg subscriber_liveliness_exe -liveliness_lease_duration 3000 --policy AUTOMATIC

 

subscriber의 liveliness_lease_duratoin이 publisher의 값보다 작으면 QoS호환성 오류가 납니다

$ ros2 run qos_tests_pkg subscriber_liveliness_exe -liveliness_lease_duration 3000 --policy AUTOMATIC

 

이 예제에서 subscriber의 출력을 보면 아래와 같은 출력을 볼 수 있습니다.

publisher를 실행하는 경우
publisher를 종료하는 경우

 

결론적으로,

- LeaseDuration value는 MANUAL_BY_TOPIC을 사용할 때만 유효합니다.

- AUTOMATIC을 사용하는 경우, LeaseDuration이 사용되었다면 이는 QoS 호환성 검사를 위해 사용된 것입니다.

- liveliness Callback을 호출할 수 있는 또다른 경우는 subscriber가 동작하는 도중에 publisher node를 닫아버리는 것입니다.

 

 

MANUAL_BY_TOPIC을 사용하는 경우

publisher의 주기가 Alive message의 주기보다 빠른 경우

이번 예시에서는 Alive message의 주기를 LeaseDuration보다 길게 해볼 것입니다. 즉, 이 말은 항상 에러가 발생해야 한다는 것인데, 유일하게 발생하지 않는 순간은 일반 메시지가 publish되는 순간입니다.

일반 메시지는 생존신고 메시지보다 우선순위가 높기 때문에, 일반 메시지가 들어왔다면 굳이 생존신고 메시지를확인해서 이 녀석이 살아있는지 확인할 필요가 없습니다.

 

$ ros2 run qos_tests_cpp_pkg publisher_liveliness_exe -liveliness_lease_duration 2000 -publish_period 1000 -topic_assert_period 3000 --publish_assert yes --policy MANUAL_BY_TOPIC
$ ros2 run qos_tests_cpp_pkg subscriber_liveliness_exe -liveliness_lease_duration 3000 --policy AUTOMATIC

 

publisher의 주기보다 Alive message의 주기가 빠른 경우

이 경우에는 당연히 별 문제가 없습니다. publisher의 메시지가 들어오지 않아도, 생존 신고 메시지가 계속 들어오므로 해당 publisher가 죽지 않았다는 것을 알 수 있습니다.

$ ros2 run qos_tests_cpp_pkg publisher_liveliness_exe -liveliness_lease_duration 2000 -publish_period 4000 -topic_assert_period 1000 --publish_assert yes --policy MANUAL_BY_TOPIC
$ ros2 run qos_tests_cpp_pkg subscriber_liveliness_exe -liveliness_lease_duration 3000 --policy AUTOMATIC

 

publisher를 종료할 때, 다음과 같은 결과를 볼 수 있습니다.

 

 

 

 

3. QoS in ROS2 Bags

ROS2 Bags파일의 QoS를 오버라이드 하고 싶은 상황이 있을 수도 있습니다.

 

ROS Bags QoS 기록하기

1. topic 정보 확인

$ ros2 topic info /scan --verbose

 

다음 명령으로 scan topic 기록(원하는 디렉토리에서 작업하세요)

$ ros2 bag record /scan -o turtlebot3_scan_bag

 

기록이 잘 되었나 확인해보겠습니다. 이미 /scan이라는 topic이 시뮬레이션에서 나오고 있으므로, /scan2로 바꿔서 확인해봅니다

$ ros2 bag play --remap /scan:=/scan2 -l turtlebot3_scan_bag
$ ros2 topic info /scan2 --verbose

 

 

bag파일의 QoS 바꾸기

다음과 같이 yaml파일을 하나 만듭니다. 아까 ros2 bag record 했던 디렉토리에 만드시면 됩니다.

qos_override.yaml

# qos_override.yaml
/scan:
  reliability: reliable
  history: keep_all
  deadline:
    sec: 1
    nsec: 100

즉, 이전과 달라지는 점은

  • reliability -> RELIABLE
  • history -> KEEP_ALL
  • deadline -> 1 second and 100 nanoseconds

 

이제 저 yaml파일을 사용해서 바뀐 QoS를 확인해봅니다

$ ros2 bag play --remap /scan:=/scan2 --qos-profile-overrides-path qos_override.yaml -l turtlebot3_scan_bag
$ ros2 topic info /scan2 --verbose

 

 

rviz에서도 확인해봅니다.