簡単なパブリッシャーとサブスクライバーの作成 (Python)
目標: Pythonを使用してパブリッシャーとサブスクライバーノードを作成して実行する。
チュートリアルレベル: 初心者
所要時間: 20分
背景
このチュートリアルでは、トピックを介して文字列メッセージの形で情報を互いに渡すノードを作成します。 ここで使用される例は、シンプルな「talker」と「listener」システムです。 一つのノードがデータをパブリッシュし、もう一つのノードがそのデータを受信するためにトピックをサブスクライブします。
これらの例で使用されるコードはこちらで確認できます。
前提条件
前のチュートリアルで、ワークスペースの作成とパッケージの作成方法を学びました。
Pythonの基本的な理解が推奨されますが、必須ではありません。
タスク
1 パッケージの作成
新しいターミナルを開き、ROS 2インストールをソースしてros2コマンドが動作するようにします。
前のチュートリアルで作成したros2_wsディレクトリに移動します。
パッケージはワークスペースのルートではなく、srcディレクトリに作成する必要があることを思い出してください。 したがって、ros2_ws/srcに移動し、パッケージ作成コマンドを実行します:
ros2 pkg create --build-type ament_python --license Apache-2.0 py_pubsubターミナルがパッケージpy_pubsubとそのすべての必要なファイルとフォルダの作成を確認するメッセージを返します。
2 パブリッシャーノードの作成
ros2_ws/src/py_pubsub/py_pubsubに移動します。 このディレクトリは、ネストされているROS 2パッケージと同じ名前のPythonパッケージであることを思い出してください。
以下のコマンドを入力して、サンプルのtalkerコードをダウンロードします:
wget https://raw.githubusercontent.com/ros2/examples/humble/rclpy/topics/minimal_publisher/examples_rclpy_minimal_publisher/publisher_member_function.pyこれで__init__.pyに隣接してpublisher_member_function.pyという名前の新しいファイルができます。
お好みのテキストエディタでファイルを開きます。
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class MinimalPublisher(Node):
def __init__(self):
super().__init__('minimal_publisher')
self.publisher_ = self.create_publisher(String, 'topic', 10)
timer_period = 0.5 # seconds
self.timer = self.create_timer(timer_period, self.timer_callback)
self.i = 0
def timer_callback(self):
msg = String()
msg.data = 'Hello World: %d' % self.i
self.publisher_.publish(msg)
self.get_logger().info('Publishing: "%s"' % msg.data)
self.i += 1
def main(args=None):
rclpy.init(args=args)
minimal_publisher = MinimalPublisher()
rclpy.spin(minimal_publisher)
# ノードを明示的に破棄
# (オプション - そうでなければガベージコレクタが
# ノードオブジェクトを破棄する際に自動的に実行されます)
minimal_publisher.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()2.1 コードの確認
コメントの後の最初のコード行は、Nodeクラスを使用できるようにrclpyをインポートします。
import rclpy
from rclpy.node import Node次の文は、ノードがトピックで渡すデータの構造化に使用する組み込み文字列メッセージタイプをインポートします。
from std_msgs.msg import Stringこれらの行はノードの依存関係を表しています。 依存関係はpackage.xmlに追加する必要があることを思い出してください。これは次のセクションで行います。
次に、Nodeから継承する(またはそのサブクラスである)MinimalPublisherクラスが作成されます。
class MinimalPublisher(Node):続いて、クラスのコンストラクタの定義があります。 super().__init__はNodeクラスのコンストラクタを呼び出し、この場合はminimal_publisherというノード名を与えます。
create_publisherは、ノードが(std_msgs.msgモジュールからインポートされた)Stringタイプのメッセージをtopicという名前のトピック上でパブリッシュし、「キューサイズ」が10であることを宣言します。 キューサイズは、サブスクライバーがメッセージを十分に速く受信していない場合にキューに入れられるメッセージの量を制限するために必要なQoS(Quality of Service)設定です。
次に、0.5秒ごとに実行するコールバックを持つタイマーが作成されます。 self.iはコールバックで使用されるカウンターです。
def __init__(self):
super().__init__('minimal_publisher')
self.publisher_ = self.create_publisher(String, 'topic', 10)
timer_period = 0.5 # seconds
self.timer = self.create_timer(timer_period, self.timer_callback)
self.i = 0timer_callbackは、カウンター値が追加されたメッセージを作成し、get_logger().infoでコンソールにパブリッシュします。
def timer_callback(self):
msg = String()
msg.data = 'Hello World: %d' % self.i
self.publisher_.publish(msg)
self.get_logger().info('Publishing: "%s"' % msg.data)
self.i += 1最後に、main関数が定義されます。
def main(args=None):
rclpy.init(args=args)
minimal_publisher = MinimalPublisher()
rclpy.spin(minimal_publisher)
# ノードを明示的に破棄
# (オプション - そうでなければガベージコレクタが
# ノードオブジェクトを破棄する際に自動的に実行されます)
minimal_publisher.destroy_node()
rclpy.shutdown()まずrclpyライブラリが初期化され、次にノードが作成され、その後コールバックが呼び出されるようにノードを「スピン」します。
2.2 依存関係の追加
setup.py、setup.cfg、およびpackage.xmlファイルが作成されているros2_ws/src/py_pubsubディレクトリに一つ上のレベルに移動します。
お好みのテキストエディタでpackage.xmlを開きます。
前のチュートリアルで述べたように、<description>、<maintainer>、および<license>タグを必ず記入してください:
<description>rclpyを使用した最小限のパブリッシャー/サブスクライバーの例</description>
<maintainer email="you@email.com">Your Name</maintainer>
<license>Apache License 2.0</license>上記の行の後に、ノードのインポート文に対応する以下の依存関係を追加します:
<exec_depend>rclpy</exec_depend>
<exec_depend>std_msgs</exec_depend>これは、パッケージがコードの実行時にrclpyとstd_msgsを必要とすることを宣言します。
ファイルを保存することを忘れないでください。
2.3 エントリーポイントの追加
setup.pyファイルを開きます。 再び、maintainer、maintainer_email、description、およびlicenseフィールドをpackage.xmlに合わせます:
maintainer='YourName',
maintainer_email='you@email.com',
description='rclpyを使用した最小限のパブリッシャー/サブスクライバーの例',
license='Apache License 2.0',entry_pointsフィールドのconsole_scriptsブラケット内に以下の行を追加します:
entry_points={
'console_scripts': [
'talker = py_pubsub.publisher_member_function:main',
],
},保存することを忘れないでください。
2.4 setup.cfgの確認
setup.cfgファイルの内容は、以下のように自動的に正しく設定されているはずです:
[develop]
script_dir=$base/lib/py_pubsub
[install]
install_scripts=$base/lib/py_pubsubこれは単に、ros2 runがそこを探すので、実行可能ファイルをlibに配置するようにsetuptoolsに指示しています。
今パッケージをビルドし、ローカルセットアップファイルをソースして実行することもできますが、完全なシステムの動作を確認できるように、まずサブスクライバーノードを作成しましょう。
3 サブスクライバーノードの作成
ros2_ws/src/py_pubsub/py_pubsubに戻って次のノードを作成します。 ターミナルで以下のコードを入力します:
wget https://raw.githubusercontent.com/ros2/examples/humble/rclpy/topics/minimal_subscriber/examples_rclpy_minimal_subscriber/subscriber_member_function.pyこれで、ディレクトリにこれらのファイルが含まれているはずです:
__init__.py publisher_member_function.py subscriber_member_function.py3.1 コードの確認
お好みのテキストエディタでsubscriber_member_function.pyを開きます。
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class MinimalSubscriber(Node):
def __init__(self):
super().__init__('minimal_subscriber')
self.subscription = self.create_subscription(
String,
'topic',
self.listener_callback,
10)
self.subscription # 未使用変数の警告を防ぐ
def listener_callback(self, msg):
self.get_logger().info('I heard: "%s"' % msg.data)
def main(args=None):
rclpy.init(args=args)
minimal_subscriber = MinimalSubscriber()
rclpy.spin(minimal_subscriber)
# ノードを明示的に破棄
# (オプション - そうでなければガベージコレクタが
# ノードオブジェクトを破棄する際に自動的に実行されます)
minimal_subscriber.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()サブスクライバーノードのコードは、パブリッシャーのコードとほぼ同じです。 コンストラクタは、パブリッシャーと同じ引数でサブスクライバーを作成します。 トピックチュートリアルから、パブリッシャーとサブスクライバーが通信できるようにするには、使用するトピック名とメッセージタイプが一致する必要があることを思い出してください。
self.subscription = self.create_subscription(
String,
'topic',
self.listener_callback,
10)サブスクライバーのコンストラクタとコールバックには、タイマーが必要ないため、タイマー定義は含まれていません。 そのコールバックは、メッセージを受信するとすぐに呼び出されます。
コールバック定義は、受信したデータとともに情報メッセージをコンソールに印刷するだけです。 パブリッシャーがmsg.data = 'Hello World: %d' % self.iを定義していることを思い出してください。
def listener_callback(self, msg):
self.get_logger().info('I heard: "%s"' % msg.data)main定義は、パブリッシャーの作成とスピンをサブスクライバーに置き換えることを除けば、ほぼ全く同じです。
minimal_subscriber = MinimalSubscriber()
rclpy.spin(minimal_subscriber)このノードはパブリッシャーと同じ依存関係を持つため、package.xmlに新しく追加するものはありません。 setup.cfgファイルもそのままにしておくことができます。
3.2 エントリーポイントの追加
setup.pyを再度開き、パブリッシャーのエントリーポイントの下にサブスクライバーノードのエントリーポイントを追加します。 entry_pointsフィールドは以下のようになるはずです:
entry_points={
'console_scripts': [
'talker = py_pubsub.publisher_member_function:main',
'listener = py_pubsub.subscriber_member_function:main',
],
},ファイルを保存することを忘れずに、これでpub/subシステムの準備ができているはずです。
4 ビルドと実行
おそらくROS 2システムの一部としてrclpyとstd_msgsパッケージがすでにインストールされているでしょう。 ビルド前に、ワークスペースのルート(ros2_ws)でrosdepを実行して、不足している依存関係をチェックするのが良い習慣です:
rosdep install -i --from-path src --rosdistro humble -yまだワークスペースのルートros2_wsで、新しいパッケージをビルドします:
colcon build --packages-select py_pubsub新しいターミナルを開き、ros2_wsに移動して、セットアップファイルをソースします:
source install/setup.bashtalkerノードを実行します。 ターミナルは以下のように0.5秒ごとに情報メッセージのパブリッシュを開始するはずです:
ros2 run py_pubsub talker出力:
[info] [minimal_publisher]: publishing: "hello world: 0"
[info] [minimal_publisher]: publishing: "hello world: 1"
[info] [minimal_publisher]: publishing: "hello world: 2"
[info] [minimal_publisher]: publishing: "hello world: 3"
[info] [minimal_publisher]: publishing: "hello world: 4"
...別のターミナルを開き、ros2_ws内から再度セットアップファイルをソースし、listenerノードを起動します。 リスナーは、その時点でパブリッシャーがあるメッセージカウントから始まって、以下のようにコンソールへのメッセージの印刷を開始します:
ros2 run py_pubsub listener出力:
[INFO] [minimal_subscriber]: I heard: "Hello World: 10"
[INFO] [minimal_subscriber]: I heard: "Hello World: 11"
[INFO] [minimal_subscriber]: I heard: "Hello World: 12"
[INFO] [minimal_subscriber]: I heard: "Hello World: 13"
[INFO] [minimal_subscriber]: I heard: "Hello World: 14"各ターミナルでCtrl+Cを入力して、ノードのスピンを停止します。
まとめ
トピック上でデータをパブリッシュおよびサブスクライブする2つのノードを作成しました。 それらを実行する前に、パッケージ設定ファイルに依存関係とエントリーポイントを追加しました。
次のステップ
次に、サービス/クライアントモデルを使用した別のシンプルなROS 2パッケージを作成します。 再び、C++またはPythonのいずれかで書くことを選択できます。
関連コンテンツ
Pythonでパブリッシャーとサブスクライバーを書く方法はいくつかあります。ros2/examplesリポジトリのminimal_publisherとminimal_subscriberパッケージをチェックしてください。