濮阳杆衣贸易有限公司

主頁 > 知識庫 > redis分布式鎖之可重入鎖的實現(xiàn)代碼

redis分布式鎖之可重入鎖的實現(xiàn)代碼

熱門標(biāo)簽:超呼電話機器人 宿遷便宜外呼系統(tǒng)平臺 十堰營銷電銷機器人哪家便宜 魔獸2青云地圖標(biāo)注 山東外呼銷售系統(tǒng)招商 日本中國地圖標(biāo)注 北京400電話辦理收費標(biāo)準(zhǔn) 鄭州人工智能電銷機器人系統(tǒng) 貴州電銷卡外呼系統(tǒng)

上篇redis實現(xiàn)的分布式鎖,有一個問題,它不可重入。

所謂不可重入鎖,即若當(dāng)前線程執(zhí)行某個方法已經(jīng)獲取了該鎖,那么在方法中嘗試再次獲取鎖時,就會獲取不到被阻塞。 同一個人拿一個鎖 ,只能拿一次不能同時拿2次。

1、什么是可重入鎖?它有什么作用?

可重入鎖,也叫做遞歸鎖,指的是在同一線程內(nèi),外層函數(shù)獲得鎖之后,內(nèi)層遞歸函數(shù)仍然可以獲取到該鎖。 說白了就是同一個線程再次進入同樣代碼時,可以再次拿到該鎖。 它的作用是:防止在同一線程中多次獲取鎖而導(dǎo)致死鎖發(fā)生。

2、那么java中誰實現(xiàn)了可重入鎖了?

在java的編程中synchronized 和 ReentrantLock都是可重入鎖。我們可以參考ReentrantLock的代碼

3、基于ReentrantLock的可重入鎖

ReentrantLock,是一個可重入且獨占式的鎖,是一種遞歸無阻塞的同步鎖。

3.1、看個ReentrantLock的例子

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.locks.ReentrantLock;

@Slf4j
public class ReentrantLockDemo {
    //鎖
    private static ReentrantLock lock =  new ReentrantLock();
    public void doSomething(int n){
        try{
            //進入遞歸第一件事:加鎖
            lock.lock();
            log.info("--------lock()執(zhí)行后,getState()的值:{} lock.isLocked():{}",lock.getHoldCount(),lock.isLocked());
            log.info("--------遞歸{}次--------",n);
            if(n=2){
                this.doSomething(++n);
            }else{
                return;
            }
        }finally {
            lock.unlock();
            log.info("--------unlock()執(zhí)行后,getState()的值:{} lock.isLocked():{}",lock.getHoldCount(),lock.isLocked());
        }
    }

    public static void main(String[] args) {
        ReentrantLockDemo reentrantLockDemo=new ReentrantLockDemo();
        reentrantLockDemo.doSomething(1);
        log.info("執(zhí)行完doSomething方法 是否還持有鎖:{}",lock.isLocked());
    }

}

3.2、執(zhí)行結(jié)果

16:35:58.051 [main] INFO com.test.ReentrantLockDemo - --------lock()執(zhí)行后,getState()的值:1 lock.isLocked():true
16:35:58.055 [main] INFO com.test.ReentrantLockDemo - --------遞歸1次--------
16:35:58.055 [main] INFO com.test.ReentrantLockDemo - --------lock()執(zhí)行后,getState()的值:2 lock.isLocked():true
16:35:58.055 [main] INFO com.test.ReentrantLockDemo - --------遞歸2次--------
16:35:58.055 [main] INFO com.test.ReentrantLockDemo - --------lock()執(zhí)行后,getState()的值:3 lock.isLocked():true
16:35:58.055 [main] INFO com.test.ReentrantLockDemo - --------遞歸3次--------
16:35:58.055 [main] INFO com.test.ReentrantLockDemo - --------unlock()執(zhí)行后,getState()的值:2 lock.isLocked():true
16:35:58.055 [main] INFO com.test.ReentrantLockDemo - --------unlock()執(zhí)行后,getState()的值:1 lock.isLocked():true
16:35:58.055 [main] INFO com.test.ReentrantLockDemo - --------unlock()執(zhí)行后,getState()的值:0 lock.isLocked():false
16:35:58.055 [main] INFO com.test.ReentrantLockDemo - 執(zhí)行完doSomething方法 是否還持有鎖:false

3.3、 從上面栗子可以看出ReentrantLock是可重入鎖,那么他是如何實現(xiàn)的了,我們看下源碼就知道了

 final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            //先判斷,c(state)是否等于0,如果等于0,說明沒有線程持有鎖
            if (c == 0) {
                //通過cas方法把state的值0替換成1,替換成功說明加鎖成功
                if (compareAndSetState(0, acquires)) {
                    //如果加鎖成功,設(shè)置持有鎖的線程是當(dāng)前線程
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {//判斷當(dāng)前持有鎖的線程是否是當(dāng)前線程
                //如果是當(dāng)前線程,則state值加acquires,代表了當(dāng)前線程加鎖了多少次
                int nextc = c + acquires;
                if (nextc  0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

ReentrantLock的加鎖流程是:
1,先判斷是否有線程持有鎖,沒有加鎖進行加鎖
2、如果加鎖成功,則設(shè)置持有鎖的線程是當(dāng)前線程
3、如果有線程持有了鎖,則再去判斷,是否是當(dāng)前線程持有了鎖
4、如果是當(dāng)前線程持有鎖,則加鎖數(shù)量(state)+1

/**
         * 釋放鎖
         * @param releases
         * @return
         */
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;//state-1 減加鎖次數(shù)
            //如果持有鎖的線程,不是當(dāng)前線程,拋出異常
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();

            boolean free = false;
            if (c == 0) {//如果c==0了說明當(dāng)前線程,已經(jīng)要釋放鎖了
                free = true;
                setExclusiveOwnerThread(null);//設(shè)置當(dāng)前持有鎖的線程為null
            }
            setState(c);//設(shè)置c的值
            return free;
        }

看ReentrantLock的解鎖代碼我們知道,每次釋放鎖的時候都對state減1,
當(dāng)c值等于0的時候,說明鎖重入次數(shù)也為0了,
最終設(shè)置當(dāng)前持有鎖的線程為null,state也設(shè)置為0,鎖就釋放了。

4、那么redis要怎么實現(xiàn)可重入的操作了?

看ReentrantLock的源碼我們知道,它是加鎖成功了,記錄了當(dāng)前持有鎖的線程,并通過一個int類型的數(shù)字,來記錄了加鎖次數(shù)。

我們知道ReentrantLock的實現(xiàn)原理了,那么redis只要下面兩個問題解決,就能實現(xiàn)重入鎖了:
1、怎么保存當(dāng)前持有的線程
2、加鎖次數(shù)(重入了多少次),怎么記錄維護

4.1、第一個問題:怎么保存當(dāng)前持有的線程

1.上一篇文章我們用的是redis 的set命令存的是string類型,他能保存當(dāng)前持有的線程嗎?
valus值我們可以保存當(dāng)前線程的id來解決。
2. 但是集群環(huán)境下我們線程id可能是重復(fù)了那怎么解決?
項目在啟動的生成一個全局進程id,使用進程id+線程id 那就是唯一的了

4.2、第二個問題:加鎖次數(shù)(重入了多少次),怎么記錄維護

他能記錄下來加鎖次數(shù)嗎?
如果valus值存的格式是:系進程id+線程id+加鎖次數(shù),那可以實現(xiàn)

存沒問題了,但是重入次數(shù)要怎么維護了, 它肯定要保證原子性的,能解決嗎?
好像用java代碼或者lua腳本都沒法解決,因為都是實現(xiàn)都需要兩步來維護這個重入次數(shù)的

  • 第一步:先獲取到valus值,把取到加鎖次數(shù)+1
  • 第二部:把新的值再設(shè)置進去
  • 在執(zhí)行第二步操作之前,如果這個key失效了(設(shè)置持有鎖超時了),如果還能再設(shè)置進去,就會有并發(fā)問題了

5、我們已經(jīng)知道SET是不支持重入鎖的,但我們需要重入鎖,怎么辦呢?

目前對于redis的重入鎖業(yè)界還是有很多解決方案的,最流行的就是采用Redisson。

6、什么是 Redisson?

Redisson是Redis官方推薦的Java版的Redis客戶端。 它基于Java實用工具包中常用接口,為使用者提供了一系列具有分布式特性的常用工具類。 它在網(wǎng)絡(luò)通信上是基于NIO的Netty框架,保證網(wǎng)絡(luò)通信的高性能。 在分布式鎖的功能上,它提供了一系列的分布式鎖;如:可重入鎖(Reentrant Lock)、公平鎖(Fair Lock、聯(lián)鎖(MultiLock)、 紅鎖(RedLock)、 讀寫鎖(ReadWriteLock)等等。

Redisson github地址

7、Redisson的分布鎖如何使用

引入依賴包

dependency>
   groupId>org.redisson/groupId>
   artifactId>redisson/artifactId>
   version>3.15.5/version>
/dependency>  

代碼

import lombok.extern.slf4j.Slf4j;
import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SingleServerConfig;


@Slf4j
public class ReentrantLockDemo1 {
    //鎖
    public static RLock lock;

    static {
        //Redisson需要的配置
        Config config = new Config();
        String node = "127.0.0.1:6379";//redis地址
        node = node.startsWith("redis://") ? node : "redis://" + node;
        SingleServerConfig serverConfig = config.useSingleServer()
                .setAddress(node)
                .setTimeout(3000)//超時時間
                .setConnectionPoolSize(10)
                .setConnectionMinimumIdleSize(10);
        //serverConfig.setPassword("123456");//設(shè)置redis密碼
        // 創(chuàng)建RedissonClient客戶端實例
        RedissonClient redissonClient = Redisson.create(config);
        //創(chuàng)建redisson的分布式鎖
        RLock rLock = redissonClient.getLock("666");
        lock = rLock;
    }
    public void doSomething(int n){
        try{
            //進入遞歸第一件事:加鎖
            lock.lock();
            log.info("--------lock()執(zhí)行后,getState()的值:{} lock.isLocked():{}",lock.getHoldCount(),lock.isLocked());
            log.info("--------遞歸{}次--------",n);
            if(n=2){
                this.doSomething(++n);
            }else{
                return;
            }
        }finally {
            lock.unlock();
            log.info("--------unlock()執(zhí)行后,getState()的值:{} lock.isLocked():{}",lock.getHoldCount(),lock.isLocked());
        }
    }
    public static void test(){
        log.info("--------------start---------------");
        ReentrantLockDemo1 reentrantLockDemo=new ReentrantLockDemo1();
        reentrantLockDemo.doSomething(1);
        log.info("執(zhí)行完doSomething方法 是否還持有鎖:{}",ReentrantLockDemo1.lock.isLocked());
        log.info("--------------end---------------");
    }
    public static void main(String[] args) {
        test();
    }
}

執(zhí)行結(jié)果

2021-05-23 22:49:01.322 INFO 69041 --- [nio-9090-exec-1] org.redisson.Version : Redisson 3.15.5
2021-05-23 22:49:01.363 INFO 69041 --- [sson-netty-5-22] o.r.c.pool.MasterConnectionPool : 10 connections initialized for /127.0.0.1:6379
2021-05-23 22:49:01.363 INFO 69041 --- [sson-netty-5-23] o.r.c.pool.MasterPubSubConnectionPool : 1 connections initialized for /127.0.0.1:6379
2021-05-23 22:49:01.367 INFO 69041 --- [nio-9090-exec-1] com.test.ReentrantLockDemo1 : --------------start---------------
2021-05-23 22:49:01.435 INFO 69041 --- [nio-9090-exec-1] com.test.ReentrantLockDemo1 : --------lock()執(zhí)行后,getState()的值:1 lock.isLocked():true
2021-05-23 22:49:01.436 INFO 69041 --- [nio-9090-exec-1] com.test.ReentrantLockDemo1 : --------遞歸1次--------
2021-05-23 22:49:01.442 INFO 69041 --- [nio-9090-exec-1] com.test.ReentrantLockDemo1 : --------lock()執(zhí)行后,getState()的值:2 lock.isLocked():true
2021-05-23 22:49:01.442 INFO 69041 --- [nio-9090-exec-1] com.test.ReentrantLockDemo1 : --------遞歸2次--------
2021-05-23 22:49:01.448 INFO 69041 --- [nio-9090-exec-1] com.test.ReentrantLockDemo1 : --------lock()執(zhí)行后,getState()的值:3 lock.isLocked():true
2021-05-23 22:49:01.448 INFO 69041 --- [nio-9090-exec-1] com.test.ReentrantLockDemo1 : --------遞歸3次--------
2021-05-23 22:49:01.456 INFO 69041 --- [nio-9090-exec-1] com.test.ReentrantLockDemo1 : --------unlock()執(zhí)行后,getState()的值:2 lock.isLocked():true
2021-05-23 22:49:01.461 INFO 69041 --- [nio-9090-exec-1] com.test.ReentrantLockDemo1 : --------unlock()執(zhí)行后,getState()的值:1 lock.isLocked():true
2021-05-23 22:49:01.465 INFO 69041 --- [nio-9090-exec-1] com.test.ReentrantLockDemo1 : --------unlock()執(zhí)行后,getState()的值:0 lock.isLocked():false
2021-05-23 22:49:01.467 INFO 69041 --- [nio-9090-exec-1] com.test.ReentrantLockDemo1 : 執(zhí)行完doSomething方法 是否還持有鎖:false
2021-05-23 22:49:01.467 INFO 69041 --- [nio-9090-exec-1] com.test.ReentrantLockDemo1 : --------------end---------------

看控制臺打印能清楚知道Redisson是支持可重入鎖了。

8、那么Redisson是如何實現(xiàn)的了?

我們跟一下lock.lock()的代碼,發(fā)現(xiàn)它最終調(diào)用的是org.redisson.RedissonLock#tryLockInnerAsync的方法,具體如下:

 T> RFutureT> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommandT> command) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "return redis.call('pttl', KEYS[1]);",
                Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
    }

8.1、上面的代碼,用到的redis命令先梳理一下

exists 查詢一個key是否存在

EXISTS key [key ...]
返回值
如下的整數(shù)結(jié)果
1 如果key存在
0 如果key不存在

hincrby :將hash中指定域的值增加給定的數(shù)字

pexpire:設(shè)置key的有效時間以毫秒為單位

hexists:判斷field是否存在于hash中

pttl:獲取key的有效毫秒數(shù)

8.2、看lua腳本傳入的參數(shù)我們知道:

  • KEYS[1] = key的值
  • ARGV[1]) = 持有鎖的時間
  • ARGV[2] = getLockName(threadId) 下面id就算系統(tǒng)在啟動的時候會全局生成的uuid 來作為當(dāng)前進程的id,加上線程id就是getLockName(threadId)了,可以理解為:進程ID+系統(tǒng)ID = ARGV[2]
protected String getLockName(long threadId) {
        return id + ":" + threadId;
    }

8.3、代碼截圖

從截圖上可以看到,它是使用lua腳本來保證多個命令執(zhí)行的原子性,使用了hash來實現(xiàn)了分布式鎖
現(xiàn)在我們來看下lua腳本的加鎖流程

8.4、第一個if判斷

  • 204行:它是先判斷了當(dāng)前key是否存在,從EXISTS命令我們知道返回值是0說明key不存在,說明沒有加鎖
  • 205行:hincrby命令是對 ARGV[2] = 進程ID+系統(tǒng)ID 進行原子自增加1
  • 206行:是對整個hash設(shè)置過期期間

8.5、下面來看第二個if判斷

  • 209行:判斷field是否存在于hash中,如果存在返回1,返回1說明是當(dāng)前進程+當(dāng)前線程ID 之前已經(jīng)獲得到鎖了
  • 210行:hincrby命令是對 ARGV[2] = 進程ID+系統(tǒng)ID 進行原子自增加1,說明重入次數(shù)加1了
  • 211行:再對整個hash設(shè)置過期期間

8.6、下圖是redis可視化工具看到是如何在hash存儲的結(jié)構(gòu)

Redisson的整個加鎖流程跟ReentrantLock的加鎖邏輯基本相同

8.7、解鎖代碼位于 org.redisson.RedissonLock#unlockInnerAsync,如下:

 return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                        "end; " +
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                        "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                        "else " +
                        "redis.call('del', KEYS[1]); " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return nil;",
                Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
    }

