写在前面
这是一个只需要用50行代码(核心代码只有15行)实现的极其简单(原理简单、使用方法简单、功能简单)的go包mini_parallel_job,适合大部分并发任务,开箱即用。
代码
package mini_parallel_job
import (
"fmt"
"sync"
)
type JobType func()
type JobPool interface {
AddJob(jobType JobType)
Wait()
}
type jobPool struct {
jobs []JobType
}
// 添加任务
func (j *jobPool) AddJob(job JobType) {
j.jobs = append(j.jobs, job)
}
// 开始并且等待任务
func (j *jobPool) Wait() {
var wg sync.WaitGroup
wg.Add(len(j.jobs))
for i := range j.jobs {
jJob := j.jobs[i]
go func() {
defer func() {
wg.Done()
if err := recover(); err != nil {
fmt.Printf("err:%+v", err)
}
}()
jJob()
}()
}
wg.Wait()
}
func NewJobPool() JobPool {
return &jobPool{
jobs: make([]JobType, 0),
}
}
压测
package mini_parallel_job
import (
"testing"
)
const (
Count = 10
)
// 并行任务
func parallelJob() {
jobPool := NewJobPool()
for i := 0; i < Count; i++ {
jobPool.AddJob(func() {
_ = fib(10)
})
}
jobPool.Wait()
}
// 串行任务
func serialJob() {
for i := 0; i < Count; i++ {
_ = fib(10)
}
}
// 任务
func fib(n int) int {
if n == 0 || n == 1 {
return n
}
return fib(n-2) + fib(n-1)
}
// 性能测试
func BenchmarkSerialJob(b *testing.B) {
for i := 0; i < b.N; i++ {
serialJob()
}
}
func BenchmarkParallelJob(b *testing.B) {
for i := 0; i < b.N; i++ {
parallelJob()
}
}
/*
BenchmarkSerialJob-12 298855 3756 ns/op
BenchmarkParallelJob-12 117189 8710 ns/op
*/
example
package main
import (
"fmt"
mini_parallel_job "mini-parallel-job"
"time"
)
const (
JobCount = 10
)
func main() {
// 串行执行
begin1 := time.Now()
for i := 0; i < JobCount; i++ {
fib(40)
}
fmt.Println(time.Since(begin1))
// 并行执行
begin2 := time.Now()
parallelJob := mini_parallel_job.NewJobPool()
for i := 0; i < JobCount; i++ {
parallelJob.AddJob(func() {
fib(40)
})
}
parallelJob.Wait()
fmt.Println(time.Since(begin2))
/*
结果:
7.335989407s
1.112108503s
*/
}
// 任务
func fib(n int) int {
if n == 0 || n == 1 {
return n
}
return fib(n-2) + fib(n-1)
}
总结
这段代码仅仅实现使用go rountine
实现并发,sync.WaitGroup
实现等待。
在大多数场景中,只需要并发,并不关心并发量是多少,大多数程序员也是使用Wait
函数那段代码实现的(至少作者在项目中看到都是这样的,并且有多处相同的代码,基于此场景封装了一下)。
如果要实现复杂一点的场景,比如控制最大并发量,可以稍微对上述代码做一些修改,Wait
函数中加一个指定大小的chan
来控制。或者参考作者另外一个对go并发的封装gopool,使用master-worker模式实现的并发控制。