使用 Spring 整合功能和 Google Cloud Pub/Sub 傳送訊息

Spring Integration 提供您可傳遞訊息的機制 (透過 MessageChannels 交換 Messages)。它使用頻道轉接程式與外部系統進行通訊。

在本練習中,我們會建立兩個使用 Spring Cloud GCP 提供的 Spring 整合通道轉接器進行溝通的應用程式。這些轉接器可讓 Spring 整合使用 Google Cloud Pub/Sub 做為訊息交換後端。

您將瞭解如何使用 Cloud Shell 和 Cloud SDK gcloud 指令。

本教學課程使用的是 Spring Boot 入門指南中的範例程式碼。

您將會瞭解的內容

  • 如何使用 Spring 整合功能和 Spring Cloud GCP,在 Google Cloud Pub/Sub 與應用程式之間交換訊息

軟硬體需求

  • Google Cloud Platform 專案
  • 瀏覽器,例如 ChromeFirefox
  • 熟悉標準 Linux 文字編輯器,例如 Vim、EMAC 或 Nano

您要如何使用本教學課程?

唯讀閱讀 閱讀內容並完成練習

您對於建立 HTML/CSS 網路應用程式體驗有什麼評價?

初級 中級 熟練

您對於 Google Cloud Platform 服務的使用體驗評價如何?

初級 中級 專業

自行調整環境設定

如果您還沒有 Google 帳戶 (Gmail 或 Google Apps),請先建立帳戶。登入 Google Cloud Platform 主控台 (console.cloud.google.com),然後建立新專案:

2016-02-10 12:45:26.png 的螢幕擷取畫面

提醒您,專案編號是所有 Google Cloud 專案的不重複名稱 (使用上述名稱後就無法使用,敬請見諒!)此程式碼研究室稍後將稱為 PROJECT_ID

接著,您必須在 Cloud Console 中啟用計費功能,才能使用 Google Cloud 資源。

完成這個程式碼研究室的成本應該不會超過新臺幣 $300 元,但如果您決定繼續使用更多資源,或是讓資源繼續運作 (請參閱本文件結尾的「清除設定」一節),就有可能需要更多成本。

新加入 Google Cloud Platform 的使用者可免費試用 $300 美元

Google Cloud Shell

雖然 Google Cloud 可透過筆記型電腦進行遠端作業,但在這個程式碼研究室中,我們會使用 Google Cloud Shell,這是一個在 Cloud 中執行的指令列環境。

啟用 Google Cloud Shell

在 GCP 主控台中,按一下右上角的工具列的 Cloud Shell 圖示:

然後按一下 [Start Cloud Shell]。

佈建和連線至環境只需要幾分鐘的時間:

這部虛擬機器已載入所有您需要的開發工具。這項服務提供永久性的 5GB 主目錄,可在 Google Cloud 中運作,大幅提升網路效能和驗證效能。在這個研究室中,您可以透過瀏覽器或 Google Chromebook 完成大部分的工作。

連線至 Cloud Shell 之後,您應該會看到您已經通過驗證,且專案已經設定為 PROJECT_ID

在 Cloud Shell 中執行下列指令,確認您的身分已通過驗證:

gcloud auth list

指令輸出

Credentialed accounts:
 - <myaccount>@<mydomain>.com (active)
gcloud config list project

指令輸出

[core]
project = <PROJECT_ID>

如果沒有,則可使用下列指令設定:

gcloud config set project <PROJECT_ID>

指令輸出

Updated property [core/project].

前往 Google Cloud Pub/Sub 主題頁面並啟用 API。

按一下 [Create Topic] (建立主題)

輸入 exampleTopic 做為主題名稱,然後按一下 [Create]

主題建立完成後,請前往「主題」頁面。找到您剛建立的主題,按下行尾的三點圖示,然後點選 [新增訂閱]

在訂閱名稱文字方塊中輸入 exampleSubscription,然後按一下 [建立]。

Cloud Shell 啟動後,您可以使用指令列使用 Spring Initializr 產生兩個新的 Spring Boot 應用程式:

$ curl https://start.spring.io/starter.tgz \
  -d bootVersion=2.0.6.RELEASE \
  -d dependencies=web \
  -d baseDir=spring-integration-sender | tar -xzvf -
