Spring RabbitMQ
spring项目使用rabbitmq 1. 项目依赖
2. 配置文件
<!-- 声明rabbitmq连接 -->
<rabbit:connection-factory id="rabbitmqConnectionFactory"
host="${rabbitmq.host}"
virtual-host="${rabbitmq.vhost}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"/>
<!-- 创建rabbitmq admin,用来管理queue、exchanger、binding -->
<rabbit:admin connection-factory="rabbitmqConnectionFactory"/>
<!-- 声明队列 -->
<rabbit:queue id="queue-bean-1" durable="true" name="${rabbitmq.queuename1}"/>
<rabbit:queue id="queue-bean-2" durable="true" name="${rabbitmq.queuename2}"/>
<!-- 声明exchange,这里使用的direct-exchange,与rabbitmq服务器的exchange类型对应 -->
<rabbit:direct-exchange name="${rabbitmq.exchange.name}">
<rabbit:bindings>
<rabbit:binding queue="queue-bean-1" key="${rabbitmq.route.key1}"></rabbit:binding>
<rabbit:binding queue="queue-bean-2" key="${rabbitmq.route.key2}"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 声明消息消费者(监听者) -->
<bean id="rabbitmqListener1" class="com.zmannotes.spring.mq.listener.RabbitMQListener1"/>
<bean id="rabbitmqListener2" class="com.zmannotes.spring.mq.listener.RabbitMQListener2"/>
<!-- 声明消费者容器,并将queue和listener关联,此处指定自动返回ack -->
<rabbit:listener-container
connection-factory="rabbitmqConnectionFactory" acknowledge="auto">
<rabbit:listener queues="queue-bean-1" ref="rabbitmqListener1"
method="onMessage"/>
<rabbit:listener queues="queue-bean-2" ref="rabbitmqListener2"
method="onMessage"/>
</rabbit:listener-container>
<!-- 消息生产者 -->
<rabbit:template id="rabbitmqTemplate" connection-factory="rabbitmqConnectionFactory"
reply-timeout="2000" routing-key="remoting.binding"
exchange="rabbitmqExchange"/>
3. 实现XXXListener
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
public class RabbitMQListener implements ChannelAwareMessageListener {
public void onMessage(Message message, Channel channel) throws Exception {
String msg = new String(message.getBody());
System.out.println( msg );
}
}
4. 发送消息
void send(Message message) throws AmqpException;
void send(String routingKey, Message message) throws AmqpException;
void send(String exchange, String routingKey, Message message) throws AmqpException;