go 并发与并行

  • 并发(concurrency) 并发的关注点在于任务切分。举例来说,你是一个创业公司的CEO,开始只有你一个人,你一人分饰多角,一会做产品规划,一会写代码,一会见客户,虽然你不能见客户的同时写代码,但由于你切分了任务,分配了时间片,表现出来好像是多个任务一起在执行。

  • 并行(parallelism) 并行的关注点在于同时执行。还是上面的例子,你发现你自己太忙了,时间分配不过来,于是请了工程师,产品经理,市场总监,各司一职,这时候多个任务可以同时执行了。

GreenThread

  • 用户空间 首先是在用户空间,避免内核态和用户态的切换导致的成本。

  • 由语言或者框架层调度

  • 更小的栈空间允许创建大量实例(百万级别)

几个概念

  • Continuation 这个概念不熟悉 FP 编程的人可能不太熟悉,不过这里可以简单的顾名思义,可以理解为让我们的程序可以暂停,然后下次调用继续(contine)从上次暂停的地方开始的一种机制。相当于程序调用多了一种入口。

  • Coroutine 是 Continuation 的一种实现,一般表现为语言层面的组件或者类库。主要提供 yield,resume 机制。

  • Fiber 和 Coroutine 其实是一体两面的,主要是从系统层面描述,可以理解成 Coroutine 运行之后的东西就是 Fiber。

Goroutine

Goroutine 其实就是前面 GreenThread 系列解决方案的一种演进和实现。

  • 首先,它内置了 Coroutine 机制。因为要用户态的调度,必须有可以让代码片段可以暂停/继续的机制。

  • 其次,它内置了一个调度器,实现了 Coroutine 的多线程并行调度,同时通过对网络等库的封装,对用户屏蔽了调度细节。

  • 最后,提供了 Channel 机制,用于 Goroutine 之间通信,实现 CSP 并发模型(Communicating Sequential Processes)。因为 Go 的 Channel 是通过语法关键词提供的,对用户屏蔽了许多细节。其实 Go 的 Channel 和 Java 中的 SynchronousQueue 是一样的机制,如果有 buffer 其实就是 ArrayBlockQueue。

Goroutine 调度器

这个图一般讲 Goroutine 调度器的地方都会引用,想要仔细了解的可以看看原博客(小编:点击阅读原文获取)。这里只说明几点:

  1. M 代表系统线程,P 代表处理器(核),G 代表 Goroutine。Go 实现了 M : N 的调度,也就是说线程和 Goroutine 之间是多对多的关系。这点在许多GreenThread / Coroutine 的调度器并没有实现。比如 Java 1.1 版本之前的线程其实是 GreenThread(这个词就来源于 Java),但由于没实现多对多的调度,也就是没有真正实现并行,发挥不了多核的优势,所以后来改成基于系统内核的 Thread 实现了。

  2. 某个系统线程如果被阻塞,排列在该线程上的 Goroutine 会被迁移。当然还有其他机制,比如 M 空闲了,如果全局队列没有任务,可能会从其他 M 偷任务执行,相当于一种 rebalance 机制。这里不再细说,有需要看专门的分析文章。

  3. 具体的实现策略和我们前面分析的机制类似。系统启动时,会启动一个独立的后台线程(不在 Goroutine 的调度线程池里),启动 netpoll 的轮询。当有 Goroutine 发起网络请求时,网络库会将 fd(文件描述符)和 pollDesc(用于描述 netpoll 的结构体,包含因为读 / 写这个 fd 而阻塞的 Goroutine)关联起来,然后调用 runtime.gopark 方法,挂起当前的 Goroutine。当后台的 netpoll 轮询获取到 epoll(Linux 环境下)的 event,会将 event 中的 pollDesc 取出来,找到关联的阻塞 Goroutine,并进行恢复。

Goroutine 是银弹么?

Goroutine 很大程度上降低了并发的开发成本,是不是我们所有需要并发的地方直接 go func 就搞定了呢?

Go 通过 Goroutine 的调度解决了 CPU 利用率的问题。但遇到其他的瓶颈资源如何处理?比如带锁的共享资源,比如数据库连接等。互联网在线应用场景下,如果每个请求都扔到一个 Goroutine 里,当资源出现瓶颈的时候,会导致大量的 Goroutine 阻塞,最后用户请求超时。这时候就需要用 Goroutine 池来进行控流,同时问题又来了:池子里设置多少个 Goroutine 合适?

所以这个问题还是没有从更本上解决。

go没有严格的内置的logical processor数量限制,但是go的runtime默认限制了每个program最多使用10,000个线程,可以通过SetMaxThreads修改.下图展示了Concurrency和Parallelism的区别 

goroutine使用

go块

go的用法很简单,如下. 如果没有最外面的括号{}(),会显示go块必须是一个函数调用.没有()只是一个函数的声明,有了()是一个调用(没有参数的)

go func() {
  for _,n := range nums {
    out <- n
  }
  close(out)
}()

channel

channel默认上是阻塞的,也就是说,如果Channel满了,就阻塞写,如果Channel空了,就阻塞读。于是,我们就可以使用这种特性来同步我们的发送和接收端。

channel <-,发送一个新的值到通道中 <-channel,从通道中接收一个值,这个更像有两层含义,一个是会返回一个结果,当做赋值来用:msg := <-channel;另外一个含义是等待这个channel发送消息,所以还有一个等的含义在.所以如果你直接写fmt.Print(<-channel)本意只是想输出下这个chan传来的值,但是其实他还会阻塞住等着channel来发.

默认发送和接收操作是阻塞的,直到发送方和接收方都准备完毕。

    func main() {
        messages := make(chan string)
        go func() { messages <- "ping" }()
        msg := <-messages
        fmt.Println(msg)
    }

所以你要是这么写:是一辈子都不会执行到print的(会死锁)

    func main() {
        messages := make(chan string)
        messages <- "ping"
        msg := <-messages
        fmt.Println(msg)
    }

所以在一个go程中,发送messages <- "msg"channel的时候,要格外小心,不然一不留神就死锁了.(解决方法:1. 用带缓存的chan; 2. 使用带有default的select发送)

    select {
    case messages <- "msg":
        fmt.Println("sent message")
    default:
        fmt.Println("no message sent")
    }

range

用于channel的range是阻塞的.下面程序会显示deadloc,去掉注释就好了.

    queue := make(chan string, 2)
    //queue <- "one"
    //queue <- "two"
    //close(queue)
    for elem := range queue {
      fmt.Println(elem)
    }

通道缓冲

加了缓存之后,就像你向channel发送消息的时候(message <- "ping"),“ping”就已经发送出去了(到缓存).就像一个异步的队列?到时候,<-message直接从缓存中取值就好了(异步…)

但是你要这么写,利用通道缓冲,就可以.无缓冲的意味着只有在对应的接收(<-chan)通道准备好接收时,才允许发送(chan <-),可缓存通道允许在没有对应接收方的情况下,缓存限定数量的值。

    func main() {
      message := make(chan string,1)
      message <- "ping"
      msg := <-message
      fmt.Print(msg)
    }

要是多发一个messages <- "channel",fatal error: all goroutines are asleep - deadlock!,要是多接受一个fmt.Println(<-messages),会打印出buffered channel,然后报同样的error

    func main() {
        messages := make(chan string, 2)
        messages <- "buffered"
        messages <- "channel"
        fmt.Println(<-messages)
        fmt.Println(<-messages)
    }

通道同步

使用通道同步,如果你把 <- done 这行代码从程序中移除,程序甚至会在 worker还没开始运行时就结束了。

    func worker(done chan bool) {
        fmt.Print("working...")
        time.Sleep(time.Second) // working
        fmt.Println("done")
        done <- true
    }
    func main() {
        done := make(chan bool, 1)
        go worker(done)
        <-done //blocking 阻塞在这里,知道worker执行完毕
    }

发送方向

可以指定这个通道是不是只用来发送或者接收值。这个特性提升了程序的类型安全性。pong 函数允许通道(pings)来接收数据,另一通道(pongs)来发送数据。

    func ping(pings chan<- string, msg string) {
        pings <- msg
    }
    
    func pong(pings <-chan string, pongs chan<- string) {
        msg := <-pings
        pongs <- msg
    }
    
    func main() {
        pings := make(chan string, 1)
        pongs := make(chan string, 1)
        ping(pings, "passed message")
        pong(pings, pongs)
        fmt.Println(<-pongs)
    }

select

Go 的select 让你可以同时等待多个通道操作。(poll/epoll?) 注意select 要么写个死循环用超时,要不就定好次数.或者加上default让select变成非阻塞的

    go func() {
        time.Sleep(time.Second * 1)
        c1 <- "one"
    }()
    
    go func() {
        time.Sleep(time.Second * 2)
        c2 <- "two"
    }()
    
    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-c1:
            fmt.Println("received", msg1)
        case msg2 := <-c2:
            fmt.Println("received", msg2)
        }
    }

超时处理

其中time.After返回<-chan Time,直接向select发送消息

    select {
    case res := <-c1:
        fmt.Println(res)
    case <-time.After(time.Second * 1):
        fmt.Println("timeout 1")
    }

非阻塞通道操作

default,当监听的channel都没有准备好的时候,默认执行的.

    select {
    case msg := <-messages:
        fmt.Println("received message", msg)
    default:
        fmt.Println("no message received")
    }

可以使用 select 语句来检测 chan 是否已经满了

    ch := make (chan int, 1)
    ch <- 1
    select {
    case ch <- 2:
    default:
        fmt.Println("channel is full !")
    }

通道关闭

一个非空的通道也是可以关闭的,但是通道中剩下的值仍然可以被接收到

    queue := make(chan string, 2)
    queue <- "one"
    queue <- "two"
    close(queue)
    for elem := range queue {
        fmt.Println(elem)
    }

定时器

在未来某一刻执行一次时使用的

定时器表示在未来某一时刻的独立事件。你告诉定时器需要等待的时间,然后它将提供一个用于通知的通道。可以显示的关闭

    timer1 := time.NewTimer(time.Second * 2)
    <-timer1.C

<-timer1.C 直到这个定时器的通道 C 明确的发送了定时器失效的值(2s)之前,将一直阻塞。如果你只是要单纯的等待用time.Sleep,定时器是可以在它失效之前把它给取消的stop2 := timer2.Stop()

打点器

当你想要在固定的时间间隔重复执行,定时的执行,直到我们将它停止

    func main() {
        //打点器和定时器的机制有点相似:一个通道用来发送数据。这里我们在这个通道上使用内置的 range 来迭代值每隔500ms 发送一次的值。
        ticker := time.NewTicker(time.Millisecond * 500)
        go func() {
            for t := range ticker.C {
                fmt.Println("Tick at", t)
            }
        }()
    
        //打点器可以和定时器一样被停止。一旦一个打点停止了,将不能再从它的通道中接收到值。我们将在运行后 1600ms停止这个打点器。
        time.Sleep(time.Millisecond * 1600)
        ticker.Stop()
        fmt.Println("Ticker stopped")
    }

生成器

类似于提供了一个服务,不过只是适用于调用不是很频繁

    func rand_generator_2() chan int {
        out := make(chan int)
        go func() {
            for {
                out <- rand.Int()
            }
        }()
        return out
    }
    
    func main() {
        // 生成随机数作为一个服务
        rand_service_handler := rand_generator_2()
        fmt.Printf("%dn", <-rand_service_handler)
    }

多路复用

Apache使用处理每个连接都需要一个进程,所以其并发性能不是很好。而Nighx使用多路复用的技术,让一个进程处理多个连接,所以并发性能比较好。

多路复用技术可以用来整合多个通道。提升性能和操作的便捷。

其实就是整合了多个上面的生成器

    func rand_generator_3() chan int {
        rand_generator_1 := rand_generator_2()
        rand_generator_2 := rand_generator_2()
        out := make(chan int)
    
        go func() {
            for {
                //读取生成器1中的数据,整合
                out <- <-rand_generator_1
            }
        }()
        go func() {
            for {
                //读取生成器2中的数据,整合
                out <- <-rand_generator_2
            }
        }()
        return out
    }

Furture技术

可以在不准备好参数的情况下调用函数。函数调用和函数参数准备这两个过程可以完全解耦。可以在调用的时候不关心数据是否准备好,返回值是否计算好的问题。让程序中的组件在准备好数据的时候自动跑起来。 这个最后取得<-q.result也是可以放到execQuery上面的把

Furture技术可以和各个其他技术组合起来用。可以通过多路复用技术,监听多个结果Channel,当有结果后,自动返回。也可以和生成器组合使用,生成器不断生产数据,Furture技术逐个处理数据。Furture技术自身还可以首尾相连,形成一个并发的pipe filter。这个pipe filter可以用于读写数据流,操作数据流。

    type query struct {
        sql chan string
        result chan string
    }
    
    func execQuery(q query) {
        go func() {
            sql := <-q.sql
            q.result <- "get " + sql
        }()
    
    }
    
    func main() {
        q := query{make(chan string, 1), make(chan string, 1)}
        execQuery(q)
    
        //准备参数
        q.sql <- "select * from table"
        fmt.Println(<-q.result)
    }

Chain Filter技术

程序创建了10个Filter,每个分别过滤一个素数,所以可以输出前10个素数。

    func Generate(ch chan<- int) {
        for i := 2; ; i++ {
            ch <- i 
        }
    }
    
    func Filter(in <-chan int, out chan<- int, prime int) {
        for {
            i := <-in // Receive value from 'in'.
            if i%prime != 0 {
                out <- i // Send 'i' to 'out'.
            }
        }
    }
    
    // The prime sieve: Daisy-chain Filter processes.
    func main() {
        ch := make(chan int) // Create a new channel.
        go Generate(ch)      // Launch Generate goroutine.
        for i := 0; i < 10; i++ {
            prime := <-ch
            print(prime, "n")
            ch1 := make(chan int)
            go Filter(ch, ch1, prime)
            ch = ch1
        }
    }

共享变量

有些时候使用共享变量可以让代码更加简洁

    type sharded_var struct {
        reader chan int
        writer chan int
    }
    
    func sharded_var_whachdog(v sharded_var) {//共享变量维护协程
        go func() {
            var value int = 0
            for { //监听读写通道,完成服务
                select {
                case value = <-v.writer:
                case v.reader <- value:
                }
            }
        }()
    }
    
    func main() {
        v := sharded_var{make(chan int), make(chan int)} //初始化,并开始维护协程
        sharded_var_whachdog(v)
    
        fmt.Println(<-v.reader)
        v.writer <- 1
        fmt.Println(<-v.reader)
    }

Concurrency patterns

下面介绍了一些常用的并发模式.

Runner

当你的程序会运行在后台,可以是cron job或者是Iron.io这样的worker-based云环境.这个程序就可以监控和中断你的程序,如果你的程序运行的太久了.

定义了三个channel来通知任务状态.

  • interrupt:接收系统的终止信号(比如ctrl-c),接收到之后系统就优雅的退出
  • complete:指示任务完成状态或者返回错误
  • timeout:当超时了之后,系统就优雅的退出

tasks是一个函数类型的slice,你可以往里面存放签名为func funcName(id int){}的函数,作为你的任务.task(id)就是在执行任务了(当然只是用来模拟任务,可以定义一个任务接口来存放任务,此处是为了简便). 注意tasks里面的任务是串行执行的,这些任务的执行发生在一个单独的goroutine中.

New方法里的interrupt channel buffer设置为1,也就是说当用户重复ctrl+c的时候,程序也只会收到一个信号,其他的信号会被丢弃.

在run()方法中,在开始执行任务前(task(id)),会前检查执行流程有没有被中断(if r.gotInterrupt() {}),这里用了一个带default语句的select.一旦收到中断的事件,程序就不再接受任何其他事件了(signal.Stop(r.interrupt)).

在Start()方法中,在go块中执行run()方法,任何当前的goroutine会阻塞在select这边,直到收到run()返回的complete channel或者超时返回.

    // Runner runs a set of tasks within a given timeout and can be shut down on an operating system interrupt.
    type Runner struct {
        // interrupt channel reports a signal from the operating system.
        interrupt chan os.Signal
    
        // complete channel reports that processing is done.
        complete chan error
    
        // timeout reports that time has run out.
        timeout <-chan time.Time
    
        // tasks holds a set of functions that are executed
        // synchronously in index order.
        tasks []func(int)
    }
    
    // ErrTimeout is returned when a value is received on the timeout channel.
    var ErrTimeout = errors.New("received timeout")
    
    // ErrInterrupt is returned when an event from the OS is received.
    var ErrInterrupt = errors.New("received interrupt")
    
    // New returns a new ready-to-use Runner.
    func New(d time.Duration) *Runner {
        return &Runner{
            interrupt: make(chan os.Signal, 1),
            complete:  make(chan error),
            timeout:   time.After(d),
        }
    }
    
    // Add attaches tasks to the Runner. A task is a function that takes an int ID. ...表示可以传入多个参数
    func (r *Runner) Add(tasks ...func(int)) { 
        r.tasks = append(r.tasks, tasks...)
    }
    
    // Start runs all tasks and monitors channel events.
    func (r *Runner) Start() error {
        // We want to receive all interrupt based signals.
        signal.Notify(r.interrupt, os.Interrupt)
    
        // Run the different tasks on a different goroutine.
        go func() {
            r.complete <- r.run()
        }()
    
        select {
        // Signaled when processing is done.
        case err := <-r.complete:
            return err
    
        // Signaled when we run out of time.
        case <-r.timeout:
            return ErrTimeout
        }
    }
    
    // run executes each registered task.
    func (r *Runner) run() error {
        for id, task := range r.tasks {
            // Check for an interrupt signal from the OS.
            if r.gotInterrupt() {
                return ErrInterrupt
            }
    
            // Execute the registered task.
            task(id)
        }
    
        return nil
    }
    
    // gotInterrupt verifies if the interrupt signal has been issued.
    func (r *Runner) gotInterrupt() bool {
        select {
        // Signaled when an interrupt event is sent.
        case <-r.interrupt:
            // Stop receiving any further signals.
            signal.Stop(r.interrupt)
            return true
    
        // Continue running as normal.
        default:
            return false
        }
    }

main方法

    const timeout = 3 * time.Second
    
    // main is the entry point for the program.
    func main() {
        log.Println("Starting work.")
    
        // Create a new timer value for this run.
        r := runner.New(timeout)
    
        // Add the tasks to be run.
        r.Add(createTask(), createTask(), createTask())
    
        // Run the tasks and handle the result.
        if err := r.Start(); err != nil {
            switch err {
            case runner.ErrTimeout:
                log.Println("Terminating due to timeout.")
                os.Exit(1)
            case runner.ErrInterrupt:
                log.Println("Terminating due to interrupt.")
                os.Exit(2)
            }
        }
    
        log.Println("Process ended.")
    }
    
    // createTask returns an example task that sleeps for the specified
    // number of seconds based on the id.
    func createTask() func(int) {
        return func(id int) {
            log.Printf("Processor - Task #%d.", id)
            time.Sleep(time.Duration(id) * time.Second)
        }
    }

Pooling

当你有一些特定的资源要共享,比如数据库连接或者内存buffers,这个模式就非常有用

goroutine要用一个资源,就去pool中去拿,用完了就还回去.

例子中的资源是只要实现了io.Closer接口即可.

  • m用来保证多goroutine下对Poll的操作都是value-safe的.
  • resources将会是一个buffered channel,会包含将要分享的资源.
  • factory的作用是创建一个新的资源,当poll有需要的时候.
  • closed用来指示pool有无被关闭

New函数接受一个用来创建新资源的函数对象(fn func() (io.Closer, error),返回一个资源)还有一个size参数.

Acquire函数先从pool中取资源,要是取不到用factory新建一个

    func (p *Pool) Acquire() (io.Closer, error) {
        select {
        // Check for a free resource.
        case r, _ := <-p.resources:
            return r, nil
    
        // Provide a new resource since there are none available.
        default:
            return p.factory()
        }
    }

Release函数:如果pool已经关闭,就直接return.否则就向resource这个buffered channel里发送要释放的资源.default语句是如果resource已经满了,就关闭这个pool.

