Mensajes con Spring Integration y Google Cloud Pub/Sub

Spring Integration te proporciona un mecanismo de mensajería para intercambiar Messages a través de MessageChannels. Usa adaptadores de canal para comunicarse con sistemas externos.

En este ejercicio, crearemos dos apps que se comuniquen con los adaptadores de canal de Spring Integration que proporciona Spring Cloud GCP. Estos adaptadores hacen que Spring Integration use Google Cloud Pub/Sub como backend de intercambio de mensajes.

Asimismo, aprenderá a utilizar Cloud Shell y el comando gcloud del SDK de Cloud.

En este instructivo, se utiliza el código de muestra de la guía de introducción a Spring Boot.

Qué aprenderás

  • Cómo intercambiar mensajes entre apps con Google Cloud Pub/Sub mediante Spring Integration y Spring Cloud GCP

Requisitos

  • Un proyecto de Google Cloud Platform
  • Un navegador, como Chrome o Firefox
  • Se recomienda estar familiarizado con editores de texto estándares de Linux, como Vim, Emacs o Nano.

¿Cómo usarás este instructivo?

Ler Leer y completar los ejercicios

¿Cómo calificarías tu experiencia con la compilación de aplicaciones web HTML/CSS?

Principiante Intermedio Avanzado

¿Cómo calificarías tu experiencia con los servicios de Google Cloud Platform?

Principiante Intermedio Avanzado

Configuración del entorno a su propio ritmo

Si aún no tienes una Cuenta de Google (Gmail o Google Apps), debes crear una. Accede a Google Cloud Platform Console (console.cloud.google.com) y crea un proyecto nuevo:

Captura de pantalla de 2016-02-10 12:45:26.png

Recuerde el ID de proyecto, un nombre único en todos los proyectos de Google Cloud (el nombre anterior ya se encuentra en uso y no lo podrá usar). Se mencionará más adelante en este codelab como PROJECT_ID.

A continuación, debes habilitar la facturación en Cloud Console para usar los recursos de Google Cloud.

Ejecutar este codelab debería costar solo unos pocos dólares, pero su costo podría aumentar si decides usar más recursos o si los dejas en ejecución (consulta la sección “Limpiar” al final de este documento).

Los usuarios nuevos de Google Cloud Platform son aptos para obtener una prueba gratuita de USD 300.

Google Cloud Shell

Si bien Google Cloud se puede operar de forma remota desde su laptop, en este codelab usaremos Google Cloud Shell, un entorno de línea de comandos que se ejecuta en la nube.

Activar Google Cloud Shell

En GCP Console, haga clic en el ícono de Cloud Shell en la barra de herramientas superior derecha:

Haga clic en "Start Cloud Shell":

El aprovisionamiento y la conexión al entorno debería llevar solo unos minutos:

Esta máquina virtual está cargada con todas las herramientas para desarrolladores que necesitará. Ofrece un directorio principal persistente de 5 GB y se ejecuta en Google Cloud, lo que permite mejorar considerablemente el rendimiento de la red y la autenticación. Gran parte de su trabajo, si no todo, se puede hacer simplemente con un navegador o su Google Chromebook.

Una vez conectado a Cloud Shell, debería ver que ya está autenticado y que el proyecto ya está configurado con su PROJECT_ID.

En Cloud Shell, ejecute el siguiente comando para confirmar que está autenticado:

gcloud auth list

Resultado del comando

Credentialed accounts:
 - <myaccount>@<mydomain>.com (active)
gcloud config list project

Resultado del comando

[core]
project = <PROJECT_ID>

De lo contrario, puede configurarlo con este comando:

gcloud config set project <PROJECT_ID>

Resultado del comando

Updated property [core/project].

Navega a la página de temas de Google Cloud Pub/Sub y habilita la API.

Haz clic en Crear tema.

Escribe exampleTopic como el nombre del tema y, luego, haz clic en Create.

Después de crear el tema, permanezca en la página Temas. Busca el tema que acabas de crear, presiona los tres puntos verticales al final de la línea y haz clic en Nueva suscripción.

Escribe exampleSubscription en el cuadro de texto del nombre de la suscripción y haz clic en Crear.

Después del inicio de Cloud Shell, puede usar la línea de comandos para generar dos nuevas aplicaciones de Spring Boot con Spring Initializr:

$ curl https://start.spring.io/starter.tgz \
  -d bootVersion=2.0.6.RELEASE \
  -d dependencies=web \
  -d baseDir=spring-integration-sender | tar -xzvf -
$ curl https://start.spring.io/starter.tgz \
  -d bootVersion=2.0.6.RELEASE \
  -d baseDir=spring-integration-receiver | tar -xzvf -

Ahora, creemos nuestra app de envío de mensajes. Cambia al directorio de la app de envío.

$ cd spring-integration-sender

Queremos que nuestra app escriba mensajes en un canal. Una vez que un mensaje está en el canal, el adaptador de canal saliente lo recogerá, y lo convierte de un mensaje genérico de Spring en un mensaje de Google Cloud Pub/Sub y lo publica en un tema de Google Cloud Pub/Sub.

A fin de que nuestra app escriba en un canal, podemos usar una puerta de enlace de mensajería de Spring Integration. Con un editor de texto de vim, emacs o nano, declara una interfaz PubsubOutboundGateway dentro de la clase DemoApplication.

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.integration.annotation.MessagingGateway;

@SpringBootApplication
public class DemoApplication {

        ...

        @MessagingGateway(defaultRequestChannel = "pubsubOutputChannel")
        public interface PubsubOutboundGateway {

                void sendToPubsub(String text);
        }
}

Ahora tenemos un mecanismo para enviar mensajes a un canal, pero ¿a dónde van esos mensajes después de que están en el canal?

Necesitamos un adaptador de canal saliente para consumir los mensajes nuevos en el canal y publicarlos en un tema de Google Cloud Pub/Sub.

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler;
import org.springframework.messaging.MessageHandler;

@SpringBootApplication
public class DemoApplication {

        ...

        @Bean
        @ServiceActivator(inputChannel = "pubsubOutputChannel")
        public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
                return new PubSubMessageHandler(pubsubTemplate, "exampleTopic");
        }
}

La anotación @ServiceActivator hace que este MessageHandler se aplique a cualquier mensaje nuevo en inputChannel. En este caso, llamamos a nuestro adaptador de canal saliente PubSubMessageHandler para publicar el mensaje en el tema exampleTopic de Google Cloud Pub/Sub.

Una vez que se configura el adaptador del canal, podemos conectar automáticamente un objeto PubsubOutboundGateway y usarlo para escribir un mensaje en un canal.

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.servlet.view.RedirectView;

@SpringBootApplication
public class DemoApplication {

...

        @Autowired
        private PubsubOutboundGateway messagingGateway;

        @PostMapping("/postMessage")
        public RedirectView postMessage(@RequestParam("message") String message) {
                this.messagingGateway.sendToPubsub(message);
                return new RedirectView("/");
        }
}

Debido a la anotación @PostMapping, ahora tenemos un extremo que escucha las solicitudes HTTP POST, pero no sin agregar una anotación @RestController a la clase DemoApplication para marcarla como controlador de REST.

src/main/java/com/example/demo/DemoApplication.java

import org.springframework.web.bind.annotation.RestController;

@SpringBootApplication
@RestController
public class DemoApplication {
  ...
}

Para que la app se ejecute, solo debemos agregar las dependencias requeridas.

pom.xml;

<project>
  ...
  <!-- Add Spring Cloud GCP Dependency BOM -->
  <dependencyManagement>
        <dependencies>
                <dependency>
                        <groupId>org.springframework.cloud</groupId>
                        <artifactId>spring-cloud-gcp-dependencies</artifactId>
                        <version>1.0.0.RELEASE</version>
                        <type>pom</type>
                        <scope>import</scope>
                </dependency>
        </dependencies>
  </dependencyManagement>

  <dependencies>
        ...
        <!-- Add Pub/Sub Starter -->
        <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
        </dependency>

        <!-- Add Spring Integration -->
        <dependency>
                <groupId>org.springframework.integration</groupId>
                <artifactId>spring-integration-core</artifactId>
        </dependency>

  </dependencies>

</project>

Ejecuta la app emisora.

# Set the Project ID in environmental variable
$ export GOOGLE_CLOUD_PROJECT=`gcloud config list \
  --format 'value(core.project)'`
$ ./mvnw spring-boot:run

La app está escuchando solicitudes POST que contienen un mensaje en el puerto 8080 y el extremo /postMessage, pero abordaremos esto más adelante.

