变量的线程安全atomic.value

2021/08/30 go探索发现 共 4838 字,约 14 分钟

原子性

一个或者多个操作在 CPU 执行的过程中不被中断的特性,称为原子性(atomicity).这些操作对外表现成一个不可分割的整体,他们要么都执行,要么都不执行,外界不会看到他们只执行到一半的状态。

CPU执行一系列操作时不可能不发生中断,但如果我们在执行多个操作时,能让他们的中间状态对外不可见,那我们就可以宣称他们拥有了”不可分割”的原子性。

一条普通的赋值语句其实不是一个原子操作。例如,在32位机器上写int64类型的变量就会有中间状态,它会被拆成两次写操作(汇编的MOV指令)——写低 32 位和写高 32 位。对于结构体,出现并发问题的概率更高。

互斥锁跟原子子操作区别

互斥锁是用来保护一段逻辑,原子操作作用于对一个变量的更新保护。

mutex由操作系统的调度器实现,而atomic包中的原子操作则由底层硬件指令直接提供支持,这些指令在执行过程中是不允许中断的。因此原子操作可以在lock-free的情况下保证并发安全,并且它的性能也能做到随CPU个数的增多而线性扩展。

并发场景下的配置更新

Value.Store 和 Value.Load 是用来赋值和取值的,典型使用场景就是原子读写+copyonwrite。

场景:多个服务不定时的访问配置中心获取最新的配置,配置会周期性更新(只能是一个协程进行更新操作),同时会伴随着读请求。

package main

import (
        "fmt"
        "strconv"
        "sync/atomic"
        "time"
)

type config struct {
        redis map[string]string
}

func (c *config) loadConfig(addr string, size string) {
        c.redis["poolSize"] = size
        c.redis["addr"] = addr
}

func NewConfig() *config {
        return &config{
                make(map[string]string),
        }
}


func main() {
        var con atomic.Value
        con.Store(NewConfig())
        go func() {
                for i := 0; i < 10; i++ {
                        //每次更新,都创建新的结构体指针的原因是,防止读写进程同时操作同一个指针,利用了copy on write原理。
                        addr := "127.0.0." + strconv.Itoa(i)
                        size := strconv.Itoa(i)
                        conf:=con.Load().(*config)
                        newConf:=NewConfig()
                        for k,v:=range conf.redis{
                                newConf.redis[k]=v
                        }
                        newConf.loadConfig(addr, size)
                        con.Store(newConf)
                        time.Sleep(200 * time.Millisecond)
                }
        }()
        go func() {
                for {
                        c := con.Load().(*config)
                        fmt.Println(c.redis)
                        time.Sleep(20*time.Millisecond)
                }
        }()

        time.Sleep(1 * time.Second)
}

相对于读写锁,少了一些锁的争抢,不过相对的,带来了一些,内存上的开销,适用于读多写少并且变量占用内存不是特别大的情况,如果用内存存储大量数据,这个并不适合,技术上主要是常见的写时复制(copy-on-write)。

配置更新的官方例子,如果不需要参考上一个版本的配置,可以不使用copy on write,直接创建新的配置进行加载。

var config Value // holds current server configuration
// Create initial config value and store into config.
config.Store(loadConfig())
go func() {
        // Reload config every 10 seconds
        // and update config value with the new version.
        for {
                time.Sleep(10 * time.Second)
                config.Store(loadConfig())
        }
}()
// Create worker goroutines that handle incoming requests
// using the latest config value.
for i := 0; i < 10; i++ {
        go func() {
                for r := range requests() {
                        c := config.Load()
                        // Handle request r using config c.
                        _, _ = r, c
                }
        }()
}

源码分析

在进行源码分析前思考一下go的指针类型

package main

import "fmt"

func double(x *int) {
    *x += *x
    x = nil
}

func main() {
    var a = 3
    double(&a)
    fmt.Println(a) // 6

    p := &a
    double(p)
    fmt.Println(a, p == nil) // 12 false
}

x=nil并不会对数据a产生影响,x的内存地址从指向a转变成指向nil.如果对a的值进行修改,通过*x方式。 go 语言中指针不能进行数学运算,不同类型间指针不能进行转换。

unsafe.Pointer

unsafe.Pointer的特别之处在于,它可以绕过 Go 语言类型系统的检查,与任意的指针类型互相转换,如果两种类型具有相同的内存结构(layout),我们可以将unsafe.Pointer当做桥梁,让这两种类型的指针相互转换,从而实现同一份内存拥有两种不同的解读方式。

[]byte和string其实内部的存储结构都是一样的,但 Go 语言的类型系统禁止他俩互换。如果借助unsafe.Pointer,我们就可以实现在零拷贝的情况下,将[]byte数组直接转换成string类型。

