Study/ROS

ROS noetic YOLO V3

soohwan_justin 2020. 12. 29. 14:52

이번 포스팅에서는 ROS noetic에서 tensorflow v2로 구현한 YOLO V3를 사용하여 물체인식을 하는 방법에 대해 설명합니다.

 

해당 환경은 Ubuntu 20.04, ros noetic, IntelRealsense D415 camera를 사용하였습니다.

 

 

YOLO V3-tf2 Download

 

1. 다음 링크로 들어가서, git clone을 하여 다운받습니다.

github.com/zzh8829/yolov3-tf2

 

$ git clone github.com/zzh8829/yolov3-tf2.git

 

 

 

2. 위에서 설치한 디렉토리에서, 아래 명령을 사용하여 conda 가상환경을 생성하고, 활성화 합니다.

 

$ conda env create -f conda-gpu.yml

$ conda activate yolov3-tf2-gpu

 

 

 

3. 이유는 모르겠으나, 해당 환경에서 물체인식을 하려고 하면 opencv 에러가 생깁니다. 다음 명령으로 opencv를 재설치합니다.

참고자료 : stackoverflow.com/questions/40207011/opencv-not-working-properly-with-python-on-linux-with-anaconda-getting-error-th/43526627

 

 

$ conda remove opencv

$ conda update conda

$ conda install --channel menop opencv

$ pip install opencv-contrib-python

 

 

 

4. ROS 패키지들을 사용하기 위해 다음 명령으로 필요한 패키지들을 설치합니다.


$ pip install rospkg catkin_pkg

$ pip3 install netifaces

 

 

 

 

 

Code for ROS environment

1. catkin_package를 만듭니다.(굳이 만들지 않아도, python code만 있어도 되지만 편의를 위해 만듭니다.)

 

$ catkin_create_pkg my_tf_course_pkg

$ cd my_tf_course_pkg

$ mkdir script

$ cd script

 

 

 

2. 다음과 같은 코드를 작성합니다.

 

object_detection.py

#!/usr/bin/env python
import rospy
from sensor_msgs.msg import Image
from std_msgs.msg import String

import time
from absl import app, flags, logging
from absl.flags import FLAGS
import numpy as np
import tensorflow as tf
from yolov3_tf2.models import (
    YoloV3, YoloV3Tiny
)
from yolov3_tf2.dataset import transform_images, load_tfrecord_dataset
from yolov3_tf2.utils import draw_outputs

import sys

import cv2
from cv_bridge import CvBridge

flags.DEFINE_string('classes', '/home/justin/yolov3-tf2/data/coco.names', 'path to classes file')
flags.DEFINE_string('weights', '/home/justin/yolov3-tf2/checkpoints/yolov3.tf',
                    'path to weights file')
flags.DEFINE_boolean('tiny', False, 'yolov3 or yolov3-tiny')
flags.DEFINE_integer('size', 416, 'resize images to')
flags.DEFINE_string('output', './data/output.jpg', 'path to output image')
flags.DEFINE_integer('num_classes', 80, 'number of classes in the model')
flags.DEFINE_string("f", "", "kernel")


class TensorFlowIageRecognition(object):

    def __init__(self):

        """
        python scripts/detect.py --image ./data/yolo_test_images/person_ignisbot.png
        """
        physical_devices = tf.config.experimental.list_physical_devices('GPU')
        if len(physical_devices) > 0:
            tf.config.experimental.set_memory_growth(physical_devices[0], True)


        if FLAGS.tiny:
            self.yolo = YoloV3Tiny(classes=FLAGS.num_classes)
        else:
            self.yolo = YoloV3(classes=FLAGS.num_classes)

        self.yolo.load_weights(FLAGS.weights).expect_partial()
        logging.info('weights loaded')

        self.class_names = [c.strip() for c in open(FLAGS.classes).readlines()]
        logging.info('classes loaded')

        self.times = []

    def detect_objects_from_image(self, img_raw, save_detection=False):

        img = tf.expand_dims(img_raw, 0)
        img = transform_images(img, FLAGS.size)

        t1 = time.time()
        boxes, scores, classes, nums = self.yolo.predict(img)
        t2 = time.time()
        logging.info('time: {}'.format(t2 - t1))
        rospy.loginfo('time per frame: {}'.format(t2 - t1))

        logging.info('detections:')
        objects_detected_list = []
        for i in range(nums[0]):
            logging.info('\t{}, {}, {}'.format(self.class_names[int(classes[0][i])],
                                            np.array(scores[0][i]),
                                            np.array(boxes[0][i])))

            objects_detected_list.append(self.class_names[int(classes[0][i])])


        rospy.logdebug("Result-Detection="+str(objects_detected_list))
        #img = cv2.cvtColor(img_raw.numpy(), cv2.COLOR_RGB2BGR)
        img = cv2.cvtColor(img_raw, cv2.COLOR_BGR2RGB)
        img_detection = draw_outputs(img, (boxes, scores, classes, nums), self.class_names)
        if save_detection:
            cv2.imwrite(FLAGS.output, img)
            logging.info('output saved to: {}'.format(FLAGS.output))

        return img_detection, objects_detected_list


