Study/[ROS2] ROS2 Basic

ROS2 with python - Actions

soohwan_justin 2023. 7. 28. 21:36

본 포스트는 ROS1에 대한 전반적인 지식이 있다는 가정 하에 작성되었으며, The Construct Youtube 영상을 참고하였습니다.

또한, 시뮬레이션 환경은 turtlebot3 waffle_pi를 사용합니다 : https://github.com/ROBOTIS-GIT/turtlebot3_simulations

 

 

 

https://docs.ros.org/en/foxy/Tutorials/Beginner-CLI-Tools/Understanding-ROS2-Actions/Understanding-ROS2-Actions.html

ROS2의 Action 또한... Topic, Service와 마찬가지로 ROS1과 별 차이가 없습니다. 바로 예제로 들어가보겠습니다.

 

 

1. Action Server, Action Service, Custom Action 만들기

ROS1 과 마찬가지로... Action은 Topic과 Service를 합친 것입니다. 따라서 자세한 내용은 생략하고, 코드만 올리겠습니다.

 

1. 다음 명령으로 패키지를 만듭니다.

$ ros2 pkg create my_action --build-type ament_python --dependencies rclpy rclpy.action custom_interfaces

 

 

2. 먼저, Action Client 먼저 만들어보겠습니다.

action_client.py

import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from custom_interfaces.action import Move


class MyActionClient(Node):

    def __init__(self):
        super().__init__('my_action_client')
        self._action_client = ActionClient(self, Move, 'turtlebot3_as')

    def send_goal(self, seconds):
        goal_msg = Move.Goal()
        goal_msg.secs = seconds

        self._action_client.wait_for_server()
        self._send_goal_future = self._action_client.send_goal_async(
            goal_msg, feedback_callback=self.feedback_callback)

        self._send_goal_future.add_done_callback(self.goal_response_callback)

    def goal_response_callback(self, future):
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().info('Goal rejected :(')
            return

        self.get_logger().info('Goal accepted :)')

        self._get_result_future = goal_handle.get_result_async()
        self._get_result_future.add_done_callback(self.get_result_callback)

    def get_result_callback(self, future):
        result = future.result().result
        self.get_logger().info('Result: {0}'.format(result.status))
        rclpy.shutdown()

    def feedback_callback(self, feedback_msg):
        feedback = feedback_msg.feedback
        self.get_logger().info(f'Received feedback: {feedback.feedback}')


def main(args=None):
    rclpy.init(args=args)
    action_client = MyActionClient()
    action_client.send_goal(5)
    rclpy.spin(action_client)


if __name__ == '__main__':
    main()

 

 

2. Action server를 만듭니다

action_server.py

import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node

from custom_interfaces.action import Move

from geometry_msgs.msg import Twist
import time
class MyActionServer(Node):

    def __init__(self):
        super().__init__('my_action_server')
        self._action_server = ActionServer(self, Move, 'turtlebot3_as',self.execute_callback) 
        self.cmd = Twist()
        self.publisher_ = self.create_publisher(Twist, 'cmd_vel', 10)
    
    def execute_callback(self, goal_handle):
        
        self.get_logger().info('Executing goal...')

        feedback_msg = Move.Feedback()
        feedback_msg.feedback = "Moving to the left left left..."

        for i in range(1, goal_handle.request.secs):
            
            self.get_logger().info(f'Feedback: {feedback_msg.feedback}')

            goal_handle.publish_feedback(feedback_msg)
            self.cmd.linear.x = 0.3
            self.cmd.angular.z =0.3
            
            self.publisher_.publish(self.cmd)
            time.sleep(1)

        goal_handle.succeed()

        self.cmd.linear.x = 0.0
        self.cmd.angular.z = 0.0
            
        self.publisher_.publish(self.cmd)
        result = Move.Result()
        result.status = "Finished action server. Robot moved during 5 seconds"
        self.get_logger().info(f'Result: {result.status}')
        return result

def main(args=None):
    rclpy.init(args=args)

    my_action_server = MyActionServer()

    rclpy.spin(my_action_server)


if __name__ == '__main__':
    main()

 

3. launch 파일들을 만듭니다.

my_action_client.launch.py

from launch import LaunchDescription
from launch_ros.actions import Node

def generate_launch_description():
    return LaunchDescription([
        Node(
            package='my_action',
            executable='action_client',
            output='screen'),
    ])

 

my_action_server.launch.py

from launch import LaunchDescription
from launch_ros.actions import Node

def generate_launch_description():
    return LaunchDescription([
        Node(
            package='my_action',
            executable='action_server',
            output='screen'),
    ])

 

 

4. setup.py 파일을 수정합니다.

setup.py

from setuptools import find_packages, setup

package_name = 'my_action'

setup(
    name=package_name,
    version='0.0.0',
    packages=find_packages(exclude=['test']),
    data_files=[
        ('share/ament_index/resource_index/packages',
            ['resource/' + package_name]),
        ('share/' + package_name, ['package.xml']),
    ],
    install_requires=['setuptools'],
    zip_safe=True,
    maintainer='justin',
    maintainer_email='justin@todo.todo',
    description='TODO: Package description',
    license='TODO: License declaration',
    tests_require=['pytest'],
    entry_points={
        'console_scripts': [
            'action_client = my_action.action_client:main',
            'action_server = my_action.action_server:main',
        ],
    },
)

 

5. 이전에 만들어뒀던 custom_interface 패키지에 custom action을 만듭니다.

custom_interfaces/action/Move.action

int32 secs
---
string status
---
string feedback

 

6. custom_interface 패키지의 CMakeLists.txt를 수정합니다.

CMakeLists.txt

cmake_minimum_required(VERSION 3.8)
project(custom_interfaces)
 
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
  add_compile_options(-Wall -Wextra -Wpedantic)
endif()
 
# find dependencies
find_package(ament_cmake REQUIRED)
find_package(rclcpp REQUIRED)
find_package(std_msgs REQUIRED)
find_package(rosidl_default_generators REQUIRED)
find_package(action_msgs REQUIRED)
# uncomment the following section in order to fill in
# further dependencies manually.
# find_package(<dependency> REQUIRED)
 
if(BUILD_TESTING)
  find_package(ament_lint_auto REQUIRED)
  # the following line skips the linter which checks for copyrights
  # comment the line when a copyright and license is added to all source files
  set(ament_cmake_copyright_FOUND TRUE)
  # the following line skips cpplint (only works in a git repo)
  # comment the line when this package is in a git repo and when
  # a copyright and license is added to all source files
  set(ament_cmake_cpplint_FOUND TRUE)
  ament_lint_auto_find_test_dependencies()
endif()
 
rosidl_generate_interfaces(${PROJECT_NAME}
  "msg/Age.msg"
  "srv/MyCustomServiceMessage.srv"
  "action/Move.action"
)
 
ament_package()

 

7. 빌드 후, 실행합니다.

$ ros2 launch my_action my_action_server.launch.py
$ ros2 launch my_action my_action_client.launch.py