1、技術(shù)方案
1.1、redis的基本命令
1)SETNX命令(SET if Not eXists)
語法:SETNX key value
功能:當且僅當 key 不存在,將 key 的值設(shè)為 value ,并返回1;若給定的 key 已經(jīng)存在,則 SETNX 不做任何動作,并返回0。
2)expire命令
語法:expire KEY seconds
功能:設(shè)置key的過期時間。如果key已過期,將會被自動刪除。
3)DEL命令
語法:DEL key [KEY …]
功能:刪除給定的一個或多個 key ,不存在的 key 會被忽略。
1.2、實現(xiàn)同步鎖原理
1)加鎖:“鎖”就是一個存儲在redis里的key-value對,key是把一組投資操作用字符串來形成唯一標識,value其實并不重要,因為只要這個唯一的key-value存在,就表示這個操作已經(jīng)上鎖。
2)解鎖:既然key-value對存在就表示上鎖,那么釋放鎖就自然是在redis里刪除key-value對。
3)阻塞、非阻塞:阻塞式的實現(xiàn),若線程發(fā)現(xiàn)已經(jīng)上鎖,會在特定時間內(nèi)輪詢鎖。非阻塞式的實現(xiàn),若發(fā)現(xiàn)線程已經(jīng)上鎖,則直接返回。
4)處理異常情況:假設(shè)當投資操作調(diào)用其他平臺接口出現(xiàn)等待時,自然沒有釋放鎖,這種情況下加入鎖超時機制,用redis的expire命令為key設(shè)置超時時長,過了超時時間redis就會將這個key自動刪除,即強制釋放鎖
(此步驟需在JAVA內(nèi)部設(shè)置同樣的超時機制,內(nèi)部超時時長應(yīng)小于或等于redis超時時長)。
1.3、處理流程圖
![](/d/20211018/40fdace4f6ad32eb9b284126bf6146cb.gif)
2、代碼實現(xiàn)
2.1、同步鎖工具類
package com.mic.synchrolock.util;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import com.mic.constants.Constants;
import com.mic.constants.InvestType;
/**
* 分布式同步鎖工具類
* @author Administrator
*
*/
public class SynchrolockUtil {
private final Log logger = LogFactory.getLog(getClass());
@Autowired
private RedisClientTemplate redisClientTemplate;
public final String RETRYTYPE_WAIT = "1"; //加鎖方法當對象已加鎖時,設(shè)置為等待并輪詢
public final String RETRYTYPE_NOWAIT = "0"; //加鎖方法當對象已加鎖時,設(shè)置為直接返回
private String requestTimeOutName = ""; //投資同步鎖請求超時時間
private String retryIntervalName = ""; //投資同步鎖輪詢間隔
private String keyTimeoutName = ""; //緩存中key的失效時間
private String investProductSn = ""; //產(chǎn)品Sn
private String uuid; //對象唯一標識
private Long startTime = System.currentTimeMillis(); //首次調(diào)用時間
public Long getStartTime() {
return startTime;
}
ListString> keyList = new ArrayListString>(); //緩存key的保存集合
public ListString> getKeyList() {
return keyList;
}
public void setKeyList(ListString> keyList) {
this.keyList = keyList;
}
@PostConstruct
public void init() {
uuid = UUID.randomUUID().toString();
}
@PreDestroy
public void destroy() {
this.unlock();
}
/**
* 根據(jù)傳入key值,判斷緩存中是否存在該key
* 存在-已上鎖:判斷retryType,輪詢超時,或直接返回,返回ture
* 不存在-未上鎖:將該放入緩存,返回false
* @param key
* @param retryType 當遇到上鎖情況時 1:輪詢;0:直接返回
* @return
*/
public boolean islocked(String key,String retryType){
boolean flag = true;
logger.info("====投資同步鎖設(shè)置輪詢間隔、請求超時時長、緩存key失效時長====");
//投資同步鎖輪詢間隔 毫秒
Long retryInterval = Long.parseLong(Constants.getProperty(retryIntervalName));
//投資同步鎖請求超時時間 毫秒
Long requestTimeOut = Long.parseLong(Constants.getProperty(requestTimeOutName));
//緩存中key的失效時間 秒
Integer keyTimeout = Integer.parseInt(Constants.getProperty(keyTimeoutName));
//調(diào)用緩存獲取當前產(chǎn)品鎖
logger.info("====當前產(chǎn)品key為:"+key+"====");
if(isLockedInRedis(key,keyTimeout)){
if("1".equals(retryType)){
//采用輪詢方式等待
while (true) {
logger.info("====產(chǎn)品已被占用,開始輪詢====");
try {
Thread.sleep(retryInterval);
} catch (InterruptedException e) {
logger.error("線程睡眠異常:"+e.getMessage(), e);
return flag;
}
logger.info("====判斷請求是否超時====");
Long currentTime = System.currentTimeMillis(); //當前調(diào)用時間
long Interval = currentTime - startTime;
if (Interval > requestTimeOut) {
logger.info("====請求超時====");
return flag;
}
if(!isLockedInRedis(key,keyTimeout)){
logger.info("====輪詢結(jié)束,添加同步鎖====");
flag = false;
keyList.add(key);
break;
}
}
}else{
//不等待,直接返回
logger.info("====產(chǎn)品已被占用,直接返回====");
return flag;
}
}else{
logger.info("====產(chǎn)品未被占用,添加同步鎖====");
flag = false;
keyList.add(key);
}
return flag;
}
/**
* 在緩存中查詢key是否存在
* 若存在則返回true;
* 若不存在則將key放入緩存,設(shè)置過期時間,返回false
* @param key
* @param keyTimeout key超時時間單位是秒
* @return
*/
boolean isLockedInRedis(String key,int keyTimeout){
logger.info("====在緩存中查詢key是否存在====");
boolean isExist = false;
//與redis交互,查詢對象是否上鎖
Long result = this.redisClientTemplate.setnx(key, uuid);
logger.info("====上鎖 result = "+result+"====");
if(null != result 1 == Integer.parseInt(result.toString())){
logger.info("====設(shè)置緩存失效時長 = "+keyTimeout+"秒====");
this.redisClientTemplate.expire(key, keyTimeout);
logger.info("====上鎖成功====");
isExist = false;
}else{
logger.info("====上鎖失敗====");
isExist = true;
}
return isExist;
}
/**
* 根據(jù)傳入key,對該產(chǎn)品進行解鎖
* @param key
* @return
*/
public void unlock(){
//與redis交互,對產(chǎn)品解鎖
if(keyList.size()>0){
for(String key : this.keyList){
String value = this.redisClientTemplate.get(key);
if(null != value !"".equals(value)){
if(uuid.equals(value)){
logger.info("====解鎖key:"+key+" value="+value+"====");
this.redisClientTemplate.del(key);
}else{
logger.info("====待解鎖集合中key:"+key+" value="+value+"與uuid不匹配====");
}
}else{
logger.info("====待解鎖集合中key="+key+"的value為空====");
}
}
}else{
logger.info("====待解鎖集合為空====");
}
}
}
2.2、業(yè)務(wù)調(diào)用模擬樣例
//獲取同步鎖工具類
SynchrolockUtil synchrolockUtil = SpringUtils.getBean("synchrolockUtil");
//獲取需上鎖資源的KEY
String key = "abc";
//查詢是否上鎖,上鎖輪詢,未上鎖加鎖
boolean isLocked = synchrolockUtil.islocked(key,synchrolockUtil.RETRYTYPE_WAIT);
//判斷上鎖結(jié)果
if(isLocked){
logger.error("同步鎖請求超時并返回 key ="+key);
}else{
logger.info("====同步鎖加鎖陳功====");
}
try {
//執(zhí)行業(yè)務(wù)處理
} catch (Exception e) {
logger.error("業(yè)務(wù)異常:"+e.getMessage(), e);
}finally{
//解鎖
synchrolockUtil.unlock();
}
2.3、如果業(yè)務(wù)處理內(nèi)部,還有嵌套加鎖需求,只需將對象傳入方法內(nèi)部,加鎖成功后將key值追加到集合中即可
ps:實際實現(xiàn)中還需要jedis工具類,需額外添加調(diào)用
補充:使用redis鎖還是出現(xiàn)同步問題
一種可能是,2臺機器同時訪問,一臺訪問,還沒有把鎖設(shè)置過去的時候,另一臺也查不到就會出現(xiàn)這個問題。
解決方法
這我跟寫代碼的方式有關(guān)。先查,如果不存在就set,這種方式有極微小的可能存在時間差,導致鎖set了2次。
推薦使用setIfAbsent 這樣在redis set的時候是單線程的。不會存在重復的問題。
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。如有錯誤或未考慮完全的地方,望不吝賜教。
您可能感興趣的文章:- Redis的主從同步解析
- 簡單注解實現(xiàn)集群同步鎖(spring+redis+注解)
- SpringBoot集成redis實現(xiàn)分布式鎖的示例代碼
- 基于redis setIfAbsent的使用說明
- Redis實現(xiàn)分布式Session管理的機制詳解
- kubernetes環(huán)境部署單節(jié)點redis數(shù)據(jù)庫的方法