消息队列之rocketmq | go 技术论坛-金年会app官方网
文章介绍
本文来简单介绍一下消息队列 ,这里将什么是mq, 介绍rocketmq的安装,rocketmq的基本概念,消息类型,并使用go做各类消息的收发
什么是mq
1.什么是mq
消息队列是一种“先进先出”的数据结构
2.应用场景
其应用场景主要包含以下3个方面
- 应用解耦
系统的耦合性越高,容错性就越低。以电商应用为例,用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障或者因为升级等原因暂时不可用,都会造成下单操作异常,影响用户使用体验。
使用消息队列解耦合,系统的耦合性就会提高了。比如物流系统发生故障,需要几分钟才能来修复,在这段时间内,物流系统要处理的数据被缓存到消息队列中,用户的下单操作正常完成。当物流系统回复后,补充处理存在消息队列中的订单消息即可,终端系统感知不到物流系统发生过几分钟故障。
- 流量削峰
应用系统如果遇到系统请求流量的瞬间猛增,有可能会将系统压垮。有了消息队列可以将大量请求缓存起来,分散到很长一段时间处理,这样可以大大提到系统的稳定性和用户体验。
一般情况,为了保证系统的稳定性,如果系统负载超过阈值,就会阻止用户请求,这会影响用户体验,而如果使用消息队列将请求缓存起来,等待系统处理完毕后通知用户下单完毕,这样总不能下单体验要好。
处于经济考量目的:
业务系统正常时段的qps如果是1000,流量最高峰是10000,为了应对流量高峰配置高性能的服务器显然不划算,这时可以使用消息队列对峰值流量削峰
- 数据分发
通过消息队列可以让数据在多个系统更加之间进行流通。数据的产生方不需要关心谁来使用数据,只需要将数据发送到消息队列,数据使用方直接在消息队列中直接获取数据即可。
mq的优点和缺点
优点:解耦、削峰、数据分发
缺点包含以下几点:
- 系统可用性降低
系统引入的外部依赖越多,系统稳定性越差。一旦mq宕机,就会对业务造成影响。
如何保证mq的高可用? - 系统复杂度提高
mq的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过mq进行异步调用。
如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性? - 一致性问题
a系统处理完业务,通过mq给b、c、d三个系统发消息数据,如果b系统、c系统处理成功,d系统处理失败。
如何保证消息数据处理的一致性?
rocketmq的安装
使用docker安装
rocketmq的基本概念
- producer:消息的发送者;例如:发信人
- consumer:消息接收者;例如:收信人
- broker:暂存和传输消息;例如:邮局、中转站
- nameserver:管理broker;例如:各个邮局的管理机构
- topic:区分消息的种类;一个发送者可以发送消息给一个或者多个topic;一个消息的接收者可以订阅一个或者多个topic消息
- message queue:相当于是topic的分区;用于并行发送和接收消息
消息类型
go实战
需要拉取
go get github.com/apache/rocketmq-client-go/v2
go get github.com/apache/rocketmq-client-go/v2/primitive
go get github.com/apache/rocketmq-client-go/v2/producer
这里我以实战的角度来介绍rocketmq的消息类型:
1. 普通消息
只是消息的收发,发送成功后接收者就直接可以收到消息
package main
import (
"context"
"fmt"
"time"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)
func main() {
//初始化生产者
q, err := rocketmq.newproducer(producer.withnameserver([]string{"101.1.12.202:9876"}))
if err != nil {
panic("生成q生产者失败")
}
if err := q.start(); err != nil {
panic("启动q生产者失败")
}
msg := []byte("您好呀, 我是ice_moss")
mq := primitive.newmessage("msg_test_hello", msg) //msg_test_hello是为topic
res, err := q.sendsync(context.background(), mq)
if err != nil {
fmt.printf("发送失败%s", err)
}
fmt.println("消息发送成功")
fmt.println(res.string())
err = q.shutdown()
if err != nil {
panic("shutdown fail err")
}
}
这里需要注意的是如果我们需要在一个进程中启动多个rocketmq.newproducer()
就必须将他的第二个参数配置上:producer.withgroupname("sendmsg")
q, err := rocketmq.newproducer(producer.withnameserver([]string{"101.1.12.202:9876"}), producer.withgroupname("sendmsg"))
不然就会报:生产者组已经被创建
原因:我们没有不设置withgroupname
在调用时,会自动为我们创建一个默认名称的withgroupname
,当第二次rocketmq.newproducer
仍然是默认名,这时整个groupname
就冲突了
好了已经将”普通消息”发送到队列中了,现在我们来接收
2. 消费消息
注意:两端的topic必须保持一直
package main
import (
"context"
"fmt"
"os"
"time"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
)
func main() {
c, _ := rocketmq.newpushconsumer(
//接收者组
consumer.withgroupname("msg_test"),
consumer.withnsresolver(primitive.newpassthroughresolver([]string{"101.1.12.202:9876"})),
)
//订阅消息
err := c.subscribe("msg_test_hello", consumer.messageselector{}, func(ctx context.context,
msgs ...*primitive.messageext) (consumer.consumeresult, error) {
for i := range msgs {
fmt.printf("subscribe callback: %v \n", msgs[i])
}
return consumer.consumesuccess, nil
})
if err != nil {
fmt.println(err.error())
}
// note: start after subscribe
err = c.start()
if err != nil {
fmt.println(err.error())
os.exit(-1)
}
//程序运行2分钟
time.sleep(time.second * 120)
err = c.shutdown()
if err != nil {
fmt.printf("shutdown consumer error: %s", err.error())
}
}
输出:
subscribe callback: [message=[topic=msg_test_hello, body=您好呀, 我是ice_moss, flag=0, properties=map[consume_start_time:1668255347270 max_offset:2 min_offset:0 uniq_k251664a6e000000003cf040100001], transactionid=], msgid=0a0251664a6e000000003cf040100001, offsetmsgid=010eb4ca00002a9f000000000004bc14,queueid=1, storesize=174, queueoffset=0, sysflag=0, borntimestamp=1668254378888, bornhost=112.21.20.248:43010, storetimestamp=1668254379066, storehost=101.1.12.202:10911, commitlogoffset=310292, bodycrc=1573027761, reconsumetimes=0, preparedtransactionoffset=0]
3. 延时消息
延时消息,指我们将我们需要发送的发送消息延迟多少时间后接收方才能收到,其中一个应用场景就是分布式电商系统的下单——>支付, 例如:12306金年会app官方网官网买车票,当我们购买一张车票后,后台会做车票库存扣减,但是如果我们只下单,不支付这就很要命,该买票的人买不到票,该卖出去的票没有卖出去;其实仔细一点就会发现,12306购买下单后,在规定时间没有完成支付,就会取消相应的订单, 然后做库存归还。
现在来看一下延迟消息怎么发送:
package main
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)
//sendmessage 生成消息,延迟消息
func sendmessage(q rocketmq.producer) {
if err := q.start(); err != nil {
panic("启动q生产者失败")
}
msg := primitive.newmessage("msg_test_hello", []byte("这是一个延迟消息"))
//延迟时间级别
//messagedelaylevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
msg.withdelaytimelevel(3) //10s
res, err := q.sendsync(context.background(), msg)
if err != nil {
fmt.printf("发送失败%s", err)
}
err = q.shutdown()
if err != nil {
fmt.printf("shutdown consumer error: %s", err.error())
}
fmt.println(res.string())
}
func main() {
//初始化生产者
q, err := rocketmq.newproducer(producer.withnameserver([]string{"101.1.12.202:9876"}))
if err != nil {
panic("生成q生产者失败")
}
sendmessage(q)
}
10秒后:
subscribe callback: [message=[topic=msg_test_hello, body=这是一个延迟消息, flag=0, properties=map[consume_start_time:1668256662984 delay:3 max_offset:5 min_offset:0 reareal_topic:msg_test_hello uniq_key:0a0251664be9000000003d12f2e00001]………
4.事务消息
什么是事务
事务是指是程序中一系列严密的逻辑操作,而且所有操作必须全部成功完成,否则在每个操作中所作的所有更改都会被撤消。可以通俗理解为:就是把多件事情当做一件事情来处理,好比大家同在一条船上,要活一起活,要完一起完
事物的四个特性(acid)
● 原子性(atomicity):操作这些指令时,要么全部执行成功,要么全部不执行。只要其中一个指令执行失败,所有的指令都执行失败,数据进行回滚,回到执行指令前的数据状态。
eg:拿转账来说,假设用户a和用户b两者的钱加起来一共是20000,那么不管a和b之间如何转账,转几次账,事务结束后两个用户的钱相加起来应该还得是20000,这就是事务的一致性。
● 一致性(consistency):事务的执行使数据从一个状态转换为另一个状态,但是对于整个数据的完整性保持稳定。
● 隔离性(isolation):隔离性是当多个用户并发访问数据库时,比如操作同一张表时,数据库为每一个用户开启的事务,不能被其他事务的操作所干扰,多个并发事务之间要相互隔离,可以使用锁机制来实现隔离,其实就是将并发场景下对数据操作的部分对并发请求进行串行化。
● 持久性(durability):当事务正确完成后,它对于数据的改变是永久性的。
eg: 例如我们在使用jdbc操作数据库时,在提交事务方法后,提示用户事务操作完成,当我们程序执行完成直到看到提示后,就可以认定事务以及正确提交,即使这时候数据库出现了问题,也必须要将我们的事务完全执行完成,否则就会造成我们看到提示事务处理完毕,但是数据库因为故障而没有执行事务的重大错误。
mq的事务消息
这里的事务消息实现接口:
type transactionlistener interface {
// when send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
executelocaltransaction(*message) localtransactionstate
// when no response to prepare(half) message. broker will send check message to check the transaction status, and this
// method will be invoked to get local transaction status.
checklocaltransaction(*messageext) localtransactionstate
}
我们的业务代码需要放在executelocaltransaction(*message) localtransactionstate
方法中执行,对应返回相应的状态
const (
commitmessagestate localtransactionstate = iota 1 //返回状态:事务执行成功发现消息
rollbackmessagestate //返回状态:进行事务回查
unknowstate //仍然会回查
)
我们回查机制业务需要在checklocaltransaction(*messageext) localtransactionstate
方法中完成
下面我们来实现该接口(创建订单场景下):
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
"google.golang.org/grpc/codes"
)
//order 模拟订单
type order struct {
ordersrvid string
userid int32
goodsid int32
totalprice float32
post string
address string
mobile string
}
//orderlister 接口实现者,事务可以将一下配置\信息写入该结构体中
type orderlister struct {
code codes.code //返回状态码
ctx context.context //上下文数据
id int32 //订单id
}
//executelocaltransaction when send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
func (o *orderlister) executelocaltransaction(msg *primitive.message) primitive.localtransactionstate {
//执行本地业务逻辑
fmt.println("开始执行本地逻辑")
time.sleep(time.second * 3)
orderinfo := order{}
err := json.unmarshal(msg.body, &orderinfo)
if err != nil {
o.code = codes.unavailable
log.fatal("解析失败:", err)
//调用回查逻辑
return primitive.rollbackmessagestate
}
fmt.println("订单信息:", orderinfo)
fmt.println("本地逻辑执行成功")
//commitmessagestate 提交信息至mq
//commitmessagestate/rollbackmessagestate都不会回查
return primitive.commitmessagestate
}
//checklocaltransaction when no response to prepare(half) message. broker will send check message to check the transaction status, and this method will be invoked to get local transaction status.
func (o *orderlister) checklocaltransaction(*primitive.messageext) primitive.localtransactionstate {
//回查
fmt.println("事务未通过,开始回查")
return primitive.rollbackmessagestate
}
func (o *order) createorder(q rocketmq.transactionproducer) {
order, err := json.marshal(o)
if err != nil {
panic("marshal fail")
}
msg := primitive.newmessage("msg_test_order", order)
res, err := q.sendmessageintransaction(context.background(), msg)
if err != nil {
fmt.printf("发送失败%s", err)
} else {
fmt.println("发送成功", res.string())
}
time.sleep(time.hour)
err = q.shutdown()
if err != nil {
panic("shutdown fail err")
}
}
func main() {
//初始化事务对象
orderlister := &orderlister{}
q, err := rocketmq.newtransactionproducer(orderlister,
producer.withnameserver([]string{"101.1.12.202:9876"}), producer.withgroupname("msg_test"))
if err != nil {
panic("生成q生产者失败")
}
if err = q.start(); err != nil {
panic("启动q生产者失败")
}
orderinfo := &order{
ordersrvid: "343435",
userid: 21,
goodsid: 214,
totalprice: 150.5,
post: "请尽快发货",
address: "无锡市",
mobile: "18389202834",
}
orderinfo.createorder(q)
}
执行输出:
开始执行本地逻辑
订单信息: {343435 21 214 150.5 请尽快发货 无锡市 18389202834}
本地逻辑执行成功
发送成功 sendresult [sendstatus=0, msgids=0a0266db4e24000000003da94f100001, offsetmsgid=010eb4ca00002a9f000000000004c28f, queueoffset=364, messagequeue=messagequeue [tomsg_test_order, brokername=broker-a, queueid=1]]
接收者接收到:
subscribe callback: [message=[topic=msg_test_order, body={"ordersrvid":"343435","userid":21,"goodsid":214,"totalprice":150.5,"post":"请尽快发货","address":"无锡市","mobile":"18389202834"}, flag=0, properties=map[consume_start_time:1668266524665 max_offset:1 min_offset:0 pgroup:msg_test real_qid:1 real_topic:msg_test_order tran_msg:true uniq_key:0a0266db4e24000000003da94f100001], transactionid=0a0266db4e24000000003da94f100001], msgid=0a0266db4e24000000003da94f100001…………
本作品采用《cc 协议》,转载必须注明作者和本文链接