1.安装rocketmq
前提:需要安装java
docker安装rocketmq
(1)检查有哪些可用的rocketmq镜像
docker search rocketmq
(2)拉取镜像,用第一个镜像
docker pull rocketmqinc/rocketmq
(3)启动navesrv服务
docker run -d -p 9876:9876 -v /tmp/mq/data/namesrv/logs:/root/logs -v /tmp/mq/data/namesrv/store:/root/store --name rmqnamesrv -e "JAVA_OPTS=-Duser.home=/opt" -e "JAVA_OPT_EXT=-server -Xms1024m -Xmx1024m" rocketmqinc/rocketmq sh mqnamesrv
需要在本地创建 /tmp/mq/ 目录
(4)启动rocketmq服务
先配置一下,创建一个配置文件/tmp/mq/conf/docker.conf
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
#换成网络IP
namesrvAddr = 127.0.0.1:9876
#换成网络IP
brokerIP1 = 127.0.0.1
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
启动docker
docker run -d -p 10911:10911 -p 10909:10909 -v /tmp/mq/data/broker/logs:/root/logs -v /tmp/mq/rocketmq/data/broker/store:/root/store -v /tmp/mq/conf/broker.conf:/opt/rocketmq/conf/broker.conf --name rmqbroker --link rmqnamesrv:namesrv -e "NAMESRV_ADDR=namesrv:9876" -e "JAVA_OPTS=-Duser.home=/opt" -e "JAVA_OPT_EXT=-server -Xms1024m -Xmx1024m" rocketmqinc/rocketmq sh mqbroker -c /opt/rocketmq/conf/broker.conf
(5)安装控制台
docker run -e "JAVA_OPTS=-Drocketmq.config.namesrvAddr=127.0.0.1:9876 -Drocketmq.config.isVIPChannel=false" -p 9993:8080 -t styletang/rocketmq-console-ng
至此,在docker内安装了namesrv,rocketmq以及rocketmq-console
访问:localhost:8080即可看到rocketmq内topic和消息
2.go 注册主题
topic := "newOne"
//clusterName := "DefaultCluster"
nameSrvAddr := []string{"127.0.0.1:9876"}
brokerAddr := "127.0.0.1:10911"
testAdmin, err := admin.NewAdmin(admin.WithResolver(primitive.NewPassthroughResolver(nameSrvAddr)))
if err != nil {
fmt.Println(err.Error())
}
//create topic
err = testAdmin.CreateTopic(
context.Background(),
admin.WithTopicCreate(topic),
admin.WithBrokerAddrCreate(brokerAddr),
)
if err != nil {
fmt.Println("Create topic error:", err.Error())
}
源代码:https://github.com/apache/rocketmq-client-go/blob/master/examples/admin/topic/main.go
3.go 生产消息
p, _ := rocketmq.NewProducer(
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
)
err := p.Start()
msg := &primitive.Message{
Topic: topic,
Body: []byte("Hello RocketMQ Go Client! "),
}
res, err := p.SendSync(context.Background(), msg)
源代码:https://github.com/apache/rocketmq-client-go/blob/master/examples/producer/simple/main.go
4.go 消费消息
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
)
err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for i := range msgs {
fmt.Printf("subscribe callback: %v \n", msgs[i])
}
return consumer.ConsumeSuccess, nil
})
if err != nil {
fmt.Println(err.Error())
}
// Note: start after subscribe
err = c.Start()
源代码:https://github.com/apache/rocketmq-client-go/blob/master/examples/consumer/simple/main.go
5.参考
【1】MacOS查看和设定JAVA_HOME
【2】Docker安装RocketMQ
【3】找不到topic,route info 异常问题缘由及解决方案
【4】https://github.com/apache/rocketmq-dashboard