基于redis的分布式锁:redis加锁需要用到命令setnt(如果key不存在,那么设置对应的值),对于锁,需要考虑两个问题:网络问题或宕机导致解锁失败和加锁不成功时如何等待(sleep或是一直访问肯定不行)。对于第一个问题,可以采用redis的expire命令,来设置key的过期时间。对于第二个问题,可以利用redis的发布与订阅来实现阻塞等待。一个锁对应一个channel,如果加锁失败,那么订阅该chennel,阻塞监听该chennel知道收到消息为止,此时说明解锁了,可以进一步尝试加锁。在解锁时,需要发布一条消息,来唤醒所有等待在channel上的客户端。
对于可重入锁,也就是一个获得锁的客户端,可以再一次的加锁,此时可以在setnt的时候,设置key的value为客户端的id+进程id+加锁次数。如果在setnt失败后,那么先获得key对应的value,判断锁的持有者是否为本进程,是的话,可以直接将key的value对应的次数加1,利用set进行设置,再一次加锁。在解锁的时候,如果锁的次数大于1,那么直接减去1就好,如果锁的次数为1的话,那么此时需要真正的解锁,直接把del key,并发布一条消息,通知等待的客户端。可以参考redissonlock来学习具体的实现机制:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
//加锁的实现函数,成功返回null,不成功返回锁的存活时间,-1表示锁永不超时
private Long tryLockInner() {
//保存锁的状态: 客户端UUID+线程ID来唯一标识某一JVM实例的某一线程
final LockValue currentLock = new LockValue(id, Thread.currentThread().getId());
currentLock.incCounter(); //用来保存重入次数, 实现可重入功能, 初始情况是1
//Redisson封装了交互的细节, 具体的逻辑为execute方法逻辑.
return connectionManager.write(getName(), new SyncOperation<LockValue, Long>() {
@Override
public Long execute(RedisConnection<Object, LockValue> connection) {
//如果key:haogrgr不存在, 就set并返回true, 否则返回false
Boolean res = connection.setnx(getName(), currentLock);
//如果设置失败, 那么表示有锁竞争了, 于是获取当前锁的状态, 如果拥有者是当前线程, 就累加重入次数并set新值
if (!res) {
//通过watch命令配合multi来实现简单的事务功能,这里需要监看key,有可能key因为超时被删除了。
connection.watch(getName());
LockValue lock = (LockValue) connection.get(getName());
//LockValue的equals实现为比较客户id和threadid是否一样
if (lock != null && lock.equals(currentLock)) {
lock.incCounter(); //如果当前线程已经获取过锁, 则累加加锁次数, 并set更新
connection.multi();
connection.set(getName(), lock);
if (connection.exec().size() == 1) {
return null; //set成功,
}
}
connection.unwatch();
//走到这里, 说明上面set的时候, 其他客户端在 watch之后->set之前 有其他客户端修改了key值
//则获取key的过期时间, 如果是永不过期, 则返回-1, 具体处理后面说明
Long ttl = connection.pttl(getName());
return ttl;
}
return null;
}
});
}
//加锁操作
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
Long ttl;
if (leaseTime != -1) {
ttl = tryLockInner(leaseTime, unit);
} else {
ttl = tryLockInner(); //lock()方法调用会走的逻辑
}
// lock acquired
//加锁成功(新获取锁, 重入情况) tryLockInner会返回null, 失败会返回key超时时间, 或者-1(key未设置超时时间)
if (ttl == null) {
return; //加锁成功, 返回
}
//subscribe这个方法代码有点多, Redisson通过netty来和redis通讯, 然后subscribe返回的是一个Future类型,
//Future的awaitUninterruptibly()调用会阻塞, 然后Redisson通过Redis的pubsub来监听unlock的topic(getChannelName())
//例如, 5中所看到的命令 "PUBLISH" "redisson__lock__channel__{haogrgr}" "0"
//当解锁时, 会向名为 getChannelName() 的topic来发送解锁消息("0")
//而这里 subscribe() 中监听这个topic, 在订阅成功时就会唤醒阻塞在awaitUninterruptibly()的方法.
//所以线程在这里只会阻塞很短的时间(订阅成功即唤醒, 并不代表已经解锁)
subscribe().awaitUninterruptibly();
try {
while (true) { //循环, 不断重试lock
if (leaseTime != -1) {
ttl = tryLockInner(leaseTime, unit);
} else {
ttl = tryLockInner(); //不多说了
}
// lock acquired
if (ttl == null) {
break;
}
// 这里才是真正的等待解锁消息, 收到解锁消息, 就唤醒, 然后尝试获取锁, 成功返回, 失败则阻塞在acquire().
// 收到订阅成功消息, 则唤醒阻塞上面的subscribe().awaitUninterruptibly();
// 收到解锁消息, 则唤醒阻塞在下面的entry.getLatch().acquire();
RedissonLockEntry entry = ENTRIES.get(getEntryName());
if (ttl >= 0) {
//等待订阅的消息,ttl时间后超时,表示锁被自动删除
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
//锁没有设置超时情况,需要一直等到有消息为止。
entry.getLatch().acquire();
}
}
} finally {
unsubscribe(); //加锁成功或异常,解除订阅
}
}
下面是解锁的代码:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81public void unlock() {
connectionManager.write(getName(), new SyncOperation<Object, Void>() {
@Override
public Void execute(RedisConnection<Object, Object> connection) {
LockValue lock = (LockValue) connection.get(getName());
if (lock != null) {
LockValue currentLock = new LockValue(id, Thread.currentThread().getId());
if (lock.equals(currentLock)) {
if (lock.getCounter() > 1) {
lock.decCounter();
connection.set(getName(), lock);
} else {
unlock(connection);
}
} else {
throw new IllegalMonitorStateException("Attempt to unlock lock, not locked by current id: "
+ id + " thread-id: " + Thread.currentThread().getId());
}
} else {
// could be deleted
}
return null;
}
});
}
private void unlock(RedisConnection<Object, Object> connection) {
int counter = 0;
//尝试5次解锁,不成功则抛出异常
while (counter < 5) {
connection.multi();
connection.del(getName());
connection.publish(getChannelName(), unlockMessage);
List<Object> res = connection.exec();
if (res.size() == 2) {
return;
}
counter++;
}
throw new IllegalStateException("Can't unlock lock after 5 attempts. Current id: "
+ id + " thread-id: " + Thread.currentThread().getId());
}