原文:Java Coding Problems
协议:CC BY-NC-SA 4.0
贡献者:飞龙
本文来自【ApacheCN Java 谷歌翻译用谷歌翻译。
本章包括涉及 Java 并发的 13 一个问题,涉及 Fork/Join 框架、CompletableFuture
、ReentrantLock
、ReentrantReadWriteLock
、StampedLock
、原子变量、任务取消、可中断方法、局部线程、死锁等。并发性是任何开发人员的必要主题之一,在工作面试中不容忽视。这就是为什么这一章和最后一章如此重要。阅读本章后,您将对并发性有相当大的了解,这是每一个 Java 开发人员都需要。
问题
使用以下问题来测试您的并发编程能力。在使用解决方案和下载示例程序之前,我强烈建议您尝试每个问题:
- :写一个程序,举例说明处理中断方法的最佳方法。
- :写一个依赖 Fork/Join 框架对列表元素求和的程序。写一个依赖 Fork/Join 为了计算给定位置的斐波那契数(例如,
F(12) = 144
)。另外,写一个程序来举例说明CountedCompleter
的用法。 - :编程,将 Fork/Join 框架应用于一组相互依存的任务,只需执行一次(例如任务 D 依赖于任务 C 和任务 B,但任务 C 依赖于任务 B 所以任务 B 只能执行一次,不能执行两次。
- :通过
CompletableFuture
写几个代码片段来说明异步代码。 - :写几个代码例子来解释组合
CompletableFuture
对象的不同解决方案。 - :写一个概念证明的例子
onSpinWait()
优化忙等待技术。 - :写一个概念证明,举例说明如何使用
volatile
变量保存过程的取消状态。 ThreadLocal
:写一个概念证明,举例说明ThreadLocal
的用法。- :采用多线程应用(
Runnable
编写一个从 1 到 1000000 整数计数程序。 - :编写程序,使用
ReentrantLock
将整数从 1 递增到 1000000。 - :通过
ReentrantReadWriteLock
编写模拟读写过程编排的程序。 - :通过
StampedLock
编写模拟读写过程编排的程序。 - :写一个程序,揭示和解决著名餐饮哲学家问题中可能出现的死锁(循环等待或致命拥抱)。
以下各节介绍上述问题的解决方案。记住,通常没有一个正确的方法来解决一个特定的问题。另外,请记住,这里显示的解释仅包括解决问题所需的最有趣和最重要的细节。下载示例解决方案以查看更多详细信息,并在这个页面中试用程序。
213 可中断方法
所谓的中断方法是指可以抛出InterruptedException
例如Thread.sleep()
、BlockingQueue.take()
、BlockingQueue.poll(long timeout, TimeUnit unit)
等等、或如果状态中断,试着尽快抛出这种方法InterruptedException
。
因为InterruptedException
这是一个异常的检查,所以我们必须捕获它和/或抛出它。换句话说,如果我们调用抛出的方法InterruptedException
如果是这样,我们必须准备好处理这种异常。如果我们能把它扔掉(把异常传播给调用器),那就不再是我们的工作了。打电话的人必须进一步处理。因此,当我们必须抓住它时,让我们专注于这个案例。当我们的代码在那里时Runnable
这种情况发生在内部运行时,因为它不能抛出异常。
让我们从一个简单的例子开始。试着通过poll(long timeout, TimeUnit unit)
从BlockingQueue
获得元素可写为:
try {
queue.poll(3000, TimeUnit.MILLISECONDS); } catch (InterruptedException ex) {
... logger.info(() -> "Thread is interrupted? " Thread.currentThread().isInterrupted()); }
尝试轮询队列中的元素可能会导致InterruptedException
。有一个 3000 毫秒的窗口可以中断线程。在中断的情况下(例如,Thread.interrupt()
),我们可能会认为调用catch
块中的Thread.currentThread().isInterrupted()
将返回true
。毕竟,我们处在一个InterruptedException catch
街区,所以相信这一点是有道理的。实际上,它会返回false
,答案在poll(long timeout, TimeUnit unit)
方法的源代码中,如下所示:
1: public E poll(long timeout, TimeUnit unit)
throws InterruptedException {
2: E e = xfer(null, false, TIMED, unit.toNanos(timeout));
3: if (e != null || !Thread.interrupted())
4: return e;
5: throw new InterruptedException();
6: }
更准确地说,答案在第 3 行。如果线程被中断,那么Thread.interrupted()
将返回true
,并将导致第 5 行(throw new InterruptedException()
。但是除了测试之外,如果当前线程被中断,Thread.interrupted()
清除线程的中断状态。请查看以下连续调用中断线程:
Thread.currentThread().isInterrupted(); // true
Thread.interrupted() // true
Thread.currentThread().isInterrupted(); // false
Thread.interrupted() // false
注意,Thread.currentThread().isInterrupted()
测试这个线程是否被中断,而不影响中断状态。
现在,让我们回到我们的案子。所以,我们知道线程在捕捉到InterruptedException
后就中断了,但是中断状态被Thread.interrupted()
清除了。这也意味着我们代码的调用者不会意识到中断。
我们有责任成为好公民,通过调用interrupt()
方法恢复中断。这样,我们代码的调用者就可以看到发出了中断,并相应地采取行动。正确的代码如下:
try {
queue.poll(3000, TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
...
Thread.currentThread().interrupt(); // restore interrupt
}
根据经验,在捕捉到InterruptedException
之后,不要忘记通过调用Thread.currentThread().interrupt()
来恢复中断。
让我们来解决一个突出显示忘记恢复中断的问题。假设一个Runnable
只要当前线程没有中断就可以运行(例如,while (!Thread.currentThread().isInterrupted()) { ... }
。
在每次迭代中,如果当前线程中断状态为false
,那么我们尝试从BlockingQueue
中获取一个元素。
实现代码如下:
Thread thread = new Thread(() -> {
// some dummy queue
TransferQueue<String> queue = new LinkedTransferQueue<>();
while (!Thread.currentThread().isInterrupted()) {
try {
logger.info(() -> "For 3 seconds the thread "
+ Thread.currentThread().getName()
+ " will try to poll an element from queue ...");
queue.poll(3000, TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
logger.severe(() -> "InterruptedException! The thread "
+ Thread.currentThread().getName() + " was interrupted!");
Thread.currentThread().interrupt();
}
}
logger.info(() -> "The execution was stopped!");
});
作为调用者(另一个线程),我们启动上面的线程,睡眠 1.5 秒,只是给这个线程时间进入poll()
方法,然后我们中断它。如下代码所示:
thread.start();
Thread.sleep(1500);
thread.interrupt();
这将导致InterruptedException
。
记录异常并恢复中断。
下一步,while
计算Thread.currentThread().isInterrupted()
到false
并退出。
因此,输出如下:
[18:02:43] [INFO] For 3 seconds the thread Thread-0
will try to poll an element from queue ...
[18:02:44] [SEVERE] InterruptedException!
The thread Thread-0 was interrupted!
[18:02:45] [INFO] The execution was stopped!
现在,让我们对恢复中断的行进行注释:
...
} catch (InterruptedException ex) {
logger.severe(() -> "InterruptedException! The thread "
+ Thread.currentThread().getName() + " was interrupted!");
// notice that the below line is commented
// Thread.currentThread().interrupt();
}
...
这一次,while
块将永远运行,因为它的保护条件总是被求值为true
。
代码不能作用于中断,因此输出如下:
[18:05:47] [INFO] For 3 seconds the thread Thread-0
will try to poll an element from queue ...
[18:05:48] [SEVERE] InterruptedException!
The thread Thread-0 was interrupted!
[18:05:48] [INFO] For 3 seconds the thread Thread-0
will try to poll an element from queue ...
...
根据经验,当我们可以接受中断(而不是恢复中断)时,唯一可以接受的情况是我们可以控制整个调用栈(例如,extend Thread
)。
否则,捕获的InterruptedException
也应该包含Thread.currentThread().interrupt()
。
214 Fork/Join 框架
我们已经在“工作线程池”一节中介绍了 Fork/Join 框架。
Fork/Join 框架主要用于处理一个大任务(通常,通过大,我们可以理解大量的数据)并递归地将其拆分为可以并行执行的小任务(子任务)。最后,在完成所有子任务后,它们的结果将合并(合并)为一个结果。
下图是 Fork/Join 流的可视化表示:
在 API 方面,可以通过java.util.concurrent.ForkJoinPool
创建叉/连接。
JDK8 之前,推荐的方法依赖于public static
变量,如下所示:
public static ForkJoinPool forkJoinPool = new ForkJoinPool();
从 JDK8 开始,我们可以按如下方式进行:
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
这两种方法都避免了在单个 JVM 上有太多池线程这一令人不快的情况,这是由创建它们自己的池的并行操作造成的。
对于自定义的ForkJoinPool
,依赖于此类的构造器。JDK9 添加了迄今为止最全面的一个(详细信息见文档)。
AForkJoinPool
对象操作任务。ForkJoinPool
中执行的任务的基本类型为ForkJoinTask<V>
。更确切地说,执行以下任务:
RecursiveAction
对于void
任务RecursiveTask<V>
对于返回值的任务CountedCompleter<T>
对于需要记住挂起任务计数的任务
这三种类型的任务都有一个名为compute()
的抽象方法,在这个方法中任务的逻辑是成形的。
向ForkJoinPool
提交任务可以通过以下方式完成:
execute()
和submit()
invoke()
派生任务并等待结果invokeAll()
用于分叉一堆任务(例如,集合)fork()
用于安排在池中异步执行此任务,join()
用于在完成时返回计算结果
让我们从一个通过RecursiveTask
解决的问题开始。
通过RecursiveTask
计算总和
为了演示框架的分叉行为,我们假设我们有一个数字列表,并且我们要计算这些数字的总和。为此,我们使用createSubtasks()
方法递归地拆分(派生)这个列表,只要它大于指定的THRESHOLD
。每个任务都被添加到List<SumRecursiveTask>
中。最后通过invokeAll(Collection<T> tasks)
方式将该列表提交给ForkJoinPool
。这是使用以下代码完成的:
public class SumRecursiveTask extends RecursiveTask<Integer> {
private static final Logger logger
= Logger.getLogger(SumRecursiveTask.class.getName());
private static final int THRESHOLD = 10;
private final List<Integer> worklist;
public SumRecursiveTask(List<Integer> worklist) {
this.worklist = worklist;
}
@Override
protected Integer compute() {
if (worklist.size() <= THRESHOLD) {
return partialSum(worklist);
}
return ForkJoinTask.invokeAll(createSubtasks())
.stream()
.mapToInt(ForkJoinTask::join)
.sum();
}
private List<SumRecursiveTask> createSubtasks() {
List<SumRecursiveTask> subtasks = new ArrayList<>();
int size = worklist.size();
List<Integer> worklistLeft
= worklist.subList(0, (size + 1) / 2);
List<Integer> worklistRight
= worklist.subList((size + 1) / 2, size);
subtasks.add(new SumRecursiveTask(worklistLeft));
subtasks.add(new SumRecursiveTask(worklistRight));
return subtasks;
}
private Integer partialSum(List<Integer> worklist) {
int sum = worklist.stream()
.mapToInt(e -> e)
.sum();
logger.info(() -> "Partial sum: " + worklist + " = "
+ sum + "\tThread: " + Thread.currentThread().getName());
return sum;
}
}
为了测试它,我们需要一个列表和ForkJoinPool
如下:
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
Random rnd = new Random();
List<Integer> list = new ArrayList<>();
for (int i = 0; i < 200; i++) {
list.add(1 + rnd.nextInt(10));
}
SumRecursiveTask sumRecursiveTask = new SumRecursiveTask(list);
Integer sumAll = forkJoinPool.invoke(sumRecursiveTask);
logger.info(() -> "Final sum: " + sumAll);
可能的输出如下:
...
[15:17:06] Partial sum: [1, 3, 6, 6, 2, 5, 9] = 32
ForkJoinPool.commonPool-worker-9
...
[15:17:06] Partial sum: [1, 9, 9, 8, 9, 5] = 41
ForkJoinPool.commonPool-worker-7
[15:17:06] Final sum: 1084
用递归运算计算斐波那契函数
斐波那契数通常表示为F(n)
,是一个遵循以下公式的序列:
F(0) = 0,
F(1) = 1,
...,
F(n) = F(n-1) + F(n-2), n > 1
斐波那契数的快照是:
0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, ...
通过RecursiveAction
实现斐波那契数可以如下完成:
public class FibonacciRecursiveAction extends RecursiveAction {
private static final Logger logger =
Logger.getLogger(FibonacciRecursiveAction.class.getName());
private static final long THRESHOLD = 5;
private long nr;
public FibonacciRecursiveAction(long nr) {
this.nr = nr;
}
@Override
protected void compute() {
final long n = nr;
if (n <= THRESHOLD) {
nr = fibonacci(n);
} else {
nr = ForkJoinTask.invokeAll(createSubtasks(n))
.stream()
.mapToLong(x -> x.fibonacciNumber())
.sum();
}
}
private List<FibonacciRecursiveAction> createSubtasks(long n) {
List<FibonacciRecursiveAction> subtasks = new ArrayList<>();
FibonacciRecursiveAction fibonacciMinusOne
= new FibonacciRecursiveAction(n - 1);
FibonacciRecursiveAction fibonacciMinusTwo
= new FibonacciRecursiveAction(n - 2);
subtasks.add(fibonacciMinusOne);
subtasks.add(fibonacciMinusTwo);
return subtasks;
}
private long fibonacci(long n) {
logger.info(() -> "Number: " + n
+ " Thread: " + Thread.currentThread().getName());
if (n <= 1) {
return n;
}
return fibonacci(n - 1) + fibonacci(n - 2);
}
public long fibonacciNumber() {
return nr;
}
}
为了测试它,我们需要以下ForkJoinPool
对象:
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
FibonacciRecursiveAction fibonacciRecursiveAction
= new FibonacciRecursiveAction(12);
forkJoinPool.invoke(fibonacciRecursiveAction);
logger.info(() -> "Fibonacci: "
+ fibonacciRecursiveAction.fibonacciNumber());
F(12)
的输出如下:
[15:40:46] Number: 5 Thread: ForkJoinPool.commonPool-worker-3 [15:40:46] Number: 5 Thread: ForkJoinPool.commonPool-worker-13 [15:40: