涉及概念
- 并發(fā)安全Map
- 分段鎖
- sync.Map
- CAS ( Compare And Swap )
- 雙檢查
分?jǐn)噫i
type SimpleCache struct {
mu sync.RWMutex
items map[interface{}]*simpleItem
}
在日常開(kāi)發(fā)中, 上述這種數(shù)據(jù)結(jié)構(gòu)肯定不少見(jiàn),因?yàn)間olang的原生map是非并發(fā)安全的,所以為了保證map的并發(fā)安全,最簡(jiǎn)單的方式就是給map加鎖。
之前使用過(guò)兩個(gè)本地內(nèi)存緩存的開(kāi)源庫(kù), gcache, cache2go,其中存儲(chǔ)緩存對(duì)象的結(jié)構(gòu)都是這樣,對(duì)于輕量級(jí)的緩存庫(kù),為了設(shè)計(jì)簡(jiǎn)潔(包含清理過(guò)期對(duì)象等 ) 再加上當(dāng)需要緩存大量數(shù)據(jù)時(shí)有redis,memcache等明星項(xiàng)目解決。 但是如果拋開(kāi)這些因素遇到真正數(shù)量巨大的數(shù)據(jù)量時(shí),直接對(duì)一個(gè)map加鎖,當(dāng)map中的值越來(lái)越多,訪問(wèn)map的請(qǐng)求越來(lái)越多,大家都競(jìng)爭(zhēng)這一把鎖顯得并發(fā)訪問(wèn)控制變重。 在go1.9引入sync.Map 之前,比較流行的做法就是使用分段鎖,顧名思義就是將鎖分段,將鎖的粒度變小,將存儲(chǔ)的對(duì)象分散到各個(gè)分片中,每個(gè)分片由一把鎖控制,這樣使得當(dāng)需要對(duì)在A分片上的數(shù)據(jù)進(jìn)行讀寫時(shí)不會(huì)影響B(tài)分片的讀寫。
![](/d/20211017/f6929122eb59e77894631885ac4b523b.gif)
分段鎖的實(shí)現(xiàn)
// Map 分片
type ConcurrentMap []*ConcurrentMapShared
// 每一個(gè)Map 是一個(gè)加鎖的并發(fā)安全Map
type ConcurrentMapShared struct {
items map[string]interface{}
sync.RWMutex // 各個(gè)分片Map各自的鎖
}
主流的分段鎖,即通過(guò)hash取模的方式找到當(dāng)前訪問(wèn)的key處于哪一個(gè)分片之上,再對(duì)該分片進(jìn)行加鎖之后再讀寫。分片定位時(shí),常用有BKDR, FNV32等hash算法得到key的hash值。
func New() ConcurrentMap {
// SHARD_COUNT 默認(rèn)32個(gè)分片
m := make(ConcurrentMap, SHARD_COUNT)
for i := 0; i SHARD_COUNT; i++ {
m[i] = ConcurrentMapShared{
items: make(map[string]interface{}),
}
}
return m
}
在初始化好分片后, 對(duì)分片上的數(shù)據(jù)進(jìn)行讀寫時(shí)就需要用hash取模進(jìn)行分段定位來(lái)確認(rèn)即將要讀寫的分片。
獲取段定位
func (m ConcurrentMap) GetShard(key string) *ConcurrentMapShared {
return m[uint(fnv32(key))%uint(SHARD_COUNT)]
}
// FNV hash
func fnv32(key string) uint32 {
hash := uint32(2166136261)
const prime32 = uint32(16777619)
for i := 0; i len(key); i++ {
hash *= prime32
hash ^= uint32(key[i])
}
return hash
}
之后對(duì)于map的GET SET 就簡(jiǎn)單順利成章的完成
Set And Get
func (m ConcurrentMap) Set(key string, value interface{}) {
shard := m.GetShard(key) // 段定位找到分片
shard.Lock() // 分片上鎖
shard.items[key] = value // 分片操作
shard.Unlock() // 分片解鎖
}
func (m ConcurrentMap) Get(key string) (interface{}, bool) {
shard := m.GetShard(key)
shard.RLock()
val, ok := shard.items[key]
shard.RUnlock()
return val, ok
}
由此一個(gè)分段鎖Map就實(shí)現(xiàn)了, 但是比起普通的Map, 常用到的方法比如獲取所有key, 獲取所有Val 操作是要比原生Map復(fù)雜的,因?yàn)橐闅v每一個(gè)分片的每一個(gè)數(shù)據(jù), 好在golang的并發(fā)特性使得解決這類問(wèn)題變得非常簡(jiǎn)單
Keys
// 統(tǒng)計(jì)當(dāng)前分段map中item的個(gè)數(shù)
func (m ConcurrentMap) Count() int {
count := 0
for i := 0; i SHARD_COUNT; i++ {
shard := m[i]
shard.RLock()
count += len(shard.items)
shard.RUnlock()
}
return count
}
// 獲取所有的key
func (m ConcurrentMap) Keys() []string {
count := m.Count()
ch := make(chan string, count)
// 每一個(gè)分片啟動(dòng)一個(gè)協(xié)程 遍歷key
go func() {
wg := sync.WaitGroup{}
wg.Add(SHARD_COUNT)
for _, shard := range m {
go func(shard *ConcurrentMapShared) {
defer wg.Done()
shard.RLock()
// 每個(gè)分片中的key遍歷后都寫入統(tǒng)計(jì)用的channel
for key := range shard.items {
ch - key
}
shard.RUnlock()
}(shard)
}
wg.Wait()
close(ch)
}()
keys := make([]string, count)
// 統(tǒng)計(jì)各個(gè)協(xié)程并發(fā)讀取Map分片的key
for k := range ch {
keys = append(keys, k)
}
return keys
}
這里寫了一個(gè)benchMark來(lái)對(duì)該分段鎖Map和原生的Map加鎖方式進(jìn)行壓測(cè), 場(chǎng)景為將一萬(wàn)個(gè)不重復(fù)的鍵值對(duì)同時(shí)以100萬(wàn)次寫和100萬(wàn)次讀,分別進(jìn)行5次壓測(cè), 如下壓測(cè)代碼
func BenchmarkMapShared(b *testing.B) {
num := 10000
testCase := genNoRepetTestCase(num) // 10000個(gè)不重復(fù)的鍵值對(duì)
m := New()
for _, v := range testCase {
m.Set(v.Key, v.Val)
}
b.ResetTimer()
for i := 0; i 5; i++ {
b.Run(strconv.Itoa(i), func(b *testing.B) {
b.N = 1000000
wg := sync.WaitGroup{}
wg.Add(b.N * 2)
for i := 0; i b.N; i++ {
e := testCase[rand.Intn(num)]
go func(key string, val interface{}) {
m.Set(key, val)
wg.Done()
}(e.Key, e.Val)
go func(key string) {
_, _ = m.Get(key)
wg.Done()
}(e.Key)
}
wg.Wait()
})
}
}
原生Map加鎖壓測(cè)結(jié)果
![](/d/20211017/b2142f98a4d1741350325c5011db14b5.gif)
分段鎖壓測(cè)結(jié)果
![](/d/20211017/be54f8b7ff508496b14c2e924f44b67a.gif)
可以看出在將鎖的粒度細(xì)化后再面對(duì)大量需要控制并發(fā)安全的訪問(wèn)時(shí),分段鎖Map的耗時(shí)比原生Map加鎖要快3倍有余
Sync.Map
go1.9之后加入了支持并發(fā)安全的Map sync.Map, sync.Map 通過(guò)一份只使用原子操作的數(shù)據(jù)和一份冗余了只讀數(shù)據(jù)的加鎖數(shù)據(jù)實(shí)現(xiàn)一定程度上的讀寫分離,使得大多數(shù)讀操作和更新操作是原子操作,寫入新數(shù)據(jù)才加鎖的方式來(lái)提升性能。以下是 sync.Map源碼剖析, 結(jié)構(gòu)體中的注釋都會(huì)在具體實(shí)現(xiàn)代碼中提示相呼應(yīng)
type Map struct {
// 保護(hù)dirty的鎖
mu Mutex
// 只讀數(shù)據(jù)(修改采用原子操作)
read atomic.Value
// 包含只讀中所有數(shù)據(jù)(冗余),寫入新數(shù)據(jù)時(shí)也在dirty中操作
dirty map[interface{}]*entry
// 當(dāng)原子操作訪問(wèn)只讀read時(shí)找不到數(shù)據(jù)時(shí)會(huì)去dirty中尋找,此時(shí)misses+1,dirty及作為存儲(chǔ)新寫入的數(shù)據(jù),又冗余了只讀結(jié)構(gòu)中的數(shù)據(jù),所以當(dāng)misses > dirty 的長(zhǎng)度時(shí), 會(huì)將dirty升級(jí)為read,同時(shí)將老的dirty置nil
misses int
}
// Map struct 中的 read 就是readOnly 的指針
type readOnly struct {
// 基礎(chǔ)Map
m map[interface{}]*entry
// 用于表示當(dāng)前dirty中是否有read中不存在的數(shù)據(jù), 在寫入數(shù)據(jù)時(shí), 如果發(fā)現(xiàn)dirty中沒(méi)有新數(shù)據(jù)且dirty為nil時(shí),會(huì)將read中未被刪除的數(shù)據(jù)拷貝一份冗余到dirty中, 過(guò)程與Map struct中的 misses相呼應(yīng)
amended bool
}
// 數(shù)據(jù)項(xiàng)
type entry struct {
p unsafe.Pointer
}
// 用于標(biāo)記數(shù)據(jù)項(xiàng)已被刪除(主要保證數(shù)據(jù)冗余時(shí)的并發(fā)安全)
// 上述Map結(jié)構(gòu)中說(shuō)到有一個(gè)將read數(shù)據(jù)拷貝冗余至dirty的過(guò)程, 因?yàn)閯h除數(shù)據(jù)項(xiàng)是將*entry置nil, 為了避免冗余過(guò)程中因并發(fā)問(wèn)題導(dǎo)致*entry改變而影響到拷貝后的dirty正確性,所以sync.Map使用expunged來(lái)標(biāo)記entry是否被刪除
var expunged = unsafe.Pointer(new(interface{}))
在下面sync.Map具體實(shí)現(xiàn)中將會(huì)看到很多“雙檢查”代碼,因?yàn)橥ㄟ^(guò)原子操作獲取的值可能在進(jìn)行其他非原子操作過(guò)程中已改變,所以再非原子操作后需要使用之前原子操作獲取的值需要再次進(jìn)行原子操作獲取。
compareAndSwap 交換并比較, 用于在多線程編程中實(shí)現(xiàn)不被打斷的數(shù)據(jù)交換操作,從而避免多線程同時(shí)改寫某一數(shù)據(jù)時(shí)導(dǎo)致數(shù)據(jù)不一致問(wèn)題。
sync.Map Write
func (m *Map) Store(key, value interface{}) {
// 先不上鎖,而是從只讀數(shù)據(jù)中按key讀取, 如果已存在以compareAndSwap操作進(jìn)行覆蓋(update)
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok e.tryStore(value) {
return
}
m.mu.Lock()
// 雙檢查獲取read
read, _ = m.read.Load().(readOnly)
// 如果data在read中,更新entry
if e, ok := read.m[key]; ok {
// 如果原子操作讀到的數(shù)據(jù)是被標(biāo)記刪除的, 則視為新數(shù)據(jù)寫入dirty
if e.unexpungeLocked() {
m.dirty[key] = e
}
// 原子操作寫新數(shù)據(jù)
e.storeLocked(value)
} else if e, ok := m.dirty[key]; ok {
// 原子操作寫新數(shù)據(jù)
e.storeLocked(value)
} else {
// 新數(shù)據(jù)
// 當(dāng)dirty中沒(méi)有新數(shù)據(jù)時(shí),將read中數(shù)據(jù)冗余到dirty
if !read.amended {
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}
func (e *entry) tryStore(i *interface{}) bool {
p := atomic.LoadPointer(e.p)
if p == expunged {
return false
}
for {
if atomic.CompareAndSwapPointer(e.p, p, unsafe.Pointer(i)) {
return true
}
p = atomic.LoadPointer(e.p)
if p == expunged {
return false
}
}
}
// 在dirty中沒(méi)有比read多出的新數(shù)據(jù)時(shí)觸發(fā)冗余
func (m *Map) dirtyLocked() {
if m.dirty != nil {
return
}
read, _ := m.read.Load().(readOnly)
m.dirty = make(map[interface{}]*entry, len(read.m))
for k, e := range read.m {
// 檢查entry是否被刪除, 被刪除的數(shù)據(jù)不冗余
if !e.tryExpungeLocked() {
m.dirty[k] = e
}
}
}
func (e *entry) tryExpungeLocked() (isExpunged bool) {
p := atomic.LoadPointer(e.p)
for p == nil {
// 將被刪除(置nil)的數(shù)據(jù)以cas原子操作標(biāo)記為expunged(防止因并發(fā)情況下其他操作導(dǎo)致冗余進(jìn)dirty的數(shù)據(jù)不正確)
if atomic.CompareAndSwapPointer(e.p, nil, expunged) {
return true
}
p = atomic.LoadPointer(e.p)
}
return p == expunged
}
sync.Map Read
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
// 只讀數(shù)據(jù)中沒(méi)有,并且dirty有比read多的數(shù)據(jù),加鎖在dirty中找
if !ok read.amended {
m.mu.Lock()
// 雙檢查, 因?yàn)樯湘i之前的語(yǔ)句是非原子性的
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok read.amended {
// 只讀中沒(méi)有讀取到的次數(shù)+1
e, ok = m.dirty[key]
// 檢查是否達(dá)到觸發(fā)dirty升級(jí)read的條件
m.missLocked()
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
// atomic.Load 但被標(biāo)記為刪除的會(huì)返回nil
return e.load()
}
func (m *Map) missLocked() {
m.misses++
if m.misses len(m.dirty) {
return
}
m.read.Store(readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0
}
sync.Map DELETE
func (m *Map) Delete(key interface{}) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
// 只讀中不存在需要到dirty中去刪除
if !ok read.amended {
m.mu.Lock()
// 雙檢查, 因?yàn)樯湘i之前的語(yǔ)句是非原子性的
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok read.amended {
delete(m.dirty, key)
}
m.mu.Unlock()
}
if ok {
e.delete()
}
}
func (e *entry) delete() (hadValue bool) {
for {
p := atomic.LoadPointer(e.p)
if p == nil || p == expunged {
return false
}
if atomic.CompareAndSwapPointer(e.p, p, nil) {
return true
}
}
}
同樣以剛剛壓測(cè)原生加鎖Map和分段鎖的方式來(lái)壓測(cè)sync.Map
![](/d/20211017/a5875922424c7a6f2acbbc4461758b93.gif)
壓測(cè)平均下來(lái)sync.Map和分段鎖差別不大,但是比起分段鎖, sync.Map則將鎖的粒度更加的細(xì)小到對(duì)數(shù)據(jù)的狀態(tài)上,使得大多數(shù)據(jù)可以無(wú)鎖化操作, 同時(shí)比分段鎖擁有更好的拓展性,因?yàn)榉侄捂i使用前總是要定一個(gè)分片數(shù)量, 在做擴(kuò)容或者縮小時(shí)很麻煩, 但要達(dá)到sync.Map這種性能既好又能動(dòng)態(tài)擴(kuò)容的程度,代碼就相對(duì)復(fù)雜很多。
還有注意在使用sync.Map時(shí)切忌不要將其拷貝, go源碼中有對(duì)sync.Map注釋到” A Map must not be copied after first use.”因?yàn)楫?dāng)sync.Map被拷貝之后, Map類型的dirty還是那個(gè)map 但是read 和 鎖卻不是之前的read和鎖(都不在一個(gè)世界你拿什么保護(hù)我), 所以必然導(dǎo)致并發(fā)不安全(為了寫博我把sync.Map代碼復(fù)制出來(lái)一份把私有成員改成可外部訪問(wèn)的打印指針)
![](/d/20211017/d3f039d314acd77063f4774ee90dcc27.gif)
![](/d/20211017/4764c6e7a8a7f7897ed9d6d40c8ef32f.gif)
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
您可能感興趣的文章:- Golang 語(yǔ)言map底層實(shí)現(xiàn)原理解析
- golang映射Map的方法步驟
- Golang 使用map需要注意的幾個(gè)點(diǎn)
- golang中使用sync.Map的方法
- Golang Map實(shí)現(xiàn)賦值和擴(kuò)容的示例代碼
- golang中range在slice和map遍歷中的注意事項(xiàng)
- Golang自定義結(jié)構(gòu)體轉(zhuǎn)map的操作