Обмен сообщениями с Spring Integration и Google Cloud Pub/Sub

Spring Integration предоставляет вам механизм обмена сообщениями для обмена Messages через MessageChannels . Он использует адаптеры каналов для связи с внешними системами.

В этом упражнении мы создадим два приложения, которые обмениваются данными с помощью адаптеров канала Spring Integration, предоставляемых Spring Cloud GCP. Эти адаптеры заставляют Spring Integration использовать Google Cloud Pub/Sub в качестве сервера обмена сообщениями.

Вы узнаете, как использовать Cloud Shell и команду gcloud Cloud SDK.

В этом руководстве используется пример кода из руководства Spring Boot Getting Started .

Что вы узнаете

  • Как обмениваться сообщениями между приложениями с Google Cloud Pub/Sub, используя Spring Integration и Spring Cloud GCP

Что вам понадобится

  • Проект облачной платформы Google
  • Браузер, такой как Chrome или Firefox
  • Знакомство со стандартными текстовыми редакторами Linux, такими как Vim, EMACs или Nano

Как вы будете использовать этот учебник?

Прочитайте только это Прочтите его и выполните упражнения

Как бы вы оценили свой опыт создания веб-приложений HTML/CSS?

Новичок Средний Опытный

Как бы вы оценили свой опыт использования сервисов Google Cloud Platform?

Новичок Средний Опытный

Самостоятельная настройка среды

Если у вас еще нет учетной записи Google (Gmail или Google Apps), вы должны создать ее. Войдите в консоль Google Cloud Platform ( console.cloud.google.com ) и создайте новый проект:

Скриншот от 10 февраля 2016 г., 12:45:26.png

Запомните идентификатор проекта, уникальное имя для всех проектов Google Cloud (имя выше уже занято и не будет работать для вас, извините!). Позже в этой кодовой лаборатории он будет упоминаться как PROJECT_ID .

Затем вам нужно включить выставление счетов в облачной консоли, чтобы использовать ресурсы Google Cloud.

Прохождение этой кодовой лаборатории не должно стоить вам больше нескольких долларов, но может стоить больше, если вы решите использовать больше ресурсов или оставите их работающими (см. раздел «Очистка» в конце этого документа).

Новые пользователи Google Cloud Platform имеют право на бесплатную пробную версию стоимостью 300 долларов США .

Облачная оболочка Google

Хотя Google Cloud можно управлять удаленно с вашего ноутбука, в этой кодовой лаборатории мы будем использовать Google Cloud Shell , среду командной строки, работающую в облаке.

Активировать облачную оболочку Google

В консоли GCP щелкните значок Cloud Shell на верхней правой панели инструментов:

Затем нажмите «Запустить Cloud Shell»:

Подготовка и подключение к среде займет всего несколько минут:

Эта виртуальная машина загружена всеми необходимыми инструментами разработки. Он предлагает постоянный домашний каталог размером 5 ГБ и работает в облаке Google, что значительно повышает производительность сети и аутентификацию. Многое, если не все, из вашей работы в этом лабораторном занятии можно выполнить просто с помощью браузера или вашего Google Chromebook.

После подключения к Cloud Shell вы должны увидеть, что вы уже прошли аутентификацию и что для проекта уже задан ваш PROJECT_ID .

Выполните следующую команду в Cloud Shell, чтобы подтвердить, что вы прошли аутентификацию:

gcloud auth list

Вывод команды

Credentialed accounts:
 - <myaccount>@<mydomain>.com (active)
gcloud config list project

Вывод команды

[core]
project = <PROJECT_ID>

Если это не так, вы можете установить его с помощью этой команды:

gcloud config set project <PROJECT_ID>

Вывод команды

Updated property [core/project].

Перейдите на страницу тем Google Cloud Pub/Sub и включите API.

Щелкните Создать тему .

Введите exampleTopic в качестве названия темы и нажмите « Создать ».

После создания темы оставайтесь на странице Темы. Найдите только что созданную тему, нажмите на три вертикальные точки в конце строки и нажмите « Новая подписка ».

Введите exampleSubscription в текстовое поле имени подписки и нажмите « Создать ».

После запуска Cloud Shell вы можете использовать командную строку для создания двух новых приложений Spring Boot с помощью Spring Initializr:

