50行代码实现一个go并发包


写在前面

这是一个只需要用50行代码(核心代码只有15行)实现的极其简单(原理简单、使用方法简单、功能简单)的go包mini_parallel_job,适合大部分并发任务,开箱即用。

代码

package mini_parallel_job

import (
    "fmt"
    "sync"
)

type JobType func()

type JobPool interface {
    AddJob(jobType JobType)
    Wait()
}

type jobPool struct {
    jobs []JobType
}

// 添加任务
func (j *jobPool) AddJob(job JobType) {
    j.jobs = append(j.jobs, job)
}

// 开始并且等待任务
func (j *jobPool) Wait() {
    var wg sync.WaitGroup
    wg.Add(len(j.jobs))
    for i := range j.jobs {
       jJob := j.jobs[i]
       go func() {
          defer func() {
             wg.Done()
             if err := recover(); err != nil {
                fmt.Printf("err:%+v", err)
             }
          }()

          jJob()
       }()
    }
    wg.Wait()
}

func NewJobPool() JobPool {
    return &jobPool{
       jobs: make([]JobType, 0),
    }
}

压测

package mini_parallel_job

import (
    "testing"
)

const (
    Count = 10
)

// 并行任务
func parallelJob() {
    jobPool := NewJobPool()
    for i := 0; i < Count; i++ {
       jobPool.AddJob(func() {
          _ = fib(10)
       })
    }
    jobPool.Wait()
}

// 串行任务
func serialJob() {
    for i := 0; i < Count; i++ {
       _ = fib(10)
    }
}

// 任务
func fib(n int) int {
    if n == 0 || n == 1 {
       return n
    }
    return fib(n-2) + fib(n-1)
}

// 性能测试
func BenchmarkSerialJob(b *testing.B) {
    for i := 0; i < b.N; i++ {
       serialJob()
    }
}

func BenchmarkParallelJob(b *testing.B) {
    for i := 0; i < b.N; i++ {
       parallelJob()
    }
}

/*
BenchmarkSerialJob-12             298855              3756 ns/op
BenchmarkParallelJob-12           117189              8710 ns/op
*/

example

package main

import (
    "fmt"
    mini_parallel_job "mini-parallel-job"
    "time"
)

const (
    JobCount = 10
)

func main() {
    // 串行执行
    begin1 := time.Now()
    for i := 0; i < JobCount; i++ {
       fib(40)
    }
    fmt.Println(time.Since(begin1))

    // 并行执行
    begin2 := time.Now()
    parallelJob := mini_parallel_job.NewJobPool()
    for i := 0; i < JobCount; i++ {
       parallelJob.AddJob(func() {
          fib(40)
       })
    }
    parallelJob.Wait()
    fmt.Println(time.Since(begin2))

    /*
       结果:
       7.335989407s
       1.112108503s
    */
}

// 任务
func fib(n int) int {
    if n == 0 || n == 1 {
       return n
    }
    return fib(n-2) + fib(n-1)
}

总结

这段代码仅仅实现使用go rountine实现并发,sync.WaitGroup实现等待。

在大多数场景中,只需要并发,并不关心并发量是多少,大多数程序员也是使用Wait函数那段代码实现的(至少作者在项目中看到都是这样的,并且有多处相同的代码,基于此场景封装了一下)。

如果要实现复杂一点的场景,比如控制最大并发量,可以稍微对上述代码做一些修改,Wait函数中加一个指定大小的chan来控制。或者参考作者另外一个对go并发的封装gopool,使用master-worker模式实现的并发控制。

参考

[1]mini_parallel_job

2]gopool


文章作者: Alex
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Alex !
  目录