建立RabbitMQ & SpringBoot集成環境
介紹
RabbitMQ是實現了進階訊息佇列協定(AMQP)的開源訊息代理軟體(亦稱訊息導向中介層)。RabbitMQ伺服器是用Erlang語言編寫的,Erlang是一個使用垃圾回收的虛擬機運行的併發編程語言,類似Java,而群集和故障轉移是構建在開放電信平台框架上的。所有主要的程式語言均有與代理介面通訊的客戶端函式庫。(維基百科)
實作
使用Docker建立RabbitMQ 服務,並透過Springboot AutoConfiguration特性,快速建立一個Web Application來與RabbitMQ進行send and receive測試。
首先執行docker 指令,下載RabbitMQ映像檔
docker pull rabbitmq:management
啟動服務 default port 5672 , admin port 15672
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management
服務啟動後即可登入RabbitMQ管理介面,預設帳密:guest , guest
建立simple springboot web project .並import amqp dependency
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.3.5.RELEASE</version>
</dependency>
application.properties參數設定中加入RabbitMQ 連線參數
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.activemq.user=guest
spring.activemq.password=guest
建立RabbitmqConfig.java,自訂部分設定,創建Queue:tpu.queue,並且加入@EnableRabbit 標籤來讓@RabbitListener 標籤生效。
package com.tpisoftware;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableRabbit
public class RabbitmqConfig {
/**
* 將自定義的消息類序列化成json格式,再轉成byte構造 Message,在接收消息時,會將接收到的 Message 再反序列化成自定義的類。
* @param objectMapper
* @return
*/
@Bean
public MessageConverter jsonMessageConverter(ObjectMapper objectMapper) {
return new Jackson2JsonMessageConverter(objectMapper);
}
/**
* create Rabbit Queue
* @return
*/
@Bean
public Queue tpuQueue() {
return new Queue("tpu.queue");
}
}
建立一個收發訊息的封裝物件User.java
package com.tpisoftware;
import java.io.Serializable;
public class User implements Serializable{
private String name;
private Integer age;
public User() {
}
public User(String name, Integer age) {
this.name = name;
this.age = age;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
@Override
public String toString() {
return "User{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}
}
建立SendMessageController.java,讓使用者可以透過http GET method傳送name & age並用User封裝後送至Message Queue中。
package com.tpisoftware;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class SendMessageController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/put_message")
public String putMessage(@RequestParam(name="name") String name, @RequestParam(name="age") Integer age) {
rabbitTemplate.convertAndSend("tpu.queue", new User(name,age));
return "this is quick demo for Spring Boot!";
}
}
建立ReceiveMessageListener.java,印出receive message表示監聽指定Queue中的資料並進行消費。
package com.tpisoftware;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class ReceiveMessageListener {
/**
* 監聽Queue中是否有資料,若有資料則進行消費。
* @param user
*/
@RabbitListener(queues={"tpu.queue"})
public void receive(User user) {
System.out.println("receive message from queue:"+user.toString());
}
}
程式完成後,啟動 SpringBootApplication
在啟動log中可以看到是否成功與RabbitMQ Service建立連線。
測試
透過curl 發送請求給Server.
curl -X GET 'http://localhost:8080/put_message?name=Tian&age=28'
curl -X GET 'http://localhost:8080/put_message?name=Lin&age=18'
curl -X GET 'http://localhost:8080/put_message?name=Peter&age=51'
curl -X GET 'http://localhost:8080/put_message?name=John&age=33'
ReceiveMessageListener 接收到tpu.queue發布的訊息,並印在Server Console中。
透過RabbitMQ managementUI 可以觀察佇列內訊息數量的變化。
結語
RabbitMQ非常容易的實現於SpringBoot上,而RabbitMQ在與訊息佇列主流 Kafka使用上的選擇,
如果你想要一個簡單、傳統的發布/訂閱消息代理,選用RabbitMQ即可,
RabbitMQ還有靈活的路由預測、優先級隊列選項優點。
如果需要高流量及大數據的需求,相較下Kafka會是比較好的選擇。
參考資料
維基百科:https://zh.wikipedia.org/wiki/RabbitMQ
Medium : https://medium.com/better-programming/kafka-vs-rabbitmq-why-use-kafka-8401b2863b8b
kknews : https://kknews.cc/code/lvxagng.html