iis服务器助手广告广告
返回顶部
首页 > 资讯 > 精选 >go zero微服务性能优化极致秒杀实例分析
  • 614
分享到

go zero微服务性能优化极致秒杀实例分析

2023-07-02 17:07:34 614人浏览 安东尼
摘要

这篇“Go zero微服务性能优化极致秒杀实例分析”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“go z

这篇“Go zero微服务性能优化极致秒杀实例分析”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“go zero微服务性能优化极致秒杀实例分析”文章吧。

批量数据聚合

SeckillOrder这个方法中,每来一次秒杀抢购请求都往往kafka中发送一条消息。假如这个时候有一千万的用户同时来抢购,就算我们做了各种限流策略,一瞬间还是可能会有上百万的消息会发到Kafka,会产生大量的网络io和磁盘IO成本,大家都知道Kafka是基于日志的消息系统,写消息虽然大多情况下都是顺序IO,但当海量的消息同时写入的时候还是可能会扛不住。

那怎么解决这个问题呢?答案是做消息的聚合。之前发送一条消息就会产生一次网络IO和一次磁盘IO,我们做消息聚合后,比如聚合100条消息后再发送给Kafka,这个时候100条消息才会产生一次网络IO和磁盘IO,对整个Kafka的吞吐和性能是一个非常大的提升。其实这就是一种小包聚合的思想,或者叫Batch或者批量的思想。这种思想也随处可见,比如我们使用Mysql插入批量数据的时候,可以通过一条sql语句执行而不是循环的一条一条插入,还有Redis的Pipeline操作等等。

go zero微服务性能优化极致秒杀实例分析

那怎么来聚合呢,聚合策略是啥呢?聚合策略有两个维度分别是聚合消息条数和聚合时间,比如聚合消息达到100条我们就往Kafka发送一次,这个条数是可以配置的,那如果一直也达不到100条消息怎么办呢?通过聚合时间来兜底,这个聚合时间也是可以配置的,比如配置聚合时间为1秒钟,也就是无论目前聚合了多少条消息只要聚合时间达到1秒,那么就往Kafka发送一次数据。聚合条数和聚合时间是或的关系,也就是只要有一个条件满足就触发。

在这里我们提供一个批量聚合数据的工具Batcher,定义如下

type Batcher struct {  opts options  Do       func(ctx context.Context, val map[string][]interface{})  Sharding func(key string) int  chans    []chan *msg  wait     sync.WaitGroup}

Do方法:满足聚合条件后就会执行Do方法,其中val参数为聚合后的数据

Sharding方法:通过Key进行sharding,相同的key消息写入到同一个channel中,被同一个goroutine处理

在merge方法中有两个触发执行Do方法的条件,一是当聚合的数据条数大于等于设置的条数,二是当触发设置的定时器

代码实现比较简单,如下为具体实现:

type msg struct {  key string  val interface{}}type Batcher struct {  opts options  Do       func(ctx context.Context, val map[string][]interface{})  Sharding func(key string) int  chans    []chan *msg  wait     sync.WaitGroup}func New(opts ...Option) *Batcher {  b := &Batcher{}  for _, opt := range opts {    opt.apply(&b.opts)  }  b.opts.check()  b.chans = make([]chan *msg, b.opts.worker)  for i := 0; i < b.opts.worker; i++ {    b.chans[i] = make(chan *msg, b.opts.buffer)  }  return b}func (b *Batcher) Start() {  if b.Do == nil {    log.Fatal("Batcher: Do func is nil")  }  if b.Sharding == nil {    log.Fatal("Batcher: Sharding func is nil")  }  b.wait.Add(len(b.chans))  for i, ch := range b.chans {    go b.merge(i, ch)  }}func (b *Batcher) Add(key string, val interface{}) error {  ch, msg := b.add(key, val)  select {  case ch <- msg:  default:    return ErrFull  }  return nil}func (b *Batcher) add(key string, val interface{}) (chan *msg, *msg) {  sharding := b.Sharding(key) % b.opts.worker  ch := b.chans[sharding]  msg := &msg{key: key, val: val}  return ch, msg}func (b *Batcher) merge(idx int, ch <-chan *msg) {  defer b.wait.Done()  var (    msg        *msg    count      int    closed     bool    lastTicker = true    interval   = b.opts.interval    vals       = make(map[string][]interface{}, b.opts.size)  )  if idx > 0 {    interval = time.Duration(int64(idx) * (int64(b.opts.interval) / int64(b.opts.worker)))  }  ticker := time.NewTicker(interval)  for {    select {    case msg = <-ch:      if msg == nil {        closed = true        break      }      count++      vals[msg.key] = append(vals[msg.key], msg.val)      if count >= b.opts.size {        break      }      continue    case <-ticker.C:      if lastTicker {        ticker.Stop()        ticker = time.NewTicker(b.opts.interval)        lastTicker = false      }    }    if len(vals) > 0 {      ctx := context.Background()      b.Do(ctx, vals)      vals = make(map[string][]interface{}, b.opts.size)      count = 0    }    if closed {      ticker.Stop()      return    }  }}func (b *Batcher) Close() {  for _, ch := range b.chans {    ch <- nil  }  b.wait.Wait()}

使用的时候需要先创建一个Batcher,然后定义Batcher的Sharding方法和Do方法,在Sharding方法中通过ProductID把不同商品的聚合投递到不同的goroutine中处理,在Do方法中我们把聚合的数据一次性批量的发送到Kafka,定义如下:

b := batcher.New(  batcher.WithSize(batcherSize),  batcher.WithBuffer(batcherBuffer),  batcher.WithWorker(batcherWorker),  batcher.WithInterval(batcherInterval),)b.Sharding = func(key string) int {  pid, _ := strconv.ParseInt(key, 10, 64)  return int(pid) % batcherWorker}b.Do = func(ctx context.Context, val map[string][]interface{}) {  var msgs []*KafkaData  for _, vs := range val {    for _, v := range vs {      msgs = append(msgs, v.(*KafkaData))    }  }  kd, err := JSON.Marshal(msgs)  if err != nil {    logx.Errorf("Batcher.Do json.Marshal msgs: %v error: %v", msgs, err)  }  if err = s.svcCtx.KafkaPusher.Push(string(kd)); err != nil {    logx.Errorf("KafkaPusher.Push kd: %s error: %v", string(kd), err)  }}s.batcher = bs.batcher.Start()

SeckillOrder方法中不再是每来一次请求就往Kafka中投递一次消息,而是先通过batcher提供的Add方法添加到Batcher中等待满足聚合条件后再往Kafka中投递。

err = l.batcher.Add(strconv.FORMatInt(in.ProductId, 10), &KafkaData{Uid: in.UserId, Pid: in.ProductId})if err!= nil {    logx.Errorf("l.batcher.Add uid: %d pid: %d error: %v", in.UserId, in.ProductId, err)}

降低消息的消费延迟

通过批量消息处理的思想,我们提供了Batcher工具,提升了性能,但这主要是针对生产端而言的。当我们消费到批量的数据后,还是需要串行的一条条的处理数据,那有没有办法能加速消费从而降低消费消息的延迟呢?有两种方案分别是:

  • 增加消费者的数量

  • 在一个消费者中增加消息处理的并行度

因为在Kafka中,一个Topci可以配置多个Partition,数据会被平均或者按照生产者指定的方式写入到多个分区中,那么在消费的时候,Kafka约定一个分区只能被一个消费者消费,为什么要这么设计呢?我理解的是如果有多个Consumer同时消费一个分区的数据,那么在操作这个消费进度的时候就需要加,对性能影响比较大。所以说当消费者数量小于分区数量的时候,我们可以增加消费者的数量来增加消息处理能力,但当消费者数量大于分区的时候再继续增加消费者数量就没有意义了。

go zero微服务性能优化极致秒杀实例分析

不能增加Consumer的时候,可以在同一个Consumer中提升处理消息的并行度,即通过多个goroutine来并行的消费数据,我们一起来看看如何通过多个goroutine来消费消息。

在Service中定义msgsChan,msgsChan为Slice,Slice的长度表示有多少个goroutine并行的处理数据,初始化如下:

func NewService(c config.Config) *Service {  s := &Service{    c:          c,    Productrpc: product.NewProduct(zrpc.MustNewClient(c.ProductRPC)),    OrderRPC:   order.NeWorder(zrpc.MustNewClient(c.OrderRPC)),    msgsChan:   make([]chan *KafkaData, chanCount),  }  for i := 0; i < chanCount; i++ {    ch := make(chan *KafkaData, bufferCount)    s.msgsChan[i] = ch    s.waiter.Add(1)    go s.consume(ch)  }  return s}

从Kafka中消费到数据后,把数据投递到Channel中,注意投递消息的时候按照商品的id做Sharding,这能保证在同一个Consumer中对同一个商品的处理是串行的,串行的数据处理不会导致并发带来的数据竞争问题

