Study/[ROS2] ROS2 Basic

ROS2 with python - Understanding ROS2 Topics

soohwan_justin 2023. 7. 26. 21:57

본 포스트는 ROS1에 대한 전반적인 지식이 있다는 가정 하에 작성되었으며, The Construct Youtube 영상을 참고하였습니다.

또한, 시뮬레이션 환경은 turtlebot3 waffle_pi를 사용합니다 : https://github.com/ROBOTIS-GIT/turtlebot3_simulations

 

 

 

1. ROS2 Topic 확인해보기

 

다음 명령으로 turtlebot 시뮬레이션 실행 후, topic들을 확인해봅니다.

$ ros2 launch turtlebot3_bringup gazebo.launch.py
$ ros2 topic list

 

 

ros2 topic info 명령으로 해당 topic의 정보를 확인해봅니다

$ ros2 topic info /cmd_vel

ROS1을 하셨으니 무슨 의미인지 이름만 봐도 아실 수 있을것입니다.

다른 topic들도 한번 확인해보겠습니다

 

$ ros2 topic info /camera/image_raw
$ ros2 topic info /scan

 

ROS2에서는 ROS1에서 rostopic의 대부분의 기능을 같은 이름으로 지원합니다. rostopic echo = ros2 topic echo, rostopic list = ros2 topic list 이런식입니다.

 

그러나 새로 추가된 기능이 하나 있는데, ROS1에서 rosmsg와 비슷한 명령입니다.

$ ros2 interface list

 

실행 시, 사용 가능한 topic, service, action 리스트가 나옵니다.

해당 메시지가 어떻게 정의되었는지 확인하고 싶으면 아래 명령을 실행하면 됩니다.

$ ros2 interface show geometry_msgs/msg/Twist

 

$ ros2 interface -h 를 입력하면 도움말을 확인할 수 있습니다.

 

다음 명령으로 해당 topic을 어떤 식으로 사용하는지 확인할 수 있습니다.

$ ros2 interface proto geometry_msgs/msg/Twist

 

 

그리고 다음 명령으로 topic을 publish 할 수 있습니다.

$ ros2 topic pub /cmd_vel geometry_msgs/msg/Twist "linear:
  x: 0.1
  y: 0.0
  z: 0.0
angular:
  x: 0.0
  y: 0.0
  z: 0.0"

 

근데 이게... ros1에서는 "rostopc pub /cmd_vel geometry_msgs/msg/Twist + [TAB] + [TAB]" 이게 가능했었는데, ROS2에서 버그가 좀 있는 것 같습니다. 수정되었다고는 하는데... 저는 여전히 해당 문제가 남아있는 것으로 보이네요. github issue

TAB을 사용한 자동완성 버그

근데 이걸 해결하는 편법이 있는데... proto명령을 사용해서 보면 "linear" 로 시작하죠? 그러면 "l 까지만 치고 탭 누르면 자동완성됩니다.

 

아무튼, 위 명령으로 속도 값을 보내면 로봇이 움직이는걸 확인할 수 있습니다. 멈추려면 속도값 0을 보내세요

 

그 외 ros2 topic echo 등 ROS1만 알면 추측할 수 있는 내용들은 에 대한 명령들은 생략하겠습니다.

 

topic을 1번만 pub하려면 다음 명령을 사용하세요

$ ros2 topic --once pub /cmd_vel geometry_msgs/msg/Twist "linear:
  x: 0.1
  y: 0.0
  z: 0.0
angular:
  x: 0.0
  y: 0.0
  z: 0.0"

 

 

 

2. Topic Publisher

 

간단한 publisher node를 실행할 패키지를 만들어보겠습니다. 커맨드 창에 다음 명령을 입력합니다

$ cs
$ ros2 pkg create --build-type ament_python publisher_pkg --dependencies rclpy std_msgs geometry_msgs

 

 

simple_publisher패키지 내부의 simple_publisher 디렉토리에서 simple_publisher.py 파일을 만듭니다.

 

~/ros2_ws/src/publisher_pkg/publisher_pkg/simple_publisher.py

import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist

class SimplePublisher(Node):

    def __init__(self):
        super().__init__('simple_publisher')
        # publisher object 생성
        self.publisher_ = self.create_publisher(Twist, 'cmd_vel', 10)
        timer_period = 0.5
        self.timer = self.create_timer(timer_period, self.timer_callback)

    def timer_callback(self):
        msg = Twist()
        msg.linear.x = 0.5
        msg.angular.z = 0.5
        self.publisher_.publish(msg)
        # 터미널에 로그 메시지 출력
        self.get_logger().info('Publishing: "%s"' % msg)
            
def main(args=None):
    rclpy.init(args=args)
    simple_publisher = SimplePublisher()
    rclpy.spin(simple_publisher)
    # ctrl + c 입력시 노드 종료
    simple_publisher.destroy_node()
    # shutdown the ROS communication
    rclpy.shutdown()

if __name__ == '__main__':
    main()

 

 

launch 디렉토리를 생성하고, 해당 디렉토리 내부에 launch file을 만들고, 실행 권한을 부여한 후, 코드를 작성합니다.

$ cd ~/ros2_ws/src/publisher_pkg
$ mkdir launch
$ cd launch
$ touch publisher_pkg_launch_file.launch.py
$ chmod +x publisher_pkg_launch_file.launch.py

 

~/ros2_ws/src/publisher_pkg/launch/publisher_pkg_launch_file.launch.py

from launch import LaunchDescription
from launch_ros.actions import Node

def generate_launch_description():
    return LaunchDescription([
        Node(
            package='publisher_pkg',
            executable='simple_publisher',
            output='screen'),
    ])

 

 

setup.py 파일을 다음과 같이 수정합니다.

~/ros2_ws/src/publisher_pkg/setup.py

from setuptools import setup
import os
from glob import glob

package_name = 'publisher_pkg'

setup(
    name=package_name,
    version='0.0.0',
    packages=[package_name],
    data_files=[
        ('share/ament_index/resource_index/packages',
            ['resource/' + package_name]),
        ('share/' + package_name, ['package.xml']),
        (os.path.join('share', package_name), glob('launch/*.launch.py'))
    ],
    install_requires=['setuptools'],
    zip_safe=True,
    maintainer='user',
    maintainer_email='user@todo.todo',
    description='TODO: Package description',
    license='TODO: License declaration',
    tests_require=['pytest'],
    entry_points={
        'console_scripts': [
            'simple_publisher = publisher_pkg.simple_publisher:main'
        ],
    },
)

 

 

패키지를 컴파일 후, launch 파일을 실행해봅니다.

$ cb
$ sb
$ ros2 launch publisher_pkg publisher_pkg_launch_file.launch.py

 

 

 

3. Topic Subscriber

 

다음 명령으로 간단한 subscriber를 만들 패키지를 생성합니다.

$ ros2 pkg create --build-type ament_python subscriber_pkg --dependencies rclpy std_msgs sensor_msgs

 

이제는 어떤 디렉토리에 어떤 파일을 만들어야 하는지 감이 오실겁니다. 이제부터는 디렉토리 생략하고 파일 이름만 쓰겠습니다.

 

simple_subscriber.py

import rclpy
from rclpy.node import Node
from sensor_msgs.msg import LaserScan
from rclpy.qos import ReliabilityPolicy, QoSProfile


class SimpleSubscriber(Node):

    def __init__(self):
        super().__init__('simple_subscriber')

        self.subscriber = self.create_subscription(
            LaserScan,
            '/scan',
            self.listener_callback,
            QoSProfile(depth=10, reliability=ReliabilityPolicy.RELIABLE))  # Laser scan 및 여러 센서에서 사용되는 QOS

    def listener_callback(self, msg):
        self.get_logger().info(f'I have received a message : {msg}')


def main(args=None):
    rclpy.init(args=args)
    simple_subscriber = SimpleSubscriber()
    rclpy.spin(simple_subscriber)
    simple_subscriber.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

 

ROS1의 subscriber node와 다른점은 line 16인데, 해당 부분은 나중에 설명하겠습니다. 해당 관련 글을 쓰게되면 링크 달아두겠습니다.

 

 

launch 디렉토리 생성하고, 파일을 만듭니다.

$ cd ~/ros2_ws/src/subscriber_pkg
$ mkdir launch
$ cd launch
$ touch subscriber_pkg_launch_file.launch.py
$ chmod +x subscriber_pkg_launch_file.launch.py

 

subscriber_pkg_launch_file.launch.py

from launch import LaunchDescription
from launch_ros.actions import Node

def generate_launch_description():
    return LaunchDescription([
        Node(
            package='subscriber_pkg',
            executable='simple_subscriber',
            output='screen'),
    ])

 

 

setup.py 파일을 수정합니다

setup.py

from setuptools import setup
import os
from glob import glob

package_name = 'subscriber_pkg'

setup(
    name=package_name,
    version='0.0.0',
    packages=[package_name],
    data_files=[
        ('share/ament_index/resource_index/packages',
            ['resource/' + package_name]),
        ('share/' + package_name, ['package.xml']),
        (os.path.join('share', package_name), glob('launch/*.launch.py'))
    ],
    install_requires=['setuptools'],
    zip_safe=True,
    maintainer='somebody very awesome',
    maintainer_email='user@user.com',
    description='TODO: Package description',
    license='TODO: License declaration',
    tests_require=['pytest'],
    entry_points={
        'console_scripts': [
            'simple_subscriber = subscriber_pkg.simple_subscriber:main'
        ],
    },
)

 

 

빌드 후, 실행해봅니다.(turtlebot gazebo가 켜져있어야 합니다)

$ cb
$ sb
$ ros2 launch subscriber_pkg subscriber_pkg_launch_file.launch.py

 

 

4. 또다른 예제

이번엔 파이썬 파일 하나에 publisher, subscriber를 모두 선언하고, 라이다 데이터를 받아서 turtlebot이 움직이도록 하는 간단한 예제를 만들어보겠습니다

 

- exercise31_pkg라는 이름의 패키지를 만들고, 의존 패키지로는 rclpy, std_msgs, sensor_msgs, geometry_msgs 추가

- LaserScan 데이터를 받고, 다음과 같이 움직이게 하기

  - 로봇의 전방 라이다 데이터가 3m될 때까지 선속도 0.2, 각속도 1.0으로 움직이게 하기(turtlebot 시뮬레이션의 라이다 데이터는 1도마다 하나씩, 총 360개가 들어옵니다. 또한, [0] = 1도, [359] = 0도 입니다.)

  - 그 후, 해당 값이 0.3m가 될때까지 직선으로 움직이게 하기

  - 해당 값이 0.3m가 되면 정지

 

 

1. 패키지 생성

$ ros2 pkg create --build-type ament_python exercise31_pkg --dependencies rclpy std_msgs sensor_msgs geometry_msgs

 

2. exercise31.py 코드 작성

exercise31.py

import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
from sensor_msgs.msg import LaserScan
from rclpy.qos import ReliabilityPolicy, QoSProfile

class Exercise31(Node):

    def __init__(self):
        super().__init__('exercise31')
        self.publisher_ = self.create_publisher(Twist, 'cmd_vel', 10)
        self.subscriber = self.create_subscription(LaserScan, '/scan', self.laser_callback,
                                                   QoSProfile(depth=10, reliability=ReliabilityPolicy.RELIABLE))
        self.timer_period = 0.2
        self.laser_forward = 0
        self.cmd = Twist()
        self.timer = self.create_timer(self.timer_period, self.motion)

    def laser_callback(self,msg):
        # Save the frontal laser scan info at 0°
        self.laser_forward = msg.ranges[359] 
        
        
    def motion(self):
        self.get_logger().info(f'Detected range : {self.laser_forward}')
        # Logic of move
        if self.laser_forward > 3.0:
            self.cmd.linear.x = 0.2
            self.cmd.angular.z = 1.0
        elif self.laser_forward < 3.0 and self.laser_forward >= 0.3:
            self.cmd.linear.x = 0.2
            self.cmd.angular.z = 0.0         
        else:
            self.cmd.linear.x = 0.0
            self.cmd.angular.z = 0.0
            
        self.publisher_.publish(self.cmd)


            
def main(args=None):
    rclpy.init(args=args)
    exercise31 = Exercise31()       
    rclpy.spin(exercise31)
    exercise31.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

 

3. launch 파일 작성(실행 권한을 부여해주세요)

exercise31_pkg_launch_file.launch.py

from launch import LaunchDescription
from launch_ros.actions import Node

def generate_launch_description():
    return LaunchDescription([
        Node(
            package='exercise31_pkg',
            executable='exercise31',
            output='screen'),
    ])

 

4. setup.py 파일 수정

setup.py

from setuptools import setup
import os
from glob import glob

package_name = 'exercise31_pkg'

setup(
    name=package_name,
    version='0.0.0',
    packages=[package_name],
    data_files=[
        ('share/ament_index/resource_index/packages',
            ['resource/' + package_name]),
        ('share/' + package_name, ['package.xml']),
        (os.path.join('share', package_name), glob('launch/*.launch.py'))
    ],
    install_requires=['setuptools'],
    zip_safe=True,
    maintainer='somebody very awesome',
    maintainer_email='user@user.com',
    description='TODO: Package description',
    license='TODO: License declaration',
    tests_require=['pytest'],
    entry_points={
        'console_scripts': [
            'exercise31 = exercise31_pkg.exercise31:main'
        ],
    },
)

 

5. 빌드 후, 실행

$ cb
$ sb
$ ros2 launch exercise31_pkg exercise31_pkg_launch_file.launch.py

 

 

 

5. Custom Interface 만들기

이 부분은 ROS1에서 custom msg만드는 부분에 대응됩니다. custom interface를 만들기 위해서는 순수하게 파이썬 만으로는 불가능하고, CMake package가 필요합니다. 따라서, 다음 명령과 같이 패키지를 만들어봅니다.

$ ros2 pkg create --build-type ament_cmake custom_interfaces --dependencies rclcpp std_msgs

 

만드는 과정은 ROS1과 비슷합니다.

1. msg 디렉토리 만들기

2. msg 파일 만들기

3. CMakeLists.txt 파일 수정하기

4. package.xml 파일 수정하기

5. 컴파일 하기

 

 

1. 다음과 같이 msg 디렉토리를 만들고, msg 파일도 만듭니다.

$ cd ~/ros2_ws/src/custom_interfaces
$ mkdir msg
$ cd msg
$ code Age.msg

 

Age.msg

int32 year
int32 month
int32 day

 

2. CMakeLists.txt 파일을 수정합니다

 

find_package : topics, services, actions를 컴파일하기 위해 필요한 패키지들입니다. 여기 있는 패키지들은 package.xml에 build_depend, exec_depend 태그로 기술되어야 합니다.

 

rosidl_generate_interfaces() : 이 패키지에서 컴파일 되어야 할 모든 메시지들을 담아야 합니다.

 

전체 코드는 다음과 같습니다.

CMakeLists.txt

cmake_minimum_required(VERSION 3.8)
project(custom_interfaces)

if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
  add_compile_options(-Wall -Wextra -Wpedantic)
endif()

# find dependencies
find_package(ament_cmake REQUIRED)
find_package(rclcpp REQUIRED)
find_package(std_msgs REQUIRED)
find_package(rosidl_default_generators REQUIRED)
# uncomment the following section in order to fill in
# further dependencies manually.
# find_package(<dependency> REQUIRED)

if(BUILD_TESTING)
  find_package(ament_lint_auto REQUIRED)
  # the following line skips the linter which checks for copyrights
  # comment the line when a copyright and license is added to all source files
  set(ament_cmake_copyright_FOUND TRUE)
  # the following line skips cpplint (only works in a git repo)
  # comment the line when this package is in a git repo and when
  # a copyright and license is added to all source files
  set(ament_cmake_cpplint_FOUND TRUE)
  ament_lint_auto_find_test_dependencies()
