使用 Spring Integration 和 Google Cloud Pub/Sub 进行消息传递

Spring Integration 为您提供了通过 MessageChannels 交换 Messages 的消息传递机制。它使用通道适配器与外部系统进行通信。

在本练习中,我们将创建两个应用,它们使用 Spring Cloud GCP 提供的 Spring Integration 通道适配器进行通信。Spring Integration 通过这些适配器使用 Google Cloud Pub/Sub 作为消息交换后端。

您将学习如何使用 Cloud Shell 和 Cloud SDK gcloud 命令。

本教程使用 Spring Boot 入门指南中的示例代码。

学习内容

  • 如何使用 Spring Integration 和 Spring Cloud GCP 在应用与 Google Cloud Pub/Sub 之间交换消息

所需条件

  • 一个 Google Cloud Platform 项目
  • 一个浏览器,例如 ChromeFirefox
  • 熟悉标准的 Linux 文本编辑器,例如 Vim、EMACs 或 Nano

您将如何使用本教程?

仅阅读教程内容 阅读并完成练习

您如何评价自己在构建 HTML/CSS Web 应用方面的经验水平?

新手水平 中等水平 熟练水平

您如何评价自己在使用 Google Cloud Platform 服务方面的经验水平?

新手水平 中等水平 熟练水平

自定进度的环境设置

如果您还没有 Google 帐号(Gmail 或 Google Apps),则必须创建一个。登录 Google Cloud Platform Console (console.cloud.google.com) 并创建一个新项目:

2016-02-10 12:45:26 的屏幕截图.png

请记住项目 ID,它在所有 Google Cloud 项目中都是唯一名称(很抱歉,上述名称已被占用,您无法使用!)。它稍后将在此 Codelab 中被称为 PROJECT_ID

接下来,您需要在 Cloud Console 中启用结算功能,才能使用 Google Cloud 资源。

在此 Codelab 中运行仅花费几美元,但是如果您决定使用更多资源或继续让它们运行,费用可能更高(请参阅本文档末尾的“清理”部分)。

Google Cloud Platform 的新用户有资格获享 $300 免费试用

Google Cloud Shell

虽然 Google Cloud 可以从笔记本电脑远程操作,但在此 Codelab 中,我们将使用 Google Cloud Shell,这是一个在云端运行的命令行环境。

激活 Google Cloud Shell

在 GCP 控制台中,点击右上角工具栏上的 Cloud Shell 图标:

然后点击“启动 Cloud Shell”:

配置和连接到环境应该只需要片刻时间:

这个虚拟机已加载了您需要的所有开发工具。它提供了一个永久性的 5GB 主目录,并且在 Google Cloud 上运行,从而大大增强了网络性能和身份验证功能。只需使用一个浏览器或 Google Chromebook 即可完成本实验中的大部分(甚至全部)工作。

在连接到 Cloud Shell 后,您应该会看到自己已通过身份验证,并且相关项目已设置为您的 PROJECT_ID。

在 Cloud Shell 中运行以下命令以确认您已通过身份验证:

gcloud auth list

命令输出

Credentialed accounts:
 - <myaccount>@<mydomain>.com (active)
gcloud config list project

命令输出

[core]
project = <PROJECT_ID>

如果不是上述结果,您可以使用以下命令进行设置:

gcloud config set project <PROJECT_ID>

命令输出

Updated property [core/project].

导航到 Google Cloud Pub/Sub 主题页面并启用该 API。

点击创建主题

输入 exampleTopic 作为主题名称,然后点击创建

创建主题后,留在“主题”页面中。找到您刚刚创建的主题,按行末的三个垂直点,然后点击新建订阅

在订阅名称文本框中输入 exampleSubscription,然后点击创建

在 Cloud Shell 启动后,您可以使用 Spring Initializr 通过命令行生成两个新的 Spring Boot 应用:

$ curl https://start.spring.io/starter.tgz \
  -d bootVersion=2.0.6.RELEASE \
  -d dependencies=web \
  -d baseDir=spring-integration-sender | tar -xzvf -
