Spring Integration と Google Cloud Pub/Sub を使用したメッセージング

Spring Integration には、MessageChannels を介して Messages を交換するためのメッセージング メカニズムが用意されています。チャンネル アダプターを使用して外部システムと通信します。

この演習では、Spring Cloud GCP が提供する Spring Integration チャネル アダプターを使用して通信する 2 つのアプリを作成します。これらのアダプタを使用すると、Spring Integration でメッセージ交換バックエンドとして Google Cloud Pub/Sub を使用できます。

Cloud Shell と Cloud SDK の gcloud コマンドを使用する方法について学習します。

このチュートリアルでは、Spring Boot スタートガイドのサンプルコードを使用します。

ラボの内容

  • Spring Integration と Spring Cloud GCP を使用して Google Cloud Pub/Sub とアプリ間でメッセージを交換する方法

必要なもの

  • Google Cloud Platform プロジェクト
  • ChromeFirefox などのブラウザ
  • Linux の標準的なテキスト エディタ(vim、emacs、nano など)を使い慣れていること

このチュートリアルの利用方法をお選びください。

通読するのみ 通読し、演習を行う

HTML/CSS ウェブアプリ作成のご経験についてお答えください。

初心者 中級者 上級者

Google Cloud Platform サービスのご利用経験についてどのように評価されますか?

初心者 中級者 上級者

セルフペース型の環境設定

Google アカウント(Gmail または Google Apps)をまだお持ちでない場合は、アカウントを作成する必要があります。Google Cloud Platform Console(console.cloud.google.com)にログインして、新しいプロジェクトを作成します。

2016-02-10 12:45:26.png のスクリーンショット

プロジェクト ID を忘れないようにしてください。プロジェクト ID はすべての Google Cloud プロジェクトを通じて一意の名前にする必要があります(上記の名前はすでに使用されているので使用できません)。以降、このコードラボでは PROJECT_ID と呼びます。

次に、Google Cloud リソースを使用するために、Cloud Console で課金を有効にする必要があります。

この Codelab を実施した場合、費用は数ドルを超えることはありませんが、より多くのリソースを使用する場合や、実行したままにしておくとさらにコストがかかる場合があります(このドキュメントの最後にある「クリーンアップ」セクションをご覧ください)。

Google Cloud Platform の新規ユーザーは 300 ドル分の無料トライアルをご利用いただけます。

Google Cloud Shell

Google Cloud はノートパソコンからリモートで操作できますが、この Codelab では Cloud 上で動作するコマンドライン環境である Google Cloud Shell を使用します。

Google Cloud Shell をアクティブにする

GCP Console で右上のツールバーにある Cloud Shell アイコンをクリックします。

[Cloud Shell の起動] をクリックします。

プロビジョニングと環境への接続にはそれほど時間はかかりません。

この仮想マシンには、必要な開発ツールがすべて準備されています。5 GB の永続ホーム ディレクトリが用意されており、Google Cloud で稼働するため、ネットワーク パフォーマンスが充実しており認証もスムーズです。このラボでの作業のほとんどは、ブラウザまたは Google Chromebook から実行できます。

Cloud Shell に接続されると、認証が完了し、プロジェクトが PROJECT_ID に設定されていることが確認できます。

Cloud Shell で次のコマンドを実行して、認証されたことを確認します。

gcloud auth list

コマンド出力

Credentialed accounts:
 - <myaccount>@<mydomain>.com (active)
gcloud config list project

コマンド出力

[core]
project = <PROJECT_ID>

上記のようになっていない場合は、次のコマンドで設定できます。

gcloud config set project <PROJECT_ID>

コマンド出力

Updated property [core/project].

Google Cloud Pub/Sub トピックのページに移動し、API を有効にします。

[トピックを作成] をクリックします。

トピック名として「exampleTopic」と入力し、[作成] をクリックします。

トピックの作成後も、[トピック] ページを表示します。作成したトピックを探し、行の末尾にあるその他アイコンをクリックして [新規サブスクリプション] をクリックします。

サブスクリプション名のテキスト ボックスに「exampleSubscription」と入力し、[作成] をクリックします。

Cloud Shell が起動したら、コマンドラインを使用して、Spring Initializr で 2 つの新しい Spring Boot アプリケーションを生成できます。

$ curl https://start.spring.io/starter.tgz \
  -d bootVersion=2.0.6.RELEASE \
  -d dependencies=web \
  -d baseDir=spring-integration-sender | tar -xzvf -
$ curl https://start.spring.io/starter.tgz \
  -d bootVersion=2.0.6.RELEASE \
  -d baseDir=spring-integration-receiver | tar -xzvf -

次に、メッセージ アプリを作成します。送信アプリのディレクトリに移動します。

$ cd spring-integration-sender

アプリではチャネルにメッセージを書き込んでください。メッセージがチャネルに送られると、送信チャネル アダプタによって取り込まれ、汎用的な Spring メッセージから Google Cloud Pub/Sub メッセージに変換され、Google Cloud Pub/Sub トピックにパブリッシュされます。

アプリがチャネルに書き込むには、Spring Integration メッセージング ゲートウェイを使用します。vimemacsnano のテキスト エディタを使用して、DemoApplication クラス内で PubsubOutboundGateway インターフェースを宣言します。

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.integration.annotation.MessagingGateway;

@SpringBootApplication
public class DemoApplication {

        ...

        @MessagingGateway(defaultRequestChannel = "pubsubOutputChannel")
        public interface PubsubOutboundGateway {

                void sendToPubsub(String text);
        }
}

今度はチャンネルにメッセージを送信するためのメカニズムができましたが、メッセージがチャンネル内に送信された後は、どこにそのメッセージを送ればよいでしょうか。

チャネル内の新しいメッセージを使用して Google Cloud Pub/Sub トピックにパブリッシュするには、送信チャネル アダプタが必要です。

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler;
import org.springframework.messaging.MessageHandler;

@SpringBootApplication
public class DemoApplication {

        ...

        @Bean
        @ServiceActivator(inputChannel = "pubsubOutputChannel")
        public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
                return new PubSubMessageHandler(pubsubTemplate, "exampleTopic");
        }
}

@ServiceActivator アノテーションにより、この MessageHandlerinputChannel のすべての新規メッセージに適用されます。この例では、送信チャネル アダプタ PubSubMessageHandler を呼び出して、Google Cloud Pub/Sub の exampleTopic トピックにメッセージを公開します。

チャンネル アダプターがあれば、PubsubOutboundGateway オブジェクトを自動で有線接続し、そのチャンネルを使ってメッセージを書き込むことができます。

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.servlet.view.RedirectView;

@SpringBootApplication
public class DemoApplication {

...

        @Autowired
        private PubsubOutboundGateway messagingGateway;

        @PostMapping("/postMessage")
        public RedirectView postMessage(@RequestParam("message") String message) {
                this.messagingGateway.sendToPubsub(message);
                return new RedirectView("/");
        }
}

@PostMapping アノテーションにより、HTTP POST リクエストをリッスンするエンドポイントが作成されましたが、DemoApplication クラスに @RestController アノテーションを追加して、これを REST コントローラとしてマークするわけではありません。

src/main/java/com/example/demo/DemoApplication.java

import org.springframework.web.bind.annotation.RestController;

@SpringBootApplication
@RestController
public class DemoApplication {
  ...
}

アプリを実行するには、必要な依存関係を追加するだけです。

pom.xml

<project>
  ...
  <!-- Add Spring Cloud GCP Dependency BOM -->
  <dependencyManagement>
        <dependencies>
                <dependency>
                        <groupId>org.springframework.cloud</groupId>
                        <artifactId>spring-cloud-gcp-dependencies</artifactId>
                        <version>1.0.0.RELEASE</version>
                        <type>pom</type>
                        <scope>import</scope>
                </dependency>
        </dependencies>
  </dependencyManagement>

  <dependencies>
        ...
        <!-- Add Pub/Sub Starter -->
        <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
        </dependency>

        <!-- Add Spring Integration -->
        <dependency>
                <groupId>org.springframework.integration</groupId>
                <artifactId>spring-integration-core</artifactId>
        </dependency>

  </dependencies>

</project>

センダーアプリを実行します。

# Set the Project ID in environmental variable
$ export GOOGLE_CLOUD_PROJECT=`gcloud config list \
  --format 'value(core.project)'`
$ ./mvnw spring-boot:run

アプリはポート 8080 とエンドポイント /postMessage でメッセージを含む POST リクエストをリッスンしますが、これについては後で説明します。

Google Cloud Pub/Sub でメッセージを送信するアプリを作成しました。次に、これらのメッセージを受信して処理する別のアプリを作成します。

[+] をクリックして、新しい Cloud Shell セッションを開きます。

次に、新しい Cloud Shell セッションで、ディレクトリをレシーバー アプリのディレクトリに変更します。

$ cd spring-integration-receiver

以前のアプリでは、メッセージ ゲートウェイ宣言によって送信チャンネルが作成されました。メッセージを受信するためにメッセージ ゲートウェイを使用しないため、着信したメッセージを受信する独自の MessageChannel を宣言する必要があります。

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.context.annotation.Bean;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.MessageChannel;

@SpringBootApplication
public class DemoApplication {

        ...

        @Bean
        public MessageChannel pubsubInputChannel() {
                return new DirectChannel();
        }
}

Google Cloud Pub/Sub からメッセージを受信して pubsubInputChannel にリレーするには、受信チャネル アダプタが必要です。

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.inbound.PubSubInboundChannelAdapter;

@SpringBootApplication
public class DemoApplication {
        ...

        @Bean
        public PubSubInboundChannelAdapter messageChannelAdapter(
                        @Qualifier("pubsubInputChannel") MessageChannel inputChannel,
                        PubSubTemplate pubSubTemplate) {
                PubSubInboundChannelAdapter adapter =
                                new PubSubInboundChannelAdapter(pubSubTemplate, "exampleSubscription");
                adapter.setOutputChannel(inputChannel);

                return adapter;
        }
}

このアダプタは自身を pubsubInputChannel にバインドし、Google Cloud Pub/Sub exampleSubscription サブスクリプションからの新しいメッセージをリッスンします。

受信メールを投稿するチャンネルがありますが、そのメールはどのように処理すればよいですか。

それらを @ServiceActivator で処理しましょう。これは pubsubInputChannel に新しいメッセージが届いたときにトリガーされます。

src/main/java/com/example/demo/DemoApplication.java

...
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.integration.annotation.ServiceActivator;

@SpringBootApplication
public class DemoApplication {

        ...

        private static final Log LOGGER = LogFactory.getLog(DemoApplication.class);

        @ServiceActivator(inputChannel = "pubsubInputChannel")
        public void messageReceiver(String payload) {
                LOGGER.info("Message arrived! Payload: " + payload);
        }
}

この場合、メッセージ ペイロードだけが記録されます。

必要な依存関係を追加する必要があります。

pom.xml

<project>
  ...
  <!-- Add Spring Cloud GCP Dependency BOM -->
  <dependencyManagement>
        <dependencies>
                <dependency>
                        <groupId>org.springframework.cloud</groupId>
                        <artifactId>spring-cloud-gcp-dependencies</artifactId>
                        <version>1.0.0.RELEASE</version>
                        <type>pom</type>
                        <scope>import</scope>
                </dependency>
        </dependencies>
  </dependencyManagement>

  <dependencies>
        ...
        <!-- Add Pub/Sub Starter -->
        <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
        </dependency>

        <!-- Add Spring Integration -->
        <dependency>
                <groupId>org.springframework.integration</groupId>
                <artifactId>spring-integration-core</artifactId>
        </dependency>

  </dependencies>

</project>

レシーバー アプリを実行します。

$ ./mvnw spring-boot:run

これで、送信者アプリに送信されたすべてのメッセージが受信者アプリに記録されるようになります。それをテストするには、新しい Cloud Shell セッションを開き、送信側アプリに対して HTTP POST リクエストを送信します。

$ curl --data "message=Hello world!" localhost:8080/postMessage

次に、送信したメッセージをレシーバー アプリが記録したことを確認します。

INFO: Message arrived! Payload: Hello world!

この演習で作成したサブスクリプションとトピックを削除します。

$ gcloud beta pubsub subscriptions delete exampleSubscription
$ gcloud beta pubsub topics delete exampleTopic

Google Cloud Pub/Sub の Spring Integration Channel アダプタを使用する 2 つの Spring Boot アプリを設定しました。ユーザーは、Google Cloud Pub/Sub API とやり取りすることなくメッセージを交換できます。

Google Cloud Pub/Sub 用の Spring Integration チャネル アダプタの使い方を学習しました。

詳細

ライセンス

この作業はクリエイティブ・コモンズの表示 2.0 汎用ライセンスにより使用許諾されています。