怎么在Java项目中利用rabbitMQ实现一个消息收发功能?相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

成都创新互联公司专业为企业提供密山网站建设、密山做网站、密山网站设计、密山网站制作等企业网站建设、网页设计与制作、密山企业网站模板建站服务,十余年密山做网站经验,不只是建网站,更提供有价值的思路和整体网络服务。
java实现rAMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
本文不介绍amqp和rabbitmq相关知识,请自行网上查阅
本文是基于spring-rabbit中间件来实现消息的发送接受功能
see http://www.rabbitmq.com/tutorials/tutorial-one-java.html
see http://www.springsource.org/spring-amqp
Java编程通过操作rabbitMQ消息的收发实现代码如下:
com.rabbitmq amqp-client 2.8.2 org.springframework.amqp spring-amqp 1.1.1.RELEASE org.springframework.amqp spring-rabbit 1.1.1.RELEASE com.caucho hessian 4.0.7
首先我们需要一个用来在app和rabbitmq之间传递消息的持有对象
public class EventMessage implements Serializable{
private String queueName;
private String exchangeName;
private byte[] eventData;
public EventMessage(String queueName, String exchangeName, byte[] eventData) {
this.queueName = queueName;
this.exchangeName = exchangeName;
this.eventData = eventData;
}
public EventMessage() {
}
public String getQueueName() {
return queueName;
}
public String getExchangeName() {
return exchangeName;
}
public byte[] getEventData() {
return eventData;
}
@Override
public String toString() {
return "EopEventMessage [queueName=" + queueName + ", exchangeName="
+ exchangeName + ", eventData=" + Arrays.toString(eventData)
+ "]";
}
}为了可以发送和接受这个消息持有对象,我们还需要需要一个用来序列化和反序列化的工厂
public interface CodecFactory {
byte[] serialize(Object obj) throws IOException;
Object deSerialize(byte[] in) throws IOException;
}下面是编码解码的实现类,用了hessian来实现,大家可以自行选择序列化方式
public class HessionCodecFactory implements CodecFactory {
private final Logger logger = Logger.getLogger(HessionCodecFactory.class);
@Override
public byte[] serialize(Object obj) throws IOException {
ByteArrayOutputStream baos = null;
HessianOutput output = null;
try {
baos = new ByteArrayOutputStream(1024);
output = new HessianOutput(baos);
output.startCall();
output.writeObject(obj);
output.completeCall();
} catch (final IOException ex) {
throw ex;
} finally {
if (output != null) {
try {
baos.close();
} catch (final IOException ex) {
this.logger.error("Failed to close stream.", ex);
}
}
}
return baos != null ? baos.toByteArray() : null;
}
@Override
public Object deSerialize(byte[] in) throws IOException {
Object obj = null;
ByteArrayInputStream bais = null;
HessianInput input = null;
try {
bais = new ByteArrayInputStream(in);
input = new HessianInput(bais);
input.startReply();
obj = input.readObject();
input.completeReply();
} catch (final IOException ex) {
throw ex;
} catch (final Throwable e) {
this.logger.error("Failed to decode object.", e);
} finally {
if (input != null) {
try {
bais.close();
} catch (final IOException ex) {
this.logger.error("Failed to close stream.", ex);
}
}
}
return obj;
}
}接下来就先实现发送功能,新增一个接口专门用来实现发送功能
public interface EventTemplate {
void send(String queueName,String exchangeName,Object eventContent) throws SendRefuseException;
void send(String queueName,String exchangeName,Object eventContent,CodecFactory codecFactory) throws SendRefuseException;
}SendRefuseException是自定义的发送失败异常类
下面是它的实现类,主要的任务就是将数据转换为EventMessage
public class DefaultEventTemplate implements EventTemplate {
private static final Logger logger = Logger.getLogger(DefaultEventTemplate.class);
private AmqpTemplate eventAmqpTemplate;
private CodecFactory defaultCodecFactory;
// private DefaultEventController eec;
// public DefaultEventTemplate(AmqpTemplate eopAmqpTemplate,
// CodecFactory defaultCodecFactory, DefaultEventController eec) {
// this.eventAmqpTemplate = eopAmqpTemplate;
// this.defaultCodecFactory = defaultCodecFactory;
// this.eec = eec;
// }
public DefaultEventTemplate(AmqpTemplate eopAmqpTemplate,CodecFactory defaultCodecFactory) {
this.eventAmqpTemplate = eopAmqpTemplate;
this.defaultCodecFactory = defaultCodecFactory;
}
@Override
public void send(String queueName, String exchangeName, Object eventContent)
throws SendRefuseException {
this.send(queueName, exchangeName, eventContent, defaultCodecFactory);
}
@Override
public void send(String queueName, String exchangeName, Object eventContent,
CodecFactory codecFactory) throws SendRefuseException {
if (StringUtils.isEmpty(queueName) || StringUtils.isEmpty(exchangeName)) {
throw new SendRefuseException("queueName exchangeName can not be empty.");
}
// if (!eec.beBinded(exchangeName, queueName))
// eec.declareBinding(exchangeName, queueName);
byte[] eventContentBytes = null;
if (codecFactory == null) {
if (eventContent == null) {
logger.warn("Find eventContent is null,are you sure...");
} else {
throw new SendRefuseException(
"codecFactory must not be null ,unless eventContent is null");
}
} else {
try {
eventContentBytes = codecFactory.serialize(eventContent);
} catch (IOException e) {
throw new SendRefuseException(e);
}
}
// 构造成Message
EventMessage msg = new EventMessage(queueName, exchangeName,
eventContentBytes);
try {
eventAmqpTemplate.convertAndSend(exchangeName, queueName, msg);
} catch (AmqpException e) {
logger.error("send event fail. Event Message : [" + eventContent + "]", e);
throw new SendRefuseException("send event fail", e);
}
}
}注释的地方稍后会用到,主要是防止数据数据发送的地方没有事先声明
然后我们再实现接受消息
首先我们需要一个消费接口,所有的消费程序都实现这个类
public interface EventProcesser {
public void process(Object e);
}为了能够将不同类型的消息交由对应的程序来处理,我们还需要一个消息处理适配器
/** * MessageListenerAdapter的Pojo *消息处理适配器,主要功能:
*1、将不同的消息类型绑定到对应的处理器并本地缓存,如将queue01+exchange01的消息统一交由A处理器来出来
*2、执行消息的消费分发,调用相应的处理器来消费属于它的消息
* */ public class MessageAdapterHandler { private static final Logger logger = Logger.getLogger(MessageAdapterHandler.class); private ConcurrentMapepwMap; public MessageAdapterHandler() { this.epwMap = new ConcurrentHashMap (); } public void handleMessage(EventMessage eem) { logger.debug("Receive an EventMessage: [" + eem + "]"); // 先要判断接收到的message是否是空的,在某些异常情况下,会产生空值 if (eem == null) { logger.warn("Receive an null EventMessage, it may product some errors, and processing message is canceled."); return; } if (StringUtils.isEmpty(eem.getQueueName()) || StringUtils.isEmpty(eem.getExchangeName())) { logger.warn("The EventMessage's queueName and exchangeName is empty, this is not allowed, and processing message is canceled."); return; } // 解码,并交给对应的EventHandle执行 EventProcessorWrap eepw = epwMap.get(eem.getQueueName()+"|"+eem.getExchangeName()); if (eepw == null) { logger.warn("Receive an EopEventMessage, but no processor can do it."); return; } try { eepw.process(eem.getEventData()); } catch (IOException e) { logger.error("Event content can not be Deserialized, check the provided CodecFactory.",e); return; } } protected void add(String queueName, String exchangeName, EventProcesser processor,CodecFactory codecFactory) { if (StringUtils.isEmpty(queueName) || StringUtils.isEmpty(exchangeName) || processor == null || codecFactory == null) { throw new RuntimeException("queueName and exchangeName can not be empty,and processor or codecFactory can not be null. "); } EventProcessorWrap epw = new EventProcessorWrap(codecFactory,processor); EventProcessorWrap oldProcessorWrap = epwMap.putIfAbsent(queueName + "|" + exchangeName, epw); if (oldProcessorWrap != null) { logger.warn("The processor of this queue and exchange exists, and the new one can't be add"); } } protected Set getAllBinding() { Set keySet = epwMap.keySet(); return keySet; } protected static class EventProcessorWrap { private CodecFactory codecFactory; private EventProcesser eep; protected EventProcessorWrap(CodecFactory codecFactory, EventProcesser eep) { this.codecFactory = codecFactory; this.eep = eep; } public void process(byte[] eventData) throws IOException{ Object obj = codecFactory.deSerialize(eventData); eep.process(obj); } } }
这是正常情况下的消息处理方式,如果rabbitmq消息接受发生异常,也要监控到,新增一个消费类专门用来处理错误异常的消息
public class MessageErrorHandler implements ErrorHandler{
private static final Logger logger = Logger.getLogger(MessageErrorHandler.class);
@Override
public void handleError(Throwable t) {
logger.error("RabbitMQ happen a error:" + t.getMessage(), t);
}
}接下来我们可能需要一个专门配置和rabbitmq通信的一些信息,比如地址,端口等信息
public class EventControlConfig {
private final static int DEFAULT_PORT = 5672;
private final static String DEFAULT_USERNAME = "guest";
private final static String DEFAULT_PASSWORD = "guest";
private final static int DEFAULT_PROCESS_THREAD_NUM = Runtime.getRuntime().availableProcessors() * 2;
private static final int PREFETCH_SIZE = 1;
private String serverHost ;
private int port = DEFAULT_PORT;
private String username = DEFAULT_USERNAME;
private String password = DEFAULT_PASSWORD;
private String virtualHost;
/**
* 和rabbitmq建立连接的超时时间
*/
private int connectionTimeout = 0;
/**
* 事件消息处理线程数,默认是 CPU核数 * 2
*/
private int eventMsgProcessNum;
/**
* 每次消费消息的预取值
*/
private int prefetchSize;
public EventControlConfig(String serverHost) {
this(serverHost,DEFAULT_PORT,DEFAULT_USERNAME,DEFAULT_PASSWORD,null,0,DEFAULT_PROCESS_THREAD_NUM,DEFAULT_PROCESS_THREAD_NUM,new HessionCodecFactory());
}
public EventControlConfig(String serverHost, int port, String username,
String password, String virtualHost, int connectionTimeout,
int eventMsgProcessNum,int prefetchSize,CodecFactory defaultCodecFactory) {
this.serverHost = serverHost;
this.port = port>0?port:DEFAULT_PORT;
this.username = username;
this.password = password;
this.virtualHost = virtualHost;
this.connectionTimeout = connectionTimeout>0?connectionTimeout:0;
this.eventMsgProcessNum = eventMsgProcessNum>0?eventMsgProcessNum:DEFAULT_PROCESS_THREAD_NUM;
this.prefetchSize = prefetchSize>0?prefetchSize:PREFETCH_SIZE;
}
public String getServerHost() {
return serverHost;
}
public int getPort() {
return port;
}
public String getUsername() {
return username;
}
public String getPassword() {
return password;
}
public String getVirtualHost() {
return virtualHost;
}
public int getConnectionTimeout() {
return connectionTimeout;
}
public int getEventMsgProcessNum() {
return eventMsgProcessNum;
}
public int getPrefetchSize() {
return prefetchSize;
}
}具体的发送、接受程序已经好了,接下来也是最重要的就是管理控制和rabbitmq的通信
public interface EventController {
/**
* 控制器启动方法
*/
void start();
/**
* 获取发送模版
*/
EventTemplate getEopEventTemplate();
/**
* 绑定消费程序到对应的exchange和queue
*/
EventController add(String queueName, String exchangeName, EventProcesser eventProcesser);
/*in map, the key is queue name, but value is exchange name*/
EventController add(Map bindings, EventProcesser eventProcesser);
} 它的实现类如下:
/** * 和rabbitmq通信的控制器,主要负责: *1、和rabbitmq建立连接
*2、声明exChange和queue以及它们的绑定关系
*3、启动消息监听容器,并将不同消息的处理者绑定到对应的exchange和queue上
*4、持有消息发送模版以及所有exchange、queue和绑定关系的本地缓存
* @author yangyong * */ public class DefaultEventController implements EventController { private CachingConnectionFactory rabbitConnectionFactory; private EventControlConfig config; private RabbitAdmin rabbitAdmin; private CodecFactory defaultCodecFactory = new HessionCodecFactory(); private SimpleMessageListenerContainer msgListenerContainer; // rabbitMQ msg listener container private MessageAdapterHandler msgAdapterHandler = new MessageAdapterHandler(); private MessageConverter serializerMessageConverter = new SerializerMessageConverter(); // 直接指定 //queue cache, key is exchangeName private Mapexchanges = new HashMap (); //queue cache, key is queueName private Map queues = new HashMap (); //bind relation of queue to exchange cache, value is exchangeName | queueName private Set binded = new HashSet (); private EventTemplate eventTemplate; // 给App使用的Event发送客户端 private AtomicBoolean isStarted = new AtomicBoolean(false); private static DefaultEventController defaultEventController; public synchronized static DefaultEventController getInstance(EventControlConfig config){ if(defaultEventController==null){ defaultEventController = new DefaultEventController(config); } return defaultEventController; } private DefaultEventController(EventControlConfig config){ if (config == null) { throw new IllegalArgumentException("Config can not be null."); } this.config = config; initRabbitConnectionFactory(); // 初始化AmqpAdmin rabbitAdmin = new RabbitAdmin(rabbitConnectionFactory); // 初始化RabbitTemplate RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory); rabbitTemplate.setMessageConverter(serializerMessageConverter); eventTemplate = new DefaultEventTemplate(rabbitTemplate,defaultCodecFactory, this); } /** * 初始化rabbitmq连接 */ private void initRabbitConnectionFactory() { rabbitConnectionFactory = new CachingConnectionFactory(); rabbitConnectionFactory.setHost(config.getServerHost()); rabbitConnectionFactory.setChannelCacheSize(config.getEventMsgProcessNum()); rabbitConnectionFactory.setPort(config.getPort()); rabbitConnectionFactory.setUsername(config.getUsername()); rabbitConnectionFactory.setPassword(config.getPassword()); if (!StringUtils.isEmpty(config.getVirtualHost())) { rabbitConnectionFactory.setVirtualHost(config.getVirtualHost()); } } /** * 注销程序 */ public synchronized void destroy() throws Exception { if (!isStarted.get()) { return; } msgListenerContainer.stop(); eventTemplate = null; rabbitAdmin = null; rabbitConnectionFactory.destroy(); } @Override public void start() { if (isStarted.get()) { return; } Set mapping = msgAdapterHandler.getAllBinding(); for (String relation : mapping) { String[] relaArr = relation.split("\\|"); declareBinding(relaArr[1], relaArr[0]); } initMsgListenerAdapter(); isStarted.set(true); } /** * 初始化消息监听器容器 */ private void initMsgListenerAdapter(){ MessageListener listener = new MessageListenerAdapter(msgAdapterHandler,serializerMessageConverter); msgListenerContainer = new SimpleMessageListenerContainer(); msgListenerContainer.setConnectionFactory(rabbitConnectionFactory); msgListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO); msgListenerContainer.setMessageListener(listener); msgListenerContainer.setErrorHandler(new MessageErrorHandler()); msgListenerContainer.setPrefetchCount(config.getPrefetchSize()); // 设置每个消费者消息的预取值 msgListenerContainer.setConcurrentConsumers(config.getEventMsgProcessNum()); msgListenerContainer.setTxSize(config.getPrefetchSize());//设置有事务时处理的消息数 msgListenerContainer.setQueues(queues.values().toArray(new Queue[queues.size()])); msgListenerContainer.start(); } @Override public EventTemplate getEopEventTemplate() { return eventTemplate; } @Override public EventController add(String queueName, String exchangeName,EventProcesser eventProcesser) { return add(queueName, exchangeName, eventProcesser, defaultCodecFactory); } public EventController add(String queueName, String exchangeName,EventProcesser eventProcesser,CodecFactory codecFactory) { msgAdapterHandler.add(queueName, exchangeName, eventProcesser, defaultCodecFactory); if(isStarted.get()){ initMsgListenerAdapter(); } return this; } @Override public EventController add(Map bindings, EventProcesser eventProcesser) { return add(bindings, eventProcesser,defaultCodecFactory); } public EventController add(Map bindings, EventProcesser eventProcesser, CodecFactory codecFactory) { for(Map.Entry item: bindings.entrySet()) msgAdapterHandler.add(item.getKey(),item.getValue(), eventProcesser,codecFactory); return this; } /** * exchange和queue是否已经绑定 */ protected boolean beBinded(String exchangeName, String queueName) { return binded.contains(exchangeName+"|"+queueName); } /** * 声明exchange和queue已经它们的绑定关系 */ protected synchronized void declareBinding(String exchangeName, String queueName) { String bindRelation = exchangeName+"|"+queueName; if (binded.contains(bindRelation)) return; boolean needBinding = false; DirectExchange directExchange = exchanges.get(exchangeName); if(directExchange == null) { directExchange = new DirectExchange(exchangeName, true, false, null); exchanges.put(exchangeName, directExchange); rabbitAdmin.declareExchange(directExchange);//声明exchange needBinding = true; } Queue queue = queues.get(queueName); if(queue == null) { queue = new Queue(queueName, true, false, false); queues.put(queueName, queue); rabbitAdmin.declareQueue(queue); //声明queue needBinding = true; } if(needBinding) { Binding binding = BindingBuilder.bind(queue).to(directExchange).with(queueName);//将queue绑定到exchange rabbitAdmin.declareBinding(binding);//声明绑定关系 binded.add(bindRelation); } } }
搞定,现在可以将DefaultEventTemplate里的注释去掉了,接下来最后完成单元测试,为了测试传递对象,建立一个PO
@SuppressWarnings("serial")
public class People implements Serializable{
private int id;
private String name;
private boolean male;
private People spouse;
private List friends;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public boolean isMale() {
return male;
}
public void setMale(boolean male) {
this.male = male;
}
public People getSpouse() {
return spouse;
}
public void setSpouse(People spouse) {
this.spouse = spouse;
}
public List getFriends() {
return friends;
}
public void setFriends(List friends) {
this.friends = friends;
}
@Override
public String toString() {
// TODO Auto-generated method stub
return "People[id="+id+",name="+name+",male="+male+"]";
}
} 建立单元测试
public class RabbitMqTest{
private String defaultHost = "127.0.0.1";
private String defaultExchange = "EXCHANGE_DIRECT_TEST";
private String defaultQueue = "QUEUE_TEST";
private DefaultEventController controller;
private EventTemplate eventTemplate;
@Before
public void init() throws IOException{
EventControlConfig config = new EventControlConfig(defaultHost);
controller = DefaultEventController.getInstance(config);
eventTemplate = controller.getEopEventTemplate();
controller.add(defaultQueue, defaultExchange, new ApiProcessEventProcessor());
controller.start();
}
@Test
public void sendString() throws SendRefuseException{
eventTemplate.send(defaultQueue, defaultExchange, "hello world");
}
@Test
public void sendObject() throws SendRefuseException{
eventTemplate.send(defaultQueue, defaultExchange, mockObj());
}
@Test
public void sendTemp() throws SendRefuseException, InterruptedException{
String tempExchange = "EXCHANGE_DIRECT_TEST_TEMP";//以前未声明的exchange
String tempQueue = "QUEUE_TEST_TEMP";//以前未声明的queue
eventTemplate.send(tempQueue, tempExchange, mockObj());
//发送成功后此时不会接受到消息,还需要绑定对应的消费程序
controller.add(tempQueue, tempExchange, new ApiProcessEventProcessor());
}
@After
public void end() throws InterruptedException{
Thread.sleep(2000);
}
private People mockObj(){
People jack = new People();
jack.setId(1);
jack.setName("JACK");
jack.setMale(true);
List friends = new ArrayList<>();
friends.add(jack);
People hanMeiMei = new People();
hanMeiMei.setId(1);
hanMeiMei.setName("韩梅梅");
hanMeiMei.setMale(false);
hanMeiMei.setFriends(friends);
People liLei = new People();
liLei.setId(2);
liLei.setName("李雷");
liLei.setMale(true);
liLei.setFriends(friends);
liLei.setSpouse(hanMeiMei);
hanMeiMei.setSpouse(liLei);
return hanMeiMei;
}
class ApiProcessEventProcessor implements EventProcesser{
@Override
public void process(Object e) {//消费程序这里只是打印信息
Assert.assertNotNull(e);
System.out.println(e);
if(e instanceof People){
People people = (People)e;
System.out.println(people.getSpouse());
System.out.println(people.getFriends());
}
}
}
} 看完上述内容,你们掌握怎么在Java项目中利用rabbitMQ实现一个消息收发功能的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注创新互联行业资讯频道,感谢各位的阅读!
新闻名称:怎么在Java项目中利用rabbitMQ实现一个消息收发功能
文章转载:http://www.jxjierui.cn/article/ghpceg.html


咨询
建站咨询