$ curl https://start.spring.io/starter.tgz \
  -d bootVersion=2.0.6.RELEASE \
  -d baseDir=spring-integration-receiver | tar -xzvf -

現在,讓我們建立訊息傳送應用程式。請變更為傳送應用程式的目錄。

$ cd spring-integration-sender

我們希望應用程式能將訊息寫入頻道。訊息在管道中之後,即會由傳出管道轉接程式接收,該訊息會從一般的 Spring 訊息轉換為 Google Cloud Pub/Sub 訊息,並發布至 Google Cloud Pub/Sub 主題。

為了讓應用程式寫入到頻道,我們可以使用 Spring Integration 訊息閘道。使用 vimemacsnano 的文字編輯器,宣告 DemoApplication 類別內的 PubsubOutboundGateway 介面。

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.integration.annotation.MessagingGateway;

@SpringBootApplication
public class DemoApplication {

        ...

        @MessagingGateway(defaultRequestChannel = "pubsubOutputChannel")
        public interface PubsubOutboundGateway {

                void sendToPubsub(String text);
        }
}

目前我們有機制可向頻道傳送訊息,不過這些訊息是否在頻道中之後會在哪裡?

我們需要傳出管道轉接程式來使用管道中的新訊息,並發布至 Google Cloud Pub/Sub 主題。

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler;
import org.springframework.messaging.MessageHandler;

@SpringBootApplication
public class DemoApplication {

        ...

        @Bean
        @ServiceActivator(inputChannel = "pubsubOutputChannel")
        public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
                return new PubSubMessageHandler(pubsubTemplate, "exampleTopic");
        }
}

@ServiceActivator 註解會導致這個 MessageHandler 套用到 inputChannel 中的任何新訊息。在這個案例中,我們呼叫了傳出通道轉接程式 PubSubMessageHandler,以便將訊息發布至 Google Cloud Pub/Sub 的 exampleTopic 主題。

使用頻道轉接頭後,我們現在會自動抹除 PubsubOutboundGateway 物件,並用來將訊息寫入頻道。

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.servlet.view.RedirectView;

@SpringBootApplication
public class DemoApplication {

...

        @Autowired
        private PubsubOutboundGateway messagingGateway;

        @PostMapping("/postMessage")
        public RedirectView postMessage(@RequestParam("message") String message) {
                this.messagingGateway.sendToPubsub(message);
                return new RedirectView("/");
        }
}

由於 @PostMapping 註冊,我們現在有一個端點收聽 HTTP POST 請求,但除了不同時添加 @RestController 註冊在 DemoApplication 類中請為它為 REST 控制器。

src/main/java/com/example/demo/DemoApplication.java

import org.springframework.web.bind.annotation.RestController;

@SpringBootApplication
@RestController
public class DemoApplication {
  ...
}

應用程式需要先新增必要依附元件,應用程式才能運作。

pom.xml

<project>
  ...
  <!-- Add Spring Cloud GCP Dependency BOM -->
  <dependencyManagement>
        <dependencies>
                <dependency>
                        <groupId>org.springframework.cloud</groupId>
                        <artifactId>spring-cloud-gcp-dependencies</artifactId>
                        <version>1.0.0.RELEASE</version>
                        <type>pom</type>
                        <scope>import</scope>
                </dependency>
        </dependencies>
  </dependencyManagement>

  <dependencies>
        ...
        <!-- Add Pub/Sub Starter -->
        <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
        </dependency>

        <!-- Add Spring Integration -->
        <dependency>
                <groupId>org.springframework.integration</groupId>
                <artifactId>spring-integration-core</artifactId>
        </dependency>

  </dependencies>

</project>

執行寄件者應用程式。

# Set the Project ID in environmental variable
$ export GOOGLE_CLOUD_PROJECT=`gcloud config list \
  --format 'value(core.project)'`
$ ./mvnw spring-boot:run

應用程式正在聽取 POST 要求,其中包含通訊埠 8080 和端點 /postMessage 的訊息,不過我們稍後會處理這個問題。

我們剛建立了能透過 Google Cloud Pub/Sub 傳送訊息的應用程式。現在,我們會建立另一個應用程式,以接收並處理這些訊息。