endif()

rosidl_generate_interfaces(${PROJECT_NAME}
  "msg/Age.msg"
)

ament_package()

 

3. package.xml 파일을 수정합니다.

<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format2.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
  <name>custom_interfaces</name> 
  <version>0.0.0</version>
  <description>TODO: Package description</description>
  <maintainer email="ubuntu@todo.todo">ubuntu</maintainer>
  <license>TODO: License declaration</license>

  <buildtool_depend>ament_cmake</buildtool_depend>

  <depend>rclcpp</depend>
  <depend>std_msgs</depend>

  <build_depend>rosidl_default_generators</build_depend>
  <exec_depend>rosidl_default_runtime</exec_depend>
  <member_of_group>rosidl_interface_packages</member_of_group>

  <test_depend>ament_lint_auto</test_depend>
  <test_depend>ament_lint_common</test_depend>

  <export>
    <build_type>ament_cmake</build_type>
  </export>
</package>

 

4. 컴파일 후, 제대로 만들어졌는지 확인해봅니다.

$ cb
$ sb
$ ros2 interface show custom_interfaces/msg/Age

 

 

 

6. Custom Interface 사용해보기

1. example36_pkg라는 패키지를 만들고, 3.5에서 만들었던 메시지를 사용해보겠습니다.

$ ros2 pkg create --build-type ament_python example36_pkg --dependencies rclpy std_msgs geometry_msgs custom_interfaces

 

참고로, 만약 패키지를 만들때 의존성 부분에 어떤 패키지를 잊어버렸다면, 해당 부분을 아래와 같이 package.xml에 추가하면 됩니다. 빌드 타입을 파이썬으로 하는 경우에는 CMakeLists.txt에는 추가할 필요 없습니다.

<depend>custom_interfaces</depend>

 

2. example36.py 코드를 작성합니다.

example36.py

import rclpy
from rclpy.node import Node
from rclpy.qos import ReliabilityPolicy, QoSProfile
from custom_interfaces.msg import Age


class Example36(Node):

    def __init__(self):
        super().__init__('example36')
        self.publisher_ = self.create_publisher(Age, 'age', 10)
        self.age = Age()
        self.timer_period = 0.5
        self.timer = self.create_timer(self.timer_period, self.timer_callback)

    def timer_callback(self):
        self.age.year = 2031
        self.age.month = 5
        self.age.day = 21
        self.publisher_.publish(self.age)


def main(args=None):
    rclpy.init(args=args)
    example36 = Example36()
    rclpy.spin(example36)
    example36.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

 

3. launch 파일을 만듭니다.(실행 권한을 부여해주세요)

example36.launch.py

from launch import LaunchDescription
from launch_ros.actions import Node

def generate_launch_description():
    return LaunchDescription([
        Node(
            package='example36_pkg',
            executable='example36',
            output='screen'),
    ])

 

4. setup.py 파일을 수정합니다

setup.py

from setuptools import setup
import os
from glob import glob

package_name = 'example36_pkg'

setup(
    name=package_name,
    version='0.0.0',
    packages=[package_name],
    data_files=[
        ('share/ament_index/resource_index/packages',
            ['resource/' + package_name]),
        ('share/' + package_name, ['package.xml']),
        (os.path.join('share', package_name), glob('launch/*.launch.py'))
    ],
    install_requires=['setuptools'],
    zip_safe=True,
    maintainer='user',
    maintainer_email='user@todo.todo',
    description='TODO: Package description',
    license='TODO: License declaration',
    tests_require=['pytest'],
    entry_points={
        'console_scripts': [
        'example36 = example36_pkg.example36:main'
        ],
    },
)

 

5. 컴파일 후, 실행해봅니다

$ cb
$ sb
$ ros2 launch example36_pkg example36.launch.py
$ ros2 topic echo /age