The objective of this tutorial is to develop a reactive server which consuming data from a rabbitmq message broker and publishing data to client via websocket.
- Prerequisites
- Architecture overview
- Setup environment
- Consume data from Rabbitmq
- Store data into InfluxDB
- Create Rest API with Swagger
- Publish data to client via Websocket
- Final Result
Prerequisites
You must be aware of these posts to understand the environment.
Architecture overview
Setup environment
Prerequisites
Clone the project
git clone git@github.com:jluccisano/reactive-server.git
Run
mv application.tpl.yml application.yml
-
Set your value into application.yml
-
Start server
mvn spring-boot:run -Drun.profiles=dev
Consume data from Rabbitmq
git checkout -b step1-consumeDataFromRabbitMQ origin/step1-consumeDataFromRabbitMQ
Configuration properties
@Data
@ConfigurationProperties("spring.rabbitmq")
public class RabbitMQProperties {
@NotEmpty
private String endpoint;
@NotEmpty
private String exchange;
@NotEmpty
private String queue;
@NotEmpty
private String gatewayId;
}
Setup configuration
@Configuration
@EnableRabbit
@EnableConfigurationProperties(RabbitMQProperties.class)
public class RabbitMQConfig {
@Autowired
RabbitMQProperties rabbitMQProperties;
@Bean
public Jackson2JsonMessageConverter jackson2MessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(URI.create(rabbitMQProperties.getEndpoint()));
return connectionFactory;
}
@Bean
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(jackson2MessageConverter());
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
return factory;
}
@Bean
public ReceiveHandlerImpl receiveHandler() {
return new ReceiveHandlerImpl();
}
}
Create consumer
@Service
public class ReceiveHandlerImpl implements ReceiveHandler {
private static final Logger logger = LoggerFactory.getLogger(ReceiveHandlerImpl.class);
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${spring.rabbitmq.queue}", durable = "true"),
exchange = @Exchange(value = "${spring.rabbitmq.exchange}", type = ExchangeTypes.HEADERS, durable = "true"),
arguments = {
@Argument(name = "x-match", value = "all"),
@Argument(name = "gatewayId", value = "${spring.rabbitmq.gatewayId}")
})
)
public void handleMessage(@Payload Message message, @Header("gatewayId") String gatewayId) {
logger.info(new String(message.toString()));
}
}
Result
2017-05-11 13:48:51.283 INFO 16434 --- [cTaskExecutor-1] com.jls.service.impl.ReceiveHandlerImpl : Message(expire=43869000, created=1494503330, data=DHT22Sensor{temperature='23.799999237060547', humidity=47.29999923706055})
Store data into InfluxDB
git checkout -b step2-store-data-into-influxDB origin/step2-store-data-into-influxDB
Maven dependencies
<dependency>
<groupId>com.github.miwurster</groupId>
<artifactId>spring-data-influxdb</artifactId>
<version>1.5</version>
</dependency>
<dependency>
<groupId>org.influxdb</groupId>
<artifactId>influxdb-java</artifactId>
<version>2.5</version>
</dependency>
Configuration
@Configuration
@EnableConfigurationProperties(InfluxDBProperties.class)
public class InfluxDBConfig {
@Bean
public InfluxDBConnectionFactory influxDBConnectionFactory(final InfluxDBProperties properties) {
return new InfluxDBConnectionFactory(properties);
}
@Bean
public InfluxDBTemplate<Point> influxDBTemplate(final InfluxDBConnectionFactory connectionFactory) {
return new InfluxDBTemplate<>(connectionFactory, new PointConverter());
}
}
Service
@Service
public class InfluxDBServiceImpl implements InfluxDBService, InitializingBean {
@Autowired
InfluxDBTemplate<Point> influxDBTemplate;
public void write(Point point) {
influxDBTemplate.write(point);
}
public QueryResult query(String queryString) {
Query query = new Query(queryString, influxDBTemplate.getDatabase());
return influxDBTemplate.query(query);
}
@Override
public void afterPropertiesSet() throws Exception {
influxDBTemplate.createDatabase();
}
}
Store
@Service
public class ReceiveHandlerImpl implements ReceiveHandler {
@Autowired
InfluxDBService influxDBService;
private static final Logger logger = LoggerFactory.getLogger(ReceiveHandlerImpl.class);
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${spring.rabbitmq.queue}", durable = "true"),
exchange = @Exchange(value = "${spring.rabbitmq.exchange}", type = ExchangeTypes.HEADERS, durable = "true"),
arguments = {
@Argument(name = "x-match", value = "all"),
@Argument(name = "gatewayId", value = "${spring.rabbitmq.gatewayId}")
})
)
public void handleMessage(@Payload Message message, @Header("gatewayId") String gatewayId) {
logger.info(new String(message.toString()));
Point point = Point.measurement("dht22")
.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
.tag("gatewayId",gatewayId)
.addField("temperature",message.getData().getTemperature())
.addField("humidity", message.getData().getHumidity())
.build();
influxDBService.write(point);
}
}
Result
bash-4.3# influx
Connected to http://localhost:8086 version 1.2.0
InfluxDB shell version: 1.2.0
> use influx_db_name
Using database influx_db_name
> SELECT * FROM dht22
1494503402044000000 your_gateway_id 47.20000076293945 23.899999618530273
Create Rest API with Swagger
git checkout -b origin/step3-create-rest-api-with-swagger step3-create-rest-api-with-swagger
Maven dependencies
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>${springfox.version}</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>${springfox.version}</version>
</dependency>
Swagger Config
@Configuration
@EnableSwagger2
public class SwaggerConfig {
@Bean
public Docket api() {
return new Docket(DocumentationType.SWAGGER_2)
.select()
.apis(RequestHandlerSelectors.any())
.paths(PathSelectors.any())
.build();
}
}
@SpringBootApplication
@Import({WebConfig.class,
JacksonConfig.class,
InfluxDBConfig.class,
RabbitMQConfig.class,
SwaggerConfig.class})
public class ReactiveServerApplication extends WebMvcConfigurerAdapter {
public static void main(String[] args) {
SpringApplication.run(ReactiveServerApplication.class, args);
}
@Override
public void addResourceHandlers(ResourceHandlerRegistry registry) {
registry.addResourceHandler("swagger-ui.html").addResourceLocations("classpath:/META-INF/resources/");
registry.addResourceHandler("/webjars/**").addResourceLocations("classpath:/META-INF/resources/webjars/");
}
}
Controller
@RestController
@RequestMapping("/api/v1/sensor/dht22")
@Api(value = "dht22", description = "DHT22 Sensor data")
public class DHT22Controller {
@Autowired
InfluxDBService influxDBService;
private ListResource<Map> toListResource(QueryResult queryResult) {
ListResource<Map> listResource = new ListResource<>();
if(queryResult.getResults() != null && queryResult.getResults().size() > 0) {
QueryResult.Result result = queryResult.getResults().get(0);
if(result != null && result.getSeries() != null && result.getSeries().size() > 0) {
QueryResult.Series series = result.getSeries().get(0);
series.getValues().forEach(value -> {
Map<String,Object> mappedValues = new HashMap<>();
IntStream.range(0, value.size())
.forEach(index -> {
mappedValues.put(series.getColumns().get(index), value.get(index));
});
listResource.getItems().add(mappedValues);
});
}
}
return listResource;
}
@SneakyThrows
private DHT22Sensor toTransferObject(QueryResult queryResult) {
DHT22Sensor dht22Sensor = new DHT22Sensor();
QueryResult.Series series = queryResult.getResults().get(0).getSeries().get(0);
dht22Sensor.setTime(ZonedDateTime.parse((String) series.getValues().get(0).get(0)));
dht22Sensor.setGatewayId((String) series.getValues().get(0).get(1));
dht22Sensor.setHumidity((Double) series.getValues().get(0).get(2));
dht22Sensor.setTemperature((Double) series.getValues().get(0).get(3));
return dht22Sensor;
}
//http://localhost:8084//api/v1/sensor/dht22/fresh/raspberry_1
@RequestMapping(value = "/fresh/{gatewayId}", method = RequestMethod.GET, produces = MediaType.APPLICATION_JSON_VALUE)
@ApiOperation(value = "Get latest fresh data", notes = "Get latest fresh data")
public ResponseEntity<DHT22Sensor> getFreshSensorData(@PathVariable("gatewayId") String gatewayId) {
String getLastValueQuery =
new SelectBuilder()
.column("*")
.from("dht22")
.where("gatewayId = '" + gatewayId + "'")
.orderBy("time", false)
.toString();
getLastValueQuery = getLastValueQuery.concat(" LIMIT 1");
return new ResponseEntity<>(toTransferObject(influxDBService.query(getLastValueQuery)), HttpStatus.OK);
}
//http://localhost:8084/api/v1/sensor/dht22/continuous/raspberry_1?sample=1h&range=12h
@RequestMapping(value = "/continuous/{gatewayId}", method = RequestMethod.GET, produces = MediaType.APPLICATION_JSON_VALUE)
@ApiOperation(value = "Get Continuous Data", notes = "Get Continuous Data")
public ResponseEntity<ListResource<Map>> getContinuousData(@PathVariable("gatewayId") String gatewayId,
@RequestParam(value = "sample", required = true) String sample,
@RequestParam(value = "range", required = true) String range) {
String getContinuousQuey =
new SelectBuilder()
.column("*")
.from("cq_dht22_" + sample)
.where("gatewayId = '" + gatewayId + "' AND time >= now() - " + range)
.orderBy("time", false)
.toString();
return new ResponseEntity<>(toListResource(influxDBService.query(getContinuousQuey)), HttpStatus.OK);
}
}
Result
http://localhost:8084/swagger-ui.html
Publish data to client via Websocket
git checkout -b step4-publish-to-websocket origin/step4-publish-to-websocket
Maven dependency
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-websocket</artifactId>
<version>4.3.3.RELEASE</version>
</dependency>
Configuration
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/queue/");
config.setApplicationDestinationPrefixes("/app");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/stomp").setAllowedOrigins("*").withSockJS();
}
}
Service
@Service
public class PublishServiceImpl implements PublishService {
@Autowired
SimpMessagingTemplate messagingTemplate;
public void publish(String destination, Object payload) {
messagingTemplate.convertAndSend(destination, payload.toString());
}
}
Forward
@Service
public class ReceiveHandlerImpl implements ReceiveHandler {
@Autowired
PublishServiceImpl publishService;
@Autowired
InfluxDBService influxDBService;
private static final Logger logger = LoggerFactory.getLogger(ReceiveHandlerImpl.class);
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${spring.rabbitmq.queue}", durable = "true"),
exchange = @Exchange(value = "${spring.rabbitmq.exchange}", type = ExchangeTypes.HEADERS, durable = "true"),
arguments = {
@Argument(name = "x-match", value = "all"),
@Argument(name = "gatewayId", value = "${spring.rabbitmq.gatewayId}")
})
)
public void handleMessage(@Payload Message message, @Header("gatewayId") String gatewayId) {
logger.info(new String(message.toString()));
Point point = Point.measurement("dht22")
.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
.tag("gatewayId",gatewayId)
.addField("temperature",message.getData().getTemperature())
.addField("humidity", message.getData().getHumidity())
.build();
influxDBService.write(point);
publishService.publish("/queue/DHT22",message.toString());
}
}
Result
2017-05-12 09:11:03.826 INFO 1729 --- [cTaskExecutor-1] com.jls.service.impl.PublishServiceImpl : [>] Publish: Message(expire=60537000, created=1494573062, data=DHT22Sensor{temperature='23.200000762939453', humidity=52.5}) ;destination: /queue/DHT22
Final Result
version: '2'
services:
reactive-server:
image: "jluccisano/reactive-server:latest"
environment:
- PORT=8084
- RABBITMQ_ENDPOINT=amqp://rabbit_user:rabbit_password@rabbitmq:5672/myvhost
- RABBITMQ_EXCHANGE=your_exchange_name
- RABBITMQ_QUEUE=your_queue_name
- RABBITMQ_GATEWAYID=your_gateway_id
- INFLUXDB_URL=http://influxdb:8086
- INFLUXDB_USERNAME=influx_username
- INFLUXDB_PASSWORD=influx_password
- INFLUXDB_DATABASE=influx_db_name
- INFLUXDB_RETENTION_POLICY=influx_rp_name
ports:
- "8084:8084"
links:
- "rabbitmq:rabbitmq"
- "influxdb:influxdb"
depends_on:
- "rabbitmq"
- "influxdb"
rabbitmq:
image: "rabbitmq:3-management"
environment:
- RABBITMQ_DEFAULT_USER=rabbit_user
- RABBITMQ_DEFAULT_PASS=rabbit_password
- RABBITMQ_DEFAULT_VHOST=myvhost
ports:
- "5672:5672"
- "8092:15672"
influxdb:
image: "appcelerator/influxdb"
volumes:
- './resources/init_script.influxql:/etc/extra-config/influxdb/init_script.influxql:ro'
ports:
- "8083:8083"
- "8086:8086"
grafana:
image: 'grafana/grafana'
links:
- "influxdb:influxdb"
depends_on:
- "influxdb"
ports:
- '3600:3000'
device-mock:
build:
context: ./resources
dockerfile: stub.dockerfile
environment:
- RABBITMQ_ENDPOINT=amqp://rabbit_user:rabbit_password@rabbitmq:5672/myvhost
- RABBITMQ_EXCHANGE=your_exchange_name
- RABBITMQ_GATEWAYID=your_gateway_id
- PUBLISH_INTERVAL=60
docker-compose up -d