go连接rocketmq


1.安装rocketmq

前提:需要安装java

docker安装rocketmq

(1)检查有哪些可用的rocketmq镜像

docker search rocketmq

(2)拉取镜像,用第一个镜像

docker pull rocketmqinc/rocketmq

(3)启动navesrv服务

docker run -d -p 9876:9876 -v /tmp/mq/data/namesrv/logs:/root/logs -v /tmp/mq/data/namesrv/store:/root/store --name rmqnamesrv -e "JAVA_OPTS=-Duser.home=/opt" -e "JAVA_OPT_EXT=-server -Xms1024m -Xmx1024m" rocketmqinc/rocketmq sh mqnamesrv 

需要在本地创建 /tmp/mq/ 目录

(4)启动rocketmq服务

先配置一下,创建一个配置文件/tmp/mq/conf/docker.conf

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
#换成网络IP
namesrvAddr = 127.0.0.1:9876
#换成网络IP
brokerIP1 = 127.0.0.1
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 
autoCreateSubscriptionGroup=true

启动docker

docker run -d  -p 10911:10911 -p 10909:10909   -v /tmp/mq/data/broker/logs:/root/logs -v /tmp/mq/rocketmq/data/broker/store:/root/store  -v /tmp/mq/conf/broker.conf:/opt/rocketmq/conf/broker.conf  --name rmqbroker --link rmqnamesrv:namesrv -e "NAMESRV_ADDR=namesrv:9876" -e "JAVA_OPTS=-Duser.home=/opt" -e "JAVA_OPT_EXT=-server -Xms1024m -Xmx1024m"  rocketmqinc/rocketmq sh mqbroker -c /opt/rocketmq/conf/broker.conf

(5)安装控制台

docker run -e "JAVA_OPTS=-Drocketmq.config.namesrvAddr=127.0.0.1:9876 -Drocketmq.config.isVIPChannel=false" -p 9993:8080 -t styletang/rocketmq-console-ng

至此,在docker内安装了namesrv,rocketmq以及rocketmq-console

访问:localhost:8080即可看到rocketmq内topic和消息

2.go 注册主题

topic := "newOne"
//clusterName := "DefaultCluster"
nameSrvAddr := []string{"127.0.0.1:9876"}
brokerAddr := "127.0.0.1:10911"

testAdmin, err := admin.NewAdmin(admin.WithResolver(primitive.NewPassthroughResolver(nameSrvAddr)))
if err != nil {
	fmt.Println(err.Error())
}

//create topic
err = testAdmin.CreateTopic(
	context.Background(),
	admin.WithTopicCreate(topic),
	admin.WithBrokerAddrCreate(brokerAddr),
)
if err != nil {
	fmt.Println("Create topic error:", err.Error())
}

源代码:https://github.com/apache/rocketmq-client-go/blob/master/examples/admin/topic/main.go

3.go 生产消息

p, _ := rocketmq.NewProducer(
	producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
	producer.WithRetry(2),
)
err := p.Start()

msg := &primitive.Message{
		Topic: topic,
		Body:  []byte("Hello RocketMQ Go Client! "),
	}

res, err := p.SendSync(context.Background(), msg)

源代码:https://github.com/apache/rocketmq-client-go/blob/master/examples/producer/simple/main.go

4.go 消费消息

c, _ := rocketmq.NewPushConsumer(
	consumer.WithGroupName("testGroup"),
	consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
)
err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context,
	msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
	for i := range msgs {
		fmt.Printf("subscribe callback: %v \n", msgs[i])
	}

	return consumer.ConsumeSuccess, nil
})
if err != nil {
	fmt.Println(err.Error())
}
// Note: start after subscribe
err = c.Start()

源代码:https://github.com/apache/rocketmq-client-go/blob/master/examples/consumer/simple/main.go

5.参考

【1】MacOS查看和设定JAVA_HOME

【2】Docker安装RocketMQ

【3】找不到topic,route info 异常问题缘由及解决方案

【4】https://github.com/apache/rocketmq-dashboard


文章作者: Alex
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Alex !
  目录