func (s *Service) Consume(_ string, value string) error {  logx.Infof("Consume value: %s\n", value)  var data []*KafkaData  if err := json.Unmarshal([]byte(value), &data); err != nil {    return err  }  for _, d := range data {    s.msgsChan[d.Pid%chanCount] <- d  }  return nil}

我们定义了chanCount个goroutine同时处理数据,每个channel的长度定义为bufferCount,并行处理数据的方法为consume,如下:

func (s *Service) consume(ch chan *KafkaData) {  defer s.waiter.Done()  for {    m, ok := <-ch    if !ok {      log.Fatal("seckill rMQ exit")    }    fmt.Printf("consume msg: %+v\n", m)    p, err := s.ProductRPC.Product(context.Background(), &product.ProductItemRequest{ProductId: m.Pid})    if err != nil {      logx.Errorf("s.ProductRPC.Product pid: %d error: %v", m.Pid, err)      return    }    if p.Stock <= 0 {      logx.Errorf("stock is zero pid: %d", m.Pid)      return    }    _, err = s.OrderRPC.CreateOrder(context.Background(), &order.CreateOrderRequest{Uid: m.Uid, Pid: m.Pid})    if err != nil {      logx.Errorf("CreateOrder uid: %d pid: %d error: %v", m.Uid, m.Pid, err)      return    }    _, err = s.ProductRPC.UpdateProductStock(context.Background(), &product.UpdateProductStockRequest{ProductId: m.Pid, Num: 1})    if err != nil {      logx.Errorf("UpdateProductStock uid: %d pid: %d error: %v", m.Uid, m.Pid, err)    }  }}

怎么保证不会超卖

当秒杀活动开始后,大量用户点击商品详情页上的秒杀按钮,会产生大量的并发请求查询库存,一旦某个请求查询到有库存,紧接着系统就会进行库存的扣减。然后,系统生成实际的订单,并进行后续的处理。如果请求查不到库存,就会返回,用户通常会继续点击秒杀按钮,继续查询库存。简单来说,这个阶段的操作就是三个:检查库存,库存扣减、和订单处理。因为每个秒杀请求都会查询库存,而请求只有查到库存有余量后,后续的库存扣减和订单处理才会被执行,所以,这个阶段中最大的并发压力都在库存检查操作上。

为了支撑大量高并发的库存检查请求,我们需要使用Redis单独保存库存量。那么,库存扣减和订单处理是否都可以交给Mysql来处理呢?其实,订单的处理是可以在数据库中执行的,但库存扣减操作不能交给mysql直接处理。因为到了实际的订单处理环节,请求的压力已经不大了,数据库完全可以支撑这些订单处理请求。那为什么库存扣减不能直接在数据库中执行呢?这是因为,一旦请求查到有库存,就意味着该请求获得购买资格,紧接着就会进行下单操作,同时库存量会减一,这个时候如果直接操作数据库来扣减库存可能就会导致超卖问题。

直接操作数据库扣减库存为什么会导致超卖呢?由于数据库的处理速度较慢,不能及时更新库存余量,这就会导致大量的查询库存的请求读取到旧的库存值,并进行下单,此时就会出现下单数量大于实际的库存量,导致超卖。所以,就需要直接在Redis中进行库存扣减,具体的操作是,当库存检查完后,一旦库存有余量,我们就立即在Redis中扣减库存,同时,为了避免请求查询到旧的库存值,库存检查和库存扣减这两个操作需要保证原子性。

我们使用Redis的Hash来存储库存,total为总库存,seckill为已秒杀的数量,为了保证查询库存和减库存的原子性,我们使用lua脚本进行原子操作,让秒杀量小于库存的时候返回1,表示秒杀成功,否则返回0,表示秒杀失败,代码如下:

const (  luaCheckAndUpdateScript = `local counts = redis.call("HMGET", KEYS[1], "total", "seckill")local total = tonumber(counts[1])local seckill = tonumber(counts[2])if seckill + 1 <= total then  redis.call("HINCRBY", KEYS[1], "seckill", 1)  return 1endreturn 0`)func (l *CheckAndUpdateStockLogic) CheckAndUpdateStock(in *product.CheckAndUpdateStockRequest) (*product.CheckAndUpdateStockResponse, error) {  val, err := l.svcCtx.BizRedis.EvalCtx(l.ctx, luaCheckAndUpdateScript, []string{stockKey(in.ProductId)})  if err != nil {    return nil, err  }  if val.(int64) == 0 {    return nil, status.Errorf(codes.ResourceExhausted, fmt.Sprintf("insufficient stock: %d", in.ProductId))  }  return &product.CheckAndUpdateStockResponse{}, nil}func stockKey(pid int64) string {  return fmt.Sprintf("stock:%d", pid)}

