Skip to content

アクションサーバーとクライアントの作成 (Python)

目標: Pythonでアクションサーバーとクライアントを実装する。

チュートリアルレベル: 中級

所要時間: 15分

背景

アクションはROS 2における非同期通信の一形態です。 アクションクライアントアクションサーバーにゴールリクエストを送信します。 アクションサーバーはゴールのフィードバックと結果をアクションクライアントに送信します。

前提条件

前のチュートリアル「アクションの作成」で定義したaction_tutorials_interfacesパッケージとFibonacci.actionインターフェースが必要です。

タスク

1 アクションサーバーの作成

アクションの作成」チュートリアルで作成したアクションを使用してフィボナッチ数列を計算するアクションサーバーの作成に焦点を当てましょう。

これまでは、パッケージを作成してros2 runを使用してノードを実行してきました。 しかし、このチュートリアルではシンプルにするために、アクションサーバーを単一ファイルにスコープします。 アクションチュートリアルの完全なパッケージがどのようなものかを確認したい場合は、action_tutorialsをチェックしてください。

ホームディレクトリに新しいファイルを開き、fibonacci_action_server.pyと呼び、以下のコードを追加します:

python
import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node

from action_tutorials_interfaces.action import Fibonacci

class FibonacciActionServer(Node):

    def __init__(self):
        super().__init__('fibonacci_action_server')
        self._action_server = ActionServer(
            self,
            Fibonacci,
            'fibonacci',
            self.execute_callback)

    def execute_callback(self, goal_handle):
        self.get_logger().info('Executing goal...')
        result = Fibonacci.Result()
        return result

def main(args=None):
    rclpy.init(args=args)

    fibonacci_action_server = FibonacciActionServer()

    rclpy.spin(fibonacci_action_server)

if __name__ == '__main__':
    main()

8行目では、NodeのサブクラスであるFibonacciActionServerクラスを定義しています。 クラスはNodeコンストラクタを呼び出すことで初期化され、ノード名をfibonacci_action_serverとします:

python
        super().__init__('fibonacci_action_server')

コンストラクタでは、新しいアクションサーバーもインスタンス化します:

python
        self._action_server = ActionServer(
            self,
            Fibonacci,
            'fibonacci',
            self.execute_callback)

アクションサーバーには4つの引数が必要です:

  1. アクションクライアントを追加するROS 2ノード:self

  2. アクションのタイプ:Fibonacci(5行目でインポート)。

  3. アクション名:'fibonacci'

  4. 受け入れられたゴールを実行するためのコールバック関数:self.execute_callback。 このコールバックは、アクションタイプの結果メッセージを返さなければなりません

クラスでexecute_callbackメソッドも定義します:

python
    def execute_callback(self, goal_handle):
        self.get_logger().info('Executing goal...')
        result = Fibonacci.Result()
        return result

これは、ゴールが受け入れられた後に実行されるメソッドです。

アクションサーバーを実行してみましょう:

Linux/macOS:

bash
$ python3 fibonacci_action_server.py

Windows:

bash
$ python fibonacci_action_server.py

別のターミナルで、コマンドラインインターフェースを使用してゴールを送信できます:

bash
$ ros2 action send_goal fibonacci action_tutorials_interfaces/action/Fibonacci "{order: 5}"

アクションサーバーが実行されているターミナルに、「Executing goal...」というログメッセージが表示され、その後ゴール状態が設定されていないという警告が表示されるはずです。 デフォルトでは、実行コールバックでゴールハンドル状態が設定されていない場合、中断状態と見なされます。

ゴールハンドルのsucceed()メソッドを使用してゴールが成功したことを示すことができます:

python
    def execute_callback(self, goal_handle):
        self.get_logger().info('Executing goal...')
        goal_handle.succeed()
        result = Fibonacci.Result()
        return result

アクションサーバーを再起動して別のゴールを送信すると、ゴールがSUCCEEDEDステータスで完了することがわかるはずです。

では、ゴール実行で実際に要求されたフィボナッチ数列を計算して返すようにしましょう:

python
    def execute_callback(self, goal_handle):
        self.get_logger().info('Executing goal...')

        sequence = [0, 1]

        for i in range(1, goal_handle.request.order):
            sequence.append(sequence[i] + sequence[i-1])

        goal_handle.succeed()

        result = Fibonacci.Result()
        result.sequence = sequence
        return result

数列を計算した後、返す前に結果メッセージフィールドに割り当てます。

再度、アクションサーバーを再起動して別のゴールを送信してください。 適切な結果数列でゴールが完了することがわかるはずです。

1.2 フィードバックの公開

アクションの良い点の一つは、ゴール実行中にアクションクライアントにフィードバックを提供する機能です。 ゴールハンドルのpublish_feedback()メソッドを呼び出すことで、アクションサーバーがアクションクライアント用のフィードバックを公開できるようにします。

sequence変数を置き換え、代わりにフィードバックメッセージを使用して数列を格納します。 forループでフィードバックメッセージを更新するたびに、フィードバックメッセージを公開し、劇的な効果のためにスリープします:

python
import time

import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node

from action_tutorials_interfaces.action import Fibonacci

class FibonacciActionServer(Node):

    def __init__(self):
        super().__init__('fibonacci_action_server')
        self._action_server = ActionServer(
            self,
            Fibonacci,
            'fibonacci',
            self.execute_callback)

    def execute_callback(self, goal_handle):
        self.get_logger().info('Executing goal...')

        feedback_msg = Fibonacci.Feedback()
        feedback_msg.partial_sequence = [0, 1]

        for i in range(1, goal_handle.request.order):
            feedback_msg.partial_sequence.append(
                feedback_msg.partial_sequence[i] + feedback_msg.partial_sequence[i-1])
            self.get_logger().info('Feedback: {0}'.format(feedback_msg.partial_sequence))
            goal_handle.publish_feedback(feedback_msg)
            time.sleep(1)

        goal_handle.succeed()

        result = Fibonacci.Result()
        result.sequence = feedback_msg.partial_sequence
        return result

def main(args=None):
    rclpy.init(args=args)

    fibonacci_action_server = FibonacciActionServer()

    rclpy.spin(fibonacci_action_server)

if __name__ == '__main__':
    main()

アクションサーバーを再起動した後、--feedbackオプションを使用してコマンドラインツールを使用することで、フィードバックが公開されていることを確認できます:

bash
$ ros2 action send_goal --feedback fibonacci action_tutorials_interfaces/action/Fibonacci "{order: 5}"

2 アクションクライアントの作成

アクションクライアントも単一ファイルにスコープします。 新しいファイルを開き、fibonacci_action_client.pyと呼び、以下のボイラープレートコードを追加します:

python
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node

from action_tutorials_interfaces.action import Fibonacci

class FibonacciActionClient(Node):

    def __init__(self):
        super().__init__('fibonacci_action_client')
        self._action_client = ActionClient(self, Fibonacci, 'fibonacci')

    def send_goal(self, order):
        goal_msg = Fibonacci.Goal()
        goal_msg.order = order

        self._action_client.wait_for_server()

        return self._action_client.send_goal_async(goal_msg)

def main(args=None):
    rclpy.init(args=args)

    action_client = FibonacciActionClient()

    future = action_client.send_goal(10)

    rclpy.spin_until_future_complete(action_client, future)

if __name__ == '__main__':
    main()

rclpy::NodeのサブクラスであるFibonacciActionClientクラスを定義しました。 クラスはNodeコンストラクタを呼び出すことで初期化され、ノード名をfibonacci_action_clientとします:

python
        super().__init__('fibonacci_action_client')

またクラスコンストラクタで、前のチュートリアル「アクションの作成」のカスタムアクション定義を使用してアクションクライアントを作成します:

python
        self._action_client = ActionClient(self, Fibonacci, 'fibonacci')

3つの引数を渡すことでActionClientを作成します:

  1. アクションクライアントを追加するROS 2ノード:self

  2. アクションのタイプ:Fibonacci

  3. アクション名:'fibonacci'

アクションクライアントは、同じアクション名とタイプのアクションサーバーと通信できるようになります。

FibonacciActionClientクラスでsend_goalメソッドも定義します:

python
    def send_goal(self, order):
        goal_msg = Fibonacci.Goal()
        goal_msg.order = order

        self._action_client.wait_for_server()

        return self._action_client.send_goal_async(goal_msg)

このメソッドはアクションサーバーが利用可能になるまで待機し、その後サーバーにゴールを送信します。 後で待機できるフューチャーを返します。

クラス定義の後、ROS 2を初期化し、FibonacciActionClientノードのインスタンスを作成する関数main()を定義します。 その後、ゴールを送信し、そのゴールが完了するまで待機します。

最後に、Pythonプログラムのエントリポイントでmain()を呼び出します。

先ほど構築したアクションサーバーを最初に実行してアクションクライアントをテストしましょう:

Linux/macOS:

bash
$ python3 fibonacci_action_server.py

Windows:

bash
$ python fibonacci_action_server.py

別のターミナルで、アクションクライアントを実行します。 アクションサーバーがゴールを正常に実行しているときに印刷されるメッセージが表示されるはずです:

Linux/macOS:

bash
$ python3 fibonacci_action_client.py
[INFO] [fibonacci_action_server]: Executing goal...
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1])
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2])
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3])
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3, 5])
~ など

Windows:

bash
$ python fibonacci_action_client.py

アクションクライアントは起動し、その後すぐに終了するはずです。 この時点で、機能するアクションクライアントがありますが、結果が表示されず、フィードバックも取得できません。

2.1 結果の取得

ゴールを送信できますが、いつ完了したかをどのように知るのでしょうか? いくつかのステップで結果情報を取得できます。 まず、送信したゴールのゴールハンドルを取得する必要があります。 その後、ゴールハンドルを使用して結果をリクエストできます。

この例の完全なコードは次のとおりです:

python
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node

from action_tutorials_interfaces.action import Fibonacci

class FibonacciActionClient(Node):

    def __init__(self):
        super().__init__('fibonacci_action_client')
        self._action_client = ActionClient(self, Fibonacci, 'fibonacci')

    def send_goal(self, order):
        goal_msg = Fibonacci.Goal()
        goal_msg.order = order

        self._action_client.wait_for_server()

        self._send_goal_future = self._action_client.send_goal_async(goal_msg)

        self._send_goal_future.add_done_callback(self.goal_response_callback)

    def goal_response_callback(self, future):
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().info('Goal rejected :(')
            return

        self.get_logger().info('Goal accepted :)')

        self._get_result_future = goal_handle.get_result_async()
        self._get_result_future.add_done_callback(self.get_result_callback)

    def get_result_callback(self, future):
        result = future.result().result
        self.get_logger().info('Result: {0}'.format(result.sequence))
        rclpy.shutdown()

def main(args=None):
    rclpy.init(args=args)

    action_client = FibonacciActionClient()

    action_client.send_goal(10)

    rclpy.spin(action_client)

if __name__ == '__main__':
    main()

ActionClient.send_goal_async()メソッドは、ゴールハンドルへのフューチャーを返します。 まず、フューチャーが完了したときのコールバックを登録します:

python
        self._send_goal_future.add_done_callback(self.goal_response_callback)

フューチャーは、アクションサーバーがゴールリクエストを受け入れるか拒否するときに完了することに注意してください。 goal_response_callbackをより詳しく見てみましょう。 ゴールが拒否されたかどうかを確認し、結果がないことがわかっているので早期に戻ります:

python
    def goal_response_callback(self, future):
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().info('Goal rejected :(')
            return

        self.get_logger().info('Goal accepted :)')

ゴールハンドルを取得したので、get_result_async()メソッドを使用して結果をリクエストできます。 ゴールの送信と同様に、結果の準備ができたときに完了するフューチャーを取得します。 ゴールレスポンスと同じようにコールバックを登録しましょう:

python
        self._get_result_future = goal_handle.get_result_async()
        self._get_result_future.add_done_callback(self.get_result_callback)

コールバックで、結果の数列をログに記録し、クリーンな終了のためにROS 2をシャットダウンします:

python
    def get_result_callback(self, future):
        result = future.result().result
        self.get_logger().info('Result: {0}'.format(result.sequence))
        rclpy.shutdown()

別のターミナルでアクションサーバーが実行されている状態で、フィボナッチアクションクライアントを実行してみてください!

Linux/macOS:

bash
$ python3 fibonacci_action_client.py

Windows:

bash
$ python fibonacci_action_client.py

ゴールが受け入れられ、最終結果のログメッセージが表示されるはずです。

2.2 フィードバックの取得

アクションクライアントはゴールを送信できます。 素晴らしい! しかし、アクションサーバーから送信したゴールについてのフィードバックを取得できたら素晴らしいでしょう。

この例の完全なコードは次のとおりです:

python
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node

from action_tutorials_interfaces.action import Fibonacci

class FibonacciActionClient(Node):

    def __init__(self):
        super().__init__('fibonacci_action_client')
        self._action_client = ActionClient(self, Fibonacci, 'fibonacci')

    def send_goal(self, order):
        goal_msg = Fibonacci.Goal()
        goal_msg.order = order

        self._action_client.wait_for_server()

        self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)

        self._send_goal_future.add_done_callback(self.goal_response_callback)

    def goal_response_callback(self, future):
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().info('Goal rejected :(')
            return

        self.get_logger().info('Goal accepted :)')

        self._get_result_future = goal_handle.get_result_async()
        self._get_result_future.add_done_callback(self.get_result_callback)

    def get_result_callback(self, future):
        result = future.result().result
        self.get_logger().info('Result: {0}'.format(result.sequence))
        rclpy.shutdown()

    def feedback_callback(self, feedback_msg):
        feedback = feedback_msg.feedback
        self.get_logger().info('Received feedback: {0}'.format(feedback.partial_sequence))

def main(args=None):
    rclpy.init(args=args)

    action_client = FibonacciActionClient()

    action_client.send_goal(10)

    rclpy.spin(action_client)

if __name__ == '__main__':
    main()

フィードバックメッセージのコールバック関数は次のとおりです:

python
    def feedback_callback(self, feedback_msg):
        feedback = feedback_msg.feedback
        self.get_logger().info('Received feedback: {0}'.format(feedback.partial_sequence))

コールバックで、メッセージのフィードバック部分を取得し、partial_sequenceフィールドを画面に印刷します。

アクションクライアントでコールバックを登録する必要があります。 これは、ゴールを送信するときにアクションクライアントにコールバックを追加で渡すことで実現されます:

python
        self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)

準備完了です。 アクションクライアントを実行すると、フィードバックが画面に印刷されるのが表示されるはずです。

概要

このチュートリアルでは、Pythonアクションサーバーとアクションクライアントを一行ずつ組み立て、ゴール、フィードバック、結果を交換するように設定しました。

関連コンテンツ

  • Pythonでアクションサーバーとクライアントを書く方法はいくつかあります。ros2/examplesリポジトリのminimal_action_serverminimal_action_clientパッケージをチェックしてください。

  • ROSアクションについてのより詳細な情報については、設計記事を参照してください。

関連リンク

Released under the MIT License.