iis服务器助手广告广告
返回顶部
首页 > 资讯 > 前端开发 > html >web开发中怎样优雅地实现并发编排任务
  • 678
分享到

web开发中怎样优雅地实现并发编排任务

2024-04-02 19:04:59 678人浏览 独家记忆
摘要

今天就跟大家聊聊有关web开发中怎样优雅地实现并发编排任务,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。业务场景在做任

今天就跟大家聊聊有关web开发中怎样优雅地实现并发编排任务,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。

业务场景

在做任务开发的时候,你们一定会碰到以下场景:

场景1:调用第三方接口的时候, 一个需求你需要调用不同的接口,做数据组装。

场景2:一个应用首页可能依托于很多服务。那就涉及到在加载页面时需要同时请求多个服务的接口。这一步往往是由后端统一调用组装数据再返回给前端,也就是所谓的  BFF(Backend For Frontend) 层。

针对以上两种场景,假设在没有强依赖关系下,选择串行调用,那么总耗时即:

time=s1+s2+....sn

按照当代秒入百万的有为青年,这么长时间早就把你祖宗十八代问候了一遍。

为了伟大的KPI,我们往往会选择并发地调用这些依赖接口。那么总耗时就是:

time=max(s1,s2,s3.....,sn)

当然开始堆业务的时候可以先串行化,等到上面的人着急的时候,亮出绝招。

这样,年底 PPT 就可以加上浓重的一笔流水账:为业务某个接口提高百分之XXX性能,间接产生XXX价值。

当然这一切的前提是,做老板不懂技术,做技术”懂”你。

言归正传,如果修改成并发调用,你可能会这么写,

package main  import (     "fmt"     "sync"     "time" )  func main() {     var wg sync.WaitGroup     wg.Add(2)      var userInfo *User     var productList []Product      Go func() {         defer wg.Done()         userInfo, _ = getUser()     }()      go func() {         defer wg.Done()         productList, _ = getProductList()     }()     wg.Wait()     fmt.Printf("用户信息:%+v\n", userInfo)     fmt.Printf("商品信息:%+v\n", productList) }     type User struct {     Name string     Age uint8 }  func getUser() (*User, error) {     time.Sleep(500 * time.Millisecond)     var u User     u.Name = "wuqinqiang"     u.Age = 18     return &u, nil }    type Product struct {     Title string     Price uint32 }  func getProductList() ([]Product, error) {     time.Sleep(400 * time.Millisecond)     var list []Product     list = append(list, Product{         Title: "SHib",         Price: 10,     })     return list, nil }

从实现上来说,需要多少服务,会开多少个 G,利用 sync.WaitGroup 的特性,

实现并发编排任务的效果。

好像,问题不大。

但是随着代号 996 业务场景的增加,你会发现,好多模块都有相似的功能,只是对应的业务场景不同而已。

那么我们能不能抽像出一套针对此业务场景的工具,而把具体业务实现交给业务方。

使用

本着不重复造轮子的原则,去搜了下开源项目,最终看上了 go-zero 里面的一个工具 mapReduce

可以自行 Google 这个名词。

使用很简单。我们通过它改造一下上面的代码:

package main  import (     "fmt"     "GitHub.com/tal-tech/go-zero/core/mr"     "time" )  func main() {     var userInfo *User     var productList []Product     _ = mr.Finish(func() (err error) {         userInfo, err = getUser()         return err     }, func() (err error) {         productList, err = getProductList()         return err     })     fmt.Printf("用户信息:%+v\n", userInfo)     fmt.Printf("商品信息:%+v\n", productList) } //打印 用户信息:&{Name:wuqinqiang Age:18} 商品信息:[{Title:SHib Price:10}]

是不是舒服多了。

但是这里还需要注意一点,假设你调用的其中一个服务错误,并且你 return err 对应的错误,那么其他调用的服务会被取消。

比如我们修改 getProductList 直接响应错误。

func getProductList() ([]Product, error) {     return nil, errors.New("test error") } //打印 // 用户信息:<nil> // 商品信息:[]

那么最终打印的时候连用户信息都会为空,因为出现一个服务错误,用户服务请求被取消了。

一般情况下,在请求服务错误的时候我们会有保底操作,一个服务错误不能影响其他请求的结果。

