go 并发编程 | 池化技术 pool | go 技术论坛-金年会app官方网

1.背景

在并发编程中,资源的分配和回收是一个很重要的问题,频繁的分配和回收,会造成大量的开销。如果你想使用 go 开发一个高性能的应用程序的话,就必须考虑垃圾回收给性能带来的影响。

所以,我们一般做性能优化时,会考虑池化技术,回收使用率高的对象,避免被垃圾回收,减少重复创建的开销。

go 语言的 sync.pool 正是一个可以帮助我们实现池化技术的工具。

2.使用

pool 是一组临时对象的集合,可以单独保存和检索。

pool 中存储的任何对象都可能在任何时候被自动移除。这是 pool 管理内存的一种方式,它可以根据当前的内存使用情况和池的大小来决定何时回收对象。并且移除操作不会有任何通知,也就是不能依赖 pool 来告知你对象何时被回收。

如果在对象被移除的时候,pool 是持有该对象的唯一引用,该对象可能会被 gc 回收。

pool 是线程安全且不可复制的。

sync.pool 只提供了两个方法:get、put,和 pool 结构体的一个字段 new,我们都来看看。

  1. new

new 的数据类型是 func() any,在调用 get 方法时如果没有空闲元素可返回时就会调用 new 方法来创建新的元素,如果没有设置 new 字段,则会返回 nil。

  1. get

从 pool 中取走一个元素,并返回。

  1. put

将元素放回 pool 中,如果传入的是 nil,那么会被 pool 忽略。

我们来看 go 官方包 fmt 使用 sync.pool 的一个源码案例:

type pp struct {
    buf buffer
    ......
}
var ppfree = sync.pool{
    new: func() any { return new(pp) },     // 初始化时设置 new 字段
}
func newprinter() *pp {
    p := ppfree.get().(*pp) // 从 pool 中获取 pp 元素
    ......
    return p
}
func (p *pp) free() {
    ......
    ppfree.put(p)
}
func fprintln(w io.writer, a ...any) (n int, err error) {
    p := newprinter()    // 从 pool 中取得一个 *pp 元素
    p.doprintln(a)
    n, err = w.write(p.buf)
    p.free()    // 使用结束,放回 pool 中
    return
}
func println(a ...any) (n int, err error) {
    return fprintln(os.stdout, a...)
}

3.源码解析

我们将基于 来解析。

3.1 数据结构

type pool struct {
    nocopy nocopy
    local     unsafe.pointer // local fixed-size per-p pool, actual type is [p]poollocal
    localsize uintptr        // local 数组的长度
    victim     unsafe.pointer // local from previous cycle
    victimsize uintptr        // victim 数组的长度
    new func() any
}
// local per-p pool appendix.
type poollocalinternal struct {
    private any       // can be used only by the respective p.
    shared  poolchain // local p can pushhead/pophead; any p can poptail.
}
type poollocal struct {
    poollocalinternal
    // prevents false sharing on widespread platforms with
    // 128 mod (cache line size) = 0 .
    pad [128 - unsafe.sizeof(poollocalinternal{})%128]byte
}

pool 结构体最主要的字段是 localvictim,我们先来看 local。它的实际类型是 [p]poollocal,这里的 p 是指 gmp 模型中 p 的数量。每个 p 都有一个自己的 local 池,这样可以减少不同逻辑处理器之间的内存争用,提高并发性能。

victim 的数据类型和 local 一模一样,它存储的是上一轮使用周期结束时 local 中的数据。从 poolcleanup 函数我们可以分析出:

每次垃圾回收时,pool 会把 victim 中的对象移除,然后把 local 的数据给 victimvictim 就像一个废纸篓,有需要的时候就会把里面的东西捡回来重新使用。

// poolcleanup 函数是在 gc开始时被调用的,此时处于 stw。
func poolcleanup() {
    // drop victim caches from all pools.
    for _, p := range oldpools {
        p.victim = nil
        p.victimsize = 0
    }
    // move primary cache to victim cache.
    for _, p := range allpools {
        p.victim = p.local
        p.victimsize = p.localsize
        p.local = nil
        p.localsize = 0
    }
    // the pools with non-empty primary caches now have non-empty
    // victim caches and no pools have primary caches.
    oldpools, allpools = allpools, nil
}

local 字段包含一个 poollocalinternalpoollocalinternal 字段包含两个字段:

  • private: 一个缓存对象,只能由对应的 p 获取,而一个 p 同一时刻只能运行一个 goroutine,所以不会有并发问题;
  • shard: 所有 p 均可访问,只有本地的 p 才能 pushhead/pophead,其他 p 可以 poptail,也就是只有本地 p 才能作为生产者 producer,其他 p 只能作为消费者 consumer,本质上是一个双向链表的实现。

3.2 get 方法

我们同样会删除数据竞争检测等非主要逻辑代码来进行分析。

