Commit 94cc8089 by feidy

调整rocketmq适配阿里云

	modified:   queue/rocketmq.go
	modified:   queue/rocketmq_test.go
parent 27f58e88
......@@ -2,7 +2,7 @@
* @Author: zhoufei
* @Date: 2022-04-22 14:58:25
* @LastEditors: zhoufei
* @LastEditTime: 2022-05-31 16:53:30
* @LastEditTime: 2022-07-18 14:54:24
* @FilePath: /utils/queue/rocketmq.go
* @Description:rocketmq 工具类
* 注意:rocketmq-client-go/v2 在1.18下会闪退,需要手动更改json-iterator/go 到 v1.1.12 (2022-4-25)
......@@ -29,8 +29,10 @@ type RocketMq struct {
}
type rocketOptions struct {
AccessKey string
SecretKey string
AccessKey string
SecretKey string
InstanceName string
Namespace string
}
type RocketOption func(*rocketOptions)
......@@ -45,6 +47,18 @@ func WithRocketAcl(access string, secret string) RocketOption {
}
}
func WithRocketInstanceName(name string) RocketOption {
return func(o *rocketOptions) {
o.InstanceName = name
}
}
func WithRocketNamespace(namespace string) RocketOption {
return func(o *rocketOptions) {
o.Namespace = namespace
}
}
func init() {
rlog.SetLogLevel("error")
}
......@@ -122,6 +136,7 @@ func (r *RocketMq) RegisterConsumer(topic string, recevieFunc func(qMsg *QueueMs
var tag string
selector := consumer.MessageSelector{}
if msgOptions.Tag != "" {
fmt.Println("TAG:" + msgOptions.Tag)
selector.Type = consumer.TAG
selector.Expression = msgOptions.Tag
tag = msgOptions.Tag
......@@ -131,6 +146,7 @@ func (r *RocketMq) RegisterConsumer(topic string, recevieFunc func(qMsg *QueueMs
}
err = r.consumer.Subscribe(topic, selector, func(_ context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
fmt.Println("topic:" + topic)
for _, msg := range msgs {
go recevieFunc(&QueueMsg{
Body: msg.Body,
......@@ -148,6 +164,7 @@ func (r *RocketMq) RegisterConsumer(topic string, recevieFunc func(qMsg *QueueMs
err = r.consumer.Start()
if err != nil {
fmt.Println(err.Error())
err = r.consumer.Unsubscribe(topic)
return err
}
......@@ -175,25 +192,43 @@ func (r *RocketMq) ShutDown() error {
func NewRocketMqProducer(endPoints []string, groupName string, retry int, options ...RocketOption) (p Producer, err error) {
if retry <= 0 {
retry = 0
retry = 3
}
opts := []producer.Option{producer.WithGroupName(groupName),
producer.WithNsResolver(primitive.NewPassthroughResolver(endPoints)),
producer.WithRetry(retry)}
opts := []producer.Option{
producer.WithNameServer(endPoints),
producer.WithRetry(retry),
}
ropts := &rocketOptions{}
for _, v := range options {
v(ropts)
}
if ropts.Namespace != "" {
opts = append(opts, producer.WithNamespace(ropts.Namespace))
}
if ropts.AccessKey != "" && ropts.SecretKey != "" {
opts = append(opts, producer.WithCredentials(primitive.Credentials{
credentials := primitive.Credentials{
AccessKey: ropts.AccessKey,
SecretKey: ropts.SecretKey,
}))
}
opts = append(opts, producer.WithCredentials(credentials))
// opts = append(opts, producer.WithTrace(&primitive.TraceConfig{
// GroupName: groupName,
// Access: primitive.Cloud,
// NamesrvAddrs: endPoints,
// Credentials: credentials,
// }))
}
// if ropts.InstanceName != "" {
// opts = append(opts, producer.WithInstanceName(ropts.InstanceName))
// }
client, err := rocketmq.NewProducer(opts...)
if err != nil {
......@@ -220,7 +255,7 @@ func NewRocketMqProducer(endPoints []string, groupName string, retry int, option
*/
func NewRocketMqConsumer(endPoints []string, groupName string, options ...RocketOption) (c Consumer, err error) {
opts := []consumer.Option{
consumer.WithNsResolver(primitive.NewPassthroughResolver(endPoints)),
consumer.WithNameServer(endPoints),
consumer.WithGroupName(groupName),
consumer.WithConsumerModel(consumer.Clustering)}
......@@ -229,16 +264,32 @@ func NewRocketMqConsumer(endPoints []string, groupName string, options ...Rocket
v(ropts)
}
if ropts.Namespace != "" {
opts = append(opts, consumer.WithNamespace(ropts.Namespace))
}
if ropts.AccessKey != "" && ropts.SecretKey != "" {
opts = append(opts, consumer.WithCredentials(primitive.Credentials{
credentials := primitive.Credentials{
AccessKey: ropts.AccessKey,
SecretKey: ropts.SecretKey,
}
opts = append(opts, consumer.WithCredentials(credentials))
opts = append(opts, consumer.WithTrace(&primitive.TraceConfig{
GroupName: groupName,
Access: primitive.Cloud,
NamesrvAddrs: endPoints,
Credentials: credentials,
}))
}
opts = append(opts, consumer.WithConsumeFromWhere(consumer.ConsumeFromLastOffset))
client, err := rocketmq.NewPushConsumer(opts...)
if err != nil {
return nil, err
}
......
......@@ -2,7 +2,7 @@
* @Author: zhoufei
* @Date: 2022-04-24 14:19:58
* @LastEditors: zhoufei
* @LastEditTime: 2022-05-05 14:32:59
* @LastEditTime: 2022-07-12 12:05:13
* @FilePath: /utils/queue/rocketmq_test.go
* @Description: rocketmq 测试
*
......@@ -115,14 +115,14 @@ func TestRocketMqConsumerWithTag(t *testing.T) {
}
func TestAliRocket(t *testing.T) {
p, err := NewRocketMqProducer([]string{"http://onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80"}, "test", 2,
WithRocketAcl("xxxxx", "xxxxxxxx"))
p, err := NewRocketMqProducer([]string{"http://onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80"}, "GID_SDC_DATA", 2,
WithRocketAcl("LTAI9JMKXLqYS07o", "PDKXVLPj3DrFWD8uM91dyh05qR7n12"))
if err != nil {
t.Fatal(err)
}
data := `{"a":"b"}`
msg, err := p.SendStringMsg("xxxx", data)
msg, err := p.SendStringMsg("sdc_data", data)
if err != nil {
t.Fatal(err)
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment