前置资料

理解 RabbitMQ 工作原理


使用Go语言操作 RabbitMQ 客户端

MQ 客户端分为两种:推送者接收者

安装RabbitMQ服务端


持久化

持久化要注意3个部分

  1. 队列持久化 在声明队列配置,见CreateQueue()方法
  2. 消息持久化 在推送消息时配置,见PublishMessageQueue()方法
  3. 在容器中持久化
    3.1 改docker-compsoe的配置 hostname: eric_os
    3.2 路径映射 ./data:/var/lib/rabbitmq

connectmq.go 统一链接配置
package rabbitmq_queue

import (
    "fmt"
    amqp "github.com/rabbitmq/amqp091-go"
    "iris-server-mq/commons/tools"
    "os"
)

type RabbitMQ struct {
    conn *amqp.Connection // 连接成功后的MQ对象
    ch   *amqp.Channel    // 通道
}

var mqURI string

// NewRabbitMQ 初始化MQ
func NewRabbitMQ() *RabbitMQ {

    // 使用环境变量配置,方便容器化
    uri := fmt.Sprintf("amqp://%s:%s@%s:%s/%s",
        os.Getenv("MQ_USER"),
        os.Getenv("MQ_PASSWORD"),
        os.Getenv("MQ_HOST"),
        os.Getenv("MQ_PORT"),
        os.Getenv("MQ_VHOST"),
    )

    // 创建连接
    conn, err := amqp.Dial(uri)
    tools.FailOnError(err, "操作失败,未链接到RabbitMQ")
    return &RabbitMQ{
        conn: conn,
    }
}

// CreateChannel 建立通道
func (mq *RabbitMQ) CreateChannel() {
    // 建立通道
    ch, err := mq.conn.Channel()
    tools.FailOnError(err, "操作失败,不能开启一个通道")
    mq.ch = ch
}

// CloseRabbitMQ 关闭链接
func (mq *RabbitMQ) CloseRabbitMQ() {

    mq.ch.Close()
    mq.conn.Close()
}

// ----------------------------------------------------------

// CreateQueue 声明队列
func (mq *RabbitMQ) CreateQueue() {

    // 声明队列 (名称和类型需要与存在的队列保持一致)
    _, err := mq.ch.QueueDeclare(
        "message-queue",
        true,  // durable 开启队列持久化
        false, // auto-deleted
        false, // internal
        false, // no-wait
        nil,
    )
    tools.FailOnError(err, "")
}


producer.go 生产者
// 发布消息,生产者

package rabbitmq_queue

import (
    "fmt"
    amqp "github.com/rabbitmq/amqp091-go"
    "iris-server-mq/commons/tools"
)

// PublishMessageQueue 发布消息到消息队列
func (mq *RabbitMQ) PublishMessageQueue() {

    message := fmt.Sprint(tools.Now(), " --=> 您有新的【Queue】消息,请注意查收!")

    // 发布消息到指定的消息队列
    err := mq.ch.Publish(
        "",              // exchange
        "message-queue", // routing key (根据使用的交换机类型可选择的是否需要routing key),如果不选择交换机,该参数为消息队列名称
        false,           // mandatory
        false,           // immediate
        amqp.Publishing{
            DeliveryMode: 2, // 消息持久化
            ContentType:  "text/plain",
            Body:         []byte(message),
        },
    )
    tools.FailOnError(err, "")
}


consumer.go 消费者
// 订阅消息,消费者

package rabbitmq_queue

import (
    "iris-server-mq/commons/tools"
    "log"
    "time"
)

func (mq *RabbitMQ) ConsumeMessage() {
    // 创建消费者并消费指定消息队列中的消息
    msgs, err := mq.ch.Consume(
        "message-queue", // message-queue
        "",              // consumer
        false,           // autoAck 设置为非自动确认(可根据需求自己选择)
        false,           // exclusive
        false,           // no-local
        false,           // no-wait
        nil,             // args
    )
    tools.FailOnError(err, "")

    // 获取消息队列中的消息
    forever := make(chan bool)
    go func() {
        for d := range msgs {
            // 延迟6秒在进行消费
            time.Sleep(6e9)
            log.Printf("收到消息: %s", d.Body)
            // 手动回复ack
            d.Ack(false)
        }
    }()
    log.Printf(" [消费者] 正在等待消息... ")
    <-forever
}


main_test.go 测试类
package main

import (
    "github.com/kataras/iris/v12"
    "iris-server-mq/commons/mq/rabbitmq_queue"
    "iris-server-mq/commons/tools"
    "os"
)

func init() {
    // 数据库环境变量(构建时删除)
    os.Setenv("MQ_USER", "mao_siyu")
    os.Setenv("MQ_PASSWORD", "******")
    os.Setenv("MQ_HOST", "eric.rabbitmq.com")
    os.Setenv("MQ_PORT", "5672")
    os.Setenv("MQ_VHOST", "eric_vhost")
}

func main() {
    app := iris.Default()

    mq := rabbitmq_queue.NewRabbitMQ()
    mq.CreateChannel()
    mq.CreateQueue()
    // 向【Queue】推送消息
    go tools.SetInterval(2e9, func() {
        go mq.PublishMessageQueue()
    })
    // 从【Queue】中消费消息,内置定时器每6秒消费一次
    go mq.ConsumeMessage()

    // 启动服务
    app.Listen(":8080")
}




项目地址



分类: Go

毛巳煜

高级软件开发全栈架构师

工信部备案号:辽ICP备17016257号-2