资讯详情

Java 编程问题:十一、并发-深入探索

原文:Java Coding Problems

协议:CC BY-NC-SA 4.0

贡献者:飞龙

本文来自【ApacheCN Java 谷歌翻译用谷歌翻译。

本章包括涉及 Java 并发的 13 一个问题,涉及 Fork/Join 框架、CompletableFutureReentrantLockReentrantReadWriteLockStampedLock、原子变量、任务取消、可中断方法、局部线程、死等。并发性是任何开发人员的必要主题之一,在工作面试中不容忽视。这就是为什么这一章和最后一章如此重要。阅读本章后,您将对并发性有相当大的了解,这是每一个 Java 开发人员都需要。

问题

使用以下问题来测试您的并发编程能力。在使用解决方案和下载示例程序之前,我强烈建议您尝试每个问题:

  1. :写一个程序,举例说明处理中断方法的最佳方法。
  2. :写一个依赖 Fork/Join 框架对列表元素求和的程序。写一个依赖 Fork/Join 为了计算给定位置的斐波那契数(例如,F(12) = 144)。另外,写一个程序来举例说明CountedCompleter的用法。
  3. :编程,将 Fork/Join 框架应用于一组相互依存的任务,只需执行一次(例如任务 D 依赖于任务 C任务 B,但任务 C 依赖于任务 B 所以任务 B 只能执行一次,不能执行两次。
  4. :通过CompletableFuture写几个代码片段来说明异步代码。
  5. :写几个代码例子来解释组合CompletableFuture对象的不同解决方案。
  6. :写一个概念证明的例子onSpinWait()优化忙等待技术。
  7. :写一个概念证明,举例说明如何使用volatile变量保存过程的取消状态。
  8. ThreadLocal:写一个概念证明,举例说明ThreadLocal的用法。
  9. :采用多线程应用(Runnable编写一个从 1 到 1000000 整数计数程序。
  10. :编写程序,使用ReentrantLock将整数从 1 递增到 1000000。
  11. :通过ReentrantReadWriteLock编写模拟读写过程编排的程序。
  12. :通过StampedLock编写模拟读写过程编排的程序。
  13. :写一个程序,揭示和解决著名餐饮哲学家问题中可能出现的死锁(循环等待致命拥抱)。

以下各节介绍上述问题的解决方案。记住,通常没有一个正确的方法来解决一个特定的问题。另外,请记住,这里显示的解释仅包括解决问题所需的最有趣和最重要的细节。下载示例解决方案以查看更多详细信息,并在这个页面中试用程序。

213 可中断方法

所谓的中断方法是指可以抛出InterruptedException例如Thread.sleep()BlockingQueue.take()BlockingQueue.poll(long timeout, TimeUnit unit)等等如果状态中断,试着尽快抛出这种方法InterruptedException

因为InterruptedException这是一个异常的检查,所以我们必须捕获它和/或抛出它。换句话说,如果我们调用抛出的方法InterruptedException如果是这样,我们必须准备好处理这种异常。如果我们能把它扔掉(把异常传播给调用器),那就不再是我们的工作了。打电话的人必须进一步处理。因此,当我们必须抓住它时,让我们专注于这个案例。当我们的代码在那里时Runnable这种情况发生在内部运行时,因为它不能抛出异常。

让我们从一个简单的例子开始。试着通过poll(long timeout, TimeUnit unit)BlockingQueue获得元素可写为:

try { 
           queue.poll(3000, TimeUnit.MILLISECONDS); } catch (InterruptedException ex) { 
           ...   logger.info(() -> "Thread is interrupted? "       Thread.currentThread().isInterrupted()); }

尝试轮询队列中的元素可能会导致InterruptedException。有一个 3000 毫秒的窗口可以中断线程。在中断的情况下(例如,Thread.interrupt()),我们可能会认为调用catch块中的Thread.currentThread().isInterrupted()将返回true。毕竟,我们处在一个InterruptedException catch街区,所以相信这一点是有道理的。实际上,它会返回false,答案在poll(long timeout, TimeUnit unit)方法的源代码中,如下所示:

1: public E poll(long timeout, TimeUnit unit) 
       throws InterruptedException { 
        
2:   E e = xfer(null, false, TIMED, unit.toNanos(timeout));
3:   if (e != null || !Thread.interrupted())
4:     return e;
5:   throw new InterruptedException();
6: }

更准确地说,答案在第 3 行。如果线程被中断,那么Thread.interrupted()将返回true,并将导致第 5 行(throw new InterruptedException()。但是除了测试之外,如果当前线程被中断,Thread.interrupted()清除线程的中断状态。请查看以下连续调用中断线程:

Thread.currentThread().isInterrupted(); // true
Thread.interrupted() // true
Thread.currentThread().isInterrupted(); // false
Thread.interrupted() // false

注意,Thread.currentThread().isInterrupted()测试这个线程是否被中断,而不影响中断状态。

现在,让我们回到我们的案子。所以,我们知道线程在捕捉到InterruptedException后就中断了,但是中断状态被Thread.interrupted()清除了。这也意味着我们代码的调用者不会意识到中断。

我们有责任成为好公民,通过调用interrupt()方法恢复中断。这样,我们代码的调用者就可以看到发出了中断,并相应地采取行动。正确的代码如下:

try { 
        
  queue.poll(3000, TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) { 
        
  ...
  Thread.currentThread().interrupt(); // restore interrupt
}

根据经验,在捕捉到InterruptedException之后,不要忘记通过调用Thread.currentThread().interrupt()来恢复中断。

让我们来解决一个突出显示忘记恢复中断的问题。假设一个Runnable只要当前线程没有中断就可以运行(例如,while (!Thread.currentThread().isInterrupted()) { ... }

在每次迭代中,如果当前线程中断状态为false,那么我们尝试从BlockingQueue中获取一个元素。

实现代码如下:

Thread thread = new Thread(() -> { 
        

  // some dummy queue
  TransferQueue<String> queue = new LinkedTransferQueue<>();

  while (!Thread.currentThread().isInterrupted()) { 
        
    try { 
        
      logger.info(() -> "For 3 seconds the thread " 
        + Thread.currentThread().getName() 
        + " will try to poll an element from queue ...");

      queue.poll(3000, TimeUnit.MILLISECONDS);
    } catch (InterruptedException ex) { 
        
      logger.severe(() -> "InterruptedException! The thread "
        + Thread.currentThread().getName() + " was interrupted!");
      Thread.currentThread().interrupt();
    }
  }

  logger.info(() -> "The execution was stopped!");
});

作为调用者(另一个线程),我们启动上面的线程,睡眠 1.5 秒,只是给这个线程时间进入poll()方法,然后我们中断它。如下代码所示:

thread.start();
Thread.sleep(1500);
thread.interrupt();

这将导致InterruptedException

记录异常并恢复中断。

下一步,while计算Thread.currentThread().isInterrupted()false并退出。

因此,输出如下:

[18:02:43] [INFO] For 3 seconds the thread Thread-0
                  will try to poll an element from queue ...

[18:02:44] [SEVERE] InterruptedException!
                    The thread Thread-0 was interrupted!

[18:02:45] [INFO] The execution was stopped!

现在,让我们对恢复中断的行进行注释:

...
} catch (InterruptedException ex) { 
        
  logger.severe(() -> "InterruptedException! The thread " 
    + Thread.currentThread().getName() + " was interrupted!");

  // notice that the below line is commented
  // Thread.currentThread().interrupt();
}
...

这一次,while块将永远运行,因为它的保护条件总是被求值为true

代码不能作用于中断,因此输出如下:

[18:05:47] [INFO] For 3 seconds the thread Thread-0
                  will try to poll an element from queue ...

[18:05:48] [SEVERE] InterruptedException!
                    The thread Thread-0 was interrupted!

[18:05:48] [INFO] For 3 seconds the thread Thread-0
                  will try to poll an element from queue ...
...

根据经验,当我们可以接受中断(而不是恢复中断)时,唯一可以接受的情况是我们可以控制整个调用栈(例如,extend Thread)。

否则,捕获的InterruptedException也应该包含Thread.currentThread().interrupt()

214 Fork/Join 框架

我们已经在“工作线程池”一节中介绍了 Fork/Join 框架。

Fork/Join 框架主要用于处理一个大任务(通常,通过大,我们可以理解大量的数据)并递归地将其拆分为可以并行执行的小任务(子任务)。最后,在完成所有子任务后,它们的结果将合并(合并)为一个结果。

下图是 Fork/Join 流的可视化表示:

在 API 方面,可以通过java.util.concurrent.ForkJoinPool创建叉/连接。

JDK8 之前,推荐的方法依赖于public static变量,如下所示:

public static ForkJoinPool forkJoinPool = new ForkJoinPool();

从 JDK8 开始,我们可以按如下方式进行:

ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();

这两种方法都避免了在单个 JVM 上有太多池线程这一令人不快的情况,这是由创建它们自己的池的并行操作造成的。

对于自定义ForkJoinPool,依赖于此类的构造器。JDK9 添加了迄今为止最全面的一个(详细信息见文档)。

AForkJoinPool对象操作任务。ForkJoinPool中执行的任务的基本类型为ForkJoinTask<V>。更确切地说,执行以下任务:

  • RecursiveAction对于void任务
  • RecursiveTask<V>对于返回值的任务
  • CountedCompleter<T>对于需要记住挂起任务计数的任务

这三种类型的任务都有一个名为compute()的抽象方法,在这个方法中任务的逻辑是成形的。

ForkJoinPool提交任务可以通过以下方式完成:

  • execute()submit()
  • invoke()派生任务并等待结果
  • invokeAll()用于分叉一堆任务(例如,集合)
  • fork()用于安排在池中异步执行此任务,join()用于在完成时返回计算结果

让我们从一个通过RecursiveTask解决的问题开始。

通过RecursiveTask计算总和

为了演示框架的分叉行为,我们假设我们有一个数字列表,并且我们要计算这些数字的总和。为此,我们使用createSubtasks()方法递归地拆分(派生)这个列表,只要它大于指定的THRESHOLD。每个任务都被添加到List<SumRecursiveTask>中。最后通过invokeAll​(Collection<T> tasks)方式将该列表提交给ForkJoinPool。这是使用以下代码完成的:

public class SumRecursiveTask extends RecursiveTask<Integer> { 
        

  private static final Logger logger 
    = Logger.getLogger(SumRecursiveTask.class.getName());
  private static final int THRESHOLD = 10;

  private final List<Integer> worklist;

  public SumRecursiveTask(List<Integer> worklist) { 
        
    this.worklist = worklist;
  }

  @Override
  protected Integer compute() { 
        
    if (worklist.size() <= THRESHOLD) { 
        
      return partialSum(worklist);
    }

    return ForkJoinTask.invokeAll(createSubtasks())
      .stream()
      .mapToInt(ForkJoinTask::join)
      .sum();
  }

  private List<SumRecursiveTask> createSubtasks() { 
        

    List<SumRecursiveTask> subtasks = new ArrayList<>();
    int size = worklist.size();

    List<Integer> worklistLeft 
      = worklist.subList(0, (size + 1) / 2);
    List<Integer> worklistRight 
      = worklist.subList((size + 1) / 2, size);

    subtasks.add(new SumRecursiveTask(worklistLeft));
    subtasks.add(new SumRecursiveTask(worklistRight));

    return subtasks;
  }

  private Integer partialSum(List<Integer> worklist) { 
        

    int sum = worklist.stream()
      .mapToInt(e -> e)
      .sum();

    logger.info(() -> "Partial sum: " + worklist + " = "
      + sum + "\tThread: " + Thread.currentThread().getName());

    return sum;
  }
}

为了测试它,我们需要一个列表和ForkJoinPool如下:

ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();

Random rnd = new Random();
List<Integer> list = new ArrayList<>();

for (int i = 0; i < 200; i++) { 
        
  list.add(1 + rnd.nextInt(10));
}

SumRecursiveTask sumRecursiveTask = new SumRecursiveTask(list);
Integer sumAll = forkJoinPool.invoke(sumRecursiveTask);

logger.info(() -> "Final sum: " + sumAll);

可能的输出如下:

...
[15:17:06] Partial sum: [1, 3, 6, 6, 2, 5, 9] = 32
ForkJoinPool.commonPool-worker-9
...
[15:17:06] Partial sum: [1, 9, 9, 8, 9, 5] = 41
ForkJoinPool.commonPool-worker-7
[15:17:06] Final sum: 1084

用递归运算计算斐波那契函数

斐波那契数通常表示为F(n),是一个遵循以下公式的序列:

F(0) = 0, 
F(1) = 1, 
..., 
F(n) = F(n-1) + F(n-2), n > 1

斐波那契数的快照是:

0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, ...

通过RecursiveAction实现斐波那契数可以如下完成:

public class FibonacciRecursiveAction extends RecursiveAction { 
        

  private static final Logger logger =
    Logger.getLogger(FibonacciRecursiveAction.class.getName());
  private static final long THRESHOLD = 5;

  private long nr;

  public FibonacciRecursiveAction(long nr) { 
        
    this.nr = nr;
  }

  @Override
  protected void compute() { 
        

    final long n = nr;

    if (n <= THRESHOLD) { 
        
      nr = fibonacci(n);
    } else { 
        
      nr = ForkJoinTask.invokeAll(createSubtasks(n))
        .stream()
        .mapToLong(x -> x.fibonacciNumber())
        .sum();
    }
  }

  private List<FibonacciRecursiveAction> createSubtasks(long n) { 
        

    List<FibonacciRecursiveAction> subtasks = new ArrayList<>();

    FibonacciRecursiveAction fibonacciMinusOne
      = new FibonacciRecursiveAction(n - 1);
    FibonacciRecursiveAction fibonacciMinusTwo
      = new FibonacciRecursiveAction(n - 2);

    subtasks.add(fibonacciMinusOne);
    subtasks.add(fibonacciMinusTwo);

    return subtasks;
  }

  private long fibonacci(long n) { 
        
    logger.info(() -> "Number: " + n 
      + " Thread: " + Thread.currentThread().getName());

    if (n <= 1) { 
        
      return n;
    }

    return fibonacci(n - 1) + fibonacci(n - 2);
  }

  public long fibonacciNumber() { 
        
    return nr;
  }
}

为了测试它,我们需要以下ForkJoinPool对象:

ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();

FibonacciRecursiveAction fibonacciRecursiveAction
  = new FibonacciRecursiveAction(12);
forkJoinPool.invoke(fibonacciRecursiveAction);

logger.info(() -> "Fibonacci: "
  + fibonacciRecursiveAction.fibonacciNumber());

F(12)的输出如下:

[15:40:46] Number: 5 Thread: ForkJoinPool.commonPool-worker-3
[15:40:46] Number: 5 Thread: ForkJoinPool.commonPool-worker-13
[15:40: 

标签: th矩形电连接器

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台