看這個解鎖的Lua腳本,流程跟Reentrantlock的解鎖邏輯也基本相同沒啥好說的了。

以上就是redis分布式鎖-可重入鎖的詳細(xì)內(nèi)容,更多關(guān)于redis分布式鎖的資料請關(guān)注腳本之家其它相關(guān)文章!

您可能感興趣的文章:
  • 詳解redis分布式鎖的這些坑
  • Java基于redis實現(xiàn)分布式鎖
  • 詳解Redis 分布式鎖遇到的序列化問題
  • php基于redis的分布式鎖實例詳解
  • Redis分布式鎖升級版RedLock及SpringBoot實現(xiàn)方法
  • redis分布式鎖的go-redis實現(xiàn)方法詳解
  • Redis分布式鎖的使用和實現(xiàn)原理詳解
  • redission分布式鎖防止重復(fù)初始化問題
  • Redis如何實現(xiàn)分布式鎖詳解

標(biāo)簽:吉安 楊凌 朝陽 江蘇 臺州 大慶 北京 果洛

巨人網(wǎng)絡(luò)通訊聲明:本文標(biāo)題《redis分布式鎖之可重入鎖的實現(xiàn)代碼》,本文關(guān)鍵詞  redis,分布式,鎖之,可,重入,;如發(fā)現(xiàn)本文內(nèi)容存在版權(quán)問題,煩請?zhí)峁┫嚓P(guān)信息告之我們,我們將及時溝通與處理。本站內(nèi)容系統(tǒng)采集于網(wǎng)絡(luò),涉及言論、版權(quán)與本站無關(guān)。
  • 相關(guān)文章
  • 下面列出與本文章《redis分布式鎖之可重入鎖的實現(xiàn)代碼》相關(guān)的同類信息!
  • 本頁收集關(guān)于redis分布式鎖之可重入鎖的實現(xiàn)代碼的相關(guān)信息資訊供網(wǎng)民參考!
  • 推薦文章
    临颍县| 南岸区| 乡城县| 泰兴市| 台湾省| 麻江县| 达孜县| 富蕴县| 宜昌市| 亳州市| 远安县| 青海省| 普定县| 新源县| 汽车| 花莲县| 图片| 滁州市| 武宁县| 农安县| 阿城市| 永年县| 商南县| 山东| 如皋市| 体育| 衡水市| 色达县| 余庆县| 昌邑市| 沙湾县| 揭阳市| 津南区| 龙南县| 永福县| 桦川县| 盐亭县| 高雄县| 图木舒克市| 天祝| 阿尔山市|