spring-cloud 集成 RabbitMQ

pom.xml


    4.0.0
    
        org.springframework.boot
        spring-boot-starter-parent
        2.2.4.RELEASE
         
    
    top.devshare.mq
    rabbit
    0.0.1-SNAPSHOT
    rabbit
    Demo project for Spring Boot

    
        1.8
        Hoxton.SR1
    

    
        
            org.springframework.cloud
            spring-cloud-starter
        

        
            org.springframework.boot
            spring-boot-starter-test
            test
            
                
                    org.junit.vintage
                    junit-vintage-engine
                
            
        

        
            org.springframework.boot
            spring-boot-starter-web
            provided
        

        
        
            org.springframework.boot
            spring-boot-starter-amqp
            2.2.4.RELEASE
        


    

    
        
            
                org.springframework.cloud
                spring-cloud-dependencies
                ${spring-cloud.version}
                pom
                import
            
        
    

    
        
            
                org.springframework.boot
                spring-boot-maven-plugin
            
        
    




application.yml
spring:
  application:
    name: rabbitmq-test
  # 以下都是MQ配置
  rabbitmq:
    host: 172.160.180.46
    port: 5672
    username: admin
    password: 123456
    virtual-host: /vhost_eric
    listener:
      simple:
        retry:
          enabled: true # 是否开启消费者重试(为false时关闭消费者重试,这时消费端代码异常会一直重复收到消息)
          max-attempts: 1 # 最大重试次数; 该数值必须大于0
          initial-interval: 5000 # 重试间隔时间(单位毫秒)
          max-interval: 1200000 # 重试最大时间间隔(单位毫秒)

创建 消费者(监听者) ConsumerTopicListener.java
package top.devshare.mq.rabbit;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

/**
 * 消费者
 */
@Component
public class ConsumerTopicListener {

    /**
     * 小米消费者
     * @param msg
     */
    @RabbitListener(bindings = @QueueBinding(
            // 提供一个消息队列名为:queue_xiaomi
            value = @Queue("queue_xiaomi"),
            // 将消息队列绑定到名为:topic_name_eric 的 Topic 上
            exchange = @Exchange(value = "topic_name_eric", type = ExchangeTypes.TOPIC),
            // 凡是 routingkey 以 "test" 开头的消息,都被路由到 queue_xiaomi 这个消息队列上
            key = "test.#"))
    @RabbitHandler
    public void xiaomiConsumer(String msg) {

        System.out.printf("|  小米有新的消息,请注意查收:|    %s\n", msg);
        System.out.println("+----------------------------------------------+");
    }

    /**
     * 华为消费者
     * @param msg
     */
    @RabbitListener(bindings = @QueueBinding(
            // 提供一个消息队列名为:queue_huawei
            value = @Queue("queue_huawei"),
            // 将消息队列绑定到名为:topic_name_eric 的 Topic 上
            exchange = @Exchange(value = "topic_name_eric", type = ExchangeTypes.TOPIC),
            // routingkey 为 "test.huawei" 的消息,都被路由到 queue_huawei 这个消息队列上
            key = "test.huawei"))
    @RabbitHandler
    public void huaweiConsumer(String msg) {

        System.out.printf("|  华为有新的消息,请注意查收:|    %s\n", msg);
        System.out.println("+----------------------------------------------+");
    }

}

创建 生产者 ProducerController.java
package top.devshare.mq.rabbit;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 生产者
 */
@RestController
public class ProducerController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/send")
    public void sendMessage() {

        // 向Topic名称为 topic_name_eric 中的队列发送消息, routingKey为test.huawei
        rabbitTemplate.convertAndSend("topic_name_eric", "test.huawei", "Hello RabbitMQ");
    }

}

启动程序 RabbitApplication.java
package top.devshare.mq.rabbit;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;

@SpringBootApplication
public class RabbitApplication {

    public static void main(String[] args) {
        SpringApplication.run(RabbitApplication.class, args);

        System.out.println("测试URL: http://localhost:8080/send");
        System.out.println("+----------------------------------------------+");
        System.out.println("|               持续等待接收消息                  |");
        System.out.println("+----------------------------------------------+");
    }

}


下载项目
分类: spring-boot

毛巳煜

高级软件开发全栈架构师

工信部备案号:辽ICP备17016257号-2