对应的seckill-rmq代码修改如下:

func (s *Service) consume(ch chan *KafkaData) {  defer s.waiter.Done()  for {    m, ok := <-ch    if !ok {      log.Fatal("seckill rmq exit")    }    fmt.Printf("consume msg: %+v\n", m)    _, err := s.ProductRPC.CheckAndUpdateStock(context.Background(), &product.CheckAndUpdateStockRequest{ProductId: m.Pid})    if err != nil {      logx.Errorf("s.ProductRPC.CheckAndUpdateStock pid: %d error: %v", m.Pid, err)      return    }    _, err = s.OrderRPC.CreateOrder(context.Background(), &order.CreateOrderRequest{Uid: m.Uid, Pid: m.Pid})    if err != nil {      logx.Errorf("CreateOrder uid: %d pid: %d error: %v", m.Uid, m.Pid, err)      return    }    _, err = s.ProductRPC.UpdateProductStock(context.Background(), &product.UpdateProductStockRequest{ProductId: m.Pid, Num: 1})    if err != nil {      logx.Errorf("UpdateProductStock uid: %d pid: %d error: %v", m.Uid, m.Pid, err)    }  }}

到这里,我们已经了解了如何使用原子性的Lua脚本来实现库存的检查和扣减。其实要想保证库存检查和扣减的原子性,还有另外一种方法,那就是使用分布式锁。

为了简化分布式锁、分布式选举、分布式事务的实现,etcd社区提供了一个名为concurrency的包来帮助我们更简单、正确的使用分布式锁。它的实现非常简单,主要流程如下:

  • 首先通过concurrency.NewSession方法创建Session,本质上是创建了一个TTL为10的Lease

  • 得到Session对象后,通过concurrency.NewMutex创建一个mutex对象,包括了Lease、key prefix等信息

  • 然后听过mutex对象的Lock方法尝试获取锁

  • 最后通过mutex对象的Unlock方法释放锁

cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})if err != nil {   log.Fatal(err)}defer cli.Close()session, err := concurrency.NewSession(cli, concurrency.WithTTL(10))if err != nil {   log.Fatal(err)}defer session.Close()mux := concurrency.NewMutex(session, "lock")if err := mux.Lock(context.Background()); err != nil {   log.Fatal(err)}if err := mux.Unlock(context.Background()); err != nil {   log.Fatal(err)}

以上就是关于“go zero微服务性能优化极致秒杀实例分析”这篇文章的内容,相信大家都有了一定的了解,希望小编分享的内容对大家有帮助,若想了解更多相关的知识内容,请关注编程网精选频道。

--结束END--

本文标题: go zero微服务性能优化极致秒杀实例分析