bytes := []byte{104, 101, 108, 108, 111}

p := unsafe.Pointer(&bytes) //强制转换成unsafe.Pointer,编译器不会报错
str := *(*string)(p) //然后强制转换成string类型的指针,再将这个指针的值当做string类型取出来
fmt.Println(str) //输出 "hello"

其包含四种核心操作:

  • 任何类型的指针值都可以转换为 Pointer。

  • Pointer 可以转换为任何类型的指针值。

  • uintptr 可以转换为 Pointer。

  • Pointer 可以转换为 uintptr。

修改结构体变量

type A struct{
 i string
 j int64
}

func main(){
 n := A{i: "a", j: 1}
 nPointer := unsafe.Pointer(&n)

 niPointer := (*string)(unsafe.Pointer(nPointer))
 *niPointer = "a1"

 njPointer := (*int64)(unsafe.Pointer(uintptr(nPointer) + unsafe.Offsetof(n.j)))
 *njPointer = 2

 fmt.Printf("n.i: %s, n.j: %d", n.i, n.j)
}

结果为a1 2.

结构体的成员变量在内存存储上是一段连续的内存。结构体的初始地址就是第一个成员变量的内存地址。基于结构体的成员地址去计算偏移量。就能够得出其他成员变量的内存地址。

i 为第一个成员变量。因此不需要进行偏移量计算,直接取出指针后转换为 Pointer,再强制转换为字符串类型的指针值即可。

j 为第二个成员变量。需要进行偏移量计算,才可以对其内存地址进行修改.(uintptr 是 Go 的内置类型。返回无符号整数,可存储一个完整的地址。后续常用于指针运算)。

unsafe.Offsetof:func Offsetof(x ArbitraryType) uintptr返回成员变量 x 在结构体当中的偏移量。更具体的讲,就是返回结构体初始位置到 x 之间的字节数。

Value.Store 方法

再 Store 内,只涉及到指针的原子操作,不涉及到数据拷贝,所以Value.Store() 的参数必须是个局部变量(或者说是一块全新的内存)。

func (v *Value) Store(x interface{}) {
    if x == nil {
        // 把panic当NullPointException来用?是不是有点重啊?
        panic("sync/atomic: store of nil value into Value")
    }
    vp := (*ifaceWords)(unsafe.Pointer(v)) // 通过unsafe.Pointer做指针转换
    xp := (*ifaceWords)(unsafe.Pointer(&x))
    for {
        typ := LoadPointer(&vp.typ)
        if typ == nil {
            // 尝试开始第一次存储
            // 关闭抢占(preemption)以便其他goroutine能使用active spin wait来等待完成。
            // 另外这样一来GC也不会意外的看到假类型
            runtime_procPin()
            // 通过acs操作来检查并设置tpy为特殊的标志位
            if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(^uintptr(0))) {
                // 如果失败,表示其他goroutine抢先
                runtime_procUnpin()
                continue
            }
            // 如果成功表示获得设置存储的权利,执行第一次存储
            StorePointer(&vp.data, xp.data)
            StorePointer(&vp.typ, xp.typ)
            runtime_procUnpin()
            return
        }
        if uintptr(typ) == ^uintptr(0) {
            // 第一次存储进行中,等。
            // 因为我们在第一次存储前后禁用了抢占
            // 我们可以使用active spin来等待
            continue
        }
        // First store completed. Check type and overwrite data.
        if typ != xp.typ {
            panic("sync/atomic: store of inconsistently typed value into Value")
        }
        StorePointer(&vp.data, xp.data)
        return
    }
}

value.Load()

// Value结构体提供原子装载和存储始任意类型的值
type Value struct {
    v interface{}
}

// ifaceWords 结构体是实际存储我们的 Value 值的地方, 可以看到, 我们存储的实际是指向 Value 的 type 和 data 的指针.
// ifaceWords 是 interface{} 的内部表示
type ifaceWords struct {
    typ  unsafe.Pointer
    data unsafe.Pointer
}

func (v *Value) Load() (x interface{}) {
    vp := (*ifaceWords)(unsafe.Pointer(v))  // 通过unsafe.Pointer做一次指针转换
    typ := LoadPointer(&vp.typ) // 一定要用LoadPointer,以保证原子性,比如读到的不是cpu cache
    if typ == nil || uintptr(typ) == ^uintptr(0) {
        // First store not yet completed.
        return nil
    }
    data := LoadPointer(&vp.data)
    xp := (*ifaceWords)(unsafe.Pointer(&x))
    xp.typ = typ
    xp.data = data
    return
}

文档信息

Search

    Table of Contents