Study/[ROS2] ROS2 Basic

ROS2 with python - Understanding ROS2 Services

soohwan_justin 2023. 7. 27. 23:46

본 포스트는 ROS1에 대한 전반적인 지식이 있다는 가정 하에 작성되었으며, The Construct Youtube 영상을 참고하였습니다.

또한, 시뮬레이션 환경은 turtlebot3 waffle_pi를 사용합니다 : https://github.com/ROBOTIS-GIT/turtlebot3_simulations

 

 

 

https://docs.ros.org/en/humble/Tutorials/Beginner-CLI-Tools/Understanding-ROS2-Services/Understanding-ROS2-Services.html

 

이전 포스트에서도 보셨다시피, ROS2 Topic이나 ROS1 Topic이나 별 차이가 없었습니다. Service 또한 마찬가지입니다. 따라서 내용 설명은 생략하고, 바로 예제로 들어가겠습니다.

 

 

 

1. Service Client 만들기

1. client_pkg 라는 이름으로 간단한 서비스 클라이언트 노드를 만들 패키지를 생성합니다.

$ cd ~/ros2_ws/src
$ ros2 pkg create client_pkg --build-type ament_python --dependencies rclpy std_srvs

 

2. service_client.py 파일을 작성합니다

service_client.py

from std_srvs.srv import Empty
import rclpy
from rclpy.node import Node


class ClientAsync(Node):

    def __init__(self):
        super().__init__('service_client')
        self.client = self.create_client(Empty, 'reset_world')
        while not self.client.wait_for_service(timeout_sec=1.0):
            self.get_logger().info('service not available, waiting again...')
        
        self.req = Empty.Request()
        

    def send_request(self):
        self.future = self.client.call_async(self.req)


def main(args=None):
    rclpy.init(args=args)
    client = ClientAsync()
    client.send_request()

    while rclpy.ok():
        rclpy.spin_once(client)
        if client.future.done():
            try:
                response = client.future.result()
            except Exception as e:
                client.get_logger().info(f'Service call failed {e}')
            else:
                client.get_logger().info('Succeeded to get a service' ) 
            break

    client.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

 

 

3. launch 파일을 만듭니다.

service_client_launch_file.launch.py

from launch import LaunchDescription
from launch_ros.actions import Node

def generate_launch_description():
    return LaunchDescription([
        Node(
            package='client_pkg',
            executable='service_client',
            output='screen'),
    ])

 

 

4. setup.py 파일을 수정합니다.

setup.py

from setuptools import setup
import os
from glob import glob

package_name = 'client_pkg'

setup(
    name=package_name,
    version='0.0.0',
    packages=[package_name],
    data_files=[
        ('share/ament_index/resource_index/packages',
            ['resource/' + package_name]),
        ('share/' + package_name, ['package.xml']),
        (os.path.join('share', package_name), glob('launch/*.launch.py'))
    ],
    install_requires=['setuptools'],
    zip_safe=True,
    maintainer='user',
    maintainer_email='user@todo.todo',
    description='TODO: Package description',
    license='TODO: License declaration',
    tests_require=['pytest'],
    entry_points={
        'console_scripts': [
        'service_client = client_pkg.service_client:main',
        ],
    },
)

 

5. 컴파일 후, 실행합니다.(turtlebot3 gazebo 시뮬레이션이 켜진 상태에서 진행합니다)

$ cb
$ sb
$ ros2 launch client_pkg service_client_launch_file.launch.py

 

위 명령을 실행하면 가제보 상에서 로봇의 위치가 초기 위치로 돌아가며, 파이썬 코드에서 작성했던 메시지가 보입니다.

 

 

2. 동기 vs 비동기 서비스 클라이언트

ROS1에서는 Service는 Client가 request를 보내면 Sever에서 response를 받을 때까지 아무것도 안하고 그냥 대기 상태가 됩니다. 그러나 ROS2에서는 비동기 방식을 사용하면 응답이 올 때까지 해당 node에서 다른 작업을 수행할 수 있습니다(예를 들어, 로그 메시지를 보낸다든지...)

 

즉, Service Client는 Service Server node에 쓰레드를 호출하여 request를 보냅니다. 아래의 코드 예시를 보겠습니다.

rclpy.init(args=args)
# 비동기 방식 client 객체 생성
client = ClientAsync()
# request 보내기
client.send_request()
while rclpy.ok():
        rclpy.spin_once(client)
        if client.future.done():
            try:
                # Service로부터 오는 응답 데이터를 future로 확인
                # 만약 response가 오면, 해당 데이터 저장
                response = client.future.result()
            except Exception as e:
                client.get_logger().info(f'Service call failed {e}')
            else:
                client.get_logger().info('Pretty message') 
            break
            
client.destroy_node()
rclpy.shutdown()

 

비동기 방식으로 Service에다가 request를 보내면, 그 즉시 "future"를 리턴합니다. 이 값을 확인하면 response가 들어왔는지의 여부를 확인할 수 있습니다.

 

동기 방식은 ROS1과 같다고 보시면 되겠습니다. response가 오기 전까지 node에서 아무것도 하지 않겠다는 의도가 있으면 동기식, 그게 아니면 비동기식을 사용하면 됩니다.

 

아래는 동기 방식 Client 예제 코드입니다.

rclpy.init(args=args)
client = ClientSync()
# 통신 쓰레드 시작
spin_thread = Thread(target=rclpy.spin, args=(client,))
spin_thread.start()
response = client.send_request()
client.get_logger().info('Pretty message') 

minimal_client.destroy_node()
rclpy.shutdown()

 

 

3. Service Server 만들기

이번에는 Service를 받아서 로봇을 움직이거나 멈추도록 하는 Service Server를 만들어보겠습니다. std_srvs/srv/SetBool을 사용할 것이며, True를 보내면 로봇이 움직이고, False를 보내면 로봇이 멈출 것입니다.

해당 서비스는 다음과 같이 생겼습니다.

$ ros2 interface show std_srvs/srv/SetBool

 

 

 

1. 새로운 패키지를 만듭니다.

$ ros2 pkg create server_pkg --build-type ament_python --dependencies rclpy std_msgs geometry_msgs sensor_msgs std_srvs

 

2. service_server.py 파일을 만듭니다.

service_server.py

from std_srvs.srv import SetBool
from geometry_msgs.msg import Twist
import rclpy
from rclpy.node import Node


class Service(Node):

    def __init__(self):
        super().__init__('service_moving_right')
        self.srv = self.create_service(SetBool, 'moving_right', self.SetBool_callback)
        self.publisher_ = self.create_publisher(Twist, 'cmd_vel', 10)
        self.cmd = Twist()    
        

    def SetBool_callback(self, request, response):
        if request.data == True:           
            self.cmd.linear.x = 0.3
            self.cmd.angular.z =-0.3
            
            self.publisher_.publish(self.cmd)
            response.success = True
            response.message = '@@@@@ MOVING TO THE RIGHT @@@@@'

        if request.data == False:
            self.cmd.linear.x = 0.0
            self.cmd.angular.z =0.0
            
            self.publisher_.publish(self.cmd)
            response.success = False
            response.message = 'Robot is stopping'       
                
        return response


def main(args=None):
    rclpy.init(args=args)
    moving_right_service = Service()
    rclpy.spin(moving_right_service)
    rclpy.shutdown()


if __name__ == '__main__':
    main()

 

3. launch 파일을 만듭니다.(실행 권한을 부여해주세요)

service_server_launch_file.launch.py

from launch import LaunchDescription
from launch_ros.actions import Node

def generate_launch_description():
    return LaunchDescription([
        Node(
            package='server_pkg',
            executable='service_server',
            output='screen'),
    ])

 

4. setup.py 파일을 수정합니다

setup.py

from std_srvs.srv import SetBool
from geometry_msgs.msg import Twist
import rclpy
from rclpy.node import Node


class Service(Node):

    def __init__(self):
        super().__init__('service_moving_right')
        self.srv = self.create_service(SetBool, 'moving_right', self.SetBool_callback)
        self.publisher_ = self.create_publisher(Twist, 'cmd_vel', 10)
        self.cmd = Twist()    
        

    def SetBool_callback(self, request, response):
        if request.data == True:           
            self.cmd.linear.x = 0.1
            self.cmd.angular.z =-0.3
            
            self.publisher_.publish(self.cmd)
            response.success = True
            response.message = '@@@@@ MOVING TO THE RIGHT @@@@@'

        if request.data == False:
            self.cmd.linear.x = 0.0
            self.cmd.angular.z =0.0
            
            self.publisher_.publish(self.cmd)
            response.success = False
            response.message = 'Robot is stopping'       
                
        return response


def main(args=None):
    rclpy.init(args=args)
    moving_right_service = Service()
    rclpy.spin(moving_right_service)
    rclpy.shutdown()


if __name__ == '__main__':
    main()

 

5. 컴파일 후, 런치파일을 실행하고, Service를 보내봅니다.(이제부터 컴파일이라고 하면, colcon build 후, source하는 과정은 생략하도록 하겠습니다. 언급하지 않아도 이 과정을 진행해주세요)

$ ros2 service call /moving_right std_srvs/srv/SetBool data:\ true

 

 

4. Custom Service 만들기

순서는 Custom Topic을 만드는 것과 같습니다. 또한 ROS1과 마찬가지로 srv 디렉토리를 만들고, 그에 맞게 srv를 정의하고... 하면 됩니다.

 

패키지는 이전 포스트에서 만들었던 custom_interfaces 패키지를 그대로 사용하겠습니다

$ cd ~/ros2_ws/src/custom_interfaces
$ mkdir srv
$ cd srv

 

1. custom service 파일 만들기

MyCustomServiceMessage.srv

# "Turn right" 입력 시 오른쪽으로 회전
# "Turn left" 입력 시 왼쪽으로 회전
# "Stop" 입력 시 정지
string move 
---
bool success

 

2. CMakeLists.txt 수정하기

아래와 같은 부분이 추가되어야 합니다.

rosidl_generate_interfaces(${PROJECT_NAME}
  "msg/Age.msg"
  "srv/MyCustomServiceMessage.srv"
)

 

최종적으로, 아래와 같습니다.

CMakeLists.txt

cmake_minimum_required(VERSION 3.8)
project(custom_interfaces)
 
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
  add_compile_options(-Wall -Wextra -Wpedantic)
endif()
 
# find dependencies
find_package(ament_cmake REQUIRED)
find_package(rclcpp REQUIRED)
find_package(std_msgs REQUIRED)
find_package(rosidl_default_generators REQUIRED)
# uncomment the following section in order to fill in
# further dependencies manually.
# find_package(<dependency> REQUIRED)
 
if(BUILD_TESTING)
  find_package(ament_lint_auto REQUIRED)
  # the following line skips the linter which checks for copyrights
  # comment the line when a copyright and license is added to all source files
  set(ament_cmake_copyright_FOUND TRUE)
  # the following line skips cpplint (only works in a git repo)
  # comment the line when this package is in a git repo and when
  # a copyright and license is added to all source files
  set(ament_cmake_cpplint_FOUND TRUE)
  ament_lint_auto_find_test_dependencies()
endif()
 
rosidl_generate_interfaces(${PROJECT_NAME}
  "msg/Age.msg"
  "srv/MyCustomServiceMessage.srv"
)
 
ament_package()

 

3. package.xml 파일 수정하기

이전 포스트에서 사용했던 것을 그대로 사용하면 됩니다.

package.xml

<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format2.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
  <name>custom_interfaces</name> 
  <version>0.0.0</version>
  <description>TODO: Package description</description>
  <maintainer email="ubuntu@todo.todo">ubuntu</maintainer>
  <license>TODO: License declaration</license>
 
  <buildtool_depend>ament_cmake</buildtool_depend>
 
  <depend>rclcpp</depend>
  <depend>std_msgs</depend>
 
  <build_depend>rosidl_default_generators</build_depend>
  <exec_depend>rosidl_default_runtime</exec_depend>
  <member_of_group>rosidl_interface_packages</member_of_group>
 
  <test_depend>ament_lint_auto</test_depend>
  <test_depend>ament_lint_common</test_depend>
 
  <export>
    <build_type>ament_cmake</build_type>
  </export>
</package>

 

4. 컴파일 후, 확인

$ ros2 interface show custom_interfaces/srv/MyCustomServiceMessage

 

 

 

5. Custom service 사용해보기

방금 만든 custom service를 사용하여 로봇을 움직도록 하기 위한 server와 client를 만들어보겠습니다.

다만 좀 생각해봐야 할 부분은, client 노드를 만드는 경우, 이전 예제처럼 만들면, 처음에 코드상에서 만들어둔 초깃값이 들어간 service를 보내고 그냥 바로 끝나버립니다. 따라서, 터미널 창에서 원하는 값을 보낼 수 있도록 sys.argv를 사용하겠습니다.

 

1. 패키지 생성

$ ros2 pkg create --build-type ament_python movement_pkg --dependencies rclpy custom_interfaces std_msgs geometry_msgs sensor_msgs

 

2. server를 위한 파일 작성

movement_server.py

from geometry_msgs.msg import Twist
from custom_interfaces.srv import MyCustomServiceMessage
import rclpy
from rclpy.node import Node

class Service(Node):

    def __init__(self):
        super().__init__('movement_server')
        self.srv = self.create_service(MyCustomServiceMessage, 'movement', self.custom_service_callback)
        self.publisher_ = self.create_publisher(Twist, 'cmd_vel', 10)
        

    def custom_service_callback(self, request, response):
        msg = Twist()
        
        if request.move == "Turn Right":
            msg.linear.x = 0.1
            msg.angular.z = -0.3
            self.publisher_.publish(msg)
            self.get_logger().info('@@@ Turning to right direction @@@')
            response.success = True

        elif request.move == "Turn Left":
            msg.linear.x = 0.1
            msg.angular.z = 0.3
            self.publisher_.publish(msg)
            self.get_logger().info('@@@ Turning to left direction @@@')
            response.success = True

        elif request.move == "Stop":
            msg.linear.x = 0.0
            msg.angular.z = 0.0
            self.publisher_.publish(msg)
            self.get_logger().info('@@@ Robot is stopping @@@')
            response.success = True
        else:
            response.success = False
        
        return response


def main(args=None):
    rclpy.init(args=args)
    service = Service()
    rclpy.spin(service)
    rclpy.shutdown()


if __name__ == '__main__':
    main()

 

3. client 파일 작성

movement_client.py

import rclpy
from custom_interfaces.srv import MyCustomServiceMessage
from rclpy.node import Node
import sys


class ClientAsync(Node):

    def __init__(self):
        super().__init__('movement_client')
        self.client = self.create_client(MyCustomServiceMessage, 'movement')
        while not self.client.wait_for_service(timeout_sec=1.0):
            self.get_logger().info('service not available, waiting again...')
        
        self.req = MyCustomServiceMessage.Request()
        

    def send_request(self):       
        self.future = self.client.call_async(self.req)


def main(args=None):
    rclpy.init(args=args)
    client = ClientAsync()
    client.send_request()

    while rclpy.ok():
        rclpy.spin_once(client)
        if client.future.done():
            try:

                response = client.future.result()
            except Exception as e:
                client.get_logger().info(f'Service call failed {e}')
            else:
                client.get_logger().info(f'Response state {response.success}')
            break

    client.destroy_node()
    # shutdown the ROS communication
    rclpy.shutdown()


if __name__ == '__main__':
    main()

 

4. launch 파일 작성

movement_pkg_launch_file.launch.py

from launch import LaunchDescription
from launch_ros.actions import Node

def generate_launch_description():
    return LaunchDescription([
        Node(
            package='movement_pkg',
            executable='movement_server',
            output='screen'),
        Node(
            package='movement_pkg',
            executable='movement_client',
            output='screen'),
    ])

 

5. setup.py 파일 수정

from setuptools import setup
import os
from glob import glob

package_name = 'movement_pkg'

setup(
    name=package_name,
    version='0.0.0',
    packages=[package_name],
    data_files=[
        ('share/ament_index/resource_index/packages',
            ['resource/' + package_name]),
        ('share/' + package_name, ['package.xml']),
        (os.path.join('share', package_name), glob('launch/*.launch.py'))
    ],
    install_requires=['setuptools'],
    zip_safe=True,
    maintainer='user',
    maintainer_email='user@todo.todo',
    description='TODO: Package description',
    license='TODO: License declaration',
    tests_require=['pytest'],
    entry_points={
        'console_scripts': [
        'movement_server = movement_pkg.movement_server:main',
        'movement_client = movement_pkg.movement_client:main'
        ],
    },
)

 

6. 빌드 후, 실행

$ ros2 launch movement_pkg movement_pkg_launch_file.launch.py
$ ros2 run movement_pkg movement_client "Turn Right"
$ ros2 run movement_pkg movement_client "Turn Left"
$ ros2 run movement_pkg movement_client "Stop"