所以在使用的时候具体处理取决于业务场景。

源码

既然用了,那么就追下源码吧。

func Finish(fns ...func() error) error {     if len(fns) == 0 {         return nil     }      return MapReduceVoid(func(source chan<- interface{}) {         for _, fn := range fns {             source <- fn         }     }, func(item interface{}, writer Writer, cancel func(error)) {         fn := item.(func() error)         if err := fn(); err != nil {             cancel(err)         }     }, func(pipe <-chan interface{}, cancel func(error)) {         drain(pipe)     }, WithWorkers(len(fns))) }
func MapReduceVoid(generator GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error {     _, err := MapReduce(generator, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) {         reducer(input, cancel)         drain(input)         // We need to write a placeholder to let MapReduce to continue on reducer done,         // otherwise, all goroutines are waiting. The placeholder will be discarded by MapReduce.         writer.Write(lang.Placeholder)     }, opts...)     return err }

对于 MapReduceVoid函数,主要查看三个闭包参数。

  • 第一个 GenerateFunc 用于生产数据。

  • MapperFunc 读取生产出的数据,进行处理。

  • VoidReducerFunc 这里表示不对 mapper 后的数据做聚合返回。所以这个闭包在此操作几乎0作用。

func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc, opts ...Option) (interface{}, error) {     source := buildSource(generate)      return MapReduceWithSource(source, mapper, reducer, opts...) }  func buildSource(generate GenerateFunc) chan interface{} {     source := make(chan interface{})// 创建无缓冲通道     threading.GoSafe(func() {         defer close(source)         generate(source) //开始生产数据     })      return source //返回无缓冲通道 }

buildSource函数中,返回一个无缓冲的通道。并开启一个 G 运行  generate(source),往无缓冲通道塞数据。这个generate(source) 不就是一开始 Finish 传递的第一个闭包参数。

return MapReduceVoid(func(source chan<- interface{}) {     // 就这个         for _, fn := range fns {             source <- fn         }     })

然后查看 MapReduceWithSource 函数,

func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc,     opts ...Option) (interface{}, error) {     options := buildOptions(opts...)     //任务执行结束通知信号     output := make(chan interface{})     //将mapper处理完的数据写入collector     collector := make(chan interface{}, options.workers)     // 取消操作信号     done := syncx.NewDoneChan()     writer := newGuardedWriter(output, done.Done())     var closeOnce sync.Once     var retErr errorx.AtomicError     finish := func() {         closeOnce.Do(func() {             done.Close()             close(output)         })     }     cancel := once(func(err error) {         if err != nil {             retErr.Set(err)         } else {             retErr.Set(ErrCancelWithNil)         }          drain(source)         finish()     })      go func() {         defer func() {             if r := recover(); r != nil {                 cancel(fmt.Errorf("%v", r))             } else {                 finish()             }         }()         reducer(collector, writer, cancel)         drain(collector)     }()     // 真正从生成器通道取数据执行Mapper     go executeMappers(func(item interface{}, w Writer) {         mapper(item, w, cancel)     }, source, collector, done.Done(), options.workers)      value, ok := <-output     if err := retErr.Load(); err != nil {         return nil, err     } else if ok {         return value, nil     } else {         return nil, ErrReduceNoOutput     } }

这段代码挺长的,我们说下核心的点。这里使用一个G 调用 executeMappers 方法。

go executeMappers(func(item interface{}, w Writer) {         mapper(item, w, cancel)     }, source, collector, done.Done(), options.workers)
func executeMappers(mapper MapFunc, input <-chan interface{}, collector chan<- interface{},     done <-chan lang.PlaceholderType, workers int) {     var wg sync.WaitGroup     defer func() {         // 等待所有任务全部执行完毕         wg.Wait()         // 关闭通道         close(collector)     }()    //根据指定数量创建 worker池     pool := make(chan lang.PlaceholderType, workers)      writer := newGuardedWriter(collector, done)     for {         select {         case <-done:             return         case pool <- lang.Placeholder:             // 从buildSource() 返回的无缓冲通道取数据             item, ok := <-input              // 当通道关闭,结束             if !ok {                 <-pool                 return             }              wg.Add(1)             // better to safely run caller defined method             threading.GoSafe(func() {                 defer func() {                     wg.Done()                     <-pool                 }()                 //真正运行闭包函数的地方                // func(item interface{}, w Writer) {                // mapper(item, w, cancel)                // }                 mapper(item, writer)             })         }     } }

具体的逻辑已备注,代码很容易懂。

一旦 executeMappers 函数返回,关闭 collector 通道,那么执行 reducer 不再阻塞。

go func() {         defer func() {             if r := recover(); r != nil {                 cancel(fmt.Errorf("%v", r))             } else {                 finish()             }         }()         reducer(collector, writer, cancel)         //这里         drain(collector)     }()

这里的 reducer(collector, writer, cancel) 其实就是从 MapReduceVoid 传递的第三个闭包函数。

func MapReduceVoid(generator GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error {     _, err := MapReduce(generator, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) {         reducer(input, cancel)         //这里         drain(input)         // We need to write a placeholder to let MapReduce to continue on reducer done,         // otherwise, all goroutines are waiting. The placeholder will be discarded by MapReduce.         writer.Write(lang.Placeholder)     }, opts...)     return err }

然后这个闭包函数又执行了 reducer(input, cancel),这里的 reducer 就是我们一开始解释过的 VoidReducerFunc,从  Finish() 而来。

web开发中怎样优雅地实现并发编排任务

等等,看到上面三个地方的 drain(input)了吗?

// drain drains the channel. func drain(channel <-chan interface{}) {     // drain the channel     for range channel {     } }

其实就是一个排空 channel 的操作,但是三个地方都对同一个 channel做同样的操作,也是让我费解。

还有更重要的一点。

go func() {         defer func() {             if r := recover(); r != nil {                 cancel(fmt.Errorf("%v", r))             } else {                 finish()             }         }()         reducer(collector, writer, cancel)         drain(collector)     }()

上面的代码,假如执行 reducer,writer 写入引发 panic,那么drain(collector) 将没有机会执行。

不过作者已经修复了这个问题,直接把 drain(collector) 放入到 defer。

web开发中怎样优雅地实现并发编排任务

具体 issues[1]。

到这里,关于 Finish 的源码也就结束了。感兴趣的可以看看其他源码。

很喜欢 go-zero 里的一些工具,但是工具往往并不独立,依赖于其他文件包,导致明明只想使用其中一个工具却需要安装整个包。

所以最终的结果就是扒源码,创建无依赖库工具集,遵循 MIT 即可。

看完上述内容,你们对WEB开发中怎样优雅地实现并发编排任务有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注编程网html频道,感谢大家的支持。

--结束END--

本文标题: web开发中怎样优雅地实现并发编排任务

本文链接: https://www.lsjlt.com/news/81552.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • web开发中怎样优雅地实现并发编排任务
    今天就跟大家聊聊有关web开发中怎样优雅地实现并发编排任务,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。业务场景在做任...
    99+
    2024-04-02
  • web开发中如何实现归并排序
    小编给大家分享一下web开发中如何实现归并排序,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!归并排序归并排序(Merge sort)是建立在归并操作上的一种有效的...
    99+
    2023-06-19
  • web开发中如何实现堆排序
    这篇文章主要为大家展示了“web开发中如何实现堆排序”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“web开发中如何实现堆排序”这篇文章吧。预备知识:堆结构堆是具有以下性质的完全二叉树:每个结点的...
    99+
    2023-06-19
  • PHP 并发处理中如何优雅地实现日志重定向?
    随着互联网的快速发展,Web 应用程序的规模越来越大,处理的数据量也越来越大。在这种情况下,为了提高应用程序的性能和响应速度,我们需要采用并发处理技术来处理大量的请求。 然而,并发处理也带来了一些问题,例如:多个并发请求可能会同时写入日志...
    99+
    2023-06-30
    并发 日志 重定向
  • web开发中如何实现插入排序
    这篇文章将为大家详细讲解有关web开发中如何实现插入排序,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。插入排序插入排序的代码实现虽然没有冒泡排序和选择排序那么简单粗暴,但它的原理应该是最容易理解的了,因为...
    99+
    2023-06-19
  • web开发中如何实现希尔排序
    小编给大家分享一下web开发中如何实现希尔排序,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!希尔排序希尔排序,也称递减增量排序算法,是插入排序的一种更高效的改进版...
    99+
    2023-06-19
  • web开发中如何实现快速排序
    小编给大家分享一下web开发中如何实现快速排序,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!快速排序快速排序是由东尼·霍尔所发展的一种排序算法。在平均状况下,排序...
    99+
    2023-06-19
  • web开发中如何实现选择排序
    这篇文章主要为大家展示了“web开发中如何实现选择排序”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“web开发中如何实现选择排序”这篇文章吧。 选择排序选择排序是一种简单直观的排序算法...
    99+
    2023-06-19
  • Go语言中的并发编程:如何优雅地处理Bash对象?
    随着计算机硬件性能的不断提升,我们需要的软件系统也越来越复杂,对并发编程的需求也越来越强烈。Go语言作为一门天生支持并发编程的语言,其在处理Bash对象上的表现也是十分优雅的。 Bash对象是指在Unix/Linux系统中,每个进程都有一...
    99+
    2023-06-27
    并发 bash 对象
  • web开发中如何实现示冒泡排序
    这篇文章给大家分享的是有关web开发中如何实现示冒泡排序的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。冒泡排序(Bubble Sort)也是一种简单直观的排序算法。它重复地走访过要排序的数列,一次比较两个元素,如...
    99+
    2023-06-19
  • node.js中怎么实现web开发
    这篇文章给大家介绍node.js中怎么实现web开发,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。1.express框架安装1)在node命令行模式下输入以下命令npm in...
    99+
    2024-04-02
  • 如何在Go语言中优雅地使用chan通道进行并发编程
    在Go语言中,chan通道是用来在多个goroutine之间进行数据传递和通信的重要工具。通过使用chan通道,可以很容易地实现并发编程,提高程序的性能和效率。本文将详细介绍如何在Go...
    99+
    2024-03-13
    go语言 并发 chan通道
  • web开发中怎么实现水印
    小编给大家分享一下web开发中怎么实现水印,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧! 日常工作中...
    99+
    2024-04-02
  • 如何在Django中优雅地使用Python函数和IDE实现快速开发?
    Django是一个基于Python的Web框架,已经成为了开发Web应用程序的首选工具之一。它的优点在于它提供了快速开发Web应用程序所需的一切。在Django中,使用Python函数可以帮助我们更快地开发Web应用程序。本文将探讨如何在D...
    99+
    2023-10-13
    函数 django ide
  • Python 并发编程:实现高效率的任务处理。
    Python 并发编程:实现高效率的任务处理 随着计算机硬件性能的提高,我们可以同时执行更多的任务,从而提高应用程序的性能。Python 并发编程是一种利用多线程、多进程、协程等技术实现高效率任务处理的方法。本文将介绍 Python 并发编...
    99+
    2023-08-09
    并发 apache django
  • Golang并发编程:利用Go WaitGroup实现任务队列
    Go语言的WaitGroup是用来等待一组goroutine执行完毕的工具,可以用来实现任务队列。下面是一个使用WaitGroup实...
    99+
    2023-10-08
    Golang
  • web开发中如何实现条件注释样式
    小编给大家分享一下web开发中如何实现条件注释样式,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!作为前端工程师的我们来说,IE对我们来说一定是不陌生的。在编写代码...
    99+
    2023-06-08
  • PHP并发编程中,如何实现分布式实时任务调度?
    PHP作为一种脚本语言,在Web开发中发挥了重要的作用。在开发过程中,我们经常需要处理一些实时的任务,比如定时任务、爬虫任务、消息通知等等。这些任务需要在分布式系统中进行调度,确保任务的准确执行。在本文中,我们将介绍如何使用PHP实现分布式...
    99+
    2023-11-01
    并发 分布式 实时
  • Golang并发编程:使用Go WaitGroup实现任务调度器
    任务调度器是一种常见的并发编程模式,它用于同时执行多个任务并等待所有任务完成后再继续执行其他操作。在Golang中,可以使用sync...
    99+
    2023-10-20
    Golang
  • web开发中的分布式事务是怎样的
    web开发中的分布式事务是怎样的,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。事务(Transaction):一般是指要做的或...
    99+
    2024-04-02
软考高级职称资格查询
推荐阅读
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作