方法 | 描述 |
---|---|
threading.Lock() | 返回一個同步鎖對象 |
lockObject.acquire(blocking=True, timeout=1) | 上鎖,當(dāng)一個線程在執(zhí)行被上鎖代碼塊時,將不允許切換到其他線程運行,默認(rèn)鎖失效時間為1秒 |
lockObject.release() | 解鎖,當(dāng)一個線程在執(zhí)行未被上鎖代碼塊時,將允許系統(tǒng)根據(jù)策略自行切換到其他線程中運行 |
lockObject.locaked() | 判斷該鎖對象是否處于上鎖狀態(tài),返回一個布爾值 |
同步鎖一次只能放行一個線程,一個被加鎖的線程在運行時不會將執(zhí)行權(quán)交出去,只有當(dāng)該線程被解鎖時才會將執(zhí)行權(quán)通過系統(tǒng)調(diào)度交由其他線程。
如下所示,使用同步鎖解決最上面的問題:
import threading num = 0 def add(): lock.acquire() global num for i in range(10_000_000): num += 1 lock.release() def sub(): lock.acquire() global num for i in range(10_000_000): num -= 1 lock.release() if __name__ == "__main__": lock = threading.Lock() subThread01 = threading.Thread(target=add) subThread02 = threading.Thread(target=sub) subThread01.start() subThread02.start() subThread01.join() subThread02.join() print("num result : %s" % num) # 結(jié)果三次采集 # num result : 0 # num result : 0 # num result : 0
這樣這個代碼就完全變成了串行的狀態(tài),對于這種計算密集型I/O業(yè)務(wù)來說,還不如直接使用串行化單線程執(zhí)行來得快,所以這個例子僅作為一個示例,不能概述鎖真正的用途。
對于同步鎖來說,一次acquire()必須對應(yīng)一次release(),不能出現(xiàn)連續(xù)重復(fù)使用多次acquire()后再重復(fù)使用多次release()的操作,這樣會引起死鎖造成程序的阻塞,完全不動了,如下所示:
import threading num = 0 def add(): lock.acquire() # 上鎖 lock.acquire() # 死鎖 # 不執(zhí)行 global num for i in range(10_000_000): num += 1 lock.release() lock.release() def sub(): lock.acquire() # 上鎖 lock.acquire() # 死鎖 # 不執(zhí)行 global num for i in range(10_000_000): num -= 1 lock.release() lock.release() if __name__ == "__main__": lock = threading.Lock() subThread01 = threading.Thread(target=add) subThread02 = threading.Thread(target=sub) subThread01.start() subThread02.start() subThread01.join() subThread02.join() print("num result : %s" % num)
由于threading.Lock()對象中實現(xiàn)了__enter__()與__exit__()方法,故我們可以使用with語句進(jìn)行上下文管理形式的加鎖解鎖操作:
import threading num = 0 def add(): with lock: # 自動加鎖 global num for i in range(10_000_000): num += 1 # 自動解鎖 def sub(): with lock: # 自動加鎖 global num for i in range(10_000_000): num -= 1 # 自動解鎖 if __name__ == "__main__": lock = threading.Lock() subThread01 = threading.Thread(target=add) subThread02 = threading.Thread(target=sub) subThread01.start() subThread02.start() subThread01.join() subThread02.join() print("num result : %s" % num) # 結(jié)果三次采集 # num result : 0 # num result : 0 # num result : 0
遞歸鎖是同步鎖的一個升級版本,在同步鎖的基礎(chǔ)上可以做到連續(xù)重復(fù)使用多次acquire()后再重復(fù)使用多次release()的操作,但是一定要注意加鎖次數(shù)和解鎖次數(shù)必須一致,否則也將引發(fā)死鎖現(xiàn)象。
下面是threading模塊與遞歸鎖提供的相關(guān)方法:
方法 | 描述 |
---|---|
threading.RLock() | 返回一個遞歸鎖對象 |
lockObject.acquire(blocking=True, timeout=1) | 上鎖,當(dāng)一個線程在執(zhí)行被上鎖代碼塊時,將不允許切換到其他線程運行,默認(rèn)鎖失效時間為1秒 |
lockObject.release() | 解鎖,當(dāng)一個線程在執(zhí)行未被上鎖代碼塊時,將允許系統(tǒng)根據(jù)策略自行切換到其他線程中運行 |
lockObject.locaked() | 判斷該鎖對象是否處于上鎖狀態(tài),返回一個布爾值 |
以下是遞歸鎖的簡單使用,下面這段操作如果使用同步鎖則會發(fā)生死鎖現(xiàn)象,但是遞歸鎖不會:
import threading num = 0 def add(): lock.acquire() lock.acquire() global num for i in range(10_000_000): num += 1 lock.release() lock.release() def sub(): lock.acquire() lock.acquire() global num for i in range(10_000_000): num -= 1 lock.release() lock.release() if __name__ == "__main__": lock = threading.RLock() subThread01 = threading.Thread(target=add) subThread02 = threading.Thread(target=sub) subThread01.start() subThread02.start() subThread01.join() subThread02.join() print("num result : %s" % num) # 結(jié)果三次采集 # num result : 0 # num result : 0 # num result : 0
由于threading.RLock()對象中實現(xiàn)了__enter__()與__exit__()方法,故我們可以使用with語句進(jìn)行上下文管理形式的加鎖解鎖操作:
import threading num = 0 def add(): with lock: # 自動加鎖 global num for i in range(10_000_000): num += 1 # 自動解鎖 def sub(): with lock: # 自動加鎖 global num for i in range(10_000_000): num -= 1 # 自動解鎖 if __name__ == "__main__": lock = threading.RLock() subThread01 = threading.Thread(target=add) subThread02 = threading.Thread(target=sub) subThread01.start() subThread02.start() subThread01.join() subThread02.join() print("num result : %s" % num) # 結(jié)果三次采集 # num result : 0 # num result : 0 # num result : 0
條件鎖是在遞歸鎖的基礎(chǔ)上增加了能夠暫停線程運行的功能。并且我們可以使用wait()與notify()來控制線程執(zhí)行的個數(shù)。
注意:條件鎖可以自由設(shè)定一次放行幾個線程。
下面是threading模塊與條件鎖提供的相關(guān)方法:
方法 | 描述 |
---|---|
threading.Condition() | 返回一個條件鎖對象 |
lockObject.acquire(blocking=True, timeout=1) | 上鎖,當(dāng)一個線程在執(zhí)行被上鎖代碼塊時,將不允許切換到其他線程運行,默認(rèn)鎖失效時間為1秒 |
lockObject.release() | 解鎖,當(dāng)一個線程在執(zhí)行未被上鎖代碼塊時,將允許系統(tǒng)根據(jù)策略自行切換到其他線程中運行 |
lockObject.wait(timeout=None) | 將當(dāng)前線程設(shè)置為“等待”狀態(tài),只有該線程接到“通知”或者超時時間到期之后才會繼續(xù)運行,在“等待”狀態(tài)下的線程將允許系統(tǒng)根據(jù)策略自行切換到其他線程中運行 |
lockObject.wait_for(predicate, timeout=None) | 將當(dāng)前線程設(shè)置為“等待”狀態(tài),只有該線程的predicate返回一個True或者超時時間到期之后才會繼續(xù)運行,在“等待”狀態(tài)下的線程將允許系統(tǒng)根據(jù)策略自行切換到其他線程中運行。注意:predicate參數(shù)應(yīng)當(dāng)傳入一個可調(diào)用對象,且返回結(jié)果為bool類型 |
lockObject.notify(n=1) | 通知一個當(dāng)前狀態(tài)為“等待”的線程繼續(xù)運行,也可以通過參數(shù)n通知多個 |
lockObject.notify_all() | 通知所有當(dāng)前狀態(tài)為“等待”的線程繼續(xù)運行 |
下面這個案例會啟動10個子線程,并且會立即將10個子線程設(shè)置為等待狀態(tài)。
然后我們可以發(fā)送一個或者多個通知,來恢復(fù)被等待的子線程繼續(xù)運行:
import threading currentRunThreadNumber = 0 maxSubThreadNumber = 10 def task(): global currentRunThreadNumber thName = threading.currentThread().name condLock.acquire() # 上鎖 print("start and wait run thread : %s" % thName) condLock.wait() # 暫停線程運行、等待喚醒 currentRunThreadNumber += 1 print("carry on run thread : %s" % thName) condLock.release() # 解鎖 if __name__ == "__main__": condLock = threading.Condition() for i in range(maxSubThreadNumber): subThreadIns = threading.Thread(target=task) subThreadIns.start() while currentRunThreadNumber maxSubThreadNumber: notifyNumber = int( input("Please enter the number of threads that need to be notified to run:")) condLock.acquire() condLock.notify(notifyNumber) # 放行 condLock.release() print("main thread run end") # 先啟動10個子線程,然后這些子線程會全部變?yōu)榈却隣顟B(tài) # start and wait run thread : Thread-1 # start and wait run thread : Thread-2 # start and wait run thread : Thread-3 # start and wait run thread : Thread-4 # start and wait run thread : Thread-5 # start and wait run thread : Thread-6 # start and wait run thread : Thread-7 # start and wait run thread : Thread-8 # start and wait run thread : Thread-9 # start and wait run thread : Thread-10 # 批量發(fā)送通知,放行特定數(shù)量的子線程繼續(xù)運行 # Please enter the number of threads that need to be notified to run:5 # 放行5個 # carry on run thread : Thread-4 # carry on run thread : Thread-3 # carry on run thread : Thread-1 # carry on run thread : Thread-2 # carry on run thread : Thread-5 # Please enter the number of threads that need to be notified to run:5 # 放行5個 # carry on run thread : Thread-8 # carry on run thread : Thread-10 # carry on run thread : Thread-6 # carry on run thread : Thread-9 # carry on run thread : Thread-7 # Please enter the number of threads that need to be notified to run:1 # main thread run end
由于threading.Condition()對象中實現(xiàn)了__enter__()與__exit__()方法,故我們可以使用with語句進(jìn)行上下文管理形式的加鎖解鎖操作:
import threading currentRunThreadNumber = 0 maxSubThreadNumber = 10 def task(): global currentRunThreadNumber thName = threading.currentThread().name with condLock: print("start and wait run thread : %s" % thName) condLock.wait() # 暫停線程運行、等待喚醒 currentRunThreadNumber += 1 print("carry on run thread : %s" % thName) if __name__ == "__main__": condLock = threading.Condition() for i in range(maxSubThreadNumber): subThreadIns = threading.Thread(target=task) subThreadIns.start() while currentRunThreadNumber maxSubThreadNumber: notifyNumber = int( input("Please enter the number of threads that need to be notified to run:")) with condLock: condLock.notify(notifyNumber) # 放行 print("main thread run end")
事件鎖是基于條件鎖來做的,它與條件鎖的區(qū)別在于一次只能放行全部,不能放行任意個數(shù)量的子線程繼續(xù)運行。
我們可以將事件鎖看為紅綠燈,當(dāng)紅燈時所有子線程都暫停運行,并進(jìn)入“等待”狀態(tài),當(dāng)綠燈時所有子線程都恢復(fù)“運行”。
下面是threading模塊與事件鎖提供的相關(guān)方法:
方法 | 描述 |
---|---|
threading.Event() | 返回一個事件鎖對象 |
lockObject.clear() | 將事件鎖設(shè)為紅燈狀態(tài),即所有線程暫停運行 |
lockObject.is_set() | 用來判斷當(dāng)前事件鎖狀態(tài),紅燈為False,綠燈為True |
lockObject.set() | 將事件鎖設(shè)為綠燈狀態(tài),即所有線程恢復(fù)運行 |
lockObject.wait(timeout=None) | 將當(dāng)前線程設(shè)置為“等待”狀態(tài),只有該線程接到“綠燈通知”或者超時時間到期之后才會繼續(xù)運行,在“等待”狀態(tài)下的線程將允許系統(tǒng)根據(jù)策略自行切換到其他線程中運行 |
事件鎖不能利用with語句來進(jìn)行使用,只能按照常規(guī)方式。
如下所示,我們來模擬線程和紅綠燈的操作,紅燈停,綠燈行:
import threading maxSubThreadNumber = 3 def task(): thName = threading.currentThread().name print("start and wait run thread : %s" % thName) eventLock.wait() # 暫停運行,等待綠燈 print("green light, %s carry on run" % thName) print("red light, %s stop run" % thName) eventLock.wait() # 暫停運行,等待綠燈 print("green light, %s carry on run" % thName) print("sub thread %s run end" % thName) if __name__ == "__main__": eventLock = threading.Event() for i in range(maxSubThreadNumber): subThreadIns = threading.Thread(target=task) subThreadIns.start() eventLock.set() # 設(shè)置為綠燈 eventLock.clear() # 設(shè)置為紅燈 eventLock.set() # 設(shè)置為綠燈 # start and wait run thread : Thread-1 # start and wait run thread : Thread-2 # start and wait run thread : Thread-3 # green light, Thread-1 carry on run # red light, Thread-1 stop run # green light, Thread-1 carry on run # sub thread Thread-1 run end # green light, Thread-3 carry on run # red light, Thread-3 stop run # green light, Thread-3 carry on run # sub thread Thread-3 run end # green light, Thread-2 carry on run # red light, Thread-2 stop run # green light, Thread-2 carry on run # sub thread Thread-2 run end
基本介紹
信號量鎖也是根據(jù)條件鎖來做的,它與條件鎖和事件鎖的區(qū)別如下:
下面是threading模塊與信號量鎖提供的相關(guān)方法:
方法 | 描述 |
---|---|
threading.Semaphore() | 返回一個信號量鎖對象 |
lockObject.acquire(blocking=True, timeout=1) | 上鎖,當(dāng)一個線程在執(zhí)行被上鎖代碼塊時,將不允許切換到其他線程運行,默認(rèn)鎖失效時間為1秒 |
lockObject.release() | 解鎖,當(dāng)一個線程在執(zhí)行未被上鎖代碼塊時,將允許系統(tǒng)根據(jù)策略自行切換到其他線程中運行 |
以下是使用示例,你可以將它當(dāng)做一段限寬的路段,每次只能放行相同數(shù)量的線程:
import threading import time maxSubThreadNumber = 6 def task(): thName = threading.currentThread().name semaLock.acquire() print("run sub thread %s" % thName) time.sleep(3) semaLock.release() if __name__ == "__main__": # 每次只能放行2個 semaLock = threading.Semaphore(2) for i in range(maxSubThreadNumber): subThreadIns = threading.Thread(target=task) subThreadIns.start() # run sub thread Thread-1 # run sub thread Thread-2 # run sub thread Thread-3 # run sub thread Thread-4 # run sub thread Thread-6 # run sub thread Thread-5
由于threading.Semaphore()對象中實現(xiàn)了__enter__()與__exit__()方法,故我們可以使用with語句進(jìn)行上下文管理形式的加鎖解鎖操作:
import threading import time maxSubThreadNumber = 6 def task(): thName = threading.currentThread().name with semaLock: print("run sub thread %s" % thName) time.sleep(3) if __name__ == "__main__": semaLock = threading.Semaphore(2) for i in range(maxSubThreadNumber): subThreadIns = threading.Thread(target=task) subThreadIns.start()
上面5種鎖可以說都是基于同步鎖來做的,這些你都可以從源碼中找到答案。
首先來看RLock遞歸鎖,遞歸鎖的實現(xiàn)非常簡單,它的內(nèi)部會維護(hù)著一個計數(shù)器,當(dāng)計數(shù)器不為0的時候該線程不能被I/O操作和時間輪詢機(jī)制切換。但是當(dāng)計數(shù)器為0的時候便不會如此了:
def __init__(self): self._block = _allocate_lock() self._owner = None self._count = 0 # 計數(shù)器
而Condition條件鎖的內(nèi)部其實是有兩把鎖的,一把底層鎖(同步鎖)一把高級鎖(遞歸鎖)。
低層鎖的解鎖方式有兩種,使用wait()方法會暫時解開底層鎖同時加上一把高級鎖,只有當(dāng)接收到別的線程里的notfiy()后才會解開高級鎖和重新上鎖低層鎖,也就是說條件鎖底層是根據(jù)同步鎖和遞歸鎖的不斷切換來進(jìn)行實現(xiàn)的:
def __init__(self, lock=None): if lock is None: lock = RLock() # 可以看到條件鎖的內(nèi)部是基于遞歸鎖,而遞歸鎖又是基于同步鎖來做的 self._lock = lock self.acquire = lock.acquire self.release = lock.release try: self._release_save = lock._release_save except AttributeError: pass try: self._acquire_restore = lock._acquire_restore except AttributeError: pass try: self._is_owned = lock._is_owned except AttributeError: pass self._waiters = _deque()
Event事件鎖內(nèi)部是基于條件鎖來做的:
class Event: def __init__(self): self._cond = Condition(Lock()) # 實例化出了一個條件鎖。 self._flag = False def _reset_internal_locks(self): # private! called by Thread._reset_internal_locks by _after_fork() self._cond.__init__(Lock()) def is_set(self): """Return true if and only if the internal flag is true.""" return self._flag isSet = is_set
Semaphore信號量鎖內(nèi)部也是基于條件鎖來做的:
class Semaphore: def __init__(self, value=1): if value 0: raise ValueError("semaphore initial value must be >= 0") self._cond = Condition(Lock()) # 可以看到,這里是實例化出了一個條件鎖 self._value = value
需求:一個空列表,兩個線程輪番往里面加值(一個加偶數(shù),一個加奇數(shù)),最終讓該列表中的值為 1 - 100 ,且是有序排列的。
import threading lst = [] def even(): """加偶數(shù)""" with condLock: for i in range(2, 101, 2): # 判斷當(dāng)前列表的長度處于2是否能處盡 # 如果能處盡則代表需要添加奇數(shù) # 否則就添加偶數(shù) if len(lst) % 2 != 0: # 添偶數(shù) lst.append(i) # 先添加值 condLock.notify() # 告訴另一個線程,你可以加奇數(shù)了,但是這里不會立即交出執(zhí)行權(quán) condLock.wait() # 交出執(zhí)行權(quán),并等待另一個線程通知加偶數(shù) else: # 添奇數(shù) condLock.wait() # 交出執(zhí)行權(quán),等待另一個線程通知加偶數(shù) lst.append(i) condLock.notify() condLock.notify() def odd(): """加奇數(shù)""" with condLock: for i in range(1, 101, 2): if len(lst) % 2 == 0: lst.append(i) condLock.notify() condLock.wait() condLock.notify() if __name__ == "__main__": condLock = threading.Condition() addEvenTask = threading.Thread(target=even) addOddTask = threading.Thread(target=odd) addEvenTask.start() addOddTask.start() addEvenTask.join() addOddTask.join() print(lst)
有2個任務(wù)線程來扮演李白和杜甫,如何讓他們一人一句進(jìn)行對答?文本如下:
杜甫:老李啊,來喝酒!
李白:老杜啊,不喝了我喝不下了!
杜甫:老李啊,再來一壺?
杜甫:...老李?
李白:呼呼呼...睡著了..
代碼如下:
import threading def libai(): event.wait() print("李白:老杜啊,不喝了我喝不下了!") event.set() event.clear() event.wait() print("李白:呼呼呼...睡著了..") def dufu(): print("杜甫:老李啊,來喝酒!") event.set() event.clear() event.wait() print("杜甫:老李啊,再來一壺?") print("杜甫:...老李?") event.set() if __name__ == '__main__': event = threading.Event() t1 = threading.Thread(target=libai) t2 = threading.Thread(target=dufu) t1.start() t2.start() t1.join() t2.join()
到此這篇關(guān)于Python3中最常用的5種線程鎖的文章就介紹到這了,更多相關(guān)Python3常用線程鎖內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
標(biāo)簽:宜昌 潮州 珠海 西寧 上饒 盤錦 湖北 佳木斯
巨人網(wǎng)絡(luò)通訊聲明:本文標(biāo)題《Python3中最常用的5種線程鎖實例總結(jié)》,本文關(guān)鍵詞 Python3,中最,常用的,常,用的,;如發(fā)現(xiàn)本文內(nèi)容存在版權(quán)問題,煩請?zhí)峁┫嚓P(guān)信息告之我們,我們將及時溝通與處理。本站內(nèi)容系統(tǒng)采集于網(wǎng)絡(luò),涉及言論、版權(quán)與本站無關(guān)。