func (p *pool) get() any {
    // p.pin() 会将当前的 goroutine 固定在当前的 p 上并禁止抢占,它会返回 p 的 poollocal 池和 id;
    // 这样做可以在查找期间直接拿到 p 的 poollocal 池,也确保了查找期间不会被其他 goroutine 中断。
    l, pid := p.pin()
    // 从 private 获取
    x := l.private
    l.private = nil
    if x == nil {
        // 从 shared 头部获取
        x, _ = l.shared.pophead()
        if x == nil {
            // 从其他 shared 获取
            x = p.getslow(pid)
        }
    }
    runtime_procunpin()
    // new 一个
    if x == nil && p.new != nil {
        x = p.new()
    }
    return x
}

代码比较简单,我们可以小结一下获取顺序:

  1. 从本地的 private 获取可用元素;
  2. 从本地的 shared 头部获取可用元素;
  3. 通过 getslow 方法去其他 shared “偷”一个;
  4. 最后使用 new 函数创建一个。

我们再来分析一下 getslow 方法:

func (p *pool) getslow(pid int) any {
    size := runtime_loadacquintptr(&p.localsize) 
    locals := p.local                            
    // 尝试从其他 proc 偷取一个元素
    for i := 0; i < int(size); i {
        l := indexlocal(locals, (pidi1)%int(size))
        if x, _ := l.shared.poptail(); x != nil {
            return x
        }
    }
    // 如果其他 proc 没有可用元素,那么尝试从当前的 victim 获取
    size = atomic.loaduintptr(&p.victimsize)
    // 在并发环境中,goroutineid 通常是连续分配的。如果 pid 大于等于 victimsize,说明当前 goroutineid 没有对应的 victim
    if uintptr(pid) >= size {
        return nil
    }
    locals = p.victim
    l := indexlocal(locals, pid)
    // 逻辑一样,先从 private 开始获取
    if x := l.private; x != nil {
        l.private = nil
        return x
    }
    // 再从 shared 以及其他 proc 的 shared 获取
    for i := 0; i < int(size); i {
        l := indexlocal(locals, (pidi)%int(size))
        if x, _ := l.shared.poptail(); x != nil {
            return x
        }
    }
    // 如果以上都没找到,那么标记改 victim 为空,之后的查找就可以快速跳过了
    atomic.storeuintptr(&p.victimsize, 0)
    return nil
}

我们也小结一下 getslow 的执行顺序:

  1. 从其他 proc 的 shared 获取;
  2. 从当前 victimprivate 获取;
  3. 循环从当前和其他 proc 的 victimshared 获取;
  4. 如果最终没有获取到,则标记为空,下次直接跳过。

3.3 put 方法

put 方法就更简单了,如果 private 为空就放入 private,否则插入 shared 头部。

func (p *pool) put(x any) {
    // nil 值直接丢弃
    if x == nil {
        return
    }
    l, _ := p.pin()
    if l.private == nil {
        l.private = x
    } else {
        l.shared.pushhead(x)
    }
    runtime_procunpin()
}

4.警惕踩坑

4.1 连接池

前面我们说过,pool 中存储的任何对象可能会随时被gc清除。如果使用 sync.pool 作连接池,会无通知地在某个时候就把连接垃圾回收掉了,而我们的场景是需要长久保持这个连接,所以一般不建议使用 sync.pool 作连接池。(同理其他需要长期保持的对象也一样)

4.2 内存泄露

还是以 fmt 为例,在实际使用时可能会往 p.buf 填充大量数据,这会导致底层的 []byte 扩容,如果这时候再 put 回去,[]byte 的容量不变,并且可能不会被回收,就会一直占用很大的空间。这就是 sync.pool 的内存泄漏问题。

在 有具体的重现和讨论。

fmt 包的解决方法也很简单,在每次 put 时,增加容量大小检查,如果超过指定大小,就直接丢弃掉:

func (p *pp) free() {
    if cap(p.buf) > 64<<10 {
        return
    }
    ......
}

4.3 内存浪费

如果在实际使用时需要的 buffer 比较小,而 pool 中存放的 buffer 全都比较大,就会造成一种内存浪费的现象。

要做到尽可能不浪费,一种思路是可以将 buffer 分组,不同容量的为一组,例如:

  1. 小于 1k byte 大小的元素为一组,占一个池子;
  2. 大于 1k byte 小于 2k byte 大小的元素为一组,占一个池子;
  3. 大于 2k byte 大小的元素为一组,占一个池子。

这样在使用时就可以按需到对应大小的池子中获取元素即可。官方库 就有类似实现:

var (
    bufioreaderpool   sync.pool
    bufiowriter2kpool sync.pool
    bufiowriter4kpool sync.pool
)
var copybufpool = sync.pool{
    new: func() any {
        b := make([]byte, 32*1024)
        return &b
    },
}
func bufiowriterpool(size int) *sync.pool {
    switch size {
    case 2 << 10:
        return &bufiowriter2kpool
    case 4 << 10:
        return &bufiowriter4kpool
    }
    return nil
}

5.小结

本文我们介绍了 sync.pool,是项目常用的优化手段之一,希望对你有帮助。

本作品采用《cc 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!
未填写
文章
15
粉丝
4
喜欢
13
收藏
23
排名:1361
访问:2923
博客标签
社区赞助商
网站地图