pull サブスクリプションで RBM エージェントの開発を高速化する方法

RBM エージェントは、Google Cloud Pub/Sub とのパブリッシュ/サブスクライブ関係を介してメッセージとイベントを受信します。ユーザーがエージェントのメッセージに応答すると、RBM プラットフォームはそれらのエージェントのみがアクセスできる一意の Pub/Sub トピックにメッセージをパブリッシュします。エージェントは、固有のトピックへのサブスクリプションを介して、これらのメッセージとイベントにアクセスします。

Pub/Sub は、pushpull の 2 種類のサブスクリプションをサポートしています。push サブスクリプションでは、Cloud Pub/Sub は構成した Webhook URL にメッセージを送信します。pull サブスクリプションでは、長時間実行メッセージ リスナーを作成し、メッセージを受け取るたびに確認応答するコードを記述する必要があります。

多くのデベロッパーに親しまれている一般的なアプローチは、push サブスクリプションです。サードパーティの API を使用していた場合、コールバック URL または Webhook URL を扱ったことがあるかもしれません。この方法ではシンプルですが、一般公開されている URL が必要であり、デベロッパーは変更をテストするたびに公開ウェブサーバーにデプロイする必要があります。

この記事では、ローカルテスト用に pull サブスクリプションを設定する方法と、Google Cloud の App Engine を使用して本番環境でこのサブスクリプションを使用する方法について説明します。

Pub/Sub を pull サブスクリプション用に構成する

エージェントに Pub/Sub をまだ構成していない場合は、Cloud Pub/Sub の手順に沿って最初のサブスクリプションを作成します。

サブスクリプションの作成後は、push から pull モデルに簡単に切り替えることができます。Google Cloud Console に移動し、[Pub/Sub] > [サブスクリプション] セクションに移動します。エージェント用に作成したサブスクリプションの横にあるオーバーフロー メニューをクリックし、[Edit] を選択します。次の画像のような構成画面が表示されます。

サブスクリプションの詳細

[配信タイプ] を [pull] に設定して、[保存] をクリックします。

非同期 pull サブスクリプション ハンドラの設定

次に、サブスクリプションからメッセージを pull するように RBM エージェントを更新する必要があります。さまざまなプログラミング言語でこれを行う方法については、非同期 pull をご覧ください。また、RBM エージェントサンプルの多くで pull モデルを使用しています。

次のコードは、Node.js で非同期 pull サブスクリプション リスナーを設定する方法を示しています。

function initPubsub() {
    let pubsub = new PubSub({
        projectId: REPLACE_WITH_GCP_PROJECT_ID,
        keyFilename: REPLACE_WITH_SERVICE_ACCOUNT_KEY_FILE_LOCATION,
    });

    // references an existing subscription, (e.g. rbm-agent-sub)
    let subscription = pubsub.subscription(PUB_SUB_SUBSCRIPTION_NAME);

    // create an event handler to handle messages
    let messageHandler = (message) => {
        console.log(`Received message ${message.id}:`);
        console.log(`\tData: ${message.data}`);
        console.log(`\tAttributes: ${message.attributes}`);

        let userEvent = JSON.parse(message.data);

        // TODO: process the userEvent to create another RBM message
        // "Ack" (acknowledge receipt of) the message
        message.ack();
    };

    // Listen for new messages
    subscription.on('message', messageHandler);

    return { messageHandler: messageHandler, subscription: subscription };
}

エージェントをローカルでテストするには、Express アプリの起動時に initPubsub を呼び出すと、messageHandler のレスポンスがコンソールに表示されます。

Google の App Engine の構成とデプロイ

本番環境では、サブスクリプションが常に利用可能であることを確認する必要があります。シンプルな方法の一つは、cron ジョブを使用して Pub/Sub メッセージ リスナーを定期的に再初期化することです。

App Engine では、cron ジョブをさまざまな間隔のタスクのスケジューリングに使用できます。cron ジョブは、説明、URL、間隔で構成されます。Node.js アプリでは、cron.yaml ファイルで構成します。このファイルは、Google Cloud SDK を使用して App Engine にデプロイできます。その他の言語の設定については、cron.yaml を使用したジョブのスケジュール設定をご覧ください。

cron タスクには URL が必要なため、cron によって呼び出される Express アプリ ルーターに URL エンドポイントを追加する必要があります。これにより、前のセクションの initPubsub メソッドを呼び出してリスナーを起動できます。

router.get('/pubsubCallback', function(req, res, next) {
  let pubsubConfig = initPubsub();

      // Stop listening once the timeout is hit
      setTimeout(() => {
        pubsubConfig.subscription.removeListener('message', pubsubConfig.messageHandler);
      }, CRON_JOB_TIMEOUT * 1000);

  res.status(200).send();
});

コールバック内でも、スケジュール設定されたタスクが再び実行される前にリスナーを削除する必要もあります。たとえば、cron ジョブが毎分実行される場合は、CRON_JOB_TIMEOUT パラメータが 60 と等しくなるように設定します。

以下は、このエンドポイントを 1 分ごとに実行するための cron.yaml ファイルの構成です。

cron:
- description: "Processing Pub/Sub messages"
  url: /pubsubCallback
  schedule: every 1 mins

cron タスクを App Engine にデプロイするには、次のコマンドを実行します。

gcloud app deploy cron.yaml

デプロイ後、App Engine は cron タスクを自動的に構成します。タスクは、以下に示すように [App Engine] > [Cron ジョブ] で表示できます。

構成された cron ジョブ

まとめと要約(DR)

pull サブスクリプションを使用すると、ローカルでテストおよびデバッグできるため、開発サイクルを高速化し、RBM エージェントの作成時間を短縮できます。pull モデルには、push モデルに対して追加の構成とコードが必要ですが、設定はシンプルで、一度構成するだけで済みます。cron ジョブを使用すると、pull サブスクリプションが常にエージェントのメッセージ フローを処理できる簡単な状態になります。App Engine を使用すると、セットアップとメンテナンスのコストが非常に低くなります。

それでは始めましょう。