原子性
一个或者多个操作在 CPU 执行的过程中不被中断的特性,称为原子性(atomicity).这些操作对外表现成一个不可分割的整体,他们要么都执行,要么都不执行,外界不会看到他们只执行到一半的状态。
CPU执行一系列操作时不可能不发生中断,但如果我们在执行多个操作时,能让他们的中间状态对外不可见,那我们就可以宣称他们拥有了”不可分割”的原子性。
一条普通的赋值语句其实不是一个原子操作。例如,在32位机器上写int64类型的变量就会有中间状态,它会被拆成两次写操作(汇编的MOV指令)——写低 32 位和写高 32 位。对于结构体,出现并发问题的概率更高。
互斥锁跟原子子操作区别
互斥锁是用来保护一段逻辑,原子操作作用于对一个变量的更新保护。
mutex由操作系统的调度器实现,而atomic包中的原子操作则由底层硬件指令直接提供支持,这些指令在执行过程中是不允许中断的。因此原子操作可以在lock-free的情况下保证并发安全,并且它的性能也能做到随CPU个数的增多而线性扩展。
并发场景下的配置更新
Value.Store 和 Value.Load 是用来赋值和取值的,典型使用场景就是原子读写+copyonwrite。
场景:多个服务不定时的访问配置中心获取最新的配置,配置会周期性更新(只能是一个协程进行更新操作),同时会伴随着读请求。
package main
import (
"fmt"
"strconv"
"sync/atomic"
"time"
)
type config struct {
redis map[string]string
}
func (c *config) loadConfig(addr string, size string) {
c.redis["poolSize"] = size
c.redis["addr"] = addr
}
func NewConfig() *config {
return &config{
make(map[string]string),
}
}
func main() {
var con atomic.Value
con.Store(NewConfig())
go func() {
for i := 0; i < 10; i++ {
//每次更新,都创建新的结构体指针的原因是,防止读写进程同时操作同一个指针,利用了copy on write原理。
addr := "127.0.0." + strconv.Itoa(i)
size := strconv.Itoa(i)
conf:=con.Load().(*config)
newConf:=NewConfig()
for k,v:=range conf.redis{
newConf.redis[k]=v
}
newConf.loadConfig(addr, size)
con.Store(newConf)
time.Sleep(200 * time.Millisecond)
}
}()
go func() {
for {
c := con.Load().(*config)
fmt.Println(c.redis)
time.Sleep(20*time.Millisecond)
}
}()
time.Sleep(1 * time.Second)
}
相对于读写锁,少了一些锁的争抢,不过相对的,带来了一些,内存上的开销,适用于读多写少并且变量占用内存不是特别大的情况,如果用内存存储大量数据,这个并不适合,技术上主要是常见的写时复制(copy-on-write)。
配置更新的官方例子,如果不需要参考上一个版本的配置,可以不使用copy on write
,直接创建新的配置进行加载。
var config Value // holds current server configuration
// Create initial config value and store into config.
config.Store(loadConfig())
go func() {
// Reload config every 10 seconds
// and update config value with the new version.
for {
time.Sleep(10 * time.Second)
config.Store(loadConfig())
}
}()
// Create worker goroutines that handle incoming requests
// using the latest config value.
for i := 0; i < 10; i++ {
go func() {
for r := range requests() {
c := config.Load()
// Handle request r using config c.
_, _ = r, c
}
}()
}
源码分析
在进行源码分析前思考一下go的指针类型
package main
import "fmt"
func double(x *int) {
*x += *x
x = nil
}
func main() {
var a = 3
double(&a)
fmt.Println(a) // 6
p := &a
double(p)
fmt.Println(a, p == nil) // 12 false
}
x=nil
并不会对数据a产生影响,x的内存地址从指向a转变成指向nil.如果对a的值进行修改,通过*x
方式。 go 语言中指针不能进行数学运算,不同类型间指针不能进行转换。
unsafe.Pointer
unsafe.Pointer的特别之处在于,它可以绕过 Go 语言类型系统的检查,与任意的指针类型互相转换,如果两种类型具有相同的内存结构(layout),我们可以将unsafe.Pointer当做桥梁,让这两种类型的指针相互转换,从而实现同一份内存拥有两种不同的解读方式。
[]byte和string其实内部的存储结构都是一样的,但 Go 语言的类型系统禁止他俩互换。如果借助unsafe.Pointer,我们就可以实现在零拷贝的情况下,将[]byte数组直接转换成string类型。
bytes := []byte{104, 101, 108, 108, 111}
p := unsafe.Pointer(&bytes) //强制转换成unsafe.Pointer,编译器不会报错
str := *(*string)(p) //然后强制转换成string类型的指针,再将这个指针的值当做string类型取出来
fmt.Println(str) //输出 "hello"
其包含四种核心操作:
任何类型的指针值都可以转换为 Pointer。
Pointer 可以转换为任何类型的指针值。
uintptr 可以转换为 Pointer。
Pointer 可以转换为 uintptr。
修改结构体变量
type A struct{
i string
j int64
}
func main(){
n := A{i: "a", j: 1}
nPointer := unsafe.Pointer(&n)
niPointer := (*string)(unsafe.Pointer(nPointer))
*niPointer = "a1"
njPointer := (*int64)(unsafe.Pointer(uintptr(nPointer) + unsafe.Offsetof(n.j)))
*njPointer = 2
fmt.Printf("n.i: %s, n.j: %d", n.i, n.j)
}
结果为a1 2
.
结构体的成员变量在内存存储上是一段连续的内存。结构体的初始地址就是第一个成员变量的内存地址。基于结构体的成员地址去计算偏移量。就能够得出其他成员变量的内存地址。
i 为第一个成员变量。因此不需要进行偏移量计算,直接取出指针后转换为 Pointer,再强制转换为字符串类型的指针值即可。
j 为第二个成员变量。需要进行偏移量计算,才可以对其内存地址进行修改.(uintptr 是 Go 的内置类型。返回无符号整数,可存储一个完整的地址。后续常用于指针运算)。
unsafe.Offsetof:func Offsetof(x ArbitraryType) uintptr
返回成员变量 x 在结构体当中的偏移量。更具体的讲,就是返回结构体初始位置到 x 之间的字节数。
Value.Store 方法
再 Store 内,只涉及到指针的原子操作,不涉及到数据拷贝,所以Value.Store() 的参数必须是个局部变量(或者说是一块全新的内存)。
func (v *Value) Store(x interface{}) {
if x == nil {
// 把panic当NullPointException来用?是不是有点重啊?
panic("sync/atomic: store of nil value into Value")
}
vp := (*ifaceWords)(unsafe.Pointer(v)) // 通过unsafe.Pointer做指针转换
xp := (*ifaceWords)(unsafe.Pointer(&x))
for {
typ := LoadPointer(&vp.typ)
if typ == nil {
// 尝试开始第一次存储
// 关闭抢占(preemption)以便其他goroutine能使用active spin wait来等待完成。
// 另外这样一来GC也不会意外的看到假类型
runtime_procPin()
// 通过acs操作来检查并设置tpy为特殊的标志位
if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(^uintptr(0))) {
// 如果失败,表示其他goroutine抢先
runtime_procUnpin()
continue
}
// 如果成功表示获得设置存储的权利,执行第一次存储
StorePointer(&vp.data, xp.data)
StorePointer(&vp.typ, xp.typ)
runtime_procUnpin()
return
}
if uintptr(typ) == ^uintptr(0) {
// 第一次存储进行中,等。
// 因为我们在第一次存储前后禁用了抢占
// 我们可以使用active spin来等待
continue
}
// First store completed. Check type and overwrite data.
if typ != xp.typ {
panic("sync/atomic: store of inconsistently typed value into Value")
}
StorePointer(&vp.data, xp.data)
return
}
}
value.Load()
// Value结构体提供原子装载和存储始任意类型的值
type Value struct {
v interface{}
}
// ifaceWords 结构体是实际存储我们的 Value 值的地方, 可以看到, 我们存储的实际是指向 Value 的 type 和 data 的指针.
// ifaceWords 是 interface{} 的内部表示
type ifaceWords struct {
typ unsafe.Pointer
data unsafe.Pointer
}
func (v *Value) Load() (x interface{}) {
vp := (*ifaceWords)(unsafe.Pointer(v)) // 通过unsafe.Pointer做一次指针转换
typ := LoadPointer(&vp.typ) // 一定要用LoadPointer,以保证原子性,比如读到的不是cpu cache
if typ == nil || uintptr(typ) == ^uintptr(0) {
// First store not yet completed.
return nil
}
data := LoadPointer(&vp.data)
xp := (*ifaceWords)(unsafe.Pointer(&x))
xp.typ = typ
xp.data = data
return
}
文档信息
- 本文作者:Beast
- 本文链接:https://beastpu.github.io/2021/08/30/go-07/
- 版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)