按一下 [+] 開啟新的 Cloud Shell 工作階段。

然後在新的 Cloud Shell 工作階段中,將目錄變更為接收器 app 的目錄:

$ cd spring-integration-receiver

在先前的應用程式中,訊息閘道宣告為我們建立了外連管道。由於我們不是使用簡訊閘道來接收郵件,因此我們必須宣告自己的 MessageChannel,才會收到收到的郵件。

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.context.annotation.Bean;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.MessageChannel;

@SpringBootApplication
public class DemoApplication {

        ...

        @Bean
        public MessageChannel pubsubInputChannel() {
                return new DirectChannel();
        }
}

我們需要傳入管道轉接器才能接收 Google Cloud Pub/Sub 傳送的訊息,並將這些訊息轉送至 pubsubInputChannel

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.inbound.PubSubInboundChannelAdapter;

@SpringBootApplication
public class DemoApplication {
        ...

        @Bean
        public PubSubInboundChannelAdapter messageChannelAdapter(
                        @Qualifier("pubsubInputChannel") MessageChannel inputChannel,
                        PubSubTemplate pubSubTemplate) {
                PubSubInboundChannelAdapter adapter =
                                new PubSubInboundChannelAdapter(pubSubTemplate, "exampleSubscription");
                adapter.setOutputChannel(inputChannel);

                return adapter;
        }
}

這個轉接器本身會繫結至 pubsubInputChannel,並監聽來自 Google Cloud Pub/Sub exampleSubscription 訂閱項目的新訊息。

我們有收到留言的訊息管道,但該如何處理?

讓我們使用 @ServiceActivator 處理這些郵件,並在新郵件抵達 pubsubInputChannel 時觸發。

src/main/java/com/example/demo/DemoApplication.java

...
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.integration.annotation.ServiceActivator;

@SpringBootApplication
public class DemoApplication {

        ...

        private static final Log LOGGER = LogFactory.getLog(DemoApplication.class);

        @ServiceActivator(inputChannel = "pubsubInputChannel")
        public void messageReceiver(String payload) {
                LOGGER.info("Message arrived! Payload: " + payload);
        }
}

在這種情況下,我們只會記錄郵件酬載。

我們需要新增必要的依附元件。

pom.xml

<project>
  ...
  <!-- Add Spring Cloud GCP Dependency BOM -->
  <dependencyManagement>
        <dependencies>
                <dependency>
                        <groupId>org.springframework.cloud</groupId>
                        <artifactId>spring-cloud-gcp-dependencies</artifactId>
                        <version>1.0.0.RELEASE</version>
                        <type>pom</type>
                        <scope>import</scope>
                </dependency>
        </dependencies>
  </dependencyManagement>

  <dependencies>
        ...
        <!-- Add Pub/Sub Starter -->
        <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
        </dependency>

        <!-- Add Spring Integration -->
        <dependency>
                <groupId>org.springframework.integration</groupId>
                <artifactId>spring-integration-core</artifactId>
        </dependency>

  </dependencies>

</project>

執行接收器應用程式。

$ ./mvnw spring-boot:run

現在,系統會將傳送到您寄件者應用程式的所有訊息記錄在接收端應用程式中。如要進行測試,請開啟新的 Cloud Shell 工作階段,並向寄件者傳送 HTTP POST 要求。

$ curl --data "message=Hello world!" localhost:8080/postMessage

然後確認接收端應用程式是否記錄了您所傳送的郵件!

INFO: Message arrived! Payload: Hello world!

刪除此練習中建立的訂閱項目和主題。

$ gcloud beta pubsub subscriptions delete exampleSubscription
$ gcloud beta pubsub topics delete exampleTopic

您設定了兩個 Spring Boot 應用程式,以使用 Spring Integration Channel Adapters for Google Cloud Pub/Sub。這類訊息不需與 Google Cloud Pub/Sub API 互動,就能相互交換訊息。

您已經瞭解如何使用 Google Cloud Pub/Sub 的 Spring Integration Channel 轉接器!

瞭解詳情

授權

本作品採用創用 CC 姓名標示 2.0 一般授權。