Acabamos de crear una aplicación que envía mensajes a través de Google Cloud Pub/Sub. Ahora, crearemos otra app que reciba esos mensajes y los procese.

Haga clic en + para abrir una nueva sesión de Cloud Shell.

Luego, en la sesión nueva de Cloud Shell, cambia los directorios al directorio de la app receptora:

$ cd spring-integration-receiver

En la app anterior, la declaración de la puerta de enlace de mensajería creó el canal saliente por nosotros. Debido a que no usamos una puerta de enlace de mensajería para recibir mensajes, debemos declarar nuestro propio MessageChannel adonde llegarán los mensajes entrantes.

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.context.annotation.Bean;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.MessageChannel;

@SpringBootApplication
public class DemoApplication {

        ...

        @Bean
        public MessageChannel pubsubInputChannel() {
                return new DirectChannel();
        }
}

Necesitaremos el adaptador de canal entrante para recibir mensajes de Google Cloud Pub/Sub y retransmitirlos a pubsubInputChannel.

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.inbound.PubSubInboundChannelAdapter;

@SpringBootApplication
public class DemoApplication {
        ...

        @Bean
        public PubSubInboundChannelAdapter messageChannelAdapter(
                        @Qualifier("pubsubInputChannel") MessageChannel inputChannel,
                        PubSubTemplate pubSubTemplate) {
                PubSubInboundChannelAdapter adapter =
                                new PubSubInboundChannelAdapter(pubSubTemplate, "exampleSubscription");
                adapter.setOutputChannel(inputChannel);

                return adapter;
        }
}

Este adaptador se vincula a pubsubInputChannel y escucha mensajes nuevos de la suscripción exampleSubscription de Google Cloud Pub/Sub.

Tenemos un canal en el que se publican los mensajes entrantes, pero ¿qué hacer con esos mensajes?

Procesémoslos con un @ServiceActivator que se activa cuando llegan mensajes nuevos a pubsubInputChannel.

src/main/java/com/example/demo/DemoApplication.java

...
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.integration.annotation.ServiceActivator;

@SpringBootApplication
public class DemoApplication {

        ...

        private static final Log LOGGER = LogFactory.getLog(DemoApplication.class);

        @ServiceActivator(inputChannel = "pubsubInputChannel")
        public void messageReceiver(String payload) {
                LOGGER.info("Message arrived! Payload: " + payload);
        }
}

En este caso, solo registraremos la carga útil del mensaje.

Debemos agregar las dependencias necesarias.

pom.xml;

<project>
  ...
  <!-- Add Spring Cloud GCP Dependency BOM -->
  <dependencyManagement>
        <dependencies>
                <dependency>
                        <groupId>org.springframework.cloud</groupId>
                        <artifactId>spring-cloud-gcp-dependencies</artifactId>
                        <version>1.0.0.RELEASE</version>
                        <type>pom</type>
                        <scope>import</scope>
                </dependency>
        </dependencies>
  </dependencyManagement>

  <dependencies>
        ...
        <!-- Add Pub/Sub Starter -->
        <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
        </dependency>

        <!-- Add Spring Integration -->
        <dependency>
                <groupId>org.springframework.integration</groupId>
                <artifactId>spring-integration-core</artifactId>
        </dependency>

  </dependencies>

</project>

Ejecuta la app receptora.

$ ./mvnw spring-boot:run

Ahora, todos los mensajes que envíes a la app emisora se registrarán en la app receptora. Para probar eso, abre una nueva sesión de Cloud Shell y realiza una solicitud HTTP POST a la app emisora.

$ curl --data "message=Hello world!" localhost:8080/postMessage

Luego, verifica que la app receptora haya registrado el mensaje que enviaste.

INFO: Message arrived! Payload: Hello world!

Borra la suscripción y el tema que creaste como parte de este ejercicio.

$ gcloud beta pubsub subscriptions delete exampleSubscription
$ gcloud beta pubsub topics delete exampleTopic

Configurarás dos apps de Spring Boot que usan los adaptadores de Spring Integration Channel para Google Cloud Pub/Sub. Intercambian mensajes entre sí sin interactuar con la API de Google Cloud Pub/Sub.

Aprendió a usar los adaptadores de canal de Spring Integration para Google Cloud Pub/Sub.

Más información

Licencia

Este trabajo cuenta con una licencia Atribución 2.0 Genérica de Creative Commons.