同步机制
任务的同步机制是任务之间为得到预期结果而进行的协调。
在并发应用程序中,有两种同步机制。
-
进程同步:
想要控制任务的执行顺序时,就可以使用这种同步。
例如,一个任务必须等待另一任务终止才开始执行。 -
数据同步:
当两个或多个任务访问同一内存对象时,可以使用这种同步。
在这种情况下,必须保护写入操作对该对象的访问权限。
如果不这样做,就会出现数据竞争条件,一个程序的最终结果在每次执行时都不同。
Java 并发 API 提供了多种机制,让你可以实现上述两种类型的同步。
synchronized
Java 语言提供的最基本的同步机制是 synchronized 关键字。
该关键字可应用于某个方法或者某个代码块。
对于第一种情况,一次只有一个线程可以执行该方法。
对于第二种情况,要指定一个对某个对象的引用。
在这种情况下,同时只能执行被某一对象保护的一个代码块。
其他同步机制
Java 也提供了其他一些同步机制。
-
Lock 接口及其实现类:
该机制允许你实现一个临界段,保证只有一个线程执行该代码块。 - Semaphore 类
实现了由 Edsger Dijkstra 提出的著名的信号量同步机制。 - CountDownLatch
允许你实现这样的场景:一个或多个线程等待其他线程结束。 - CyclicBarrier
允许你将不同的任务同步到某个共同的节点。 - Phaser 类
允许你分为多个阶段实现并发任务。 - Exchanger
允许你在两个线程之间实现一个数据交换点。 - CompletableFuture
是Java 8的新特性,它扩展了执行器任务的 Future 机制,以一种异步方式生成任务的结果。
可以指定任务在结果生成之后执行,这样就可以控制任务的执行顺序。
详细介绍
CommonTask 类
CommonTask 类将在随机的一段时间(0 到 10 秒)内将调用线程休眠。
Lock 接口
Lock 接口 基本实现类是 ReentrantLock 类。
Semaphore 类
信号量机制用于控制对一个或多个共享资源的访问。
该机制基于一个内部计数器以及两个名为 wait() 和 signal() 的方法。
当一个线程调用了 wait() 方法时:
-
如果内部计数器的值大于 0,那么信号量对内部计数器做递减操作,并且该线程获得对该共享资源的访问。
-
如果内部计数器的值为 0,那么线程将被阻塞,直到某个线程调用 singal() 方法为止。
当一个线程调用了 signal() 方法时,信号量将会检查是否有某些线程处于等待状态(它们已经调用了 wait() 方法)。
-
如果没有线程等待,它将对内部计数器做递增操作。
-
如果有线程在等待信号量,就获取这其中的一个线程,该线程的 wait() 方法结束返回并且访问共享资源。其他线程将继续等待,直到轮到自己为止。
在 Java 中,信号量在 Semaphore 类中实现。
wait() 方法被称作 acquire() ,而 signal() 方法被称作 release() 。
CountDownLatch 类
CountDownLatch类提供了一种等待一个或多个并发任务完成的机制。
它有一个内部计数器,必须使用要等待的任务数初始化。
然后, await() 方法休眠调用线程,直到内部计数器为 0,并且使用 countDown()方法对该内部计数器做递减操作。
CyclicBarrier 类
CyclicBarrier类允许将一些任务同步到某个共同点。
所有的任务都在该点等待,直到任务全部到达该点为止。
从内部来看,该类还管理了一个内部计数器,用于记录尚未到达该点的任务。
当一个任务到达指定点时,它要执行 await() 方法以等待其他任务。
当所有任务都到达时, CyclicBarrier 对象将它们唤醒,这样就能够继续执行。
当所有的参与方都到达后,该类允许执行另一个任务。
为了实现这一点,要在该对象的构造函数中指定一个 Runnable 对象。
CompletableFuture 类
与 Future 接口相同, CompletableFuture 也必须采用操作要返回的结果类型进行参数化。
和 Future 对象一样, CompletableFuture 类表示的是异步计算的结果,只不过 CompletableFuture 的结果可以由任意线程确立。
当计算正常结束时,该类采用 complete() 方法确定结果,而当计算出现异常时,则采用 completeExceptionally() 方法。
如果两个或者多个线程调用同一 CompletableFuture 的 complete() 方法或 completeExceptionally() 方法,那么只有第一个调用会起作用。
该类提供了大量方法,允许通过实现一个事件驱动的模型组织任务的执行顺序,一个任务只有在其之前的任务完成之后才会开始。
这其中包括如下方法。
-
thenApplyAsync() :
该方法接收 Function 接口的一个实现(可以表示为一个 lambda 表达式)作为参数。
该函数将在调用 CompletableFuture 完成后执行。
该方法将返回CompletableFuture 以获得 Fuction 的结果。 - thenComposeAsync() :
该方法和 thenApplyAsync() 方法相似,但是当供给函数也返回CompletableFuture 时很有用。 - thenAcceptAsync() :
该方法和前一个方法相似,只不过其参数是 Consumer 接口的一个实现(也可以描述为一个 lambda 表达式);在这种情况下,计算不会返回结果。 - thenRunAsync() :
该方法和前一个等价,只不过在这种情况下接收一个 Runnable 对象作为参数。 - thenCombineAsync() :
该方法接收两个参数。第一个参数为另一个 CompletableFuture 实例,另一个参数是 BiFunction 接口的一个实现(可描述为一个 lambda函数)。
该 BiFunction接口实现将在两个 CompletableFuture (当前调用的和参数中的)都完成后执行。
该方法将返回 CompletableFuture 以获取 BiFunction 的结果。 - runAfterBothAsync() :
该方法接收两个参数。第一个参数为另一个 CompletableFuture ,而第二个参数为 Runnable 接口的一个实现,它将在两个 CompletableFuture (当前调用的和参数中的)都完成后执行。 - runAfterEitherAsync() :
该方法与前一个方法等价,只不过当其中一个 CompletableFuture 对象完成之后才会执行 Runnable 任务。 - allOf() :
该方法接收 CompletableFuture 对象的一个变量列表作为参数。
它将返回一个CompletableFuture对象,而该对象将在所有的 CompletableFuture 对象都完成之后返回其结果。 - anyOf() :
该方法和前一个方法等价,只是返回的 CompletableFuture 对象会在其中一个 CompletableFuture 对象完成之后返回其结果。 - defaultExecutor() :
该方法用于返回并不接收 Executor 作为参数的那些异步操作的默认执行器。
通常,它将是 ForkJoinPool.commonPool() 方法的返回值。 - copy() :
该方法创建 CompletableFuture 对象的一个副本。
如果原来的 CompletableFuture 正常完成,则副本方法也将正常完成并返回相同的值。
如果原来的 CompletableFuture 异常完成,则副本方法也异常完成,并且抛出 CompletionException 异常。 - completeAsync() :
该方法接收一个 Supplier 对象作为参数(还可以选择 Executor )。
借助 Supplier 的结果完成 CompletableFuture 。 - orTimeout() :
该方法接收一段时延(一段时间和一个 TimeUnit )。
如果 CompletableFuture 在这段时间之后没有完成,那么抛出 TimeoutException 异常并异常完成。 - completeOnTimeout() :
该方法与上一个方法相似,只不过它在作为参数的值的范围内正常完成。 - delayedExecutor() :
该方法返回一个 Executor ,该执行器在执行指定时延之后执行某一任务。
如果想要获取 CompletableFuture 返回的结果,可以使用 get() 方法或者 join() 方法。
这两个方法都会阻塞调用线程,直到 CompletableFuture 完成之后返回其结果。
这两个方法之间的主要区别在于:
-
get() 方法抛出 ExecutionException (这是一个校验异常)
-
join() 方法抛出RuntimeException (这是一个未校验异常)