Close函数:当程序运行完关闭pool的时候,应该调用Close函数,这个函数首先关闭resource这个buffered channel,然后再把buffered channel中的任务关闭(io.Closer).注意这个加锁.

    // Pool manages a set of resources that can be shared safely by multiple goroutines.
    // The resource being managed must implement  the io.Closer interface.
    type Pool struct {
        m         sync.Mutex
        resources chan io.Closer
        factory   func() (io.Closer, error)
        closed    bool
    }
    
    // ErrPoolClosed is returned when an Acquire returns on a closed pool.
    var ErrPoolClosed = errors.New("Pool has been closed.")
    
    // New creates a pool that manages resources. A pool requires a
    // function that can allocate a new resource and the size of the pool.
    func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
        if size <= 0 {
            return nil, errors.New("Size value too small.")
        }
    
        return &Pool{
            factory:   fn,
            resources: make(chan io.Closer, size),
        }, nil
    }
    
    // Acquire retrieves a resource    from the pool.
    func (p *Pool) Acquire() (io.Closer, error) {
        select {
        // Check for a free resource.
        case r, ok := <-p.resources:
            log.Println("Acquire:", "Shared Resource")
            if !ok {
                return nil, ErrPoolClosed
            }
            return r, nil
    
        // Provide a new resource since there are none available.
        default:
            log.Println("Acquire:", "New Resource")
            return p.factory()
        }
    }
    
    // Release places a new resource onto the pool.
    func (p *Pool) Release(r io.Closer) {
        // Secure this operation with the Close operation.
        p.m.Lock()
        defer p.m.Unlock()
    
        // If the pool is closed, discard the resource.
        if p.closed {
            r.Close()
            return
        }
    
        select {
        // Attempt to place the new resource on the queue.
        case p.resources <- r:
            log.Println("Release:", "In Queue")
    
        // If the queue is already at cap we close the resource.
        default:
            log.Println("Release:", "Closing")
            r.Close()
        }
    }
    
    // Close will shutdown the pool and close all existing resources.
    func (p *Pool) Close() {
        // Secure this operation with the Release operation.
        p.m.Lock()
        defer p.m.Unlock()
    
        // If the pool is already close, don't do anything.
        if p.closed {
            return
        }
    
        // Set the pool as closed.
        p.closed = true
    
        // Close the channel before we drain the channel of its
        // resources. If we don't do this, we will have a deadlock.
        close(p.resources)
    
        // Close the resources
        for r := range p.resources {
            r.Close()
        }
    }

main

    const (
        maxGoroutines   = 25 // the number of routines to use.
        pooledResources = 2  // number of resources in the pool
    )
    
    // dbConnection simulates a resource to share.
    type dbConnection struct {
        ID int32
    }
    
    // Close implements the io.Closer interface so dbConnection can be managed by the pool. Close performs any resource release management.
    func (dbConn *dbConnection) Close() error {
        log.Println("Close: Connection", dbConn.ID)
        return nil
    }
    
    // idCounter provides support for giving each connection a unique id.
    var idCounter int32
    
    // createConnection is a factory method that will be called by the pool when a new connection is needed.
    func createConnection() (io.Closer, error) {
        id := atomic.AddInt32(&idCounter, 1)
        log.Println("Create: New Connection", id)
    
        return &dbConnection{id}, nil
    }
    
    // main is the entry point for all Go programs.
    func main() {
        var wg sync.WaitGroup
        wg.Add(maxGoroutines)
    
        // Create the pool to manage our connections.
        p, err := pool.New(createConnection, pooledResources)
        if err != nil {
            log.Println(err)
        }
    
        // Perform queries using connections from the pool.
        for query := 0; query < maxGoroutines; query++ {
            // Each goroutine needs its own copy of the query value else they will all be sharing the same query variable.
            go func(q int) {
                performQueries(q, p)
                wg.Done()
            }(query)
        }
    
        // Wait for the goroutines to finish.
        wg.Wait()
    
        // Close the pool.
        log.Println("Shutdown Program.")
        p.Close()
    }
    
    // performQueries tests the resource pool of connections.
    func performQueries(query int, p *pool.Pool) {
        // Acquire a connection from the pool.
        conn, err := p.Acquire()
        if err != nil {
            log.Println(err)
            return
        }
    
        // Release the connection back to the pool.
        defer p.Release(conn)
    
        // Wait to simulate a query response.
        time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
        log.Printf("Query: QID[%d] CID[%d]\n", query, conn.(*dbConnection).ID)
    }

Work

New函数开启了固定个数(maxGoroutines)个goroutine,注意这边work是一个unbuffered channel.这个for range会阻塞直到channel中有值可以取.要是work这个channel被关闭了,这个for range就结束,然后调用wg.Done

Run函数提交任务到pool中去w.work <- w.注意这个work是一个unbuffered channel,所以得等一个goroutine把它取走,否则会阻塞住.这是我们需要保证的,因为我们想要调用者保证这个任务被提交之后立即开始运行

    type Worker interface {
        Task()
    }
    
    // Pool provides a pool of goroutines that can execute any Worker
    // tasks that are submitted.
    type Pool struct {
        work chan Worker
        wg   sync.WaitGroup
    }
    
    // New creates a new work pool.
    func New(maxGoroutines int) *Pool {
        p := Pool{
            work: make(chan Worker),
        }
    
        p.wg.Add(maxGoroutines)
        for i := 0; i < maxGoroutines; i++ {
            go func() {
                for w := range p.work {
                    w.Task()
                }
                p.wg.Done()
            }()
        }
    
        return &p
    }
    
    // Run submits work to the pool.
    func (p *Pool) Run(w Worker) {
        p.work <- w
    }
    
    // Shutdown waits for all the goroutines to shutdown.
    func (p *Pool) Shutdown() {
        close(p.work)
        p.wg.Wait()
    }

