如何处理 RBM 广告系列中的离线用户

代理消息传送时间的一个主要因素是,您尝试联系的用户在代理发送消息时是否有数据连接。如果用户处于离线状态,RBM 平台会存储消息,并尝试传送消息(最长 30 天)。如果届时无法传送消息,系统会将其从系统中移除。

当代理尝试联系用户时,用户可能会因为许多原因而无法连接。他们可能已经关闭数据以节省移动套餐费用,也可能是在没有 Wi-Fi 连接的飞机上,或者可能正在隧道中的地铁上。根据消息到达的紧急程度,您的代理需要撤消未传送的 RBM 消息并通过其他通道重新路由这些消息,以便妥善处理离线用户。

在本文中,我将介绍如何使用 Google Cloud Datastore 跟踪您发送和传送的消息、如何使用 Cron revoke未传送的消息,以及如何通过短信重新转送这些消息。

跟踪已发邮件

您的 RBM 代理发送的每一条消息都必须包含唯一的消息 ID。 如需跟踪代理发送的消息,您需要保存每条消息的消息 ID、电话号码和时间戳。

您可以使用各种技术来存储这些信息,但在本文中,我将使用 Google Cloud Datastore。Cloud Datastore 是一个扩容能力极强的 NoSQL 数据库,可自动处理分片和复制操作。它非常适合存储非关系型数据(例如代理发送的消息)。Cloud Datastore 依赖于活跃的 Google App Engine 实例,因此我使用 App Engine 来托管我的 RBM 代理并配置 Cron 作业。

Cloud Datastore 的客户端库提供多种语言版本。在此示例中,我使用 Node.js,并以 RBM 开发者网站上提供的第一个代理 Node.js 示例为基础。我首先运行以下命令来为 Node.js 项目安装 Cloud Datastore:

npm install --save @google-cloud/datastore

在代理源代码中,我添加了对 Cloud Datastore 客户端库的全局引用。

// Imports the Google Cloud client library
const Datastore = require('@google-cloud/datastore');

// Creates a client
const datastore = new Datastore({
    projectId: PROJECT_ID,
});

创建数据存储区对象后,我引入了一个函数来存储每条消息的 msisdnmessage idsent timedelivery 状态。

/**
 *   Records an entry in the Cloud Datastore to keep track of the
 *   messageIds sent to users and the delivery state.
 *
 *   @property {string} msisdn The user's phone number in E.164 format.
 *   @property {string} messageId The unique message identifier.
 *   @property {boolean} delivered True if message has been delivered.
 */
function saveMessage(msisdn, messageId, delivered) {
    const messageKey = datastore.key(['Message', messageId]);

    const dataForMessage = {
        key: messageKey,
        data: {
            id: messageId,
            msisdn: msisdn,
            lastUpdated: new Date().getTime(),
            delivered: delivered
        },
    };

    // Record that the message was sent.
    datastore
        .save(dataForMessage)
        .then(function() {
            console.log('saved message successfully');
        })
        .catch((err) => {
            console.error('ERROR:', err);
        });
}

有了此函数,每当代理向用户发送消息时,您都需要调用此方法。当 RBM Node.js 客户端库发送 RBM 消息时,该库会在回调方法中提供响应对象,其中包含已发送给用户的消息的 messageId

以下示例展示了如何在成功与 RBM API 通信后向用户发送纯文本消息并记录消息信息。

let params = {
    messageText: 'Hello, World!',
    msisdn:'+12223334444',
};

// Send "Hello, World!" to the user.
rbmApiHelper.sendMessage(params,
function(response) {
    // Extract the message id from the response
    let messageId = response.config.params.messageId;

    // Store the sent state in the Datastore
    saveMessage(phoneNumber, messageId, false);
});

运行上述代码后,您可以从 Google Cloud 控制台Datastore 实体视图中检查 Datastore。

Datastore

更新消息的递送状态

现在您的代理将已发送的消息请求存储在 Datastore 中,接下来,在消息成功传送到用户设备后,我们需要更新传送状态。

用户的设备通过 Cloud Pub/Sub 向 RBM 代理发送 DELIVEREDREADIS_TYPING 事件。在 Pub/Sub 的处理程序中,检查是否已传送事件,并将已传送标志的 Datastore 设置更新为 true。

/**
 *   Uses the event received by the Pub/Sub subscription to send a
 *   response to the client's device.
 *   @param {object} userEvent The JSON object of a message
 *   received by the subscription.
 */
function handleMessage(userEvent) {
    if (userEvent.senderPhoneNumber != undefined) {
        let msisdn = userEvent.senderPhoneNumber;
        let messageId = userEvent.messageId;
        let eventType = userEvent.eventType;

        if(eventType === 'DELIVERED') {
            saveMessage(msisdn, messageId, true);
        }

        // TODO: Process message and create RBM response
    }
}

代理 (Agent) 会将传出消息保存到 Datastore 中,并在收到传送通知时更新数据。在下一部分中,我们将在 Google 的 App Engine 上设置一个 Cron 作业,使其每 10 分钟运行一次以监控未传送的消息。

Google App Engine 上的 Cron

您可以使用 Cron 作业按不同的时间间隔安排任务。在 Google 的 App Engine 中,Cron 作业配置了说明、网址和间隔。

在 Node.js 应用中,您可以在 cron.yaml 文件中配置这些配置,然后使用 Google Cloud SDK 将其部署到 App Engine。参阅使用 cron.yaml 安排作业,了解其他语言配置设置。

由于 Cron 任务需要一个网址,因此您需要向 Express 应用路由器添加一个网址端点,以供 Cron 调用。此 webhook 负责在 Datastore 中查询旧消息,将其从 RBM 平台中删除,并通过短信向用户发送这些消息。

router.get('/expireMessages', function(req, res, next) {
    // TOOD: Query the Datastore for undelivered messages,
    // remove them from the RBM platform, and send them over SMS

    res.status(200).send();
});

以下是每 10 分钟执行一次此端点的 cron.yaml 文件配置。

cron:
-   description: "Processing expired RBM messages"
  url: /expireMessages
  schedule: every 10 mins

如需将 Cron 任务部署到 App Engine,请运行以下命令:

gcloud app deploy cron.yaml

部署后,App Engine 会自动配置 Cron 任务,您可以在 App Engine > Cron 作业下查看该任务。

Cron 作业

查询数据存储区中是否有未传送的消息

在上一部分中设置的 Cron 作业的网络钩子中,您需要添加逻辑来查找 delivered 状态等于 false 且 lastUpdated 时间超过预定义超时(对于我们的用例而言)的所有已发送消息。在此示例中,超过 1 小时的邮件会过期。

为了支持此类复合查询,Datastore 需要一个同时包含 deliveredlastUpdated 属性的复合索引。为此,您可以在项目中创建一个名为 index.yaml 的文件,其中包含以下信息:

indexes:
-   kind: Message
  properties:
  -   name: delivered
    direction: asc
  -   name: lastUpdated
    direction: desc

与部署您之前定义的 Cron 作业类似,请使用 Google Cloud SDK 部署您使用以下命令定义的复合索引:

gcloud datastore create-indexes index.yaml

部署后,App Engine 会自动配置索引,您可以在数据存储区 > 索引下查看索引。

数据存储区索引

定义索引后,我们就可以返回到您为 Cron 作业创建的 webhook,并完成消息到期逻辑:

router.get('/expireMessages', function(req, res, next) {
    // Milliseconds in an hour
    const TIMEOUT = 3600000;

    // Threshold is current time minus one hour
    const OLD_MESSAGE_THRESHOLD = new Date().getTime() - TIMEOUT;

    // Create a query to find old undelivered messages
    const query = datastore
        .createQuery('Message')
        .filter('delivered', '=', false)
        .filter('lastUpdated', '<', OLD_MESSAGE_THRESHOLD);

    // Execute the query
    datastore.runQuery(query).then((results) => {
        for(var i = 0; i < results[0].length; i++) {
            let msisdn = results[0][i].msisdn;
            let messageId = results[0][i].id;

            // Stop the message from being sent
            rbmApiHelper.revokeMessage(msisdn, messageId);

            // Remove the message from the Datastore
            datastore.delete(results[0][i][datastore.KEY]);

            // TODO: Send the user the message as SMS
        }
    });

    res.status(200).send();
});

RBM 本身不支持短信回退,因此您需要实现相应逻辑,以通过短信发送未递送的信息。

总结和要点

为了处理离线用户,您可以为未传送的 RBM 消息构建撤消逻辑。您在消息过期之前所用的时间取决于您传输信息的时效性。对于时间敏感型信息,我们建议将超时时间控制在两小时以内。

此示例使用 Cloud Datastore 和 Google App Engine 来管理存储、查询和撤消过程,但用于跟踪已发送和已传送消息的任何存储机制都应该有效。

祝您编程顺利,编程顺利!