go 并发编程 | 池化技术 pool | go 技术论坛-金年会app官方网
1.背景
在并发编程中,资源的分配和回收是一个很重要的问题,频繁的分配和回收,会造成大量的开销。如果你想使用 go 开发一个高性能的应用程序的话,就必须考虑垃圾回收给性能带来的影响。
所以,我们一般做性能优化时,会考虑池化技术,回收使用率高的对象,避免被垃圾回收,减少重复创建的开销。
go 语言的 sync.pool
正是一个可以帮助我们实现池化技术的工具。
2.使用
pool 是一组临时对象的集合,可以单独保存和检索。
pool 中存储的任何对象都可能在任何时候被自动移除。这是 pool 管理内存的一种方式,它可以根据当前的内存使用情况和池的大小来决定何时回收对象。并且移除操作不会有任何通知,也就是不能依赖 pool 来告知你对象何时被回收。
如果在对象被移除的时候,pool 是持有该对象的唯一引用,该对象可能会被 gc 回收。
pool 是线程安全且不可复制的。
sync.pool
只提供了两个方法:get、put,和 pool 结构体的一个字段 new,我们都来看看。
- new
new
的数据类型是 func() any
,在调用 get
方法时如果没有空闲元素可返回时就会调用 new
方法来创建新的元素,如果没有设置 new
字段,则会返回 nil。
- get
从 pool 中取走一个元素,并返回。
- put
将元素放回 pool 中,如果传入的是 nil,那么会被 pool 忽略。
我们来看 go 官方包 fmt 使用 sync.pool
的一个源码案例:
type pp struct {
buf buffer
......
}
var ppfree = sync.pool{
new: func() any { return new(pp) }, // 初始化时设置 new 字段
}
func newprinter() *pp {
p := ppfree.get().(*pp) // 从 pool 中获取 pp 元素
......
return p
}
func (p *pp) free() {
......
ppfree.put(p)
}
func fprintln(w io.writer, a ...any) (n int, err error) {
p := newprinter() // 从 pool 中取得一个 *pp 元素
p.doprintln(a)
n, err = w.write(p.buf)
p.free() // 使用结束,放回 pool 中
return
}
func println(a ...any) (n int, err error) {
return fprintln(os.stdout, a...)
}
3.源码解析
我们将基于 来解析。
3.1 数据结构
type pool struct {
nocopy nocopy
local unsafe.pointer // local fixed-size per-p pool, actual type is [p]poollocal
localsize uintptr // local 数组的长度
victim unsafe.pointer // local from previous cycle
victimsize uintptr // victim 数组的长度
new func() any
}
// local per-p pool appendix.
type poollocalinternal struct {
private any // can be used only by the respective p.
shared poolchain // local p can pushhead/pophead; any p can poptail.
}
type poollocal struct {
poollocalinternal
// prevents false sharing on widespread platforms with
// 128 mod (cache line size) = 0 .
pad [128 - unsafe.sizeof(poollocalinternal{})%128]byte
}
pool 结构体最主要的字段是 local
和 victim
,我们先来看 local
。它的实际类型是 [p]poollocal
,这里的 p
是指 gmp 模型中 p 的数量。每个 p 都有一个自己的 local 池,这样可以减少不同逻辑处理器之间的内存争用,提高并发性能。
victim
的数据类型和 local
一模一样,它存储的是上一轮使用周期结束时 local
中的数据。从 poolcleanup
函数我们可以分析出:
每次垃圾回收时,pool 会把 victim
中的对象移除,然后把 local
的数据给 victim
,victim
就像一个废纸篓,有需要的时候就会把里面的东西捡回来重新使用。
// poolcleanup 函数是在 gc开始时被调用的,此时处于 stw。
func poolcleanup() {
// drop victim caches from all pools.
for _, p := range oldpools {
p.victim = nil
p.victimsize = 0
}
// move primary cache to victim cache.
for _, p := range allpools {
p.victim = p.local
p.victimsize = p.localsize
p.local = nil
p.localsize = 0
}
// the pools with non-empty primary caches now have non-empty
// victim caches and no pools have primary caches.
oldpools, allpools = allpools, nil
}
local
字段包含一个 poollocalinternal
,poollocalinternal
字段包含两个字段:
private
: 一个缓存对象,只能由对应的 p 获取,而一个 p 同一时刻只能运行一个 goroutine,所以不会有并发问题;shard
: 所有 p 均可访问,只有本地的 p 才能 pushhead/pophead,其他 p 可以 poptail,也就是只有本地 p 才能作为生产者 producer,其他 p 只能作为消费者 consumer,本质上是一个双向链表的实现。
3.2 get 方法
我们同样会删除数据竞争检测等非主要逻辑代码来进行分析。
func (p *pool) get() any {
// p.pin() 会将当前的 goroutine 固定在当前的 p 上并禁止抢占,它会返回 p 的 poollocal 池和 id;
// 这样做可以在查找期间直接拿到 p 的 poollocal 池,也确保了查找期间不会被其他 goroutine 中断。
l, pid := p.pin()
// 从 private 获取
x := l.private
l.private = nil
if x == nil {
// 从 shared 头部获取
x, _ = l.shared.pophead()
if x == nil {
// 从其他 shared 获取
x = p.getslow(pid)
}
}
runtime_procunpin()
// new 一个
if x == nil && p.new != nil {
x = p.new()
}
return x
}
代码比较简单,我们可以小结一下获取顺序:
- 从本地的
private
获取可用元素; - 从本地的
shared
头部获取可用元素; - 通过
getslow
方法去其他shared
“偷”一个; - 最后使用
new
函数创建一个。
我们再来分析一下 getslow
方法:
func (p *pool) getslow(pid int) any {
size := runtime_loadacquintptr(&p.localsize)
locals := p.local
// 尝试从其他 proc 偷取一个元素
for i := 0; i < int(size); i {
l := indexlocal(locals, (pidi1)%int(size))
if x, _ := l.shared.poptail(); x != nil {
return x
}
}
// 如果其他 proc 没有可用元素,那么尝试从当前的 victim 获取
size = atomic.loaduintptr(&p.victimsize)
// 在并发环境中,goroutineid 通常是连续分配的。如果 pid 大于等于 victimsize,说明当前 goroutineid 没有对应的 victim
if uintptr(pid) >= size {
return nil
}
locals = p.victim
l := indexlocal(locals, pid)
// 逻辑一样,先从 private 开始获取
if x := l.private; x != nil {
l.private = nil
return x
}
// 再从 shared 以及其他 proc 的 shared 获取
for i := 0; i < int(size); i {
l := indexlocal(locals, (pidi)%int(size))
if x, _ := l.shared.poptail(); x != nil {
return x
}
}
// 如果以上都没找到,那么标记改 victim 为空,之后的查找就可以快速跳过了
atomic.storeuintptr(&p.victimsize, 0)
return nil
}
我们也小结一下 getslow
的执行顺序:
- 从其他 proc 的
shared
获取; - 从当前
victim
的private
获取; - 循环从当前和其他 proc 的
victim
的shared
获取; - 如果最终没有获取到,则标记为空,下次直接跳过。
3.3 put 方法
put
方法就更简单了,如果 private
为空就放入 private
,否则插入 shared
头部。
func (p *pool) put(x any) {
// nil 值直接丢弃
if x == nil {
return
}
l, _ := p.pin()
if l.private == nil {
l.private = x
} else {
l.shared.pushhead(x)
}
runtime_procunpin()
}
4.警惕踩坑
4.1 连接池
前面我们说过,pool 中存储的任何对象可能会随时被gc清除。如果使用 sync.pool 作连接池,会无通知地在某个时候就把连接垃圾回收掉了,而我们的场景是需要长久保持这个连接,所以一般不建议使用 sync.pool 作连接池。(同理其他需要长期保持的对象也一样)
4.2 内存泄露
还是以 fmt
为例,在实际使用时可能会往 p.buf
填充大量数据,这会导致底层的 []byte
扩容,如果这时候再 put 回去,[]byte
的容量不变,并且可能不会被回收,就会一直占用很大的空间。这就是 sync.pool 的内存泄漏问题。
在 有具体的重现和讨论。
fmt
包的解决方法也很简单,在每次 put 时,增加容量大小检查,如果超过指定大小,就直接丢弃掉:
func (p *pp) free() {
if cap(p.buf) > 64<<10 {
return
}
......
}
4.3 内存浪费
如果在实际使用时需要的 buffer 比较小,而 pool 中存放的 buffer 全都比较大,就会造成一种内存浪费的现象。
要做到尽可能不浪费,一种思路是可以将 buffer 分组,不同容量的为一组,例如:
- 小于 1k byte 大小的元素为一组,占一个池子;
- 大于 1k byte 小于 2k byte 大小的元素为一组,占一个池子;
- 大于 2k byte 大小的元素为一组,占一个池子。
这样在使用时就可以按需到对应大小的池子中获取元素即可。官方库 就有类似实现:
var (
bufioreaderpool sync.pool
bufiowriter2kpool sync.pool
bufiowriter4kpool sync.pool
)
var copybufpool = sync.pool{
new: func() any {
b := make([]byte, 32*1024)
return &b
},
}
func bufiowriterpool(size int) *sync.pool {
switch size {
case 2 << 10:
return &bufiowriter2kpool
case 4 << 10:
return &bufiowriter4kpool
}
return nil
}
5.小结
本文我们介绍了 sync.pool,是项目常用的优化手段之一,希望对你有帮助。
本作品采用《cc 协议》,转载必须注明作者和本文链接
推荐文章: