アクションサーバーとクライアントの作成 (Python)
目標: Pythonでアクションサーバーとクライアントを実装する。
チュートリアルレベル: 中級
所要時間: 15分
背景
アクションはROS 2における非同期通信の一形態です。 アクションクライアントはアクションサーバーにゴールリクエストを送信します。 アクションサーバーはゴールのフィードバックと結果をアクションクライアントに送信します。
前提条件
前のチュートリアル「アクションの作成」で定義したaction_tutorials_interfacesパッケージとFibonacci.actionインターフェースが必要です。
タスク
1 アクションサーバーの作成
「アクションの作成」チュートリアルで作成したアクションを使用してフィボナッチ数列を計算するアクションサーバーの作成に焦点を当てましょう。
これまでは、パッケージを作成してros2 runを使用してノードを実行してきました。 しかし、このチュートリアルではシンプルにするために、アクションサーバーを単一ファイルにスコープします。 アクションチュートリアルの完全なパッケージがどのようなものかを確認したい場合は、action_tutorialsをチェックしてください。
ホームディレクトリに新しいファイルを開き、fibonacci_action_server.pyと呼び、以下のコードを追加します:
import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node
from action_tutorials_interfaces.action import Fibonacci
class FibonacciActionServer(Node):
def __init__(self):
super().__init__('fibonacci_action_server')
self._action_server = ActionServer(
self,
Fibonacci,
'fibonacci',
self.execute_callback)
def execute_callback(self, goal_handle):
self.get_logger().info('Executing goal...')
result = Fibonacci.Result()
return result
def main(args=None):
rclpy.init(args=args)
fibonacci_action_server = FibonacciActionServer()
rclpy.spin(fibonacci_action_server)
if __name__ == '__main__':
main()8行目では、NodeのサブクラスであるFibonacciActionServerクラスを定義しています。 クラスはNodeコンストラクタを呼び出すことで初期化され、ノード名をfibonacci_action_serverとします:
super().__init__('fibonacci_action_server')コンストラクタでは、新しいアクションサーバーもインスタンス化します:
self._action_server = ActionServer(
self,
Fibonacci,
'fibonacci',
self.execute_callback)アクションサーバーには4つの引数が必要です:
アクションクライアントを追加するROS 2ノード:
self。アクションのタイプ:
Fibonacci(5行目でインポート)。アクション名:
'fibonacci'。受け入れられたゴールを実行するためのコールバック関数:
self.execute_callback。 このコールバックは、アクションタイプの結果メッセージを返さなければなりません。
クラスでexecute_callbackメソッドも定義します:
def execute_callback(self, goal_handle):
self.get_logger().info('Executing goal...')
result = Fibonacci.Result()
return resultこれは、ゴールが受け入れられた後に実行されるメソッドです。
アクションサーバーを実行してみましょう:
Linux/macOS:
$ python3 fibonacci_action_server.pyWindows:
$ python fibonacci_action_server.py別のターミナルで、コマンドラインインターフェースを使用してゴールを送信できます:
$ ros2 action send_goal fibonacci action_tutorials_interfaces/action/Fibonacci "{order: 5}"アクションサーバーが実行されているターミナルに、「Executing goal...」というログメッセージが表示され、その後ゴール状態が設定されていないという警告が表示されるはずです。 デフォルトでは、実行コールバックでゴールハンドル状態が設定されていない場合、中断状態と見なされます。
ゴールハンドルのsucceed()メソッドを使用してゴールが成功したことを示すことができます:
def execute_callback(self, goal_handle):
self.get_logger().info('Executing goal...')
goal_handle.succeed()
result = Fibonacci.Result()
return resultアクションサーバーを再起動して別のゴールを送信すると、ゴールがSUCCEEDEDステータスで完了することがわかるはずです。
では、ゴール実行で実際に要求されたフィボナッチ数列を計算して返すようにしましょう:
def execute_callback(self, goal_handle):
self.get_logger().info('Executing goal...')
sequence = [0, 1]
for i in range(1, goal_handle.request.order):
sequence.append(sequence[i] + sequence[i-1])
goal_handle.succeed()
result = Fibonacci.Result()
result.sequence = sequence
return result数列を計算した後、返す前に結果メッセージフィールドに割り当てます。
再度、アクションサーバーを再起動して別のゴールを送信してください。 適切な結果数列でゴールが完了することがわかるはずです。
1.2 フィードバックの公開
アクションの良い点の一つは、ゴール実行中にアクションクライアントにフィードバックを提供する機能です。 ゴールハンドルのpublish_feedback()メソッドを呼び出すことで、アクションサーバーがアクションクライアント用のフィードバックを公開できるようにします。
sequence変数を置き換え、代わりにフィードバックメッセージを使用して数列を格納します。 forループでフィードバックメッセージを更新するたびに、フィードバックメッセージを公開し、劇的な効果のためにスリープします:
import time
import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node
from action_tutorials_interfaces.action import Fibonacci
class FibonacciActionServer(Node):
def __init__(self):
super().__init__('fibonacci_action_server')
self._action_server = ActionServer(
self,
Fibonacci,
'fibonacci',
self.execute_callback)
def execute_callback(self, goal_handle):
self.get_logger().info('Executing goal...')
feedback_msg = Fibonacci.Feedback()
feedback_msg.partial_sequence = [0, 1]
for i in range(1, goal_handle.request.order):
feedback_msg.partial_sequence.append(
feedback_msg.partial_sequence[i] + feedback_msg.partial_sequence[i-1])
self.get_logger().info('Feedback: {0}'.format(feedback_msg.partial_sequence))
goal_handle.publish_feedback(feedback_msg)
time.sleep(1)
goal_handle.succeed()
result = Fibonacci.Result()
result.sequence = feedback_msg.partial_sequence
return result
def main(args=None):
rclpy.init(args=args)
fibonacci_action_server = FibonacciActionServer()
rclpy.spin(fibonacci_action_server)
if __name__ == '__main__':
main()アクションサーバーを再起動した後、--feedbackオプションを使用してコマンドラインツールを使用することで、フィードバックが公開されていることを確認できます:
$ ros2 action send_goal --feedback fibonacci action_tutorials_interfaces/action/Fibonacci "{order: 5}"2 アクションクライアントの作成
アクションクライアントも単一ファイルにスコープします。 新しいファイルを開き、fibonacci_action_client.pyと呼び、以下のボイラープレートコードを追加します:
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from action_tutorials_interfaces.action import Fibonacci
class FibonacciActionClient(Node):
def __init__(self):
super().__init__('fibonacci_action_client')
self._action_client = ActionClient(self, Fibonacci, 'fibonacci')
def send_goal(self, order):
goal_msg = Fibonacci.Goal()
goal_msg.order = order
self._action_client.wait_for_server()
return self._action_client.send_goal_async(goal_msg)
def main(args=None):
rclpy.init(args=args)
action_client = FibonacciActionClient()
future = action_client.send_goal(10)
rclpy.spin_until_future_complete(action_client, future)
if __name__ == '__main__':
main()rclpy::NodeのサブクラスであるFibonacciActionClientクラスを定義しました。 クラスはNodeコンストラクタを呼び出すことで初期化され、ノード名をfibonacci_action_clientとします:
super().__init__('fibonacci_action_client')またクラスコンストラクタで、前のチュートリアル「アクションの作成」のカスタムアクション定義を使用してアクションクライアントを作成します:
self._action_client = ActionClient(self, Fibonacci, 'fibonacci')3つの引数を渡すことでActionClientを作成します:
アクションクライアントを追加するROS 2ノード:
selfアクションのタイプ:
Fibonacciアクション名:
'fibonacci'
アクションクライアントは、同じアクション名とタイプのアクションサーバーと通信できるようになります。
FibonacciActionClientクラスでsend_goalメソッドも定義します:
def send_goal(self, order):
goal_msg = Fibonacci.Goal()
goal_msg.order = order
self._action_client.wait_for_server()
return self._action_client.send_goal_async(goal_msg)このメソッドはアクションサーバーが利用可能になるまで待機し、その後サーバーにゴールを送信します。 後で待機できるフューチャーを返します。
クラス定義の後、ROS 2を初期化し、FibonacciActionClientノードのインスタンスを作成する関数main()を定義します。 その後、ゴールを送信し、そのゴールが完了するまで待機します。
最後に、Pythonプログラムのエントリポイントでmain()を呼び出します。
先ほど構築したアクションサーバーを最初に実行してアクションクライアントをテストしましょう:
Linux/macOS:
$ python3 fibonacci_action_server.pyWindows:
$ python fibonacci_action_server.py別のターミナルで、アクションクライアントを実行します。 アクションサーバーがゴールを正常に実行しているときに印刷されるメッセージが表示されるはずです:
Linux/macOS:
$ python3 fibonacci_action_client.py
[INFO] [fibonacci_action_server]: Executing goal...
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1])
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2])
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3])
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3, 5])
~ などWindows:
$ python fibonacci_action_client.pyアクションクライアントは起動し、その後すぐに終了するはずです。 この時点で、機能するアクションクライアントがありますが、結果が表示されず、フィードバックも取得できません。
2.1 結果の取得
ゴールを送信できますが、いつ完了したかをどのように知るのでしょうか? いくつかのステップで結果情報を取得できます。 まず、送信したゴールのゴールハンドルを取得する必要があります。 その後、ゴールハンドルを使用して結果をリクエストできます。
この例の完全なコードは次のとおりです:
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from action_tutorials_interfaces.action import Fibonacci
class FibonacciActionClient(Node):
def __init__(self):
super().__init__('fibonacci_action_client')
self._action_client = ActionClient(self, Fibonacci, 'fibonacci')
def send_goal(self, order):
goal_msg = Fibonacci.Goal()
goal_msg.order = order
self._action_client.wait_for_server()
self._send_goal_future = self._action_client.send_goal_async(goal_msg)
self._send_goal_future.add_done_callback(self.goal_response_callback)
def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info('Goal rejected :(')
return
self.get_logger().info('Goal accepted :)')
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)
def get_result_callback(self, future):
result = future.result().result
self.get_logger().info('Result: {0}'.format(result.sequence))
rclpy.shutdown()
def main(args=None):
rclpy.init(args=args)
action_client = FibonacciActionClient()
action_client.send_goal(10)
rclpy.spin(action_client)
if __name__ == '__main__':
main()ActionClient.send_goal_async()メソッドは、ゴールハンドルへのフューチャーを返します。 まず、フューチャーが完了したときのコールバックを登録します:
self._send_goal_future.add_done_callback(self.goal_response_callback)フューチャーは、アクションサーバーがゴールリクエストを受け入れるか拒否するときに完了することに注意してください。 goal_response_callbackをより詳しく見てみましょう。 ゴールが拒否されたかどうかを確認し、結果がないことがわかっているので早期に戻ります:
def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info('Goal rejected :(')
return
self.get_logger().info('Goal accepted :)')ゴールハンドルを取得したので、get_result_async()メソッドを使用して結果をリクエストできます。 ゴールの送信と同様に、結果の準備ができたときに完了するフューチャーを取得します。 ゴールレスポンスと同じようにコールバックを登録しましょう:
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)コールバックで、結果の数列をログに記録し、クリーンな終了のためにROS 2をシャットダウンします:
def get_result_callback(self, future):
result = future.result().result
self.get_logger().info('Result: {0}'.format(result.sequence))
rclpy.shutdown()別のターミナルでアクションサーバーが実行されている状態で、フィボナッチアクションクライアントを実行してみてください!
Linux/macOS:
$ python3 fibonacci_action_client.pyWindows:
$ python fibonacci_action_client.pyゴールが受け入れられ、最終結果のログメッセージが表示されるはずです。
2.2 フィードバックの取得
アクションクライアントはゴールを送信できます。 素晴らしい! しかし、アクションサーバーから送信したゴールについてのフィードバックを取得できたら素晴らしいでしょう。
この例の完全なコードは次のとおりです:
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from action_tutorials_interfaces.action import Fibonacci
class FibonacciActionClient(Node):
def __init__(self):
super().__init__('fibonacci_action_client')
self._action_client = ActionClient(self, Fibonacci, 'fibonacci')
def send_goal(self, order):
goal_msg = Fibonacci.Goal()
goal_msg.order = order
self._action_client.wait_for_server()
self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)
self._send_goal_future.add_done_callback(self.goal_response_callback)
def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info('Goal rejected :(')
return
self.get_logger().info('Goal accepted :)')
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)
def get_result_callback(self, future):
result = future.result().result
self.get_logger().info('Result: {0}'.format(result.sequence))
rclpy.shutdown()
def feedback_callback(self, feedback_msg):
feedback = feedback_msg.feedback
self.get_logger().info('Received feedback: {0}'.format(feedback.partial_sequence))
def main(args=None):
rclpy.init(args=args)
action_client = FibonacciActionClient()
action_client.send_goal(10)
rclpy.spin(action_client)
if __name__ == '__main__':
main()フィードバックメッセージのコールバック関数は次のとおりです:
def feedback_callback(self, feedback_msg):
feedback = feedback_msg.feedback
self.get_logger().info('Received feedback: {0}'.format(feedback.partial_sequence))コールバックで、メッセージのフィードバック部分を取得し、partial_sequenceフィールドを画面に印刷します。
アクションクライアントでコールバックを登録する必要があります。 これは、ゴールを送信するときにアクションクライアントにコールバックを追加で渡すことで実現されます:
self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)準備完了です。 アクションクライアントを実行すると、フィードバックが画面に印刷されるのが表示されるはずです。
概要
このチュートリアルでは、Pythonアクションサーバーとアクションクライアントを一行ずつ組み立て、ゴール、フィードバック、結果を交換するように設定しました。
関連コンテンツ
Pythonでアクションサーバーとクライアントを書く方法はいくつかあります。ros2/examplesリポジトリの
minimal_action_serverとminimal_action_clientパッケージをチェックしてください。ROSアクションについてのより詳細な情報については、設計記事を参照してください。