spring使用注解注入rabbitmq 注解怎么使用

博客分类:
本文不介绍AMQP和RabbitMQ的基础知识,请参考链接:
,介绍的非常详细。
本文主要通过一个小的demo,来举例说明如何使用spring-rabbit插件来实现RabbitMQ消息的发送和接收,发送端称为生产者,接收端称为消费者。
1. 给pom.xml文件中添加rabbitmq相关依赖
&properties&
&project.build.sourceEncoding&UTF-8&/project.build.sourceEncoding&
&spring.version&3.1.1.RELEASE&/spring.version&
&spring.rabbit.version&1.3.5.RELEASE&/spring.rabbit.version&
&/properties&
&dependencies&
&dependency&
&groupId&org.springframework&/groupId&
&artifactId&spring-context&/artifactId&
&version&${spring.version}&/version&
&/dependency&
&dependency&
&groupId&org.springframework.amqp&/groupId&
&artifactId&spring-rabbit&/artifactId&
&version&${spring.rabbit.version}&/version&
&/dependency&
&dependency&
&groupId&junit&/groupId&
&artifactId&junit&/artifactId&
&version&3.8.1&/version&
&scope&test&/scope&
&/dependency&
&dependency&
&groupId&com.google.protobuf&/groupId&
&artifactId&protobuf-java&/artifactId&
&version&2.5.0&/version&
&/dependency&
&dependency&
&groupId&org.slf4j&/groupId&
&artifactId&slf4j-log4j12&/artifactId&
&version&1.7.7&/version&
&/dependency&
&dependency&
&groupId&cglib&/groupId&
&artifactId&cglib&/artifactId&
&version&2.2.2&/version&
&/dependency&
&/dependencies&
上述protobuf-java依赖用于序列化和反序列化RabbitMQ的消息。
2. 生产者的xml配置文件
&?xml version="1.0" encoding="UTF-8"?&
&beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.1.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd"&
&context:property-placeholder location="classpath:rabbitmq.properties" /&
&!-- 使用annotation 自动注册bean,并保证@Required,@Autowired的属性被注入 --&
&context:component-scan base-package="com.tracy" /&
&!-- 创建rabbit ConnectionFactory --&
&rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" username="${rabbitmq.username}" password="${rabbitmq.password}" /&
&!-- 创建RabbitAdmin,用来管理exchange、queue、bindings --&
&rabbit:admin id="containerAdmin" connection-factory="connectionFactory" /&
&!-- 指定protobuf为消息队列格式 --&
&bean id="protoMessageConverter" class="com.tracy.rabbitmq.converter.ProtobufMessageConverter"&&/bean&
&!-- 创建发送消息模板auditTemplate --&
&rabbit:template id="auditTemplate" connection-factory="connectionFactory" exchange="${rabbitmq.exchange}" routing-key="${rabbitmq.routingKey}" message-converter="protoMessageConverter" /&
上述配置文件中,&rabbit:template&中的exchange声明将消息发送到名为ui_ex_test的交换器,routing-key指定消息应当路由到名为audit的队列,message-converter指定使用protobuf作为数据的交换格式。
连接RabbitMQ服务器的相关信息放到了rabbitmq.properties文件中,此文件位于src/main/resources的根目录下,具体内容为:
rabbitmq.host = 10.0.3.123
rabbitmq.username = guest
rabbitmq.password = guest
rabbitmq.exchange = ui_ex_test
rabbitmq.routingKey = audit
3. 生产者和消费者共用的proto格式约定
有关于protobuf的介绍,请参考本人的上一篇博文,地址
person_msg.proto文件内容为:
package com.tracy.rabbitmq.
option java_package = "com.tracy.rabbitmq.proto";
option java_outer_classname = "PersonMsgProtos";
message Person {
// ID(必需)
required int32 id = 1;
// 姓名(必需)
required string name = 2;
// email(可选)
optional string email = 3;
// 朋友(集合)
repeated string friends = 4;
4. 生产者主函数
package com.tracy.
import org.springframework.amqp.rabbit.core.RabbitT
import org.springframework.context.support.ClassPathXmlApplicationC
import com.tracy.rabbitmq.proto.PersonMsgP
* 发送消息主函数
* @author tracy_cui
public class Sender {
public static void main(String[] args) throws Exception {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("applicationContext-sender.xml");
RabbitTemplate template = (RabbitTemplate) context.getBean("auditTemplate");
// 按照定义的Proto结构,创建一个Person
PersonMsgProtos.Person.Builder personBuilder = PersonMsgProtos.Person.newBuilder();
personBuilder.setId(1);
personBuilder.setName("tracy");
personBuilder.setEmail("tracy_");
personBuilder.addFriends("wang");
personBuilder.addFriends("yang");
PersonMsgProtos.Person person = personBuilder.build();
// 将该Java对象发送给rabbit:template绑定的message-converter
template.convertAndSend(person);
5. 消息格式转换插件protobuf messageconverter
此插件由生产者和消费者公用,createMessage由生产者调用,convertProto2Object由消费者调用
package com.tracy.rabbitmq.
import org.springframework.amqp.core.M
import org.springframework.amqp.core.MessageP
import org.springframework.amqp.support.converter.AbstractMessageC
import org.springframework.amqp.support.converter.MessageConversionE
import com.google.protobuf.InvalidProtocolBufferE
import com.tracy.rabbitmq.proto.PersonMsgP
* ProtoBuf & object格式转换
* @author tracy_cui
public class ProtobufMessageConverter extends AbstractMessageConverter{
* object转换为ProtoBuf, 发送消息
public Message createMessage(Object object, MessageProperties messageProperties) {
System.out.println("发送转换的消息");
PersonMsgProtos.Person person = (PersonMsgProtos.Person)
byte[] byteArray = person.toByteArray();
Message message = new Message(byteArray, messageProperties);
public Object fromMessage(Message message) throws MessageConversionException {
* ProtoBuf转换为object, 接收消息
public Object convertProto2Object(Message message) throws InvalidProtocolBufferException{
byte[] byteArray = message.getBody();
PersonMsgProtos.Person parsePerson = PersonMsgProtos.Person.parseFrom(byteArray);
return parseP
6. 消费者的xml配置文件
&?xml version="1.0" encoding="UTF-8"?&
&beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.1.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.1.xsd" &
&context:property-placeholder location="classpath:rabbitmq.properties" /&
&!-- 使用annotation 自动注册bean,并保证@Required,@Autowired的属性被注入 --&
&context:component-scan base-package="com.tracy" /&
&!-- 创建rabbit ConnectionFactory --&
&rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" username="${rabbitmq.username}" password="${rabbitmq.password}" /&
&!-- 创建RabbitAdmin,用来管理exchange、queue、bindings --&
&rabbit:admin id="containerAdmin" connection-factory="connectionFactory" /&
&!-- 声明队列 --&
&rabbit:queue name="audit_queue" durable="false" exclusive="false" auto-delete="false" auto-declare="true"/&
&!-- 声明direct类型的交换器 --&
&rabbit:direct-exchange name="${rabbitmq.exchange}" durable="false" auto-delete="false" auto-declare="true"&
&!-- 将交换器与队列、路由key绑定 --&
&rabbit:bindings&
&rabbit:binding queue="audit_queue" key="${rabbitmq.routingKey}"&&/rabbit:binding&
&/rabbit:bindings&
&/rabbit:direct-exchange&
&!-- 声明两个监听器 --&
&bean id="auditListenerOne" class="com.tracy.rabbitmq.listener.AuditListenerOne" /&
&bean id="auditListenerTwo" class="com.tracy.rabbitmq.listener.AuditListenerTwo" /&
&!-- 指定protobuf为消息队列格式 --&
&bean id="protoMessageConverter" class="com.tracy.rabbitmq.converter.ProtobufMessageConverter"&&/bean&
&!-- 将两个监听器绑定到声明的队列中 --&
&rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" prefetch="1" message-converter="protoMessageConverter"&
&rabbit:listener ref="auditListenerOne" queue-names="audit_queue" /&
&rabbit:listener ref="auditListenerTwo" queue-names="audit_queue" /&
&/rabbit:listener-container&
&!-- 创建spring线程池,多线程对收到的数据进行处理 --&
&task:executor id="MessageQueue-Executor" pool-size="2-5" queue-capacity="50" rejection-policy="CALLER_RUNS" keep-alive="2000"/&
&task:annotation-driven executor="MessageQueue-Executor"/&
上述配置文件中,rabbit连接工厂的定义与生产者一致,消费者的配置与生产者的配置区别在于以下几点:
a. 定义了名称为audit_queue的队列,声明队列的作用是消费exchange中的消息;
b. 声明交换器,并与队列、路由key绑定,即将exchange收到的消息发送到bindkey=audit的队列中;
c. 声明了两个监听器,用于监听audit_queue中的消息;
d. 使用spring线程池对收到的数据进行处理。
7. 消费者的监听器
package com.tracy.rabbitmq.
import org.slf4j.L
import org.slf4j.LoggerF
import org.springframework.amqp.core.M
import org.springframework.amqp.core.MessageL
import org.springframework.beans.factory.annotation.A
* 监听rabbitMQ消息
* @author tracy_cui
public class AuditListenerOne implements MessageListener{
private static final Logger logger = LoggerFactory.getLogger(AuditListenerOne.class);
@Autowired
private AuditListenerHandler auditListenerH
public AuditListenerOne() {
("[****************] MessageQueue waiting for messages...");
public void onMessage(Message message) {
auditListenerHandler.handleMessage(message);
} catch (Exception e) {
e.printStackTrace();
8. 消费者对收到的数据进行处理
package com.tracy.rabbitmq.
import java.util.L
import org.slf4j.L
import org.slf4j.LoggerF
import org.springframework.amqp.core.M
import org.springframework.beans.factory.annotation.A
import org.springframework.scheduling.annotation.A
import org.
import com.tracy.rabbitmq.converter.ProtobufMessageC
import com.tracy.rabbitmq.proto.PersonMsgP
import com.google.protobuf.InvalidProtocolBufferE
* Message处理
* @author tracy_cui
@Component
public class AuditListenerHandler {
private static final Logger logger = LoggerFactory.getLogger(AuditListenerHandler.class);
@Autowired
private ProtobufMessageConverter messageC
* 使用Spring线程池
public void handleMessage(Message message) throws Exception{
("[****************] handleMessage thread : " + Thread.currentThread().getName());
PersonMsgProtos.Person person = this.convertMessage(message);
System.out.println("id : " + person.getId());
System.out.println("name : " + person.getName());
System.out.println("email : " + person.getEmail());
List&String& friendLists = person.getFriendsList();
for(String friend : friendLists){
System.out.println("friend :" + friend);
* 将ProtoBuf转换为Entity
private PersonMsgProtos.Person convertMessage(Message message){
PersonMsgProtos.Person person =
Object object = messageConverter.convertProto2Object(message);
if(object instanceof PersonMsgProtos.Person){
person = (PersonMsgProtos.Person)
logger.warn("[****************] object is not a instance of CreativeAuditProtos.ui_audit_t");
} catch (InvalidProtocolBufferException e) {
logger.warn("[****************] convert message error, InvalidProtocolBuffer");
e.printStackTrace();
消费者收到的数据截图:
本项目完整代码已使用git托管,地址:
谢谢 分享只是有个疑问 为啥你这没有实现 ProtobufMessageConverter的fromMessage 方法& @Override& & public Object fromMessage(Message message) throws MessageConversionException {& &&&& & }&
身心不坚强
浏览: 15274 次
来自: 西安
&div class=&quote_title ...2489人阅读
RabbitMQ是用erlang实现的消息队列系统,遵循AMQP(高级消息队列协议)协议。性能还有可扩展性优于其他相似的框架,当然,新星kafka也不错,大家可以上网查一下各种MQ框架的优缺点。
参考一下其他文章:http://blog.csdn.net/linsongbin1/article/details/
RabbitMQ主要有三种交换器:direct、fanout、topic
direct就是一对一传输
fanout就是匹配传输
topic就是主题分发传输
还是看代码吧
maven构建:pom.xml
&?xml version=&1.0& encoding=&UTF-8&?&
&project xmlns=&http://maven.apache.org/POM/4.0.0&
xmlns:xsi=&http://www.w3.org/2001/XMLSchema-instance&
xsi:schemaLocation=&http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd&&
&modelVersion&4.0.0&/modelVersion&
&groupId&com.xin&/groupId&
&artifactId&com.xin.rabbitmq&/artifactId&
&version&1.0-SNAPSHOT&/version&
&properties&
&!-- spring版本号 --&
&spring.version&3.2.8.RELEASE&/spring.version&
&!-- log4j日志文件管理包版本 --&
&slf4j.version&1.6.6&/slf4j.version&
&log4j.version&1.2.12&/log4j.version&
&!-- junit版本号 --&
&junit.version&4.10&/junit.version&
&/properties&
&dependencies&
&!-- 添加Spring依赖 --&
&dependency&
&groupId&org.springframework&/groupId&
&artifactId&spring-core&/artifactId&
&version&${spring.version}&/version&
&/dependency&
&dependency&
&groupId&org.springframework&/groupId&
&artifactId&spring-webmvc&/artifactId&
&version&${spring.version}&/version&
&/dependency&
&dependency&
&groupId&org.springframework&/groupId&
&artifactId&spring-context&/artifactId&
&version&${spring.version}&/version&
&/dependency&
&dependency&
&groupId&org.springframework&/groupId&
&artifactId&spring-context-support&/artifactId&
&version&${spring.version}&/version&
&/dependency&
&dependency&
&groupId&org.springframework&/groupId&
&artifactId&spring-aop&/artifactId&
&version&${spring.version}&/version&
&/dependency&
&dependency&
&groupId&org.springframework&/groupId&
&artifactId&spring-aspects&/artifactId&
&version&${spring.version}&/version&
&/dependency&
&dependency&
&groupId&org.springframework&/groupId&
&artifactId&spring-tx&/artifactId&
&version&${spring.version}&/version&
&/dependency&
&dependency&
&groupId&org.springframework&/groupId&
&artifactId&spring-jdbc&/artifactId&
&version&${spring.version}&/version&
&/dependency&
&dependency&
&groupId&org.springframework&/groupId&
&artifactId&spring-web&/artifactId&
&version&${spring.version}&/version&
&/dependency&
&!--单元测试依赖 --&
&dependency&
&groupId&junit&/groupId&
&artifactId&junit&/artifactId&
&version&${junit.version}&/version&
&scope&test&/scope&
&/dependency&
&!-- 日志文件管理包 --&
&!-- log start --&
&dependency&
&groupId&log4j&/groupId&
&artifactId&log4j&/artifactId&
&version&${log4j.version}&/version&
&/dependency&
&dependency&
&groupId&org.slf4j&/groupId&
&artifactId&slf4j-api&/artifactId&
&version&${slf4j.version}&/version&
&/dependency&
&dependency&
&groupId&org.slf4j&/groupId&
&artifactId&slf4j-log4j12&/artifactId&
&version&${slf4j.version}&/version&
&/dependency&
&!-- log end --&
&!--spring单元测试依赖 --&
&dependency&
&groupId&org.springframework&/groupId&
&artifactId&spring-test&/artifactId&
&version&${spring.version}&/version&
&scope&test&/scope&
&/dependency&
&!--rabbitmq依赖 --&
&dependency&
&groupId&org.springframework.amqp&/groupId&
&artifactId&spring-rabbit&/artifactId&
&version&1.3.5.RELEASE&/version&
&/dependency&
&dependency&
&groupId&javax.validation&/groupId&
&artifactId&validation-api&/artifactId&
&version&1.1.0.Final&/version&
&/dependency&
&dependency&
&groupId&org.hibernate&/groupId&
&artifactId&hibernate-validator&/artifactId&
&version&5.0.1.Final&/version&
&/dependency&
&/dependencies&
&resources&
&resource&
&directory&src/main/resources&/directory&
&targetPath&${basedir}/target/classes&/targetPath&
&includes&
&include&**/*.properties&/include&
&include&**/*.xml&/include&
&/includes&
&filtering&true&/filtering&
&/resource&
&resource&
&directory&src/main/resources&/directory&
&targetPath&${basedir}/target/resources&/targetPath&
&includes&
&include&**/*.properties&/include&
&include&**/*.xml&/include&
&/includes&
&filtering&true&/filtering&
&/resource&
&/resources&
&groupId&org.apache.maven.plugins&/groupId&
&artifactId&maven-compiler-plugin&/artifactId&
&configuration&
&source&1.6&/source&
&target&1.6&/target&
&encoding&UTF-8&/encoding&
&/configuration&
&groupId&org.apache.maven.plugins&/groupId&
&artifactId&maven-war-plugin&/artifactId&
&version&2.1.1&/version&
&configuration&
&warSourceExcludes&${warExcludes}&/warSourceExcludes&
&/configuration&
&groupId&org.apache.maven.plugins&/groupId&
&artifactId&maven-surefire-plugin&/artifactId&
&version&2.4.3&/version&
&configuration&
&testFailureIgnore&true&/testFailureIgnore&
&/configuration&
&inherited&true&/inherited&
&groupId&org.apache.maven.plugins&/groupId&
&artifactId&maven-source-plugin&/artifactId&
&executions&
&execution&
&id&attach-sources&/id&
&goal&jar&/goal&
&/execution&
&/executions&
&groupId&org.apache.maven.plugins&/groupId&
&artifactId&maven-resources-plugin&/artifactId&
&configuration&
&encoding&UTF-8&/encoding&
&/configuration&
&/plugins&
&/project&
log4j的配置文件:log4j.properties
log4j.rootLogger=DEBUG,Console,Stdout
log4j.appender.Console=org.apache.log4j.ConsoleAppender
log4j.appender.Console.layout=org.apache.log4j.PatternLayout
log4j.appender.Console.layout.ConversionPattern=%d [%t] %-5p [%c] - %m%n
log4j.logger.java.sql.ResultSet=INFO
log4j.logger.org.apache=INFO
log4j.logger.java.sql.Connection=DEBUG
log4j.logger.java.sql.Statement=DEBUG
log4j.logger.java.sql.PreparedStatement=DEBUG
log4j.appender.Stdout = org.apache.log4j.DailyRollingFileAppender
log4j.appender.Stdout.File = E://logs/log.log
log4j.appender.Stdout.Append = true
log4j.appender.Stdout.Threshold = DEBUG
log4j.appender.Stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.Stdout.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss}
[ %t:%r ] - [ %p ]
spring的配置文件:application.xml
&?xml version=&1.0& encoding=&UTF-8&?&
&beans xmlns=&http://www.springframework.org/schema/beans&
xmlns:xsi=&http://www.w3.org/2001/XMLSchema-instance& xmlns:context=&http://www.springframework.org/schema/context&
xmlns:p=&http://www.springframework.org/schema/p&
xsi:schemaLocation=&http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd&&
&import resource=&classpath*:rabbitMq.xml& /&
&!-- 扫描指定package下所有带有如@controller,@services,@resource,@ods并把所注释的注册为Spring Beans --&
&context:component-scan base-package=&com.xin.consumer,com.xin.producer& /&
&!-- 激活annotation功能 --&
&context:annotation-config /&
&!-- 激活annotation功能 --&
&context:spring-configured /&
rabbitMq的配置文件:rabbitMq.xml
&?xml version=&1.0& encoding=&UTF-8&?&
&beans xmlns=&http://www.springframework.org/schema/beans&
xmlns:xsi=&http://www.w3.org/2001/XMLSchema-instance& xmlns:rabbit=&http://www.springframework.org/schema/rabbit&
xsi:schemaLocation=&http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd&&
&!--配置connection-factory,指定连接rabbit server参数 --&
&rabbit:connection-factory id=&connectionFactory&
username=&guest& password=&guest& host=&localhost& port=&5672& /&
&!--direct 一对一传输--&
&!--定义rabbit template用于数据的接收和发送 --&
&rabbit:template id=&amqpTemplate&
connection-factory=&connectionFactory&
exchange=&exchangeTest& /&
&!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 --&
&rabbit:admin connection-factory=&connectionFactory& /&
&!--定义queue --&
&rabbit:queue name=&queueTest& durable=&true& auto-delete=&false& exclusive=&false& /&
&!-- 定义direct exchange,绑定queueTest --&
&rabbit:direct-exchange name=&exchangeTest& durable=&true& auto-delete=&false&&
&rabbit:bindings&
&rabbit:binding queue=&queueTest& key=&queueTestKey&&&/rabbit:binding&
&/rabbit:bindings&
&/rabbit:direct-exchange&
&!-- 消息接收者 --&
&bean id=&messageReceiver& class=&com.xin.consumer.MessageConsumer&&&/bean&
&!-- queue litener
观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象--&
&rabbit:listener-container connection-factory=&connectionFactory&&
&rabbit:listener queues=&queueTest& ref=&messageReceiver&/&
&/rabbit:listener-container&
&!--topic 匹配传输--&
&rabbit:template id=&topicTemplate&
connection-factory=&connectionFactory&
exchange=&topicExchange&/&
&!--定义queue --&
&rabbit:queue name=&topicQueue& durable=&true& auto-delete=&false& exclusive=&false& /&
&!--topic交换器 pattern与topicTemplate.convertAndSend(&foo.bar&,message);第一个参数匹配--&
&rabbit:topic-exchange name=&topicExchange&&
&rabbit:bindings&
&rabbit:binding queue=&topicQueue& pattern=&info.*& /&
&/rabbit:bindings&
&/rabbit:topic-exchange&
&rabbit:listener-container connection-factory=&connectionFactory&&
&rabbit:listener ref=&topic1& queues=&topicQueue&/&
&/rabbit:listener-container&
&bean id=&topic1& class=&com.xin.consumer.TopicConsumer&/&
&!--fanout 把一条消息通过多条队列传输出去--&
&rabbit:template id=&fanoutTemplate&
connection-factory=&connectionFactory&
exchange=&fanoutExchange&/&
&!--定义queue --&
&rabbit:queue name=&fanoutQueue& durable=&true& auto-delete=&false& exclusive=&false& /&
&rabbit:queue name=&fanoutQueue2& durable=&true& auto-delete=&false& exclusive=&false& /&
&!--topic交换器--&
&rabbit:fanout-exchange name=&fanoutExchange&&
&rabbit:bindings&
&rabbit:binding queue=&fanoutQueue&&&/rabbit:binding&
&rabbit:binding queue=&fanoutQueue2&&&/rabbit:binding&
&/rabbit:bindings&
&/rabbit:fanout-exchange&
&rabbit:listener-container connection-factory=&connectionFactory&&
&rabbit:listener ref=&fanoutConsumer& queues=&fanoutQueue&/&
&rabbit:listener ref=&fanoutConsumer2& method=&foo& queues=&fanoutQueue2&/&
&/rabbit:listener-container&
&bean id=&fanoutConsumer& class=&com.xin.consumer.FanoutConsumer&/&
&bean id=&fanoutConsumer2& class=&com.xin.consumer.FanoutConsumer2&/&
实现direct交换器的传输:
生产端:MessageProducer
package com.xin.
import org.slf4j.L
import org.slf4j.LoggerF
import org.springframework.amqp.core.AmqpT
import org.springframework.stereotype.S
import javax.annotation.R
* Created by lhx on
* @Description
public class MessageProducer {
private Logger logger = LoggerFactory.getLogger(MessageProducer.class);
private AmqpTemplate amqpT
public void sendMessage(Object message){
(&to send message:{}&,message);
amqpTemplate.convertAndSend(&queueTestKey&,message);
消费端:MessageConsumer
package com.xin.
import com.xin.producer.MessageP
import org.slf4j.L
import org.slf4j.LoggerF
import org.springframework.amqp.core.M
import org.springframework.amqp.core.MessageL
import org.springframework.context.support.AbstractApplicationC
import org.springframework.context.support.ClassPathXmlApplicationC
* Created by lhx on
* @Description 消费接收
public class MessageConsumer implements MessageListener {
private Logger logger = LoggerFactory.getLogger(MessageConsumer.class);
public void onMessage(Message message) {
(&receive message:{}&,message);
System.out.println(new String(message.getBody()));
public static void main(final String... args) throws Exception {
AbstractApplicationContext ctx = new ClassPathXmlApplicationContext(&application.xml&);
MessageProducer messageProducer = (MessageProducer) ctx.getBean(&messageProducer&);
messageProducer.sendMessage(&-------------hello,xin^^^^^^66666!&);
Thread.sleep(1000);
ctx.destroy();
运行main函数,服务端推消息到客户端,客户端就会收到message,打印出来。
10:29:27,659 [main] INFO
[com.xin.producer.TopicProducer] - to send message:++++++++++++++++++hello,xin^^^^^^66666!hello,xin
10:29:27,676 [SimpleAsyncTaskExecutor-1] DEBUG [org.springframework.amqp.rabbit.listener.BlockingQueueConsumer] - Received message: (Body:'++++++++++++++++++hello,xin^^^^^^66666!hello,xin'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=topicExchange, receivedRoutingKey=info.debug, deliveryTag=1, messageCount=0])
(Body:'++++++++++++++++++hello,xin^^^^^^66666!hello,xin'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=topicExchange, receivedRoutingKey=info.debug, deliveryTag=1, messageCount=0])
++++++++++++++++++hello,xin^^^^^^66666!hello,xin&
其他交换器的实现类似,请参考我的git项目:/888xin/rabbitmq
&&相关文章推荐
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:161934次
积分:2107
积分:2107
排名:第18944名
原创:57篇
转载:16篇
评论:30条
(3)(2)(3)(4)(1)(1)(2)(1)(2)(3)(15)(8)(2)(5)(2)(1)(1)(8)(9)
(window.slotbydup = window.slotbydup || []).push({
id: '4740887',
container: s,
size: '250,250',
display: 'inlay-fix'

我要回帖

更多关于 spring注解不能使用 的文章

 

随机推荐