$ curl https://start.spring.io/starter.tgz \
  -d bootVersion=2.0.6.RELEASE \
  -d dependencies=web \
  -d baseDir=spring-integration-sender | tar -xzvf -
$ curl https://start.spring.io/starter.tgz \
  -d bootVersion=2.0.6.RELEASE \
  -d baseDir=spring-integration-receiver | tar -xzvf -

Теперь давайте создадим наше приложение для отправки сообщений. Перейдите в каталог отправляющего приложения.

$ cd spring-integration-sender

Мы хотим, чтобы наше приложение писало сообщения на канал. После того, как сообщение находится в канале, оно будет получено адаптером исходящего канала, который преобразует его из общего сообщения Spring в сообщение Google Cloud Pub/Sub и публикует его в теме Google Cloud Pub/Sub.

Чтобы наше приложение могло писать в канал, мы можем использовать шлюз обмена сообщениями Spring Integration . Используя текстовый редактор из vim , emacs или nano , объявите интерфейс PubsubOutboundGateway внутри класса DemoApplication .

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.integration.annotation.MessagingGateway;

@SpringBootApplication
public class DemoApplication {

        ...

        @MessagingGateway(defaultRequestChannel = "pubsubOutputChannel")
        public interface PubsubOutboundGateway {

                void sendToPubsub(String text);
        }
}

Теперь у нас есть механизм для отправки сообщений в канал, но куда попадают эти сообщения после того, как они находятся в канале?

Нам нужен адаптер исходящего канала, чтобы получать новые сообщения в канале и публиковать их в теме Google Cloud Pub/Sub.

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler;
import org.springframework.messaging.MessageHandler;

@SpringBootApplication
public class DemoApplication {

        ...

        @Bean
        @ServiceActivator(inputChannel = "pubsubOutputChannel")
        public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
                return new PubSubMessageHandler(pubsubTemplate, "exampleTopic");
        }
}

Аннотация @ServiceActivator заставляет этот MessageHandler применяться к любым новым сообщениям в inputChannel . В этом случае мы вызываем наш адаптер исходящего канала PubSubMessageHandler , чтобы опубликовать сообщение в теме exampleTopic Google Cloud Pub/Sub.

Теперь, когда адаптер канала установлен, мы можем автоматически подключить объект PubsubOutboundGateway и использовать его для записи сообщения в канал.

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.servlet.view.RedirectView;

@SpringBootApplication
public class DemoApplication {

...

        @Autowired
        private PubsubOutboundGateway messagingGateway;

        @PostMapping("/postMessage")
        public RedirectView postMessage(@RequestParam("message") String message) {
                this.messagingGateway.sendToPubsub(message);
                return new RedirectView("/");
        }
}

Благодаря аннотации @PostMapping у нас теперь есть конечная точка, которая прослушивает HTTP-запросы POST, но не без добавления аннотации @RestController к классу DemoApplication , чтобы пометить его как контроллер REST.

src/main/java/com/example/demo/DemoApplication.java

import org.springframework.web.bind.annotation.RestController;

@SpringBootApplication
@RestController
public class DemoApplication {
  ...
}

Чтобы приложение заработало, нам просто нужно добавить необходимые зависимости.

пом.xml

<project>
  ...
  <!-- Add Spring Cloud GCP Dependency BOM -->
  <dependencyManagement>
        <dependencies>
                <dependency>
                        <groupId>org.springframework.cloud</groupId>
                        <artifactId>spring-cloud-gcp-dependencies</artifactId>
                        <version>1.0.0.RELEASE</version>
                        <type>pom</type>
                        <scope>import</scope>
                </dependency>
        </dependencies>
  </dependencyManagement>

  <dependencies>
        ...
        <!-- Add Pub/Sub Starter -->
        <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
        </dependency>

        <!-- Add Spring Integration -->
        <dependency>
                <groupId>org.springframework.integration</groupId>
                <artifactId>spring-integration-core</artifactId>
        </dependency>

  </dependencies>

</project>

Запустите приложение отправителя.

# Set the Project ID in environmental variable
$ export GOOGLE_CLOUD_PROJECT=`gcloud config list \
  --format 'value(core.project)'`
$ ./mvnw spring-boot:run

Приложение прослушивает запросы POST, содержащие сообщение на порту 8080 и конечной точке /postMessage , но мы вернемся к этому позже.