class RosTensorFlow(object):
    def __init__(self, save_detections=False, image_rostopic=True):
        # Processing the variable to process only half of the frame's lower load
        self._save_detections = save_detections
        self._process_this_frame = True
        self._image_rostopic = image_rostopic
        self._cv_bridge = CvBridge()

        # Start Tensorflow Class
        self.tensorflow_object = TensorFlowIageRecognition()

        self._camera_image_topic = '/camera/color/image_raw'
        self.check_image_topic_ready()
        self._sub = rospy.Subscriber(self._camera_image_topic, Image, self.callback, queue_size=1)
        self._result_pub = rospy.Publisher('result', String, queue_size=1)

        self.image_detection = rospy.Publisher('/image_detection', Image, queue_size=10)




    def check_image_topic_ready(self):
        camera_image_data = None
        while camera_image_data is None and not rospy.is_shutdown():
            try:
                camera_image_data = rospy.wait_for_message(self._camera_image_topic, Image, timeout=1.0)
                rospy.loginfo("Current "+str(self._camera_image_topic)+" READY=>")
            except Exception as ex:
                print(ex)
                rospy.logerr("Current "+str(self._camera_image_topic)+" not ready yet, retrying...")
        rospy.loginfo("Camera Sensor READY")

    def publish_results_objecets_list(self, objects_detected_list):
        result_msg = String()
        for object_name in objects_detected_list:
            result_msg.data = object_name
            self._result_pub.publish(result_msg)

    def callback(self, image_msg):
        if (self._process_this_frame):
            rospy.logdebug("Image processing....")

            image_np = self._cv_bridge.imgmsg_to_cv2(image_msg, "bgr8")

            img_detection, objects_detected_list = self.tensorflow_object.detect_objects_from_image(img_raw=image_np,
                                                            save_detection=self._save_detections)

            self.publish_results_objecets_list(objects_detected_list)

            # TODO: Publish in a ROS image topic, nt in a GUI
            if self._image_rostopic:
                self.image_detection.publish(self._cv_bridge.cv2_to_imgmsg(cv2.cvtColor(img_detection, cv2.COLOR_BGR2RGB), "bgr8"))
            else:
                cv2.imshow("Image window", img_detection)
                cv2.waitKey(1)
            rospy.logdebug("Image processing....DONE")
        else:
            pass
        # We invert it
        self._process_this_frame = not self._process_this_frame

    def main(self):
        rospy.spin()

def main_action(_argv):
    rospy.init_node('p3dx_robot_node', log_level=rospy.DEBUG)
    tensor = RosTensorFlow()
    tensor.main()

if __name__ == '__main__':
    try:
        app.run(main_action)
    except SystemExit:
        pass

 

3. roscore, camera node, 2.에서 작성한 코드를 실행합니다.

 

$ roscore

$ roslaunch realsense2_camera rs_camera.launch

$ roscd my_tf_course_pkg/script

$ python object_detection.py

 

 

 

 

 

i7-9750, GTX 1660Ti로는 한 프레임 당 70ms정도의 성능을 보여줍니다.

 

 

 

 

 

Code explanation

 

flags.DEFINE_string('classes', '/home/justin/yolov3-tf2/data/coco.names', 'path to classes file')
flags.DEFINE_string('weights', '/home/justin/yolov3-tf2/checkpoints/yolov3.tf',
                    'path to weights file')
flags.DEFINE_boolean('tiny', False, 'yolov3 or yolov3-tiny')
flags.DEFINE_integer('size', 416, 'resize images to')
flags.DEFINE_string('output', './data/output.jpg', 'path to output image')
flags.DEFINE_integer('num_classes', 80, 'number of classes in the model')
flags.DEFINE_string("f", "", "kernel")

classes flags에는 다운받은 yolov3-tf2에서 미리 트레이닝한 클래스의 이름들의 경로를 입력합니다.

weights floag에는 해당 네트워크에서 사용할 가중치들의 경로를 입력합니다.

 

 

