Messaging with Spring Integration and Google Cloud Pub/Sub

Spring Integration provides you with a messaging mechanism to exchange Messages through MessageChannels. It uses channel adapters to communicate with external systems.

In this exercise, we will create two apps that communicate using the Spring Integration channel adapters provided by Spring Cloud GCP. These adapters make Spring Integration use Google Cloud Pub/Sub as the message exchange backend.

You will learn how to use Cloud Shell and the Cloud SDK gcloud command.

This tutorial uses the sample code from the Spring Boot Getting Started guide.

What you'll learn

  • How to exchange messages between apps with Google Cloud Pub/Sub using Spring Integration and Spring Cloud GCP

What you'll need

  • A Google Cloud Platform Project
  • A Browser, such Chrome or Firefox
  • Familiarity with standard Linux text editors such as Vim, EMACs or Nano

How will you use use this tutorial?

Read it through only Read it and complete the exercises

How would you rate your experience with building HTML/CSS web apps?

Novice Intermediate Proficient

How would you rate your experience with using Google Cloud Platform services?

Novice Intermediate Proficient

Self-paced environment setup

If you don't already have a Google Account (Gmail or Google Apps), you must create one. Sign-in to Google Cloud Platform console (console.cloud.google.com) and create a new project:

Screenshot from 2016-02-10 12:45:26.png

Remember the project ID, a unique name across all Google Cloud projects (the name above has already been taken and will not work for you, sorry!). It will be referred to later in this codelab as PROJECT_ID.

Next, you'll need to enable billing in the Cloud Console in order to use Google Cloud resources.

Running through this codelab shouldn't cost you more than a few dollars, but it could be more if you decide to use more resources or if you leave them running (see "cleanup" section at the end of this document).

New users of Google Cloud Platform are eligible for a $300 free trial.

Google Cloud Shell

While Google Cloud can be operated remotely from your laptop, in this codelab we will be using Google Cloud Shell, a command line environment running in the Cloud.

Activate Google Cloud Shell

From the GCP Console click the Cloud Shell icon on the top right toolbar:

Then click "Start Cloud Shell":

It should only take a few moments to provision and connect to the environment:

This virtual machine is loaded with all the development tools you'll need. It offers a persistent 5GB home directory, and runs on the Google Cloud, greatly enhancing network performance and authentication. Much, if not all, of your work in this lab can be done with simply a browser or your Google Chromebook.

Once connected to Cloud Shell, you should see that you are already authenticated and that the project is already set to your PROJECT_ID.

Run the following command in Cloud Shell to confirm that you are authenticated:

gcloud auth list

Command output

Credentialed accounts:
 - <myaccount>@<mydomain>.com (active)
gcloud config list project

Command output

[core]
project = <PROJECT_ID>

If it is not, you can set it with this command:

gcloud config set project <PROJECT_ID>

Command output

Updated property [core/project].

Navigate to the Google Cloud Pub/Sub topics page and enable the API.

Click Create Topic.

Type exampleTopic as the name of the topic and then click Create..

After the topic is created, remain in the Topics page. Look for the topic you just created, press the three vertical dots at the end of the line and click New Subscription.

Type exampleSubscription in the subscription name text box and click Create.

After Cloud Shell launches, you can use the command line to generate two new Spring Boot applications with Spring Initializr:

$ curl https://start.spring.io/starter.tgz \
  -d bootVersion=2.0.6.RELEASE \
  -d dependencies=web \
  -d baseDir=spring-integration-sender | tar -xzvf -
$ curl https://start.spring.io/starter.tgz \
  -d bootVersion=2.0.6.RELEASE \
  -d baseDir=spring-integration-receiver | tar -xzvf -

Now let's create our message sending app. Change to the directory of the sending app.

$ cd spring-integration-sender

We want our app to write messages to a channel. After a message is in the channel, it will get picked up by the outbound channel adapter, which converts it from a generic Spring message to a Google Cloud Pub/Sub message and publishes it to a Google Cloud Pub/Sub topic.

In order for our app to write to a channel, we can use a Spring Integration messaging gateway. Using a text editor from vim, emacs or nano, declare a PubsubOutboundGateway interface inside the DemoApplication class.

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.integration.annotation.MessagingGateway;

@SpringBootApplication
public class DemoApplication {

        ...

        @MessagingGateway(defaultRequestChannel = "pubsubOutputChannel")
        public interface PubsubOutboundGateway {

                void sendToPubsub(String text);
        }
}

We now have a mechanism to send messages to a channel, but where do those messages go after they are in the channel?

We need an outbound channel adapter to consume new messages in the channel and publish them to a Google Cloud Pub/Sub topic.

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler;
import org.springframework.messaging.MessageHandler;

@SpringBootApplication
public class DemoApplication {

        ...

        @Bean
        @ServiceActivator(inputChannel = "pubsubOutputChannel")
        public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
                return new PubSubMessageHandler(pubsubTemplate, "exampleTopic");
        }
}

The @ServiceActivator annotation causes this MessageHandler to be applied to any new messages in inputChannel. In this case, we are calling our outbound channel adapter, PubSubMessageHandler, to publish the message to Google Cloud Pub/Sub's exampleTopic topic.

With the channel adapter in place, we can now auto-wire a PubsubOutboundGateway object and use it to write a message to a channel.

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.servlet.view.RedirectView;

@SpringBootApplication
public class DemoApplication {

...

        @Autowired
        private PubsubOutboundGateway messagingGateway;

