在集群下,經(jīng)常會因?yàn)橥瑫r(shí)處理發(fā)生資源爭搶和并發(fā)問題,但是我們都知道同步鎖 synchronized 、 cas 、 ReentrankLock 這些鎖的作用范圍都是 JVM ,說白了在集群下沒啥用。這時(shí)我們就需要能在多臺 JVM 之間決定執(zhí)行順序的鎖了,現(xiàn)在分布式鎖主要有 redis 、 Zookeeper 實(shí)現(xiàn)的,還有數(shù)據(jù)庫的方式,不過性能太差,也就是需要一個(gè)第三方的監(jiān)管。
背景
最近在做一個(gè)消費(fèi) Kafka 消息的時(shí)候發(fā)現(xiàn),由于線上的消費(fèi)者過多,經(jīng)常會遇到,多個(gè)機(jī)器同時(shí)處理一個(gè)主鍵類型的數(shù)據(jù)的情況發(fā)生,如果最后是執(zhí)行更新操作的話,也就是一個(gè)更新順序的問題,但是如果恰好都需要插入數(shù)據(jù)的時(shí)候,會出現(xiàn)主鍵重復(fù)的問題。這是生產(chǎn)上不被允許的(因?yàn)楣居挟惓1O(jiān)管的機(jī)制,扣分啥的),這是就需要個(gè)分布式鎖了,斟酌后用了 Redis 的實(shí)現(xiàn)方式(因?yàn)榫W(wǎng)上例子多)
分析
redis 實(shí)現(xiàn)的分布式鎖,實(shí)現(xiàn)原理是 set 方法,因?yàn)槎鄠€(gè)線程同時(shí)請求的時(shí)候,只有一個(gè)線程可以成功并返回結(jié)果,還可以設(shè)置有效期,來避免死鎖的發(fā)生,一切都是這么的完美,不過有個(gè)問題,在 set 的時(shí)候,會直接返回結(jié)果,成功或者失敗,不具有阻塞效果,需要我們自己對失敗的線程進(jìn)程處理,有兩種方式
- 丟棄
- 等待重試 由于我們的系統(tǒng)需要這些數(shù)據(jù),那么只能重新嘗試獲取。這里使用 redis 的 List 類型實(shí)現(xiàn)等待序列的作用
代碼
直接上代碼 其實(shí)直接redis的工具類就可以解決了
package com.test
import redis.clients.jedis.Jedis;
import java.util.Collections;
import java.util.List;
/**
* @desc redis隊(duì)列實(shí)現(xiàn)方式
* @anthor
* @date
**/
public class RedisUcUitl {
private static final String LOCK_SUCCESS = "OK";
private static final String SET_IF_NOT_EXIST = "NX";
private static final String SET_WITH_EXPIRE_TIME = "PX";
private static final Long RELEASE_SUCCESS = 1L;
private RedisUcUitl() {
}
/**
* logger
**/
/**
* 存儲redis隊(duì)列順序存儲 在隊(duì)列首部存入
*
* @param key 字節(jié)類型
* @param value 字節(jié)類型
*/
public static Long lpush(Jedis jedis, final byte[] key, final byte[] value) {
return jedis.lpush(key, value);
}
/**
* 移除列表中最后一個(gè)元素 并將改元素添加入另一個(gè)列表中 ,當(dāng)列表為空時(shí) 將阻塞連接 直到等待超時(shí)
*
* @param srckey
* @param dstkey
* @param timeout 0 表示永不超時(shí)
* @return
*/
public static byte[] brpoplpush(Jedis jedis,final byte[] srckey, final byte[] dstkey, final int timeout) {
return jedis.brpoplpush(srckey, dstkey, timeout);
}
/**
* 返回制定的key,起始位置的redis數(shù)據(jù)
* @param redisKey
* @param start
* @param end -1 表示到最后
* @return
*/
public static Listbyte[]> lrange(Jedis jedis,final byte[] redisKey, final long start, final long end) {
return jedis.lrange(redisKey, start, end);
}
/**
* 刪除key
* @param redisKey
*/
public static void delete(Jedis jedis, final byte[] redisKey) {
return jedis.del(redisKey);
}
/**
* 嘗試加鎖
* @param lockKey key名稱
* @param requestId 身份標(biāo)識
* @param expireTime 過期時(shí)間
* @return
*/
public static boolean tryGetDistributedLock(Jedis jedis,final String lockKey, final String requestId, final int expireTime) {
String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
return LOCK_SUCCESS.equals(result);
}
/**
* 釋放鎖
* @param lockKey key名稱
* @param requestId 身份標(biāo)識
* @return
*/
public static boolean releaseDistributedLock(Jedis jedis,final String lockKey, final String requestId) {
final String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
return RELEASE_SUCCESS.equals(result);
}
}
業(yè)務(wù)邏輯主要代碼如下
1.先消耗隊(duì)列中的
while(true){
// 消費(fèi)隊(duì)列
try{
// 被放入redis隊(duì)列的數(shù)據(jù) 序列化后的
byte[] bytes = RedisUcUitl.brpoplpush(keyStr.getBytes(UTF_8), dstKeyStr.getBytes(UTF_8), 1);
if(bytes == null || bytes.isEmpty()){
// 隊(duì)列中沒數(shù)據(jù)時(shí)退出
break;
}
// 反序列化對象
MapString, Object> singleMap = (MapString, Object>) ObjectSerialUtil.bytesToObject(bytes);
// 塞入唯一的值 防止被其他線程誤解鎖
String requestId = UUID.randomUUID().toString();
boolean lockGetFlag = RedisUcUitl.tryGetDistributedLock(keyStr,requestId, 100);
if(lockGetFlag){
// 成功獲取鎖 進(jìn)行業(yè)務(wù)處理
//TODO
// 處理完畢釋放鎖
boolean freeLock = RedisUcUitl.releaseDistributedLock(keyStr, requestId);
}else{
// 未能獲得鎖放入等待隊(duì)列
RedisUcUitl.lpush(keyStr.getBytes(UTF_8), ObjectSerialUtil.objectToBytes(param));
}
}catch(Exception e){
break;
}
}
2.處理最新接到的數(shù)據(jù)
同樣是走嘗試獲取鎖,獲取不到放入隊(duì)列的流程
一般序列化用 fastJson 之列的就可以了,這里用的是 JDK 自帶的,工具類如下
public class ObjectSerialUtil {
private ObjectSerialUtil() {
// 工具類
}
/**
* 將Object對象序列化為byte[]
*
* @param obj 對象
* @return byte數(shù)組
* @throws Exception
*/
public static byte[] objectToBytes(Object obj) throws IOException {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(obj);
byte[] bytes = bos.toByteArray();
bos.close();
oos.close();
return bytes;
}
/**
* 將bytes數(shù)組還原為對象
*
* @param bytes
* @return
* @throws Exception
*/
public static Object bytesToObject(byte[] bytes) {
try {
ByteArrayInputStream bin = new ByteArrayInputStream(bytes);
ObjectInputStream ois = new ObjectInputStream(bin);
return ois.readObject();
} catch (Exception e) {
throw new BaseException("反序列化出錯(cuò)!", e);
}
}
}
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
您可能感興趣的文章:- Redis分布式鎖的正確實(shí)現(xiàn)方法總結(jié)
- Redis分布式鎖的實(shí)現(xiàn)方式(redis面試題)
- SpringBoot使用Redisson實(shí)現(xiàn)分布式鎖(秒殺系統(tǒng))
- SpringBoot集成Redisson實(shí)現(xiàn)分布式鎖的方法示例
- Java Redis分布式鎖的正確實(shí)現(xiàn)方式詳解
- redis分布式鎖的問題與解決方法
- 淺談Redis分布式鎖的正確實(shí)現(xiàn)方式
- 單機(jī)redis分布式鎖實(shí)現(xiàn)原理解析