Commit 551bae76 by feidy

QueueMsg add “tag” field

parent 0a9cacaf
......@@ -2,7 +2,7 @@
* @Author: zhoufei
* @Date: 2022-04-22 15:00:32
* @LastEditors: zhoufei
* @LastEditTime: 2022-04-26 10:42:17
* @LastEditTime: 2022-05-06 14:34:13
* @FilePath: /utils/queue/mq.go
* @Description: 消息队列interface
*
......@@ -24,6 +24,7 @@ type QueueMsg struct {
Topic string `json:"topic"`
Body []byte `json:"body"`
Info map[string]interface{} `json:"info"`
Tag string `json:"tag"`
}
type msgOptions struct {
......
......@@ -2,7 +2,7 @@
* @Author: zhoufei
* @Date: 2022-04-22 14:58:25
* @LastEditors: zhoufei
* @LastEditTime: 2022-04-26 10:40:57
* @LastEditTime: 2022-05-06 14:36:10
* @FilePath: /utils/queue/rocketmq.go
* @Description:rocketmq 工具类
* 注意:rocketmq-client-go/v2 在1.18下会闪退,需要手动更改json-iterator/go 到 v1.1.12 (2022-4-25)
......@@ -62,8 +62,11 @@ func (r *RocketMq) SendMsg(topic string, msg []byte, opts ...MsgOption) (qMsg *Q
opt(msgOptions)
}
var tag string
if msgOptions.Tag != "" {
rMsg.WithTag(msgOptions.Tag)
tag = msgOptions.Tag
}
if msgOptions.UserProperties != nil {
......@@ -82,6 +85,7 @@ func (r *RocketMq) SendMsg(topic string, msg []byte, opts ...MsgOption) (qMsg *Q
Body: []byte(msg),
Topic: topic,
Info: map[string]interface{}{"msgId": resp.MsgID},
Tag: tag,
}
return qmsg, err
}
......@@ -110,11 +114,12 @@ func (r *RocketMq) RegisterConsumer(topic string, recevieFunc func(qMsg *QueueMs
for _, opt := range opts {
opt(msgOptions)
}
var tag string
selector := consumer.MessageSelector{}
if msgOptions.Tag != "" {
selector.Type = consumer.TAG
selector.Expression = msgOptions.Tag
tag = msgOptions.Tag
} else if msgOptions.Sql92 != "" {
selector.Type = consumer.SQL92
selector.Expression = msgOptions.Sql92
......@@ -126,6 +131,7 @@ func (r *RocketMq) RegisterConsumer(topic string, recevieFunc func(qMsg *QueueMs
Body: msg.Body,
Topic: topic,
Info: map[string]interface{}{"msgId": msg.MsgId},
Tag: tag,
})
}
return consumer.ConsumeSuccess, nil
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment