如何通过拉取订阅加快 RBM 代理开发

RBM 代理通过与 Google Cloud Pub/Sub 的发布/订阅关系接收消息和事件。当用户回复代理的消息时,RBM 平台会将这些消息发布到只有您的代理才能访问的唯一 Pub/Sub 主题。您的代理通过订阅其唯一主题来访问这些消息和事件。

Pub/Sub 支持两种类型的订阅:推送拉取。使用推送订阅时,Cloud Pub/Sub 会将消息发送到您配置的网络钩子网址。使用拉取订阅时,您负责编写代码,以创建长时间运行的消息监听器,并在收到消息时确认消息。

对大多数开发者而言,推送订阅更常见且更常用。如果您使用第三方 API,那么您可能已经使用过回调/网络钩子网址。虽然这个方法很简单,但它需要一个可公开访问的网址,因此开发者每次要测试某项更改时,都必须强制部署到公共网络服务器。

在本文中,我将向大家介绍如何设置本地订阅的拉取订阅,以及如何在 Google Cloud App Engine 的生产环境中使用此订阅。

为拉取订阅配置 Pub/Sub

如果您尚未为代理配置 Pub/Sub,请按照 Cloud Pub/Sub 中的说明创建您的初始订阅。

创建订阅后,您可以轻松地从推送模型切换为拉取模型。导航到 Google Cloud Console,然后转到 Pub/Sub > 订阅部分。点击您为代理创建的订阅旁边的溢出菜单,然后选择修改。您应该会看到类似于下图的配置屏幕。

订阅详情

传送类型设置为拉取,然后点击保存

设置异步拉取订阅处理程序

接下来,您需要更新 RBM 代理以从订阅拉取消息。您可以参阅异步拉取一文,了解如何使用各种编程语言实现这一点,或者查看一些 RBM 代理示例,其中很多示例都使用拉取模型。

以下代码演示了如何在 Node.js 中设置异步拉取订阅监听器:

function initPubsub() {
    let pubsub = new PubSub({
        projectId: REPLACE_WITH_GCP_PROJECT_ID,
        keyFilename: REPLACE_WITH_SERVICE_ACCOUNT_KEY_FILE_LOCATION,
    });

    // references an existing subscription, (e.g. rbm-agent-sub)
    let subscription = pubsub.subscription(PUB_SUB_SUBSCRIPTION_NAME);

    // create an event handler to handle messages
    let messageHandler = (message) => {
        console.log(`Received message ${message.id}:`);
        console.log(`\tData: ${message.data}`);
        console.log(`\tAttributes: ${message.attributes}`);

        let userEvent = JSON.parse(message.data);

        // TODO: process the userEvent to create another RBM message
        // "Ack" (acknowledge receipt of) the message
        message.ack();
    };

    // Listen for new messages
    subscription.on('message', messageHandler);

    return { messageHandler: messageHandler, subscription: subscription };
}

如需在本地测试代理,您只需在 Express 应用启动时调用 initPubsub,即可在控制台中看到 messageHandler 输出响应。

配置并部署到 Google 的 App Engine

在生产环境中,您需要确保订阅始终可用。一种简单的方法是定期依靠 Cron 作业重新初始化 Pub/Sub 消息监听器。

在 App Engine 中,cron 作业可用于安排具有不同间隔的任务。Cron 作业配置了说明、网址和间隔。在 Node.js 应用中,您可以在 cron.yaml 文件中配置这些文件,您可以使用 Google Cloud SDK 将其部署到 App Engine。您可以在使用 cron.yaml 安排作业中了解其他语言配置设置。

由于 Cron 任务需要一个网址,因此您需要添加一个网址端点,以便由 Cron 调用 Express 应用路由器,而该路由器反过来会调用上一部分中的 initPubsub 方法启动监听器。

router.get('/pubsubCallback', function(req, res, next) {
  let pubsubConfig = initPubsub();

      // Stop listening once the timeout is hit
      setTimeout(() => {
        pubsubConfig.subscription.removeListener('message', pubsubConfig.messageHandler);
      }, CRON_JOB_TIMEOUT * 1000);

  res.status(200).send();
});

在该回调函数内,您还需要移除监听器,然后才可以再次执行计划任务。例如,如果 Cron 作业每分钟运行,您可将 CRON_JOB_TIMEOUT 参数配置为 60

以下是用于每分钟执行此端点的 cron.yaml 文件配置。

cron:
- description: "Processing Pub/Sub messages"
  url: /pubsubCallback
  schedule: every 1 mins

如需将 Cron 任务部署到 App Engine,请运行以下命令:

gcloud app deploy cron.yaml

部署完成后,App Engine 会自动配置 Cron 任务,您可以在 App Engine > Cron 作业下查看此任务,如下所示。

配置的 Cron 作业

总结与要点

通过拉取订阅,您可以在本地进行测试和调试,这有助于缩短开发周期,并缩短创建 RBM 代理所需的时间。虽然拉取模型需要一些额外的配置和代码(与推送模型相比),但设置过程很简单,只需配置一次。借助 Cron 作业,您可以轻松确保拉取订阅始终可用于处理代理的消息流,并且 App Engine 可以使设置和维护费用非常低。

祝您好运,编码愉快!