Обмен сообщениями с помощью Spring Integration и Google Cloud Pub/Sub

Spring Integration предоставляет механизм обмена Messages через MessageChannels . Он использует адаптеры каналов для взаимодействия с внешними системами.

В этом упражнении мы создадим два приложения, которые взаимодействуют друг с другом через адаптеры каналов Spring Integration, предоставляемые Spring Cloud GCP. Эти адаптеры позволяют Spring Integration использовать Google Cloud Pub/Sub в качестве бэкенда для обмена сообщениями.

Вы узнаете, как использовать Cloud Shell и команду Cloud SDK gcloud.

В этом руководстве используется пример кода из руководства Spring Boot Getting Started .

Чему вы научитесь

  • Как обмениваться сообщениями между приложениями с Google Cloud Pub/Sub с помощью Spring Integration и Spring Cloud GCP

Что вам понадобится

  • Проект облачной платформы Google
  • Браузер, например Chrome или Firefox
  • Знакомство со стандартными текстовыми редакторами Linux, такими как Vim, EMAC или Nano

Как вы будете использовать это руководство?

Прочитайте это только до конца Прочитайте и выполните упражнения.

Как бы вы оценили свой опыт создания веб-приложений HTML/CSS?

Новичок Средний Опытный

Как бы вы оценили свой опыт использования сервисов Google Cloud Platform?

Новичок Средний Опытный

Настройка среды для самостоятельного обучения

Если у вас ещё нет учётной записи Google (Gmail или Google Apps), необходимо её создать . Войдите в консоль Google Cloud Platform ( console.cloud.google.com ) и создайте новый проект:

Скриншот от 2016-02-10 12:45:26.png

Запомните идентификатор проекта — уникальное имя для всех проектов Google Cloud (имя, указанное выше, уже занято и не будет вам работать, извините!). Далее в этой практической работе он будет обозначаться как PROJECT_ID .

Далее вам необходимо включить биллинг в Cloud Console, чтобы использовать ресурсы Google Cloud.

Выполнение этой лабораторной работы не должно обойтись вам дороже нескольких долларов, но может обойтись дороже, если вы решите использовать больше ресурсов или оставите их запущенными (см. раздел «Очистка» в конце этого документа).

Новые пользователи Google Cloud Platform имеют право на бесплатную пробную версию стоимостью 300 долларов США .

Google Cloud Shell

Хотя Google Cloud можно управлять удаленно с вашего ноутбука, в этой лабораторной работе мы будем использовать Google Cloud Shell — среду командной строки, работающую в облаке.

Активировать Google Cloud Shell

В консоли GCP щелкните значок Cloud Shell на верхней правой панели инструментов:

Затем нажмите «Запустить Cloud Shell»:

Подготовка и подключение к среде займет всего несколько минут:

Эта виртуальная машина оснащена всеми необходимыми инструментами разработки. Она предлагает постоянный домашний каталог объёмом 5 ГБ и работает в облаке Google Cloud, что значительно повышает производительность сети и аутентификацию. Значительную часть работы в этой лаборатории, если не всю, можно выполнить, просто используя браузер или Chromebook от Google.

После подключения к Cloud Shell вы должны увидеть, что вы уже аутентифицированы и что проекту уже присвоен ваш PROJECT_ID .

Выполните следующую команду в Cloud Shell, чтобы подтвердить, что вы прошли аутентификацию:

gcloud auth list

Вывод команды

Credentialed accounts:
 - <myaccount>@<mydomain>.com (active)
gcloud config list project

Вывод команды

[core]
project = <PROJECT_ID>

Если это не так, вы можете установить его с помощью этой команды:

gcloud config set project <PROJECT_ID>

Вывод команды

Updated property [core/project].

Перейдите на страницу тем Google Cloud Pub/Sub и включите API.

Нажмите «Создать тему» .

Введите exampleTopic в качестве названия темы и нажмите «Создать» .

После создания темы оставайтесь на странице «Темы». Найдите только что созданную тему, нажмите на три вертикальные точки в конце строки и нажмите «Новая подписка» .

Введите exampleSubscription в текстовое поле имени подписки и нажмите Создать .

После запуска Cloud Shell вы можете использовать командную строку для создания двух новых приложений Spring Boot с помощью Spring Initializr:

$ curl https://start.spring.io/starter.tgz \
  -d bootVersion=2.0.6.RELEASE \
  -d dependencies=web \
  -d baseDir=spring-integration-sender | tar -xzvf -
$ curl https://start.spring.io/starter.tgz \
  -d bootVersion=2.0.6.RELEASE \
  -d baseDir=spring-integration-receiver | tar -xzvf -

Теперь давайте создадим приложение для отправки сообщений. Перейдите в каталог приложения-отправителя.

$ cd spring-integration-sender

Мы хотим, чтобы наше приложение записывало сообщения в канал. После того, как сообщение попадает в канал, его обрабатывает адаптер исходящего канала, который преобразует его из стандартного сообщения Spring в сообщение Google Cloud Pub/Sub и публикует в теме Google Cloud Pub/Sub.

Чтобы наше приложение могло записывать данные в канал, мы можем использовать шлюз обмена сообщениями Spring Integration . Используя текстовый редактор vim , emacs или nano , объявите интерфейс PubsubOutboundGateway внутри класса DemoApplication .

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.integration.annotation.MessagingGateway;

@SpringBootApplication
public class DemoApplication {

        ...

        @MessagingGateway(defaultRequestChannel = "pubsubOutputChannel")
        public interface PubsubOutboundGateway {

                void sendToPubsub(String text);
        }
}

Теперь у нас есть механизм отправки сообщений в канал, но куда попадают эти сообщения после того, как они попадают в канал?

Нам нужен адаптер исходящего канала для приема новых сообщений в канале и публикации их в теме Google Cloud Pub/Sub.

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler;
import org.springframework.messaging.MessageHandler;

@SpringBootApplication
public class DemoApplication {

        ...

        @Bean
        @ServiceActivator(inputChannel = "pubsubOutputChannel")
        public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
                return new PubSubMessageHandler(pubsubTemplate, "exampleTopic");
        }
}

Аннотация @ServiceActivator приводит к применению этого MessageHandler ко всем новым сообщениям в inputChannel . В данном случае мы вызываем наш адаптер исходящего канала PubSubMessageHandler , чтобы опубликовать сообщение в теме exampleTopic сервиса Google Cloud Pub/Sub.

После установки адаптера канала мы можем автоматически подключить объект PubsubOutboundGateway и использовать его для записи сообщения в канал.

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.servlet.view.RedirectView;

@SpringBootApplication
public class DemoApplication {

...

        @Autowired
        private PubsubOutboundGateway messagingGateway;

        @PostMapping("/postMessage")
        public RedirectView postMessage(@RequestParam("message") String message) {
                this.messagingGateway.sendToPubsub(message);
                return new RedirectView("/");
        }
}

Благодаря аннотации @PostMapping у нас теперь есть конечная точка, которая прослушивает запросы HTTP POST, но не без добавления аннотации @RestController к классу DemoApplication , чтобы отметить его как контроллер REST.

src/main/java/com/example/demo/DemoApplication.java

import org.springframework.web.bind.annotation.RestController;

@SpringBootApplication
@RestController
public class DemoApplication {
  ...
}

Чтобы приложение заработало, нам просто нужно добавить необходимые зависимости.

pom.xml

<project>
  ...
  <!-- Add Spring Cloud GCP Dependency BOM -->
  <dependencyManagement>
        <dependencies>
                <dependency>
                        <groupId>org.springframework.cloud</groupId>
                        <artifactId>spring-cloud-gcp-dependencies</artifactId>
                        <version>1.0.0.RELEASE</version>
                        <type>pom</type>
                        <scope>import</scope>
                </dependency>
        </dependencies>
  </dependencyManagement>

  <dependencies>
        ...
        <!-- Add Pub/Sub Starter -->
        <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
        </dependency>

        <!-- Add Spring Integration -->
        <dependency>
                <groupId>org.springframework.integration</groupId>
                <artifactId>spring-integration-core</artifactId>
        </dependency>

  </dependencies>

</project>

Запустите приложение-отправитель.

# Set the Project ID in environmental variable
$ export GOOGLE_CLOUD_PROJECT=`gcloud config list \
  --format 'value(core.project)'`
$ ./mvnw spring-boot:run

Приложение прослушивает POST-запросы, содержащие сообщение, на порту 8080 и конечной точке /postMessage , но мы вернемся к этому позже.

