Commit 9d3b252e by feidy

add rocketmq and rocketmq_test

parent 92e3da8e
......@@ -3,6 +3,7 @@ module gits.shuoren.com/go/utils
go 1.17
require (
github.com/apache/rocketmq-client-go/v2 v2.1.0
github.com/go-playground/assert/v2 v2.0.1
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/qiniu/qmgo v1.0.6
......@@ -13,14 +14,24 @@ require (
require (
github.com/BurntSushi/toml v1.0.0 // indirect
github.com/emirpasic/gods v1.12.0 // indirect
github.com/go-playground/locales v0.13.0 // indirect
github.com/go-playground/universal-translator v0.17.0 // indirect
github.com/go-playground/validator/v10 v10.4.1 // indirect
github.com/go-stack/stack v1.8.0 // indirect
github.com/golang/mock v1.3.1 // indirect
github.com/golang/snappy v0.0.1 // indirect
github.com/json-iterator/go v1.1.9 // indirect
github.com/klauspost/compress v1.13.6 // indirect
github.com/konsorten/go-windows-terminal-sequences v1.0.1 // indirect
github.com/leodido/go-urn v1.2.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/sirupsen/logrus v1.4.1 // indirect
github.com/tidwall/gjson v1.14.1 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.0.2 // indirect
github.com/xdg-go/stringprep v1.0.2 // indirect
......@@ -32,4 +43,5 @@ require (
golang.org/x/sys v0.0.0-20210510120138-977fb7262007 // indirect
golang.org/x/text v0.3.5 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
stathat.com/c/consistent v1.0.0 // indirect
)
/*
* @Author: zhoufei
* @Date: 2022-04-22 15:00:32
* @LastEditors: zhoufei
* @LastEditTime: 2022-04-24 18:02:52
* @FilePath: /utils/queue/mq.go
* @Description: 消息队列interface
*
* Copyright (c) 2022 by zhoufei, All Rights Reserved.
*/
package queue
type Producer interface {
SendMsg(topic string, msg []byte) (qMsg *QueueMsg, err error)
SendStringMsg(topic string, msg string) (qMsg *QueueMsg, err error)
ShutDown() error
}
type Consumer interface {
RegisterConsumer(topic string, recevieFunc func(qMsg *QueueMsg)) (err error)
ShutDown() error
}
type QueueMsg struct {
Topic string `json:"topic"`
Body []byte `json:"body"`
Info map[string]interface{} `json:"info"`
}
/*
* @Author: zhoufei
* @Date: 2022-04-22 14:58:25
* @LastEditors: zhoufei
* @LastEditTime: 2022-04-24 17:46:43
* @FilePath: /utils/queue/rocketmq.go
* @Description:rocketmq 工具类
*
* Copyright (c) 2022 by zhoufei, All Rights Reserved.
*/
package queue
import (
"context"
"errors"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)
type RocketMq struct {
endPoints []string
producer rocketmq.Producer
consumer rocketmq.PushConsumer
}
type rocketOptions struct {
AccessKey string
SecretKey string
}
type RocketOption func(*rocketOptions)
func WithRocketAcl(username string, password string) RocketOption {
return func(o *rocketOptions) {
if username == "" || password == "" {
return
}
o.AccessKey = username
o.SecretKey = password
}
}
/**
* @description: 发送消息
* @param {string} topic
* @param {[]byte} msg
* @return {*}
*/
func (r *RocketMq) SendMsg(topic string, msg []byte) (qMsg *QueueMsg, err error) {
if r.producer == nil {
return nil, errors.New("no producer available")
}
resp, err := r.producer.SendSync(context.Background(), primitive.NewMessage(topic, msg))
if err != nil {
return
}
if resp.Status != primitive.SendOK {
return nil, fmt.Errorf("error sending message,status:%v", resp.Status)
}
qmsg := &QueueMsg{
Body: []byte(msg),
Topic: topic,
Info: map[string]interface{}{"msgId": resp.MsgID},
}
return qmsg, err
}
/**
* @description: 发送字符串消息
* @param {string} topic
* @param {string} msg
* @return {*}
*/
func (r *RocketMq) SendStringMsg(topic string, msg string) (qMsg *QueueMsg, err error) {
return r.SendMsg(topic, []byte(msg))
}
/**
* @description: 注册消费者回调
* @param {*QueueMsg} qMsg
* @return {*}
*/
func (r *RocketMq) RegisterConsumer(topic string, recevieFunc func(qMsg *QueueMsg)) (err error) {
if r.consumer == nil {
return errors.New("no consumer available")
}
err = r.consumer.Subscribe(topic, consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for _, msg := range msgs {
go recevieFunc(&QueueMsg{
Body: msg.Body,
Topic: topic,
Info: map[string]interface{}{"msgId": msg.MsgId},
})
}
return consumer.ConsumeSuccess, nil
})
if err != nil {
return err
}
err = r.consumer.Start()
if err != nil {
r.consumer.Unsubscribe(topic)
return err
}
return nil
}
func (r *RocketMq) ShutDown() error {
var err error
if r.producer != nil {
err = r.producer.Shutdown()
}
if r.consumer != nil {
err = r.consumer.Shutdown()
}
return err
}
/**
* @description: 创建rocketmq生产者
* @param {[]string} endPoints nameserver
* @param {string} groupName
* @param {int} retry
* @return {*}
*/
func NewRocketMqProducer(endPoints []string, groupName string, retry int, options ...RocketOption) (p Producer, err error) {
if retry <= 0 {
retry = 0
}
opts := []producer.Option{producer.WithGroupName(groupName),
producer.WithNsResolver(primitive.NewPassthroughResolver(endPoints)),
producer.WithRetry(retry)}
ropts := &rocketOptions{}
for _, v := range options {
v(ropts)
}
if ropts.AccessKey != "" && ropts.SecretKey != "" {
opts = append(opts, producer.WithCredentials(primitive.Credentials{
AccessKey: ropts.AccessKey,
SecretKey: ropts.SecretKey,
}))
}
client, err := rocketmq.NewProducer(opts...)
if err != nil {
return nil, err
}
r := &RocketMq{
endPoints: endPoints,
producer: client,
}
err = r.producer.Start()
if err != nil {
return nil, err
}
return r, nil
}
/**
* @description: 创建rocketmq消费者
* @param {[]string} endPoints nameserver
* @param {string} groupName
* @return {*}
*/
func NewRocketMqConsumer(endPoints []string, groupName string, options ...RocketOption) (c Consumer, err error) {
opts := []consumer.Option{
consumer.WithNsResolver(primitive.NewPassthroughResolver(endPoints)),
consumer.WithGroupName(groupName),
consumer.WithConsumerModel(consumer.Clustering)}
ropts := &rocketOptions{}
for _, v := range options {
v(ropts)
}
if ropts.AccessKey != "" && ropts.SecretKey != "" {
opts = append(opts, consumer.WithCredentials(primitive.Credentials{
AccessKey: ropts.AccessKey,
SecretKey: ropts.SecretKey,
}))
}
client, err := rocketmq.NewPushConsumer(opts...)
if err != nil {
return nil, err
}
r := &RocketMq{
endPoints: endPoints,
consumer: client,
}
return r, nil
}
/*
* @Author: zhoufei
* @Date: 2022-04-24 14:19:58
* @LastEditors: zhoufei
* @LastEditTime: 2022-04-24 17:53:46
* @FilePath: /utils/queue/rocketmq_test.go
* @Description: rocketmq 测试
*
* Copyright (c) 2022 by zhoufei, All Rights Reserved.
*/
package queue
import (
"fmt"
"testing"
"time"
"github.com/go-playground/assert/v2"
)
func getProducer() (p Producer, err error) {
return NewRocketMqProducer([]string{"http://39.101.151.217:9876"}, "test", 2)
}
func TestCreateRocketMqProducer(t *testing.T) {
procucer, err := getProducer()
if err != nil {
t.Fatal(err)
}
if p, ok := procucer.(*RocketMq); ok {
p.producer.Shutdown()
}
}
func TestRocketMqSendMsg(t *testing.T) {
procucer, err := getProducer()
if err != nil {
t.Fatal(err)
}
msg, err := procucer.SendMsg("test", []byte("test"))
if err != nil {
t.Fatal(err)
}
assert.Equal(t, msg.Topic, "test")
time.Sleep(10 * time.Second)
err = procucer.ShutDown()
if err != nil {
t.Fatal(err)
}
}
func TestRocketMqConsumer(t *testing.T) {
client, err := NewRocketMqConsumer([]string{"http://39.101.151.217:9876"}, "test")
if err != nil {
t.Fatal(err)
}
err = client.RegisterConsumer("test", func(qMsg *QueueMsg) {
fmt.Printf("info:%v\n", qMsg.Info)
})
if err != nil {
t.Fatal(err)
}
procucer, err := getProducer()
if err != nil {
t.Fatal(err)
}
_, err = procucer.SendMsg("test", []byte("test"))
if err != nil {
t.Fatal(err)
}
time.Sleep(10 * time.Second)
procucer.ShutDown()
client.ShutDown()
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment