目錄
- 控制資源訪(fǎng)問(wèn)
- 判斷是否有另一個(gè)線(xiàn)程請(qǐng)求鎖
- with lock
- 同步線(xiàn)程
- 有限資源的并發(fā)訪(fǎng)問(wèn)
- 隱藏資源
控制資源訪(fǎng)問(wèn)
前文提到threading庫(kù)在多線(xiàn)程時(shí),對(duì)同一資源的訪(fǎng)問(wèn)容易導(dǎo)致破壞與丟失數(shù)據(jù)。為了保證安全的訪(fǎng)問(wèn)一個(gè)資源對(duì)象,我們需要?jiǎng)?chuàng)建鎖。
示例如下:
import threading
import time
class AddThread():
def __init__(self, start=0):
self.lock = threading.Lock()
self.value = start
def increment(self):
print("Wait Lock")
self.lock.acquire()
try:
print("Acquire Lock")
self.value += 1
print(self.value)
finally:
self.lock.release()
def worker(a):
time.sleep(1)
a.increment()
addThread = AddThread()
for i in range(3):
t = threading.Thread(target=worker, args=(addThread,))
t.start()
運(yùn)行之后,效果如下:
![](/d/20211017/10b1b67d2484aaeba7afb413608551db.gif)
acquire()會(huì)通過(guò)鎖進(jìn)行阻塞其他線(xiàn)程執(zhí)行中間段,release()釋放鎖,可以看到,基本都是獲得鎖之后才執(zhí)行。避免了多個(gè)線(xiàn)程同時(shí)改變其資源對(duì)象,不會(huì)造成混亂。
判斷是否有另一個(gè)線(xiàn)程請(qǐng)求鎖
要確定是否有另一個(gè)線(xiàn)程請(qǐng)求鎖而不影響當(dāng)前的線(xiàn)程,可以設(shè)置acquire()的參數(shù)blocking=False。
示例如下:
import threading
import time
def worker2(lock):
print("worker2 Wait Lock")
while True:
lock.acquire()
try:
print("Holding")
time.sleep(0.5)
finally:
print("not Holding")
lock.release()
time.sleep(0.5)
def worker1(lock):
print("worker1 Wait Lock")
num_acquire = 0
value = 0
while num_acquire 3:
time.sleep(0.5)
have_it = lock.acquire(blocking=False)
try:
value += 1
print(value)
print("Acquire Lock")
if have_it:
num_acquire += 1
finally:
print("release Lock")
if have_it:
lock.release()
lock = threading.Lock()
word2Thread = threading.Thread(
target=worker2,
name='work2',
args=(lock,)
)
word2Thread.start()
word1Thread = threading.Thread(
target=worker1,
name='work1',
args=(lock,)
)
word1Thread.start()
運(yùn)行之后,效果如下:
![](/d/20211017/d6027260dd7be39a083d2ab110a42212.gif)
這里,我們需要迭代很多次,work1才能獲取3次鎖。但是嘗試了很8次。
with lock
前文,我們通過(guò)lock.acquire()與lock.release()實(shí)現(xiàn)了鎖的獲取與釋放,但其實(shí)我們Python還給我們提供了一個(gè)更簡(jiǎn)單的語(yǔ)法,通過(guò)with lock來(lái)獲取與釋放鎖。
示例如下:
import threading
import time
class AddThread():
def __init__(self, start=0):
self.lock = threading.Lock()
self.value = start
def increment(self):
print("Wait Lock")
with self.lock:
print("lock acquire")
self.value += 1
print(self.value)
print("lock release")
def worker(a):
time.sleep(1)
a.increment()
addThread = AddThread()
for i in range(3):
t = threading.Thread(target=worker, args=(addThread,))
t.start()
這里,我們只是將最上面的例子改變了一下。效果如下:
![](/d/20211017/af8a7f7a8db524e411153f02833441b5.gif)
需要注意的是,正常的Lock對(duì)象不能請(qǐng)求多次,即使是由同一個(gè)線(xiàn)程請(qǐng)求也不例外。如果同一個(gè)調(diào)用鏈中的多個(gè)函數(shù)訪(fǎng)問(wèn)一個(gè)鎖,則會(huì)發(fā)生意外。如果期望在同一個(gè)線(xiàn)程的不同代碼需要重新獲得鎖,那么這種情況下使用RLock。
同步線(xiàn)程
Condition
在實(shí)際的操作中,我們還可以使用Condition對(duì)象來(lái)同步線(xiàn)程。由于Condition使用了一個(gè)Lock,所以它可以綁定到一個(gè)共享資源,允許多個(gè)線(xiàn)程等待資源的更新。
示例如下:
import threading
import time
def consumer(cond):
print("waitCon")
with cond:
cond.wait()
print('獲取更新的資源')
def producer(cond):
print("worker")
with cond:
print('更新資源')
cond.notifyAll()
cond = threading.Condition()
t1 = threading.Thread(name='t1', target=consumer, args=(cond,))
t2 = threading.Thread(name='t2', target=consumer, args=(cond,))
t3 = threading.Thread(name='t3', target=producer, args=(cond,))
t1.start()
time.sleep(0.2)
t2.start()
time.sleep(0.2)
t3.start()
運(yùn)行之后,效果如下:
![](/d/20211017/a0e97ceeffbb6f6040821fa29b4a4867.gif)
這里,我們通過(guò)producer線(xiàn)程處理完成之后調(diào)用notifyAll(),consumer等線(xiàn)程等到了它的更新,可以類(lèi)比為觀(guān)察者模式。這里是,當(dāng)一個(gè)線(xiàn)程用完資源之后時(shí),則會(huì)自動(dòng)通知依賴(lài)它的所有線(xiàn)程。
屏障(barrier)
屏障是另一種線(xiàn)程的同步機(jī)制。barrier會(huì)建立一個(gè)控制點(diǎn),所有參與的線(xiàn)程會(huì)在這里阻塞,直到所有這些參與方都到達(dá)這一點(diǎn)。采用這種方法,線(xiàn)程可以單獨(dú)啟動(dòng)然后暫停,直到所有線(xiàn)程都準(zhǔn)備好了才可以繼續(xù)。
示例如下:
import threading
import time
def worker(barrier):
print(threading.current_thread().getName(), "worker")
worker_id = barrier.wait()
print(threading.current_thread().getName(), worker_id)
threads = []
barrier = threading.Barrier(3)
for i in range(3):
threads.append(
threading.Thread(
name="t" + str(i),
target=worker,
args=(barrier,)
)
)
for t in threads:
print(t.name, 'starting')
t.start()
time.sleep(0.1)
for t in threads:
t.join()
運(yùn)行之后,效果如下:
![](/d/20211017/508164dcd0192ca4916a3cfc777c9527.gif)
從控制臺(tái)的輸出會(huì)發(fā)發(fā)現(xiàn),barrier.wait()會(huì)阻塞線(xiàn)程,直到所有線(xiàn)程被創(chuàng)建后,才同時(shí)釋放越過(guò)這個(gè)控制點(diǎn)繼續(xù)執(zhí)行。wait()的返回值指示了釋放的參與線(xiàn)程數(shù),可以用來(lái)限制一些線(xiàn)程做清理資源等動(dòng)作。
當(dāng)然屏障Barrier還有一個(gè)abort()方法,該方法可以使所有等待線(xiàn)程接收一個(gè)BroKenBarrierError。如果線(xiàn)程在wait()上被阻塞而停止處理,會(huì)產(chǎn)生這個(gè)異常,通過(guò)except可以完成清理工作。
有限資源的并發(fā)訪(fǎng)問(wèn)
除了多線(xiàn)程可能訪(fǎng)問(wèn)同一個(gè)資源之外,有時(shí)候?yàn)榱诵阅?,我們也?huì)限制多線(xiàn)程訪(fǎng)問(wèn)同一個(gè)資源的數(shù)量。例如,線(xiàn)程池支持同時(shí)連接,但數(shù)據(jù)可能是固定的,或者一個(gè)網(wǎng)絡(luò)APP提供的并發(fā)下載數(shù)支持固定數(shù)目。這些連接就可以使用Semaphore來(lái)管理。
示例如下:
import threading
import time
class WorkerThread(threading.Thread):
def __init__(self):
super(WorkerThread, self).__init__()
self.lock = threading.Lock()
self.value = 0
def increment(self):
with self.lock:
self.value += 1
print(self.value)
def worker(s, pool):
with s:
print(threading.current_thread().getName())
pool.increment()
time.sleep(1)
pool.increment()
pool = WorkerThread()
s = threading.Semaphore(2)
for i in range(5):
t = threading.Thread(
name="t" + str(i),
target=worker,
args=(s, pool,)
)
t.start()
運(yùn)行之后,效果如下:
![](/d/20211017/cc3d9c839ded8513f2182da9b397e77c.gif)
從圖片雖然能看所有輸出,但無(wú)法看到其停頓的事件。讀者自己運(yùn)行會(huì)發(fā)現(xiàn),每次頂多只有兩個(gè)線(xiàn)程在工作,是因?yàn)槲覀冊(cè)O(shè)置了threading.Semaphore(2)。
隱藏資源
在實(shí)際的項(xiàng)目中,有些資源需要鎖定以便于多個(gè)線(xiàn)程使用,而另外一些資源則需要保護(hù),以使它們對(duì)并非使這些資源的所有者的線(xiàn)程隱藏。
local()函數(shù)會(huì)創(chuàng)建一個(gè)對(duì)象,它能夠隱藏值,使其在不同的線(xiàn)程中無(wú)法被看到。示例如下:
import threading
import random
def show_data(data):
try:
result = data.value
except AttributeError:
print(threading.current_thread().getName(), "No value")
else:
print(threading.current_thread().getName(), "value=", result)
def worker(data):
show_data(data)
data.value = random.randint(1, 100)
show_data(data)
local_data = threading.local()
show_data(local_data)
local_data.value = 1000
show_data(local_data)
for i in range(2):
t = threading.Thread(
name="t" + str(i),
target=worker,
args=(local_data,)
)
t.start()
運(yùn)行之后,效果如下:
![](/d/20211017/d7e1146b5ebfc9dfebe0f4bf162cccb0.gif)
這里local_data.value對(duì)所有線(xiàn)程都不可見(jiàn),除非在某個(gè)線(xiàn)程中設(shè)置了這個(gè)屬性,這個(gè)線(xiàn)程才能看到它。
到此這篇關(guān)于Python中threading庫(kù)實(shí)現(xiàn)線(xiàn)程鎖與釋放鎖的文章就介紹到這了,更多相關(guān)Python 線(xiàn)程鎖與釋放鎖內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
您可能感興趣的文章:- Python threading.local代碼實(shí)例及原理解析
- python語(yǔ)言線(xiàn)程標(biāo)準(zhǔn)庫(kù)threading.local解讀總結(jié)
- python threading模塊的使用指南
- python中threading和queue庫(kù)實(shí)現(xiàn)多線(xiàn)程編程
- Python threading Local()函數(shù)用法案例詳解