        @PostMapping("/postMessage")
        public RedirectView postMessage(@RequestParam("message") String message) {
                this.messagingGateway.sendToPubsub(message);
                return new RedirectView("/");
        }
}

Because of the @PostMapping annotation, we now have an endpoint that is listening to HTTP POST requests, but not without also adding a @RestController annotation to the DemoApplication class to mark it as a REST controller.

src/main/java/com/example/demo/DemoApplication.java

import org.springframework.web.bind.annotation.RestController;

@SpringBootApplication
@RestController
public class DemoApplication {
  ...
}

In order for the app to run, we just need to add the required dependencies.

pom.xml

<project>
  ...
  <!-- Add Spring Cloud GCP Dependency BOM -->
  <dependencyManagement>
        <dependencies>
                <dependency>
                        <groupId>org.springframework.cloud</groupId>
                        <artifactId>spring-cloud-gcp-dependencies</artifactId>
                        <version>1.0.0.RELEASE</version>
                        <type>pom</type>
                        <scope>import</scope>
                </dependency>
        </dependencies>
  </dependencyManagement>

  <dependencies>
        ...
        <!-- Add Pub/Sub Starter -->
        <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
        </dependency>

        <!-- Add Spring Integration -->
        <dependency>
                <groupId>org.springframework.integration</groupId>
                <artifactId>spring-integration-core</artifactId>
        </dependency>

  </dependencies>

</project>

Run the sender app.

# Set the Project ID in environmental variable
$ export GOOGLE_CLOUD_PROJECT=`gcloud config list \
  --format 'value(core.project)'`
$ ./mvnw spring-boot:run

The app is listening to POST requests containing a message on port 8080 and endpoint /postMessage, but we will get to this later.

We just created an app that sends messages through Google Cloud Pub/Sub. Now, we will create another app that receives those messages and processes them.

Click + to open a new Cloud Shell session.

Then, in the new Cloud Shell session, change directories to the receiver app's directory:

$ cd spring-integration-receiver

In the previous app, the messaging gateway declaration created the outbound channel for us. Since we don't use a messaging gateway to receive messages, we need to declare our own MessageChannel where incoming messages will arrive.

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.context.annotation.Bean;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.MessageChannel;

@SpringBootApplication
public class DemoApplication {

        ...

        @Bean
        public MessageChannel pubsubInputChannel() {
                return new DirectChannel();
        }
}

We will need the inbound channel adapter to receive messages from Google Cloud Pub/Sub and relay them to pubsubInputChannel.

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.inbound.PubSubInboundChannelAdapter;

@SpringBootApplication
public class DemoApplication {
        ...

        @Bean
        public PubSubInboundChannelAdapter messageChannelAdapter(
                        @Qualifier("pubsubInputChannel") MessageChannel inputChannel,
                        PubSubTemplate pubSubTemplate) {
                PubSubInboundChannelAdapter adapter =
                                new PubSubInboundChannelAdapter(pubSubTemplate, "exampleSubscription");
                adapter.setOutputChannel(inputChannel);

                return adapter;
        }
}

This adapter binds itself to the pubsubInputChannel and listens to new messages from the Google Cloud Pub/Sub exampleSubscription subscription.

We have a channel where incoming messages are posted to, but what to do with those messages?

Let's process them with a @ServiceActivator that is triggered when new messages arrive at pubsubInputChannel.

src/main/java/com/example/demo/DemoApplication.java

...
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.integration.annotation.ServiceActivator;

@SpringBootApplication
public class DemoApplication {

        ...

        private static final Log LOGGER = LogFactory.getLog(DemoApplication.class);

        @ServiceActivator(inputChannel = "pubsubInputChannel")
        public void messageReceiver(String payload) {
                LOGGER.info("Message arrived! Payload: " + payload);
        }
}

In this case, we'll just log the message payload.

We need to add the necessary dependencies.

pom.xml

<project>
  ...
  <!-- Add Spring Cloud GCP Dependency BOM -->
  <dependencyManagement>
        <dependencies>
                <dependency>
                        <groupId>org.springframework.cloud</groupId>
                        <artifactId>spring-cloud-gcp-dependencies</artifactId>
                        <version>1.0.0.RELEASE</version>
                        <type>pom</type>
                        <scope>import</scope>
                </dependency>
        </dependencies>
  </dependencyManagement>

  <dependencies>
        ...
        <!-- Add Pub/Sub Starter -->
        <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
        </dependency>

        <!-- Add Spring Integration -->
        <dependency>
                <groupId>org.springframework.integration</groupId>
                <artifactId>spring-integration-core</artifactId>
        </dependency>

  </dependencies>

</project>

Run the receiver app.

$ ./mvnw spring-boot:run

Now any messages you send to the sender app will be logged on the receiver app. To test that, open a new Cloud Shell session and make a HTTP POST request to the sender app.

$ curl --data "message=Hello world!" localhost:8080/postMessage

Then, verify that the receiver app logged the message you sent!

INFO: Message arrived! Payload: Hello world!

Delete the subscription and topic created as part of this exercise.

$ gcloud beta pubsub subscriptions delete exampleSubscription
$ gcloud beta pubsub topics delete exampleTopic

You set up two Spring Boot apps that use the Spring Integration Channel Adapters for Google Cloud Pub/Sub. They exchange messages among themselves without ever interacting with the Google Cloud Pub/Sub API.

You learned how to use the Spring Integration Channel Adapters for Google Cloud Pub/Sub!

Learn More

License

This work is licensed under a Creative Commons Attribution 2.0 Generic License.