Мы только что создали приложение, которое отправляет сообщения через Google Cloud Pub/Sub. Теперь создадим ещё одно приложение, которое будет получать эти сообщения и обрабатывать их.

Нажмите + , чтобы открыть новый сеанс Cloud Shell.

Затем в новом сеансе Cloud Shell измените каталоги на каталог приложения-получателя:

$ cd spring-integration-receiver

В предыдущем приложении объявление шлюза сообщений создавало для нас исходящий канал. Поскольку мы не используем шлюз сообщений для получения сообщений, нам необходимо объявить собственный MessageChannel , куда будут приходить входящие сообщения.

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.context.annotation.Bean;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.MessageChannel;

@SpringBootApplication
public class DemoApplication {

        ...

        @Bean
        public MessageChannel pubsubInputChannel() {
                return new DirectChannel();
        }
}

Нам понадобится адаптер входящего канала для получения сообщений от Google Cloud Pub/Sub и ретрансляции их в pubsubInputChannel .

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.inbound.PubSubInboundChannelAdapter;

@SpringBootApplication
public class DemoApplication {
        ...

        @Bean
        public PubSubInboundChannelAdapter messageChannelAdapter(
                        @Qualifier("pubsubInputChannel") MessageChannel inputChannel,
                        PubSubTemplate pubSubTemplate) {
                PubSubInboundChannelAdapter adapter =
                                new PubSubInboundChannelAdapter(pubSubTemplate, "exampleSubscription");
                adapter.setOutputChannel(inputChannel);

                return adapter;
        }
}

Этот адаптер привязывается к pubsubInputChannel и прослушивает новые сообщения из подписки Google Cloud Pub/Sub exampleSubscription .

У нас есть канал, на котором публикуются входящие сообщения, но что с этими сообщениями делать?

Давайте обработаем их с помощью @ServiceActivator , который срабатывает при поступлении новых сообщений в pubsubInputChannel .

src/main/java/com/example/demo/DemoApplication.java

...
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.integration.annotation.ServiceActivator;

@SpringBootApplication
public class DemoApplication {

        ...

        private static final Log LOGGER = LogFactory.getLog(DemoApplication.class);

        @ServiceActivator(inputChannel = "pubsubInputChannel")
        public void messageReceiver(String payload) {
                LOGGER.info("Message arrived! Payload: " + payload);
        }
}

В этом случае мы просто запишем полезную нагрузку сообщения.

Нам необходимо добавить необходимые зависимости.

pom.xml

<project>
  ...
  <!-- Add Spring Cloud GCP Dependency BOM -->
  <dependencyManagement>
        <dependencies>
                <dependency>
                        <groupId>org.springframework.cloud</groupId>
                        <artifactId>spring-cloud-gcp-dependencies</artifactId>
                        <version>1.0.0.RELEASE</version>
                        <type>pom</type>
                        <scope>import</scope>
                </dependency>
        </dependencies>
  </dependencyManagement>

  <dependencies>
        ...
        <!-- Add Pub/Sub Starter -->
        <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
        </dependency>

        <!-- Add Spring Integration -->
        <dependency>
                <groupId>org.springframework.integration</groupId>
                <artifactId>spring-integration-core</artifactId>
        </dependency>

  </dependencies>

</project>

Запустите приложение-приемник.

$ ./mvnw spring-boot:run

Теперь все сообщения, отправляемые в приложение-отправитель, будут регистрироваться в приложении-получателе. Чтобы проверить это, откройте новый сеанс Cloud Shell и отправьте HTTP-запрос POST к приложению-отправителю.

$ curl --data "message=Hello world!" localhost:8080/postMessage

Затем убедитесь, что приложение-получатель зарегистрировало отправленное вами сообщение!

INFO: Message arrived! Payload: Hello world!

Удалите подписку и тему, созданные в рамках этого упражнения.

$ gcloud beta pubsub subscriptions delete exampleSubscription
$ gcloud beta pubsub topics delete exampleTopic

Вы настроили два приложения Spring Boot, использующих адаптеры Spring Integration Channel для Google Cloud Pub/Sub. Они обмениваются сообщениями между собой, не взаимодействуя с API Google Cloud Pub/Sub.

Вы узнали, как использовать адаптеры каналов интеграции Spring для Google Cloud Pub/Sub!

Узнать больше

Лицензия

Данная работа распространяется по лицензии Creative Commons Attribution 2.0 Generic License.