Scale your application with Spring and RabbitMQ

15 Oct 2019

There are some projects that require a massive amount of time to run specific features: an e-commerce scenario or a system that needs to send an email when a payment process is confirmed by the payment provider. As a developer, we know that keeping users waiting isn’t an option. In the case of the payment, the application needs to send an email when the payment is complete. Creating a queue of tasks to be executed asynchronously is an excellent way to process a large quantity of data without impacting users, keeping them happy. The goal of this post is to talk about how to create these asynchronous calls with Java using Spring and RabbitMQ.

RabbitMQ is an open-source, message-broker software that translates a message from the formal messaging protocol of the sender to the official messaging protocol of the receiver. In other words, RabbitMQ is a producer-consumer implementation, where the producer processes the message, and the consumer is the client who runs the process. To show how RabbitMQ works, we’ll create a smooth sample to manage a car that has three statuses: when the vehicle is new, when that car has been sold, and when the car has been identified as junk. We’d like to store it in a relational database and have two tables: one to put the current car status and the second one to put the historical information about the car. So, for each new event, we’ll fire an event to RabbitMQ to be executed to a new client asynchronously.

Show me the code

The project demo will be a Maven project. So, the first step is to define project dependencies, e.g. Spring Boot, Spring Data, MySQL driver, and RabbitMQ client into the pom.xml file.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>sh.platform.start</groupId>
	<artifactId>spring-boot-jms</artifactId>
	<version>0.0.1</version>

	<properties>
    	<java.version>1.8</java.version>
	</properties>

	<parent>
    	<groupId>org.springframework.boot</groupId>
    	<artifactId>spring-boot-starter-parent</artifactId>
    	<version>2.2.0.RELEASE</version>
	</parent>

	<dependencies>
    	<dependency>
        	<groupId>org.springframework.boot</groupId>
        	<artifactId>spring-boot-starter-web</artifactId>
    	</dependency>
    	<dependency>
        	<groupId>org.springframework.boot</groupId>
        	<artifactId>spring-boot-starter-data-jpa</artifactId>
    	</dependency>
    	<dependency>
        	<groupId>mysql</groupId>
        	<artifactId>mysql-connector-java</artifactId>
    	</dependency>
    	<dependency>
        	<groupId>org.springframework</groupId>
        	<artifactId>spring-jms</artifactId>
    	</dependency>
    	<dependency>
        	<groupId>org.springframework.boot</groupId>
        	<artifactId>spring-boot-starter-amqp</artifactId>
    	</dependency>
    	<dependency>
        	<groupId>org.messaginghub</groupId>
        	<artifactId>pooled-jms</artifactId>
    	</dependency>
    	<dependency>
        	<groupId>com.rabbitmq.jms</groupId>
        	<artifactId>rabbitmq-jms</artifactId>
        	<version>1.11.2</version>
    	</dependency>
    	<dependency>
        	<groupId>sh.platform</groupId>
        	<artifactId>config</artifactId>
        	<version>2.2.2</version>
    	</dependency>
	</dependencies>

	<build>
    	<finalName>spring-boot-jms</finalName>
    	<plugins>
        	<plugin>
            	<groupId>org.springframework.boot</groupId>
            	<artifactId>spring-boot-maven-plugin</artifactId>
        	</plugin>
    	</plugins>
	</build>
	<repositories>
    	<repository>
        	<id>oss.sonatype.org-snapshot</id>
        	<url>http://oss.sonatype.org/content/repositories/snapshots</url>
    	</repository>
	</repositories>
</project>

The next step is the configuration classes; these classes have the responsibility to provide both the data source to connect to the database and the connection factory for the client to use to create a connection with a JMS provider.

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import sh.platform.config.Config;
import sh.platform.config.MySQL;

import javax.sql.DataSource;

@Configuration
public class DataSourceConfig {

	@Bean(name = "dataSource")
	public DataSource getDataSource() {
    	Config config = new Config();
    	MySQL database = config.getCredential("database", MySQL::new);
    	return database.get();
	}
}

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;
import sh.platform.config.Config;
import sh.platform.config.RabbitMQ;

import javax.jms.ConnectionFactory;

@Configuration
@EnableJms
public class JMSConfig {

	private ConnectionFactory getConnectionFactory() {
    	Config config = new Config();
    	final RabbitMQ rabbitMQ = config.getCredential("rabbitmq", RabbitMQ::new);
    	return rabbitMQ.get();
	}

	@Bean
	public MessageConverter jacksonJmsMessageConverter() {
    	MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
    	converter.setTargetType(MessageType.TEXT);
    	converter.setTypeIdPropertyName("_type");
    	return converter;
	}

	@Bean
	public CachingConnectionFactory cachingConnectionFactory() {
    	ConnectionFactory connectionFactory = getConnectionFactory();
    	return new CachingConnectionFactory(connectionFactory);
	}

}

After the configurations are created, the next step is to define the entities. These entities are the core of the business and will represent the instances that we’ll create/write from the database and integrate into the queue. In this sample, there are two entities: the Car entity, where we have the current status of the car, and the entity that holds the status of the operation, the CarLog.

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import java.util.Objects;

@Entity
public class Car {

	@Id
	@GeneratedValue(strategy = GenerationType.IDENTITY)
	private Long id;

	@Column
	private String plate;

	@Column
	private String model;

	@Column
	private Integer age;

	@Column
	private String color;

	public Long getId() {
    	return id;
	}

	public String getModel() {
    	return model;
	}

	public Integer getAge() {
    	return age;
	}

	public String getColor() {
    	return color;
	}

	public String getPlate() {
    	return plate;
	}

	@Override
	public boolean equals(Object o) {
    	if (this == o) {
        	return true;
    	}
    	if (o == null || getClass() != o.getClass()) {
        	return false;
    	}
    	Car car = (Car) o;
    	return Objects.equals(id, car.id);
	}

	@Override
	public int hashCode() {
    	return Objects.hashCode(id);
	}

	@Override
	public String toString() {
    	return "Car{" +
            	"id=" + id +
            	", plate='" + plate + '\'' +
            	", model='" + model + '\'' +
            	", age=" + age +
            	", color='" + color + '\'' +
            	'}';
	}
}

public enum CarStatus {
	NEW, JUNK, SOLD;
}

import javax.persistence.*;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Objects;

@Entity
public class CarLog {

	private static final ZoneId UTC = ZoneId.of("UTC");

	@Id
	@GeneratedValue(strategy = GenerationType.IDENTITY)
	private Long id;

	@Column
	private String plate;

	@Column
	private String model;

	@Column
	private LocalDateTime date = LocalDateTime.now(UTC);

	@Column
	@Enumerated(value = EnumType.STRING)
	private CarStatus status;

	public Long getId() {
    	return id;
	}

	public String getPlate() {
    	return plate;
	}

	public String getModel() {
    	return model;
	}

	public CarStatus getStatus() {
    	return status;
	}

	public LocalDateTime getDate() {
    	return date;
	}

	public static CarLog newCar(Car car) {
    	return of(car, CarStatus.NEW);
	}

	public static CarLog junk(Car car) {
    	return of(car, CarStatus.JUNK);
	}

	public static CarLog sold(Car car) {
    	return of(car, CarStatus.SOLD);
	}

	private static CarLog of(Car car, CarStatus status) {
    	Objects.requireNonNull(car, "car is required");
    	CarLog log = new CarLog();
    	log.plate = car.getPlate();
    	log.model = car.getModel();
    	log.status = status;
    	return log;
	}
}

After that, once the Spring Data entities are defined, the next step is to create the repository’s interfaces. The goal of the Spring Data repository abstraction is to significantly reduce the amount of boilerplate code required to implement data access layers for various persistence stores.

import org.springframework.data.repository.PagingAndSortingRepository;

public interface CarRepository extends PagingAndSortingRepository<Car, Long> {
}


import org.springframework.data.repository.PagingAndSortingRepository;

import java.util.List;

public interface CarLogRepository extends PagingAndSortingRepository<CarLog, Long> {

	List<CarLog> findByPlate(String plate);

	List<CarLog> findByModel(String model);

	List<CarLog> findByStatus(CarStatus status);
}

In the MVC pattern, the controller is the layer between the model and the view, and that’s what we’ll create next, the controller classes. In the CarController layer there’s the JmsTemplate that makes it very simple to send messages to a JMS destination.

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("cars")
public class CarController {

	@Autowired
	private CarRepository repository;

	@Autowired
	private JmsTemplate template;

	@PostMapping
	@ResponseStatus(code = HttpStatus.CREATED)
	public String save(@RequestBody Car car) {
    	repository.save(car);
    	template.convertAndSend("new", car);
    	return "Saved- " + car.getModel();
	}

	@GetMapping(value = "/{id}", produces = "application/json")
	public Car get(@PathVariable("id") long id) {
    	return repository.findById(id).orElseThrow(() -> new RuntimeException("Not found"));
	}

	@GetMapping(produces = "application/json")
	public Iterable<Car> get() {
    	return repository.findAll();
	}


	@PutMapping(value = "/{id}", produces = "application/json")
	public Car update(@PathVariable("id") long id, @RequestBody Car car) {
    	repository.save(car);
    	return car;
	}

	@DeleteMapping(value = "junk/{id}", produces = "application/json")
	public Car junk(@PathVariable("id") long id) {
    	Car car = repository.findById(id).orElseThrow(() -> new RuntimeException("Not found"));
    	repository.deleteById(id);
    	template.convertAndSend("junk", car);
    	return car;
	}

	@DeleteMapping(value = "sold/{id}", produces = "application/json")
	public Car sold(@PathVariable("id") long id) {
    	Car car = repository.findById(id).orElseThrow(() -> new RuntimeException("Not found"));
    	repository.deleteById(id);
    	template.convertAndSend("sold", car);
    	return car;
	}
}


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("logs")
public class CarLogController {

	@Autowired
	private CarLogRepository repository;

	@GetMapping(produces = "application/json")
	public Iterable<CarLog> get() {
    	return repository.findAll();
	}

	@GetMapping(value = "{plate}", produces = "application/json")
	public Iterable<CarLog> getHistoric(@PathVariable("plate") String plate) {
    	return repository.findByPlate(plate);
	}

	@GetMapping(value = "models/{model}", produces = "application/json")
	public Iterable<CarLog> get(@PathVariable("model") String model) {
    	return repository.findByModel(model);
	}

	@GetMapping(value = "status/{status}", produces = "application/json")
	public Iterable<CarLog> get(@PathVariable("status") CarStatus status) {
    	return repository.findByStatus(status);
	}
}

In the CarLogController layer, we only see a GET verb, which means it is a read-only controller. But how will the information get into the database? In the CarController layer, the client sends messages to the RabbitMQ queue.

Next, it’s time to talk about the class that will read this queue. The class CarEventReceiver has several methods for the JmsListener annotations, with the attribute that represents the queue for that method; it will listen and wait for a message to read and process. If you take a second look at how the JmsTemplate has been used at the class CarController layer, the first parameter is a String that provides the queue name for where the information should be sent as the second parameter. Spring JMS connects the consumer and producer easily, with a template that allows for production and annotation, making the information more consumable.

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class CarEventReceiver {


	@Autowired
	private CarLogRepository repository;

	@JmsListener(destination = "new")
	public void newCar(Car car) {
    	CarLog log = CarLog.newCar(car);
    	repository.save(log);
	}

	@JmsListener(destination = "junk")
	public void junk(Car car) {
    	CarLog log = CarLog.junk(car);
    	repository.save(log);
	}

	@JmsListener(destination = "sold")
	public void sold(Car car) {
    	CarLog log = CarLog.sold(car);
    	repository.save(log);
	}
}

Platform.sh structure

The Java application is ready to go! The next step is to set the Platform.sh files required to manage and deploy the application. In our first Java post, we took a deep dive into each detail of these three files:

  • One router (.platform/routes.yaml). Platform.sh allows you to define the routes.
  • Zero or more service containers (.platform/services.yaml). Platform.sh allows you to completely define and configure the topology and services you want to use on your project.
  • One or more application containers (.platform.app.yaml). You control your application and the way it will be built and deployed on Platform.sh via a single configuration file.

The file that will change in this post is the service file, allowing you to define a database, search engine, cache, and so on. In this project, we’ll set MariaDB and RabbitMQ.

db:
  type: mariadb:10.4
  disk: 512
queuerabbit:
  type: rabbitmq:3.7
  disk: 512

In the application file, we’ll change the relationship to allow our application to access the services. To point out, this access is an essential feature from a security perspective. So in a microservices scenario, we can make sure that the finance applications access the financial services and so on.

# This file describes an application. You can have multiple applications
# in the same project.
#
# See https://docs.platform.sh/user_guide/reference/platform-app-yaml.html

# The name of this app. Must be unique within a project.
name: app

# The runtime the application uses.
type: "java:8"

disk: 1024

# The hooks executed at various points in the lifecycle of the application.
hooks:
  build: mvn clean install

# The relationships of the application with services or other applications.
#
# The left-hand side is the name of the relationship as it will be exposed
# to the application in the PLATFORM_RELATIONSHIPS variable. The right-hand
# side is in the form `<service name>:<endpoint name>`.
relationships:
  database: "db:mysql"
  rabbitmq: "queuerabbit:rabbitmq"

# The configuration of app when it is exposed to the web.
web:
  commands:
    	start:  java -jar -Xmx512m target/spring-boot-jms.jar --server.port=$PORT

The application is now ready, so it’s time to move it to the cloud with Platform.sh using the following steps:

  • Create a new free trial account.
  • Sign up with a new username and password, or login using a current GitHub, Bitbucket, or Google account. If you use a third-party login, you’ll be able to set a password for your Platform.sh account later.
  • Select the region of the world where your site should live.
  • Select the blank template.

After this wizard, Platform.sh will provision the whole infrastructure to yo and provide your project a remote Git repository. The Platform.sh Git-driven infrastructure means it will automatically manage everything your application needs to push it to the master remote repository. After you set up your SSH keys, you’ll only need to write your code—including a few YAML files that specify your desired infrastructure—then commit it to Git and push.

git remote add platform <platform.sh@gitrepository>
git commit -m "Initial project"
git push -u platform master

Code pushed will create both the Java application, the services instances, and, when it’s done, will return an IP address to the service. Let’s test the application.

curl -X POST -k -H 'Content-Type: application/json' -i 'https://<host_addresss>/cars' --data '{"id":1,"plate":"AB-0001-AB","model":"Vogel","age":2012,"color":"green"}'
curl -X POST -k -H 'Content-Type: application/json' -i 'https://<host_addresss>/cars' --data '{"id":2,"plate":"AB-0003-AB","model":"Renault","age":2018,"color":"red"}'
curl -X POST -k -H 'Content-Type: application/json' -i 'https://<host_addresss>/cars' --data '{"id":3,"plate":"AB-0006-AB","model":"Peugeot","age":2019"color":"black"}'
curl -X GET -i 'https://<host_address>/logs'

In this post, we learned about how to optimize asynchronous communication across the system with RabbitMQ and Spring. This strategy will allow your application to be more scalable and will prevent the user from waiting too long for an answer for a Queue/Topic consumer. An architecture that has any asynchronous communication allows, for example, a second application to read and process the information from the broker or to have more than one consumer, if the system requires it.