$ curl https://start.spring.io/starter.tgz \
  -d bootVersion=2.0.6.RELEASE \
  -d baseDir=spring-integration-receiver | tar -xzvf -

现在,我们来创建消息发送应用。切换到正在发送的应用的目录。

$ cd spring-integration-sender

我们希望应用向通道写入消息。消息进入通道后,将由出站通道适配器提取。该适配器将消息从常规 Spring 消息转换为 Google Cloud Pub/Sub 消息,并将其发布到 Google Cloud Pub/Sub 主题。

为了让我们的应用向通道写入数据,可以使用 Spring Integration 消息传递网关。使用来自 vimemacsnano 的文本编辑器在 DemoApplication 类中声明 PubsubOutboundGateway 接口。

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.integration.annotation.MessagingGateway;

@SpringBootApplication
public class DemoApplication {

        ...

        @MessagingGateway(defaultRequestChannel = "pubsubOutputChannel")
        public interface PubsubOutboundGateway {

                void sendToPubsub(String text);
        }
}

我们现在有一种向通道发送消息的机制,但是这些消息在进入通道后去了哪里?

我们需要一个出站通道适配器来接收通道中的新消息,并将其发布到 Google Cloud Pub/Sub 主题。

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler;
import org.springframework.messaging.MessageHandler;

@SpringBootApplication
public class DemoApplication {

        ...

        @Bean
        @ServiceActivator(inputChannel = "pubsubOutputChannel")
        public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
                return new PubSubMessageHandler(pubsubTemplate, "exampleTopic");
        }
}

@ServiceActivator 注释可将此 MessageHandler 应用于 inputChannel 中的任何新消息。在本例中,我们将调用出站通道适配器 PubSubMessageHandler,将消息发布到 Google Cloud Pub/Sub 的 exampleTopic 主题。

设置好通道适配器后,我们现在可以自动连接 PubsubOutboundGateway 对象,并使用它向通道写入消息。

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.servlet.view.RedirectView;

@SpringBootApplication
public class DemoApplication {

...

        @Autowired
        private PubsubOutboundGateway messagingGateway;

        @PostMapping("/postMessage")
        public RedirectView postMessage(@RequestParam("message") String message) {
                this.messagingGateway.sendToPubsub(message);
                return new RedirectView("/");
        }
}

得益于 @PostMapping 注释,我们现在有了一个端点来侦听 HTTP POST 请求,但还要向 DemoApplication 类添加 @RestController 注释来将其标记为 REST 控制器。

src/main/java/com/example/demo/DemoApplication.java

import org.springframework.web.bind.annotation.RestController;

@SpringBootApplication
@RestController
public class DemoApplication {
  ...
}

我们只需添加所需的依赖项就能让应用运行。

pom.xml

<project>
  ...
  <!-- Add Spring Cloud GCP Dependency BOM -->
  <dependencyManagement>
        <dependencies>
                <dependency>
                        <groupId>org.springframework.cloud</groupId>
                        <artifactId>spring-cloud-gcp-dependencies</artifactId>
                        <version>1.0.0.RELEASE</version>
                        <type>pom</type>
                        <scope>import</scope>
                </dependency>
        </dependencies>
  </dependencyManagement>

  <dependencies>
        ...
        <!-- Add Pub/Sub Starter -->
        <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
        </dependency>

        <!-- Add Spring Integration -->
        <dependency>
                <groupId>org.springframework.integration</groupId>
                <artifactId>spring-integration-core</artifactId>
        </dependency>

  </dependencies>

</project>

运行发送者应用。

# Set the Project ID in environmental variable
$ export GOOGLE_CLOUD_PROJECT=`gcloud config list \
  --format 'value(core.project)'`
$ ./mvnw spring-boot:run

该应用正在侦听包含端口 8080 和端点 /postMessage 上消息的 POST 请求,但我们稍后会对此进行说明。

我们刚刚创建了一个通过 Google Cloud Pub/Sub 发送消息的应用。现在,我们将再创建一个接收这些消息并进行处理的应用。