Мы только что создали приложение, которое отправляет сообщения через Google Cloud Pub/Sub. Теперь мы создадим другое приложение, которое получает эти сообщения и обрабатывает их.

Нажмите + , чтобы открыть новый сеанс Cloud Shell.

Затем в новом сеансе Cloud Shell измените каталоги на каталог приложения-получателя:

$ cd spring-integration-receiver

В предыдущем приложении объявление шлюза обмена сообщениями создало для нас исходящий канал. Поскольку мы не используем шлюз обмена сообщениями для получения сообщений, нам нужно объявить собственный MessageChannel куда будут поступать входящие сообщения.

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.context.annotation.Bean;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.MessageChannel;

@SpringBootApplication
public class DemoApplication {

        ...

        @Bean
        public MessageChannel pubsubInputChannel() {
                return new DirectChannel();
        }
}

Нам понадобится адаптер входящего канала для получения сообщений из Google Cloud Pub/Sub и ретрансляции их в pubsubInputChannel .

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.inbound.PubSubInboundChannelAdapter;

@SpringBootApplication
public class DemoApplication {
        ...

        @Bean
        public PubSubInboundChannelAdapter messageChannelAdapter(
                        @Qualifier("pubsubInputChannel") MessageChannel inputChannel,
                        PubSubTemplate pubSubTemplate) {
                PubSubInboundChannelAdapter adapter =
                                new PubSubInboundChannelAdapter(pubSubTemplate, "exampleSubscription");
                adapter.setOutputChannel(inputChannel);

                return adapter;
        }
}

Этот адаптер привязывается к pubsubInputChannel и прослушивает новые сообщения из подписки Google Cloud Pub/Sub exampleSubscription .

У нас есть канал, куда отправляются входящие сообщения, но что делать с этими сообщениями?

Давайте обработаем их с помощью @ServiceActivator , который срабатывает, когда новые сообщения поступают в pubsubInputChannel .

src/main/java/com/example/demo/DemoApplication.java

...
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.integration.annotation.ServiceActivator;

@SpringBootApplication
public class DemoApplication {

        ...

        private static final Log LOGGER = LogFactory.getLog(DemoApplication.class);

        @ServiceActivator(inputChannel = "pubsubInputChannel")
        public void messageReceiver(String payload) {
                LOGGER.info("Message arrived! Payload: " + payload);
        }
}

В этом случае мы просто зарегистрируем полезную нагрузку сообщения.

Нам нужно добавить необходимые зависимости.

пом.xml

<project>
  ...
  <!-- Add Spring Cloud GCP Dependency BOM -->
  <dependencyManagement>
        <dependencies>
                <dependency>
                        <groupId>org.springframework.cloud</groupId>
                        <artifactId>spring-cloud-gcp-dependencies</artifactId>
                        <version>1.0.0.RELEASE</version>
                        <type>pom</type>
                        <scope>import</scope>
                </dependency>
        </dependencies>
  </dependencyManagement>

  <dependencies>
        ...
        <!-- Add Pub/Sub Starter -->
        <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
        </dependency>

        <!-- Add Spring Integration -->
        <dependency>
                <groupId>org.springframework.integration</groupId>
                <artifactId>spring-integration-core</artifactId>
        </dependency>

  </dependencies>

</project>

Запустите приложение-приемник.

$ ./mvnw spring-boot:run

Теперь любые сообщения, которые вы отправляете в приложение-отправитель, будут регистрироваться в приложении-получателе. Чтобы проверить это, откройте новый сеанс Cloud Shell и отправьте HTTP-запрос POST в приложение-отправитель.

$ curl --data "message=Hello world!" localhost:8080/postMessage

Затем убедитесь, что приложение-получатель зарегистрировало отправленное вами сообщение!

INFO: Message arrived! Payload: Hello world!

Удалите подписку и тему, созданную в рамках этого упражнения.

$ gcloud beta pubsub subscriptions delete exampleSubscription
$ gcloud beta pubsub topics delete exampleTopic

Вы настраиваете два приложения Spring Boot, которые используют адаптеры Spring Integration Channel для Google Cloud Pub/Sub. Они обмениваются сообщениями между собой, даже не взаимодействуя с Google Cloud Pub/Sub API.

Вы узнали, как использовать адаптеры Spring Integration Channel для Google Cloud Pub/Sub!

Учить больше

Лицензия

Эта работа находится под лицензией Creative Commons Attribution 2.0 Generic License.