How to speed up RBM agent development with a pull subscription

RBM agents receive messages and events through a publish/subscribe relationship with Google Cloud Pub/Sub. When a user responds to your agent's messages, the RBM platform publishes those messages to a unique Pub/Sub topic that only your agent has access to. Your agent accesses those messages and events through a subscription to its unique topic.

Pub/Sub supports two types of subscriptions: push and pull. With a push subscription, Cloud Pub/Sub sends messages to a webhook URL that you configure. With a pull subscription, you're responsible for writing code to create a long-running message listener and to acknowledge messages as you receive them.

The more familiar and popular approach for most developers is the push subscription. If you've used third-party APIs, then you've likely worked with callback/webhook URLs. Although this approach is simple, it requires a publically available URL, which forces developers to deploy to public web servers each time they want to test a change.

In this article, I'll show you how to set up a pull subscription for local testing and how to use this subscription in a production environment with Google Cloud's App Engine.

Configuring Pub/Sub for a pull subscription

If you haven't already configured Pub/Sub for your agent, follow the instructions in Cloud Pub/Sub to create your initial subscription.

Once you've created your subscription, it's simple to switch from a push to a pull model. Navigate to the Google Cloud Console, then to the Pub/Sub > Subscriptions section. Click the overflow menu next to the subscription you created for your agent, and select Edit. You should see a configuration screen similar to the image below.

Subscription details

Set Delivery Type to Pull, and click Save.

Setting up an asynchronous pull subscription handler

Next, you need to update your RBM agent to pull messages from the subscription. You can read about how to accomplish this with various programming languages in Asynchronous pull or take a look at some of the RBM agent samples, many of which use a pull model.

The code below demonstrates how to set up an asynchronous pull subscription listener in Node.js:

function initPubsub() {
    let pubsub = new PubSub({
        projectId: REPLACE_WITH_GCP_PROJECT_ID,
        keyFilename: REPLACE_WITH_SERVICE_ACCOUNT_KEY_FILE_LOCATION,
    });

    // references an existing subscription, (e.g. rbm-agent-sub)
    let subscription = pubsub.subscription(PUB_SUB_SUBSCRIPTION_NAME);

    // create an event handler to handle messages
    let messageHandler = (message) => {
        console.log(`Received message ${message.id}:`);
        console.log(`\tData: ${message.data}`);
        console.log(`\tAttributes: ${message.attributes}`);

        let userEvent = JSON.parse(message.data);

        // TODO: process the userEvent to create another RBM message
        // "Ack" (acknowledge receipt of) the message
        message.ack();
    };

    // Listen for new messages
    subscription.on('message', messageHandler);

    return { messageHandler: messageHandler, subscription: subscription };
}

To test agents locally, all you need to do is call initPubsub when your express app starts, and you'll see the messageHandler printing responses within your console.

Configuring and deploying to Google's App Engine

In a production environment, you need to make sure the subscription is always available. One simple approach is to rely on a cron job to reinitialize the Pub/Sub message listener periodically.

In App Engine, cron jobs can be used for scheduling tasks with different intervals. A cron job is configured with a description, URL, and interval. In Node.js apps, you configure these in a cron.yaml file, which you can deploy to App Engine using the Google Cloud SDK. You can read about other language configuration setups in Scheduling jobs with cron.yaml.

Because the cron task needs a URL, you need to add a URL endpoint to the express app router to be called by cron, which will in turn call the initPubsub method from the prior section to start the listener.

router.get('/pubsubCallback', function(req, res, next) {
  let pubsubConfig = initPubsub();

      // Stop listening once the timeout is hit
      setTimeout(() => {
        pubsubConfig.subscription.removeListener('message', pubsubConfig.messageHandler);
      }, CRON_JOB_TIMEOUT * 1000);

  res.status(200).send();
});

Within the callback, you also need to remove the listener before the scheduled task executes again. For example, if the cron job runs every minute, then you configure the CRON_JOB_TIMEOUT parameter to equal 60.

Below is the cron.yaml file configuration to execute this endpoint every minute.

cron:
- description: "Processing Pub/Sub messages"
  url: /pubsubCallback
  schedule: every 1 mins

To deploy the cron tasks to App Engine, run the following:

gcloud app deploy cron.yaml

After deployment, App engine automatically configures the cron task, and the task is viewable under App Engine > Cron jobs as seen below.

Configured cron job

Wrap-up & TL;DR

Using a pull subscription lets you test and debug locally, which helps speed up development cycles and reduce the time it takes to create RBM agents. Although the pull model requires some additional configuration and code versus the push model, the setup is simple and only needs to be configured once. Cron jobs are an easy way to make sure your pull subscription is always available to handle your agent's message flow, and App Engine makes the setup and maintenance very low cost.

Good luck and happy coding!