class TensorFlowIageRecognition(object):

    def __init__(self):

        """
        python scripts/detect.py --image ./data/yolo_test_images/person_ignisbot.png
        """
        physical_devices = tf.config.experimental.list_physical_devices('GPU')
        if len(physical_devices) > 0:
            tf.config.experimental.set_memory_growth(physical_devices[0], True)


        if FLAGS.tiny:
            self.yolo = YoloV3Tiny(classes=FLAGS.num_classes)
        else:
            self.yolo = YoloV3(classes=FLAGS.num_classes)

        self.yolo.load_weights(FLAGS.weights).expect_partial()
        logging.info('weights loaded')

        self.class_names = [c.strip() for c in open(FLAGS.classes).readlines()]
        logging.info('classes loaded')

        self.times = []

tensorflow를 사용하기 위해 GPU를 인식하고, 가중치 및 클래스 레이블을 불러옵니다.

 

 

def detect_objects_from_image(self, img_raw, save_detection=False):

        img = tf.expand_dims(img_raw, 0)
        img = transform_images(img, FLAGS.size)

        t1 = time.time()
        boxes, scores, classes, nums = self.yolo.predict(img)
        t2 = time.time()
        logging.info('time: {}'.format(t2 - t1))
        rospy.loginfo('sec per frame: {}'.format(t2 - t1))

        logging.info('detections:')
        objects_detected_list = []
        for i in range(nums[0]):
            logging.info('\t{}, {}, {}'.format(self.class_names[int(classes[0][i])],
                                            np.array(scores[0][i]),
                                            np.array(boxes[0][i])))

            objects_detected_list.append(self.class_names[int(classes[0][i])])


        rospy.logdebug("Result-Detection="+str(objects_detected_list))
        #img = cv2.cvtColor(img_raw.numpy(), cv2.COLOR_RGB2BGR)
        img = cv2.cvtColor(img_raw, cv2.COLOR_BGR2RGB)
        img_detection = draw_outputs(img, (boxes, scores, classes, nums), self.class_names)
        if save_detection:
            cv2.imwrite(FLAGS.output, img)
            logging.info('output saved to: {}'.format(FLAGS.output))

        return img_detection, objects_detected_list

이미지를 받아 해당 이미지에 대해 물체인식 알고리즘을 실행하고, 감지된 물체에 bounding box를 그리고, 해당 클래스의 레이블 및 확률값을 표시합니다.

 

 

class RosTensorFlow(object):
    def __init__(self, save_detections=False, image_rostopic=True):
        # Processing the variable to process only half of the frame's lower load
        self._save_detections = save_detections
        self._process_this_frame = True
        self._image_rostopic = image_rostopic
        self._cv_bridge = CvBridge()

        # Start Tensorflow Class
        self.tensorflow_object = TensorFlowIageRecognition()

        self._camera_image_topic = '/camera/color/image_raw'
        self.check_image_topic_ready()
        self._sub = rospy.Subscriber(self._camera_image_topic, Image, self.callback, queue_size=1)
        self._result_pub = rospy.Publisher('result', String, queue_size=1)

        self.image_detection = rospy.Publisher('/image_detection', Image, queue_size=10)

ROS환경에서의 물체인식을 위한 publisher 및 subscriber를 선언합니다. line 12에서의 값은 자신의 camera node에서 publish하는 topic의 이름을 입력하세요.

 

def callback(self, image_msg):
        if (self._process_this_frame):
            rospy.logdebug("Image processing....")

            image_np = self._cv_bridge.imgmsg_to_cv2(image_msg, "bgr8")

            img_detection, objects_detected_list = self.tensorflow_object.detect_objects_from_image(img_raw=image_np,
                                                            save_detection=self._save_detections)

            self.publish_results_objecets_list(objects_detected_list)

            # TODO: Publish in a ROS image topic, nt in a GUI
            if self._image_rostopic:
                self.image_detection.publish(self._cv_bridge.cv2_to_imgmsg(cv2.cvtColor(img_detection, cv2.COLOR_BGR2RGB), "bgr8"))
            else:
                cv2.imshow("Image window", img_detection)
                cv2.waitKey(1)
            rospy.logdebug("Image processing....DONE")
        else:
            pass
        # We invert it
        self._process_this_frame = not self._process_this_frame

    def main(self):
        rospy.spin()

카메라로부터 지정한 topic으로 이미지 데이터를 받으면, 위의 콜백 함수를 호출하여 해당 이미지에 대해 line7에서 yolo를 사용하여 프로세싱 하고, 해당 결과를 publish합니다.