本文链接: https://www.lsjlt.com/news/343524.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • go zero微服务性能优化极致秒杀实例分析
    这篇“go zero微服务性能优化极致秒杀实例分析”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“go z...
    99+
    2023-07-02
  • go zero微服务实战性能优化极致秒杀
    目录引言批量数据聚合降低消息的消费延迟怎么保证不会超卖结束语引言 上一篇文章中引入了消息队列对秒杀流量做削峰的处理,我们使用的是Kafka,看起来似乎工作的不错,但其实还是有很多隐患...
    99+
    2024-04-02
  • go zero微服务处理方法实例分析
    这篇文章主要介绍“go zero微服务处理方法实例分析”,在日常操作中,相信很多人在go zero微服务处理方法实例分析问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”go zer...
    99+
    2023-07-02
  • php性能优化实例分析
    这篇文章主要介绍了php性能优化实例分析的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇php性能优化实例分析文章都会有所收获,下面我们一起来看看吧。前言如何提高程序运行速度,减轻服务器压力是服务端开发必须面对的...
    99+
    2023-07-02
  • 玩转服务器性能优化,打造极致用户体验
    硬件优化 增加内存 (RAM):内存是服务器运行中的程序和数据存储的地方。增加内存可以减少磁盘读写,提高服务器的响应速度。 选择高效的 CPU:选择具有高主频和大量内核的 CPU,可以提高服务器的多任务处理能力。 使用 SSD 硬盘:S...
    99+
    2024-03-01
    服务器性能优化 服务器配置 数据库优化 编码优化 缓存优化
  • YOLOv5性能优化与部署实例分析
    本篇内容介绍了“YOLOv5性能优化与部署实例分析”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!模型压缩为了使YOLOv5车牌识别系统在资源...
    99+
    2023-07-05
  • Java Web服务性能优化的实践分析
    本篇文章为大家展示了Java Web服务性能优化的实践分析,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。简介: 如何提升 Java Web 服务性能,主要介绍了三种方法:一是采用 Web ...
    99+
    2023-06-17
  • 大数据交叉报表性能优化实例分析
    这篇文章主要为大家分析了大数据交叉报表性能优化实例分析的相关知识点,内容详细易懂,操作细节合理,具有一定参考价值。如果感兴趣的话,不妨跟着跟随小编一起来看看,下面跟着小编一起深入学习“大数据交叉报表性能优化实例分析”的知识吧。软硬件环境OS...
    99+
    2023-06-04
  • 从Context源码实现谈React性能优化的示例分析
    这篇文章给大家分享的是有关从Context源码实现谈React性能优化的示例分析的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。组件render的时机Context的实现与组件的r...
    99+
    2024-04-02
  • 阿里云测试服务器并发性能优化与案例分析
    随着互联网技术的发展,云计算已经成为企业IT基础设施的重要组成部分。在互联网应用中,服务器的并发性能是关键性能指标之一,特别是在测试服务器环境中,服务器的并发性能直接影响到测试效率和测试结果的准确性。本文将重点介绍阿里云测试服务器的并发性能...
    99+
    2023-11-21
    阿里 案例分析 性能
  • 阿里云服务器性能分析与优化策略
    阿里云服务器作为云计算领域的重要组成部分,其性能稳定性直接影响着云计算应用的用户体验。本文将从阿里云服务器的性能分析入手,探讨阿里云服务器看快不看就慢的原因,并提出相应的优化策略。 一、阿里云服务器性能分析服务器配置:服务器的配置直接影响着...
    99+
    2023-11-10
    阿里 性能 策略
  • 探讨网站性能优化设计的最佳实践和案例分析
    网站性能优化设计的最佳实践与案例分析 随着网络技术的迅猛发展,越来越多的企业和个人都拥有了自己的网站。然而,随之而来的是网页加载速度变慢、响应时间变长等问题,给用户的体验产生了负面影响。因此,对于网站性能的优化设计成为了刻不容缓...
    99+
    2024-02-02
    性能优化 最佳实践 案例分析 前端优化 异步加载
  • 基于Nuxt.js项目服务端性能优化与错误检测的示例分析
    小编给大家分享一下基于Nuxt.js项目服务端性能优化与错误检测的示例分析,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!nuxt...
    99+
    2024-04-02
  • 服务器日志的秘密:通过分析解锁性能优化
    服务器日志是记录服务器活动和错误的宝贵数据源。通过分析这些日志,系统管理员和开发人员可以深入了解服务器的性能、安全性和可用性。 分析日志以识别问题 日志文件可以帮助识别导致服务器性能下降的问题。例如: 高资源利用率:日志可以显示服务器是...
    99+
    2024-04-02
  • 阿里架构师细谈:微服务+分布式+性能优化+JVM调优+团队开发
    ”文末有资料放送一:分布式架构高并发,高可用,海量数据,没有分布式的架构知识肯定是玩不转的所以分布式的知识需要掌握:分布式架构思维大型互联网架构演进过程架构师应具备的分布式知识主流分布式架构设计详解架构开发基础多线程开发高性能NIO框架架构...
    99+
    2023-06-02
  • 深入分析:PHP 服务器优化秘籍,性能飙升不是梦!
    1. 基础优化: 使用 PHP 加速器: 如 Zend Opcache 或 APC,可显著提高 PHP 脚本的执行速度。 优化 PHP 配置: 例如,调整内存限制、启用 OPcache 等,可优化 PHP 的运行性能。 减少数据库查询...
    99+
    2024-02-12
    PHP 服务器优化 性能提升 最佳实践 代码优化 缓存 配置优化
  • 剖析PHP服务器优化的奥秘:10个实用技巧助力性能提升
    代码优化: 作为一名PHP开发者,您需要掌握高效的代码编写技巧,以减少不必要的资源消耗。尽量使用简洁、高效的代码,并避免冗长的循环和复杂的运算。同时,合理使用变量、函数和类,并采用面向对象编程范式,可以显著提高代码的可读性和可维护性...
    99+
    2024-02-13
    PHP服务器优化 性能提升 网站速度 代码优化 缓存 配置优化 负载均衡 数据库优化 安全优化
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作