点击 + 打开新的 Cloud Shell 会话。

然后,在新的 Cloud Shell 会话中,将目录更改为接收者应用的目录:

$ cd spring-integration-receiver

在上一个应用中,消息传递网关声明为我们创建了出站通道。由于我们不使用消息传递网关来接收消息,因此需要声明自己的 MessageChannel(收到的消息将到达此处)。

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.context.annotation.Bean;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.MessageChannel;

@SpringBootApplication
public class DemoApplication {

        ...

        @Bean
        public MessageChannel pubsubInputChannel() {
                return new DirectChannel();
        }
}

我们需要入站通道适配器接收来自 Google Cloud Pub/Sub 的消息,并将其中继到 pubsubInputChannel

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.inbound.PubSubInboundChannelAdapter;

@SpringBootApplication
public class DemoApplication {
        ...

        @Bean
        public PubSubInboundChannelAdapter messageChannelAdapter(
                        @Qualifier("pubsubInputChannel") MessageChannel inputChannel,
                        PubSubTemplate pubSubTemplate) {
                PubSubInboundChannelAdapter adapter =
                                new PubSubInboundChannelAdapter(pubSubTemplate, "exampleSubscription");
                adapter.setOutputChannel(inputChannel);

                return adapter;
        }
}

此适配器会将其自身绑定到 pubsubInputChannel,并侦听 Google Cloud Pub/Sub exampleSubscription 订阅中的新消息。

我们有一个通道,收到的消息会发布到其中,但该如何处理这些消息呢?

让我们使用在 pubsubInputChannel 收到新消息时触发的 @ServiceActivator 来处理它们。

src/main/java/com/example/demo/DemoApplication.java

...
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.integration.annotation.ServiceActivator;

@SpringBootApplication
public class DemoApplication {

        ...

        private static final Log LOGGER = LogFactory.getLog(DemoApplication.class);

        @ServiceActivator(inputChannel = "pubsubInputChannel")
        public void messageReceiver(String payload) {
                LOGGER.info("Message arrived! Payload: " + payload);
        }
}

在本例中,我们只需记录消息载荷。

我们需要添加必要的依赖项。

pom.xml

<project>
  ...
  <!-- Add Spring Cloud GCP Dependency BOM -->
  <dependencyManagement>
        <dependencies>
                <dependency>
                        <groupId>org.springframework.cloud</groupId>
                        <artifactId>spring-cloud-gcp-dependencies</artifactId>
                        <version>1.0.0.RELEASE</version>
                        <type>pom</type>
                        <scope>import</scope>
                </dependency>
        </dependencies>
  </dependencyManagement>

  <dependencies>
        ...
        <!-- Add Pub/Sub Starter -->
        <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
        </dependency>

        <!-- Add Spring Integration -->
        <dependency>
                <groupId>org.springframework.integration</groupId>
                <artifactId>spring-integration-core</artifactId>
        </dependency>

  </dependencies>

</project>

运行接收者应用。

$ ./mvnw spring-boot:run

现在,您发送到发送者应用的任何消息都会记录在接收者应用中。如需进行测试,请打开新的 Cloud Shell 会话,并向发送者应用发出 HTTP POST 请求。

$ curl --data "message=Hello world!" localhost:8080/postMessage

然后,验证接收者应用是否记录了您发送的消息!

INFO: Message arrived! Payload: Hello world!

删除在此练习中创建的订阅和主题。

$ gcloud beta pubsub subscriptions delete exampleSubscription
$ gcloud beta pubsub topics delete exampleTopic

您设置了两个 Spring Boot 应用,它们使用适用于 Google Cloud Pub/Sub 的 Spring Integration 通道适配器。它们彼此之间交换消息,而无需与 Google Cloud Pub/Sub API 进行交互。

您已经了解如何使用适用于 Google Cloud Pub/Sub 的 Spring Integration 通道适配器!

了解详情

许可

此作品已获得 Creative Commons Attribution 2.0 通用许可授权。