Semaphore
Semaphore是一个控制访问多个共享资源的计数器。当计数器值大于0,代表还有可用资源,线程可以继续访问和使用资源;当计数器的值等于0,代表暂无可用资源,线程必须等待资源的释放。
一个典型的例子就是,有多台打印机,当新的打印任务来时,将检测是否还有可用的打印机,若有则使用,并将可用打印机数量减1;若无则等待;当使用完毕后,释放打印机,将可用打印机数量加1。
使用示例
1 2 3 4 5 6 7 8 9 10 11 12 13 |
public class SemaphoreTest { private final Semaphore semaphore = new Semaphore(4); public void print() { try { semaphore.acquire(); //do something } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); } } } |
- 同Lock一样,Semaphore也可开启公平机制,
Semaphore(int permits, boolean fair) 。 acquireUninterruptibly() ,与acquire() 的区别是:线程中断不会抛出异常tryAcquire() ,尝试获取semaphore。如果成功,返回true。如果不成功,返回false值,并不会被阻塞和等待semaphore的释放。
CountDownLatch
CountDownLatch,用于等待多个并发事件完成,相当于一个倒数计数器,初始化时传入一个整数参数,调用方法countDown()减少计数,当计数为0时,解除await()的休眠。
使用示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
public class CountDownLatchTest implements Runnable{ private final CountDownLatch controller = new CountDownLatch(4); public void assemble() { System.out.println("come one"); controller.countDown(); } @Override public void run() { try { System.out.println("wait " + controller.getCount()); controller.await(); System.out.println("get all"); } catch (InterruptedException e) { e.printStackTrace(); } } } |
CountDownLatch的计数器一旦初始化就无法更改。
CyclicBarrier
CyclicBarrier,用于在同一个点同步任务,与CountDownLatch类似,但CyclicBarrier可以在计数器等于0时执行初始化时传递的Runnable对象。
使用示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
public class CyclicBarrierTest implements Runnable { private final CyclicBarrier barrier = new CyclicBarrier(4, new After()); @Override public void run() { try { System.out.println("wait"); barrier.await(); System.out.println("pass"); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } } class After implements Runnable { @Override public void run() { System.out.println("tear down"); } } |
CyclicBarrier的计数器可以重置,
Phaser
Phaser,用于阶段性并发任务的同步。CyclicBarrier可以在一个点同步任务,当需要在运行过程中多个点同步线程,则可以用Phaser来处理。
使用示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
public class PhaserTest implements Runnable { private Phaser phaser; public PhaserTest(Phaser phaser) { this.phaser = phaser; } @Override public void run() { try { Long duration = (long)(Math.random()*10000); Thread.sleep(duration); System.out.println("1"); } catch (InterruptedException e) { e.printStackTrace(); } phaser.arriveAndAwaitAdvance(); try { Long duration = (long)(Math.random()*10000); Thread.sleep(duration); System.out.println("2"); } catch (InterruptedException e) { e.printStackTrace(); } phaser.arriveAndDeregister(); } } class Main { public static void main(String[] args) { Phaser phaser = new Phaser(2); PhaserTest t1 = new PhaserTest(phaser); PhaserTest t2 = new PhaserTest(phaser); Thread n1 = new Thread(t1); Thread n2 = new Thread(t2); n1.start(); n2.start(); try { n1.join(); n2.join(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("end"); } } |
若没有Phaser来阶段性的同步,输出的结果可能是“1 1 2 2”、“1 2 1 2”。但加入Phaser来阶段性的同步后,输出结果一定是“1 1 2 2”。
Phaser提供每次phaser改变阶段都会执行的方法,onAdvance(),可以Override该方法来实现不同的业务逻辑。
使用示例
1 2 3 4 5 6 7 |
Phaser phaser = new Phaser(2){ @Override protected boolean onAdvance(int phase, int registeredParties) { System.out.println("p:"+phase); return super.onAdvance(phase, registeredParties); } }; |
再次运行上述程序,结果则变为“1 1 p0 2 2 p1 end”。
Exchanger
Exchanger,用于并发任务间交换数据,但只适用于1对1的交换。
使用示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
public class ExchangerTest implements Runnable { private final Exchanger<String> exchanger; private String value; public ExchangerTest(Exchanger<String> exchanger, String value) { this.exchanger = exchanger; this.value = value; } @Override public void run() { System.out.println(Thread.currentThread().getName() + " Before: " + value); try { value = exchanger.exchange(value); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " After: " + value); } public static void main(String[] args) { Exchanger<String> exchanger = new Exchanger<String>(); ExchangerTest e1 = new ExchangerTest(exchanger, "11111"); ExchangerTest e2 = new ExchangerTest(exchanger, "22222"); Thread t1 = new Thread(e1, "t1"); Thread t2 = new Thread(e2, "t2"); t1.start(); t2.start(); } } |
运行结果
t1 Before: 11111
t2 Before: 22222
t2 After: 11111
t1 After: 22222