main

    // names provides a set of names to display.
    var names = []string{
        "steve",
        "bob",
        "mary",
        "therese",
        "jason",
    }
    
    // namePrinter provides special support for printing names.
    type namePrinter struct {
        name string
    }
    
    // Task implements the Worker interface.
    func (m *namePrinter) Task() {
        log.Println(m.name)
        time.Sleep(time.Second)
    }
    
    // main is the entry point for all Go programs.
    func main() {
        // Create a work pool with 2 goroutines.
        p := work.New(2)
    
        var wg sync.WaitGroup
        wg.Add(100 * len(names))
    
        for i := 0; i < 100; i++ {
            // Iterate over the slice of names.
            for _, name := range names {
                // Create a namePrinter and provide the
                // specific name.
                np := namePrinter{
                    name: name,
                }
    
                go func() {
                    // Submit the task to be worked on. When RunTask
                    // returns we know it is being handled.
                    p.Run(&np)
                    wg.Done()
                }()
            }
        }
    
        wg.Wait()
    
        // Shutdown the work pool and wait for all existing work
        // to be completed.
        p.Shutdown()
    }

另一种worker的写法

创建一个二级channel系统,一个来queue job,另外一个来控制使用多少个worker来并发操作JobQueue。把工作发送到工作队列中去JobQueue <- work

    var (
        MaxWorker = os.Getenv("MAX_WORKERS")
        MaxQueue  = os.Getenv("MAX_QUEUE")
    )
    
    // Job represents the job to be run
    type Job struct {
        Payload Payload
    }
    
    // A buffered channel that we can send work requests on.
    var JobQueue chan Job
    
    // Worker represents the worker that executes the job
    type Worker struct {
        WorkerPool  chan chan Job
        JobChannel  chan Job
        quit        chan bool
    }
    
    func NewWorker(workerPool chan chan Job) Worker {
        return Worker{
            WorkerPool: workerPool,
            JobChannel: make(chan Job),
            quit:       make(chan bool)}
    }
    
    // Start method starts the run loop for the worker, listening for a quit channel in case we need to stop it
    func (w Worker) Start() {
        go func() {
            for {
                // register the current worker into the worker queue.
                w.WorkerPool <- w.JobChannel
    
                select {
                case job := <-w.JobChannel:
                    // we have received a work request.
                    if err := job.Payload.UploadToS3(); err != nil {
                        log.Errorf("Error uploading to S3: %s", err.Error())
                    }
    
                case <-w.quit:
                    // we have received a signal to stop
                    return
                }
            }
        }()
    }
    
    // Stop signals the worker to stop listening for work requests.
    func (w Worker) Stop() {
        go func() {
            w.quit <- true
        }()
    }

我们创建一个Dispatcher,然后调用Run()函数创建一个worker池子,然后开始监听JobQueue中的job。dispatcher.Run()(这个类似资源池)

    type Dispatcher struct {
        // A pool of workers channels that are registered with the dispatcher
        WorkerPool chan chan Job
    }
    
    func NewDispatcher(maxWorkers int) *Dispatcher {
        pool := make(chan chan Job, maxWorkers)
        return &Dispatcher{WorkerPool: pool}
    }
    
    func (d *Dispatcher) Run() {
        // starting n number of workers
        for i := 0; i < d.maxWorkers; i++ {
            worker := NewWorker(d.pool)
            worker.Start()
        }
    
        go d.dispatch()
    }
    
    func (d *Dispatcher) dispatch() {
        for {
            select {
            case job := <-JobQueue:
                // a job request has been received
                go func(job Job) {
                    // try to obtain a worker job channel that is available.
                    // this will block until a worker is idle
                    jobChannel := <-d.WorkerPool
    
                    // dispatch the job to the worker job channel
                    jobChannel <- job
                }(job)
            }
        }
    }