Commit a2afd2c5 by feidy

添加tag,userproperties,sql92的可配置项

parent 1ca94e0d
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
* @Author: zhoufei * @Author: zhoufei
* @Date: 2022-04-22 15:00:32 * @Date: 2022-04-22 15:00:32
* @LastEditors: zhoufei * @LastEditors: zhoufei
* @LastEditTime: 2022-04-24 18:02:52 * @LastEditTime: 2022-04-26 10:42:17
* @FilePath: /utils/queue/mq.go * @FilePath: /utils/queue/mq.go
* @Description: 消息队列interface * @Description: 消息队列interface
* *
...@@ -11,12 +11,12 @@ ...@@ -11,12 +11,12 @@
package queue package queue
type Producer interface { type Producer interface {
SendMsg(topic string, msg []byte) (qMsg *QueueMsg, err error) SendMsg(topic string, msg []byte, opts ...MsgOption) (qMsg *QueueMsg, err error)
SendStringMsg(topic string, msg string) (qMsg *QueueMsg, err error) SendStringMsg(topic string, msg string, opts ...MsgOption) (qMsg *QueueMsg, err error)
ShutDown() error ShutDown() error
} }
type Consumer interface { type Consumer interface {
RegisterConsumer(topic string, recevieFunc func(qMsg *QueueMsg)) (err error) RegisterConsumer(topic string, recevieFunc func(qMsg *QueueMsg), opts ...MsgOption) (err error)
ShutDown() error ShutDown() error
} }
...@@ -25,3 +25,50 @@ type QueueMsg struct { ...@@ -25,3 +25,50 @@ type QueueMsg struct {
Body []byte `json:"body"` Body []byte `json:"body"`
Info map[string]interface{} `json:"info"` Info map[string]interface{} `json:"info"`
} }
type msgOptions struct {
//such as "tag1 || tag2 || tag3"
Tag string
// just for rocketmq
// for sql92,example {"a":"20","b":"true"}
UserProperties map[string]string
// just for rocketmq
// example (a > 10 AND a < 100) OR (b IS NOT NULL AND b=TRUE)
Sql92 string
}
type MsgOption func(*msgOptions)
/**
* @description:
* @param {string} tag
* @return {*}
*/
func WithMsgTag(tag string) MsgOption {
return func(mo *msgOptions) {
if tag == "" {
return
}
mo.Tag = tag
}
}
func WithSql92(sql string) MsgOption {
return func(mo *msgOptions) {
if sql == "" {
return
}
mo.Sql92 = sql
}
}
func WithUserProperties(p map[string]string) MsgOption {
return func(mo *msgOptions) {
if p == nil {
return
}
mo.UserProperties = p
}
}
...@@ -2,10 +2,10 @@ ...@@ -2,10 +2,10 @@
* @Author: zhoufei * @Author: zhoufei
* @Date: 2022-04-22 14:58:25 * @Date: 2022-04-22 14:58:25
* @LastEditors: zhoufei * @LastEditors: zhoufei
* @LastEditTime: 2022-04-24 18:48:25 * @LastEditTime: 2022-04-26 10:40:57
* @FilePath: /utils/queue/rocketmq.go * @FilePath: /utils/queue/rocketmq.go
* @Description:rocketmq 工具类 * @Description:rocketmq 工具类
* * 注意:rocketmq-client-go/v2 在1.18下会闪退,需要手动更改json-iterator/go 到 v1.1.12 (2022-4-25)
* Copyright (c) 2022 by zhoufei, All Rights Reserved. * Copyright (c) 2022 by zhoufei, All Rights Reserved.
*/ */
package queue package queue
...@@ -34,13 +34,13 @@ type rocketOptions struct { ...@@ -34,13 +34,13 @@ type rocketOptions struct {
type RocketOption func(*rocketOptions) type RocketOption func(*rocketOptions)
func WithRocketAcl(username string, password string) RocketOption { func WithRocketAcl(access string, secret string) RocketOption {
return func(o *rocketOptions) { return func(o *rocketOptions) {
if username == "" || password == "" { if access == "" || secret == "" {
return return
} }
o.AccessKey = username o.AccessKey = access
o.SecretKey = password o.SecretKey = secret
} }
} }
...@@ -50,12 +50,27 @@ func WithRocketAcl(username string, password string) RocketOption { ...@@ -50,12 +50,27 @@ func WithRocketAcl(username string, password string) RocketOption {
* @param {[]byte} msg * @param {[]byte} msg
* @return {*} * @return {*}
*/ */
func (r *RocketMq) SendMsg(topic string, msg []byte) (qMsg *QueueMsg, err error) { func (r *RocketMq) SendMsg(topic string, msg []byte, opts ...MsgOption) (qMsg *QueueMsg, err error) {
if r.producer == nil { if r.producer == nil {
return nil, errors.New("no producer available") return nil, errors.New("no producer available")
} }
resp, err := r.producer.SendSync(context.Background(), primitive.NewMessage(topic, msg)) rMsg := primitive.NewMessage(topic, msg)
msgOptions := &msgOptions{}
for _, opt := range opts {
opt(msgOptions)
}
if msgOptions.Tag != "" {
rMsg.WithTag(msgOptions.Tag)
}
if msgOptions.UserProperties != nil {
rMsg.WithProperties(msgOptions.UserProperties)
}
resp, err := r.producer.SendSync(context.Background(), rMsg)
if err != nil { if err != nil {
return return
...@@ -77,8 +92,8 @@ func (r *RocketMq) SendMsg(topic string, msg []byte) (qMsg *QueueMsg, err error) ...@@ -77,8 +92,8 @@ func (r *RocketMq) SendMsg(topic string, msg []byte) (qMsg *QueueMsg, err error)
* @param {string} msg * @param {string} msg
* @return {*} * @return {*}
*/ */
func (r *RocketMq) SendStringMsg(topic string, msg string) (qMsg *QueueMsg, err error) { func (r *RocketMq) SendStringMsg(topic string, msg string, opts ...MsgOption) (qMsg *QueueMsg, err error) {
return r.SendMsg(topic, []byte(msg)) return r.SendMsg(topic, []byte(msg), opts...)
} }
/** /**
...@@ -86,12 +101,26 @@ func (r *RocketMq) SendStringMsg(topic string, msg string) (qMsg *QueueMsg, err ...@@ -86,12 +101,26 @@ func (r *RocketMq) SendStringMsg(topic string, msg string) (qMsg *QueueMsg, err
* @param {*QueueMsg} qMsg * @param {*QueueMsg} qMsg
* @return {*} * @return {*}
*/ */
func (r *RocketMq) RegisterConsumer(topic string, recevieFunc func(qMsg *QueueMsg)) (err error) { func (r *RocketMq) RegisterConsumer(topic string, recevieFunc func(qMsg *QueueMsg), opts ...MsgOption) (err error) {
if r.consumer == nil { if r.consumer == nil {
return errors.New("no consumer available") return errors.New("no consumer available")
} }
err = r.consumer.Subscribe(topic, consumer.MessageSelector{}, func(_ context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { msgOptions := &msgOptions{}
for _, opt := range opts {
opt(msgOptions)
}
selector := consumer.MessageSelector{}
if msgOptions.Tag != "" {
selector.Type = consumer.TAG
selector.Expression = msgOptions.Tag
} else if msgOptions.Sql92 != "" {
selector.Type = consumer.SQL92
selector.Expression = msgOptions.Sql92
}
err = r.consumer.Subscribe(topic, selector, func(_ context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for _, msg := range msgs { for _, msg := range msgs {
go recevieFunc(&QueueMsg{ go recevieFunc(&QueueMsg{
Body: msg.Body, Body: msg.Body,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment