ReentrantLock(可重入锁)
-
ReentrantLock与sync的区别
- ReentrantLock的底层是CAS,sync是锁升级的
- trylock
- lockInterruptibly
- 公平和非公平
-
ReentrantLock与sync的相同点
- 均是可重入的
- ReentrantLock可以替代sync完成相同的功能
需要注意的是,必须要必须要必须要手动释放锁,使用syn锁定的话如果遇到异常,jvm会自动释放锁,但是lock必须手动释放锁,因此经常在finally中进行锁的释放
Lock lock = new ReentrantLock();
void m1() {
try {
lock.lock(); //synchronized(this)
for (int i = 0; i < 10; i++) {
TimeUnit.SECONDS.sleep(1);
System.out.println(i);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
使用tryLock进行尝试锁定,不管锁定与否,方法都将继续执行
可以根据tryLock的返回值来判定是否锁定
也可以指定tryLock的时间,由于tryLock(time)抛出异常,所以要注意unclock的处理,必须放到finally中
void m2() {
boolean locked = false;
try {
locked = lock.tryLock(5, TimeUnit.SECONDS);
System.out.println("m2 ..." + locked);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (locked) lock.unlock();
}
}
使用ReentrantLock还可以调用lockInterruptibly方法,可以对线程interrupt方法做出响应, 在一个线程等待锁的过程中,可以被打断
lock.lockInterruptibly();
t2.interrupt(); //打断线程2的等待
ReentrantLock还可以指定为公平锁
private static ReentrantLock lock = new ReentrantLock(true);
- 公平锁与非公平锁
1、若在释放锁的时候总是没有新的兔子来打扰,则非公平锁等于公平锁;
2、若释放锁的时候,正好一个兔子来喝水,而此时位于队列头的兔子还没有被唤醒(因为线程上下文切换是需要不少开销的),此时后来的兔子则优先获得锁,成功打破公平,成为非公平锁;
其实对于非公平锁,只要线程进入了等待队列,队列里面依然是FIFO的原则,跟公平锁的顺序是一样的。因为公平锁与非公平锁的release()部分代码是共用AQS的代码。
CountDownLatch(闭锁)
下面是一个测试将100个线程全部运行完的小程序,分别用CountDownLatch和join实现
public class T06_TestCountDownLatch {
public static void main(String[] args) {
usingJoin();
usingCountDownLatch();
}
private static void usingCountDownLatch() {
Thread[] threads = new Thread[100];
CountDownLatch latch = new CountDownLatch(threads.length);
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
int result = 0;
for (int j = 0; j < 10000; j++) result += j;
latch.countDown();
});
}
for (int i = 0; i < threads.length; i++) {
threads[i].start();
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end latch");
}
private static void usingJoin() {
Thread[] threads = new Thread[100];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
int result = 0;
for (int j = 0; j < 10000; j++) result += j;
});
}
for (int i = 0; i < threads.length; i++) {
threads[i].start();
}
for (int i = 0; i < threads.length; i++) {
try {
threads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("end join");
}
}
CyclicBarrier()
public static void main(String[] args) {
//CyclicBarrier barrier = new CyclicBarrier(20);
//构造方法,下例中满20人即可执行第二个参数指定的动作
CyclicBarrier barrier = new CyclicBarrier(20, () -> System.out.println("满人"));
/*CyclicBarrier barrier = new CyclicBarrier(20, new Runnable() {
@Override
public void run() {
System.out.println("满人,发车");
}
});*/
for (int i = 0; i < 100; i++) {
new Thread(() -> {
try {
barrier.await();//上车,满人之后才可以进行下面的程序
System.out.println(1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
Phaser
它与CountDownLatch非常相似,允许我们协调线程的执行。与CountDownLatch相比,它具有一些额外的功能。
Phaser是在线程动态数需要继续执行之前等待的屏障。在CountDownLatch中,该数字无法动态配置,需要在创建实例时提供。
public class T08_TestPhaser {
static Random r = new Random();
static MarriagePhaser phaser = new MarriagePhaser();
static void milliSleep(int milli) {
try {
TimeUnit.MILLISECONDS.sleep(milli);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
phaser.bulkRegister(5);
for (int i = 0; i < 5; i++) {
final int nameIndex = i;
new Thread(() -> {
Person p = new Person("person " + nameIndex);
p.arrive();
phaser.arriveAndAwaitAdvance();
p.eat();
phaser.arriveAndAwaitAdvance();
p.leave();
phaser.arriveAndAwaitAdvance();
}).start();
}
}
static class MarriagePhaser extends Phaser {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
switch (phase) {
case 0:
System.out.println("所有人到齐了!");
return false;
case 1:
System.out.println("所有人吃完了!");
return false;
case 2:
System.out.println("所有人离开了!");
System.out.println("婚礼结束!");
return true;
default:
return true;
}
}
}
static class Person {
String name;
public Person(String name) {
this.name = name;
}
public void arrive() {
milliSleep(r.nextInt(1000));
System.out.printf("%s 到达现场!\n", name);
}
public void eat() {
milliSleep(r.nextInt(1000));
System.out.printf("%s 吃完!\n", name);
}
public void leave() {
milliSleep(r.nextInt(1000));
System.out.printf("%s 离开!\n", name);
}
}
}
ReadWriteLock
readLock之间不影响,writeLock须等待readLock,readLock也须等待writeLock
- 只允许一个线程写入(其他线程既不能写入也不能读取);
- 没有写入时,多个线程允许同时读(提高性能)。
读 | 写 | |
---|---|---|
读 | 允许 | 不允许 |
写 | 不允许 | 不允许 |
public class T10_TestReadWriteLock {
static Lock lock = new ReentrantLock();
private static int value;
static ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
static Lock readLock = readWriteLock.readLock();
static Lock writeLock = readWriteLock.writeLock();
public static void read(Lock lock) {
try {
lock.lock();
Thread.sleep(1000);
System.out.println("read over!");
//模拟读取操作
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void write(Lock lock, int v) {
try {
lock.lock();
Thread.sleep(1000);
value = v;
System.out.println("write over!");
//模拟写操作
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
// Runnable readR = ()-> read(lock);
// Runnable writeR = ()->write(lock, new Random().nextInt());
Runnable readR = () -> read(readLock);
Runnable writeR = () -> write(writeLock, new Random().nextInt());
for (int i = 0; i < 18; i++) new Thread(readR).start();
for (int i = 0; i < 2; i++) new Thread(writeR).start();
}
}
StampedLock
前面介绍的ReadWriteLock可以解决多线程同时读,但只有一个线程能写的问题。
如果我们深入分析ReadWriteLock,会发现它有个潜在的问题:如果有线程正在读,写线程需要等待读线程释放锁后才能获取写锁,即读的过程中不允许写,这是一种悲观的读锁。
要进一步提升并发执行效率,Java 8引入了新的读写锁:StampedLock。
StampedLock和ReadWriteLock相比,改进之处在于:读的过程中也允许获取写锁后写入!这样一来,我们读的数据就可能不一致,所以,需要一点额外的代码来判断读的过程中是否有写入,这种读锁是一种乐观锁。
乐观锁的意思就是乐观地估计读的过程中大概率不会有写入,因此被称为乐观锁。反过来,悲观锁则是读的过程中拒绝有写入,也就是写入必须等待。显然乐观锁的并发效率更高,但一旦有小概率的写入导致读取的数据不一致,需要能检测出来,再读一遍就行。
public class Point {
private final StampedLock stampedLock = new StampedLock();
private double x;
private double y;
public void move(double deltaX, double deltaY) {
long stamp = stampedLock.writeLock(); // 获取写锁
try {
x += deltaX;
y += deltaY;
} finally {
stampedLock.unlockWrite(stamp); // 释放写锁
}
}
public double distanceFromOrigin() {
long stamp = stampedLock.tryOptimisticRead(); // 获得一个乐观读锁
// 注意下面两行代码不是原子操作
// 假设x,y = (100,200)
double currentX = x;
// 此处已读取到x=100,但x,y可能被写线程修改为(300,400)
double currentY = y;
// 此处已读取到y,如果没有写入,读取是正确的(100,200)
// 如果有写入,读取是错误的(100,400)
if (!stampedLock.validate(stamp)) { // 检查乐观读锁后是否有其他写锁发生
stamp = stampedLock.readLock(); // 获取一个悲观读锁
try {
currentX = x;
currentY = y;
} finally {
stampedLock.unlockRead(stamp); // 释放悲观读锁
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
}
和ReadWriteLock相比,写入的加锁是完全一样的,不同的是读取。注意到首先我们通过tryOptimisticRead()获取一个乐观读锁,并返回版本号。接着进行读取,读取完成后,我们通过validate()去验证版本号,如果在读取过程中没有写入,版本号不变,验证成功,我们就可以放心地继续后续操作。如果在读取过程中有写入,版本号会发生变化,验证将失败。在失败的时候,我们再通过获取悲观读锁再次读取。由于写入的概率不高,程序在绝大部分情况下可以通过乐观读锁获取数据,极少数情况下使用悲观读锁获取数据。
可见,StampedLock把读锁细分为乐观读和悲观读,能进一步提升并发效率。但这也是有代价的:一是代码更加复杂,二是StampedLock是不可重入锁,不能在一个线程中反复获取同一个锁。
StampedLock还提供了更复杂的将悲观读锁升级为写锁的功能,它主要使用在if-then-update的场景:即先读,如果读的数据满足条件,就返回,如果读的数据不满足条件,再尝试写。
Semaphore
Semaphore(信号灯)用于限制可以访问某些资源的线程数目,acquire()用于请求执行的灯,若没有信号灯则不能执行;release()用来释放信号灯。
public class T11_TestSemaphore {
public static void main(String[] args) {
//Semaphore s = new Semaphore(2);
Semaphore s = new Semaphore(2, true);
//允许一个线程同时执行
//Semaphore s = new Semaphore(1);
new Thread(() -> {
try {
s.acquire();
System.out.println("T1 running...");
Thread.sleep(200);
System.out.println("T1 running...");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
s.release();
}
}).start();
new Thread(() -> {
try {
s.acquire();
System.out.println("T2 running...");
Thread.sleep(200);
System.out.println("T2 running...");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
s.release();
}
}).start();
}
}
Exchanger
Exchanger类可用于两个线程之间交换信息。可简单地将Exchanger对象理解为一个包含两个格子的容器,通过exchanger方法可以向两个格子中填充信息。当两个格子中的均被填充时,该对象会自动将两个格子的信息交换,然后返回给线程,从而实现两个线程的信息交换。
public class T12_TestExchanger {
static Exchanger<String> exchanger = new Exchanger<>();
public static void main(String[] args) {
new Thread(() -> {
String s = "T1";
try {
s = exchanger.exchange(s);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " " + s);
}, "t1").start();
new Thread(() -> {
String s = "T2";
try {
s = exchanger.exchange(s);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " " + s);
}, "t2").start();
}
}
LockSupport
LockSupport定义了一组以park开头的方法来阻塞当前线程,unpark来唤醒被阻塞的线程。
public class T13_TestLockSupport {
public static void main(String[] args) {
Thread t = new Thread(() -> {
for (int i = 0; i < 10; i++) {
System.out.println(i);
if (i == 5) {
LockSupport.park();//阻塞
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t.start();
try {
TimeUnit.SECONDS.sleep(8);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("after 8 senconds!");
LockSupport.unpark(t);//解除阻塞
}
}
Comments | 0 条评论