Skip to content

簡単なパブリッシャーとサブスクライバーの作成 (Python)

目標: Pythonを使用してパブリッシャーとサブスクライバーノードを作成して実行する。

チュートリアルレベル: 初心者

所要時間: 20分

背景

このチュートリアルでは、トピックを介して文字列メッセージの形で情報を互いに渡すノードを作成します。 ここで使用される例は、シンプルな「talker」と「listener」システムです。 一つのノードがデータをパブリッシュし、もう一つのノードがそのデータを受信するためにトピックをサブスクライブします。

これらの例で使用されるコードはこちらで確認できます。

前提条件

前のチュートリアルで、ワークスペースの作成パッケージの作成方法を学びました。

Pythonの基本的な理解が推奨されますが、必須ではありません。

タスク

1 パッケージの作成

新しいターミナルを開き、ROS 2インストールをソースしてros2コマンドが動作するようにします。

前のチュートリアルで作成したros2_wsディレクトリに移動します。

パッケージはワークスペースのルートではなく、srcディレクトリに作成する必要があることを思い出してください。 したがって、ros2_ws/srcに移動し、パッケージ作成コマンドを実行します:

bash
ros2 pkg create --build-type ament_python --license Apache-2.0 py_pubsub

ターミナルがパッケージpy_pubsubとそのすべての必要なファイルとフォルダの作成を確認するメッセージを返します。

2 パブリッシャーノードの作成

ros2_ws/src/py_pubsub/py_pubsubに移動します。 このディレクトリは、ネストされているROS 2パッケージと同じ名前のPythonパッケージであることを思い出してください。

以下のコマンドを入力して、サンプルのtalkerコードをダウンロードします:

bash
wget https://raw.githubusercontent.com/ros2/examples/humble/rclpy/topics/minimal_publisher/examples_rclpy_minimal_publisher/publisher_member_function.py

これで__init__.pyに隣接してpublisher_member_function.pyという名前の新しいファイルができます。

お好みのテキストエディタでファイルを開きます。

python
import rclpy
from rclpy.node import Node

from std_msgs.msg import String

class MinimalPublisher(Node):

    def __init__(self):
        super().__init__('minimal_publisher')
        self.publisher_ = self.create_publisher(String, 'topic', 10)
        timer_period = 0.5  # seconds
        self.timer = self.create_timer(timer_period, self.timer_callback)
        self.i = 0

    def timer_callback(self):
        msg = String()
        msg.data = 'Hello World: %d' % self.i
        self.publisher_.publish(msg)
        self.get_logger().info('Publishing: "%s"' % msg.data)
        self.i += 1

def main(args=None):
    rclpy.init(args=args)

    minimal_publisher = MinimalPublisher()

    rclpy.spin(minimal_publisher)

    # ノードを明示的に破棄
    # (オプション - そうでなければガベージコレクタが
    # ノードオブジェクトを破棄する際に自動的に実行されます)
    minimal_publisher.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

2.1 コードの確認

コメントの後の最初のコード行は、Nodeクラスを使用できるようにrclpyをインポートします。

python
import rclpy
from rclpy.node import Node

次の文は、ノードがトピックで渡すデータの構造化に使用する組み込み文字列メッセージタイプをインポートします。

python
from std_msgs.msg import String

これらの行はノードの依存関係を表しています。 依存関係はpackage.xmlに追加する必要があることを思い出してください。これは次のセクションで行います。

次に、Nodeから継承する(またはそのサブクラスである)MinimalPublisherクラスが作成されます。

python
class MinimalPublisher(Node):

続いて、クラスのコンストラクタの定義があります。 super().__init__Nodeクラスのコンストラクタを呼び出し、この場合はminimal_publisherというノード名を与えます。

create_publisherは、ノードが(std_msgs.msgモジュールからインポートされた)Stringタイプのメッセージをtopicという名前のトピック上でパブリッシュし、「キューサイズ」が10であることを宣言します。 キューサイズは、サブスクライバーがメッセージを十分に速く受信していない場合にキューに入れられるメッセージの量を制限するために必要なQoS(Quality of Service)設定です。

次に、0.5秒ごとに実行するコールバックを持つタイマーが作成されます。 self.iはコールバックで使用されるカウンターです。

python
def __init__(self):
    super().__init__('minimal_publisher')
    self.publisher_ = self.create_publisher(String, 'topic', 10)
    timer_period = 0.5  # seconds
    self.timer = self.create_timer(timer_period, self.timer_callback)
    self.i = 0

timer_callbackは、カウンター値が追加されたメッセージを作成し、get_logger().infoでコンソールにパブリッシュします。

python
def timer_callback(self):
    msg = String()
    msg.data = 'Hello World: %d' % self.i
    self.publisher_.publish(msg)
    self.get_logger().info('Publishing: "%s"' % msg.data)
    self.i += 1

最後に、main関数が定義されます。

python
def main(args=None):
    rclpy.init(args=args)

    minimal_publisher = MinimalPublisher()

    rclpy.spin(minimal_publisher)

    # ノードを明示的に破棄
    # (オプション - そうでなければガベージコレクタが
    # ノードオブジェクトを破棄する際に自動的に実行されます)
    minimal_publisher.destroy_node()
    rclpy.shutdown()

まずrclpyライブラリが初期化され、次にノードが作成され、その後コールバックが呼び出されるようにノードを「スピン」します。

2.2 依存関係の追加

setup.pysetup.cfg、およびpackage.xmlファイルが作成されているros2_ws/src/py_pubsubディレクトリに一つ上のレベルに移動します。

お好みのテキストエディタでpackage.xmlを開きます。

前のチュートリアルで述べたように、<description><maintainer>、および<license>タグを必ず記入してください:

xml
<description>rclpyを使用した最小限のパブリッシャー/サブスクライバーの例</description>
<maintainer email="you@email.com">Your Name</maintainer>
<license>Apache License 2.0</license>

上記の行の後に、ノードのインポート文に対応する以下の依存関係を追加します:

xml
<exec_depend>rclpy</exec_depend>
<exec_depend>std_msgs</exec_depend>

これは、パッケージがコードの実行時にrclpystd_msgsを必要とすることを宣言します。

ファイルを保存することを忘れないでください。

2.3 エントリーポイントの追加

setup.pyファイルを開きます。 再び、maintainermaintainer_emaildescription、およびlicenseフィールドをpackage.xmlに合わせます:

python
maintainer='YourName',
maintainer_email='you@email.com',
description='rclpyを使用した最小限のパブリッシャー/サブスクライバーの例',
license='Apache License 2.0',

entry_pointsフィールドのconsole_scriptsブラケット内に以下の行を追加します:

python
entry_points={
        'console_scripts': [
                'talker = py_pubsub.publisher_member_function:main',
        ],
},

保存することを忘れないでください。

2.4 setup.cfgの確認

setup.cfgファイルの内容は、以下のように自動的に正しく設定されているはずです:

[develop]
script_dir=$base/lib/py_pubsub
[install]
install_scripts=$base/lib/py_pubsub

これは単に、ros2 runがそこを探すので、実行可能ファイルをlibに配置するようにsetuptoolsに指示しています。

今パッケージをビルドし、ローカルセットアップファイルをソースして実行することもできますが、完全なシステムの動作を確認できるように、まずサブスクライバーノードを作成しましょう。

3 サブスクライバーノードの作成

ros2_ws/src/py_pubsub/py_pubsubに戻って次のノードを作成します。 ターミナルで以下のコードを入力します:

bash
wget https://raw.githubusercontent.com/ros2/examples/humble/rclpy/topics/minimal_subscriber/examples_rclpy_minimal_subscriber/subscriber_member_function.py

これで、ディレクトリにこれらのファイルが含まれているはずです:

__init__.py  publisher_member_function.py  subscriber_member_function.py

3.1 コードの確認

お好みのテキストエディタでsubscriber_member_function.pyを開きます。

python
import rclpy
from rclpy.node import Node

from std_msgs.msg import String

class MinimalSubscriber(Node):

    def __init__(self):
        super().__init__('minimal_subscriber')
        self.subscription = self.create_subscription(
            String,
            'topic',
            self.listener_callback,
            10)
        self.subscription  # 未使用変数の警告を防ぐ

    def listener_callback(self, msg):
        self.get_logger().info('I heard: "%s"' % msg.data)

def main(args=None):
    rclpy.init(args=args)

    minimal_subscriber = MinimalSubscriber()

    rclpy.spin(minimal_subscriber)

    # ノードを明示的に破棄
    # (オプション - そうでなければガベージコレクタが
    # ノードオブジェクトを破棄する際に自動的に実行されます)
    minimal_subscriber.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

サブスクライバーノードのコードは、パブリッシャーのコードとほぼ同じです。 コンストラクタは、パブリッシャーと同じ引数でサブスクライバーを作成します。 トピックチュートリアルから、パブリッシャーとサブスクライバーが通信できるようにするには、使用するトピック名とメッセージタイプが一致する必要があることを思い出してください。

python
self.subscription = self.create_subscription(
    String,
    'topic',
    self.listener_callback,
    10)

サブスクライバーのコンストラクタとコールバックには、タイマーが必要ないため、タイマー定義は含まれていません。 そのコールバックは、メッセージを受信するとすぐに呼び出されます。

コールバック定義は、受信したデータとともに情報メッセージをコンソールに印刷するだけです。 パブリッシャーがmsg.data = 'Hello World: %d' % self.iを定義していることを思い出してください。

python
def listener_callback(self, msg):
    self.get_logger().info('I heard: "%s"' % msg.data)

main定義は、パブリッシャーの作成とスピンをサブスクライバーに置き換えることを除けば、ほぼ全く同じです。

python
minimal_subscriber = MinimalSubscriber()

rclpy.spin(minimal_subscriber)

このノードはパブリッシャーと同じ依存関係を持つため、package.xmlに新しく追加するものはありません。 setup.cfgファイルもそのままにしておくことができます。

3.2 エントリーポイントの追加

setup.pyを再度開き、パブリッシャーのエントリーポイントの下にサブスクライバーノードのエントリーポイントを追加します。 entry_pointsフィールドは以下のようになるはずです:

python
entry_points={
        'console_scripts': [
                'talker = py_pubsub.publisher_member_function:main',
                'listener = py_pubsub.subscriber_member_function:main',
        ],
},

ファイルを保存することを忘れずに、これでpub/subシステムの準備ができているはずです。

4 ビルドと実行

おそらくROS 2システムの一部としてrclpystd_msgsパッケージがすでにインストールされているでしょう。 ビルド前に、ワークスペースのルート(ros2_ws)でrosdepを実行して、不足している依存関係をチェックするのが良い習慣です:

bash
rosdep install -i --from-path src --rosdistro humble -y

まだワークスペースのルートros2_wsで、新しいパッケージをビルドします:

bash
colcon build --packages-select py_pubsub

新しいターミナルを開き、ros2_wsに移動して、セットアップファイルをソースします:

bash
source install/setup.bash

talkerノードを実行します。 ターミナルは以下のように0.5秒ごとに情報メッセージのパブリッシュを開始するはずです:

bash
ros2 run py_pubsub talker

出力:

[info] [minimal_publisher]: publishing: "hello world: 0"
[info] [minimal_publisher]: publishing: "hello world: 1"
[info] [minimal_publisher]: publishing: "hello world: 2"
[info] [minimal_publisher]: publishing: "hello world: 3"
[info] [minimal_publisher]: publishing: "hello world: 4"
...

別のターミナルを開き、ros2_ws内から再度セットアップファイルをソースし、listenerノードを起動します。 リスナーは、その時点でパブリッシャーがあるメッセージカウントから始まって、以下のようにコンソールへのメッセージの印刷を開始します:

bash
ros2 run py_pubsub listener

出力:

[INFO] [minimal_subscriber]: I heard: "Hello World: 10"
[INFO] [minimal_subscriber]: I heard: "Hello World: 11"
[INFO] [minimal_subscriber]: I heard: "Hello World: 12"
[INFO] [minimal_subscriber]: I heard: "Hello World: 13"
[INFO] [minimal_subscriber]: I heard: "Hello World: 14"

各ターミナルでCtrl+Cを入力して、ノードのスピンを停止します。

まとめ

トピック上でデータをパブリッシュおよびサブスクライブする2つのノードを作成しました。 それらを実行する前に、パッケージ設定ファイルに依存関係とエントリーポイントを追加しました。

次のステップ

次に、サービス/クライアントモデルを使用した別のシンプルなROS 2パッケージを作成します。 再び、C++またはPythonのいずれかで書くことを選択できます。

関連コンテンツ

Pythonでパブリッシャーとサブスクライバーを書く方法はいくつかあります。ros2/examplesリポジトリのminimal_publisherminimal_subscriberパッケージをチェックしてください。

Released under the MIT License.