资讯详情

《C++ Concurrencyin Action》第4章--同步并发操作

前言

本章主要内容:

1、等待事件
2.期待等待一次性事件
3.在有限的时间内等待
4.使用同步操作简化代码

在上一章中,我们看到了各种在线程间保护共享数据的方法。当仅要保护数据,还要同步单独的线程。例如,在第一个线程完成之前,可能需要等待另一个线程完成。通常,线程会等待特定事件的发生或某些条件的实现(为true)。这可能需要定期检查任务完成标志,或者将类似的东西放入共享数据中,但这与理想情况相差甚远。在这种情况下,需要在线程中同步,C 标准库以条件变量的形式提供了一些同步操作的工具(condition variables)和期望(futures)。

本章将讨论如何使用条件变量等待事件,介绍期望,并简化同步操作。

4.1 等待事件或其他条件

假设你在旅行,在夜间运行的火车上。晚上,如何在正确的站点下车?一种方法是整晚都要醒着,然后注意到了哪一站。这样,你就不会错过你想到达的网站,但这会让你感到很累。另外,你可以看一下时间表,估计一下火车到达目的地的时间,然后在一个稍早的时间点上设置闹铃,然后你就可以安心的睡会了。这种方法听起来也很不错,也没有错过你想下车的车站,但是火车晚了,你就会过早醒来。当然,闹钟的电池也可能没电,导致你睡过站。理想的方法是,无论是早还是晚,只要有人或其他东西能在火车到达车站时唤醒你。

这与线程有什么关系?好吧,让我们联系一下。当一个线程等待另一个线程完成任务时,它会有很多选择。首先,它可以继续检查共享数据标志(用于保护工作的相互排斥),直到另一行完成工作。然而,这是一种浪费:线程消耗宝贵的执行时间,不断检查相应的标志,当相互排斥被等待线程定时,其他线程无法获得锁,因此线程将继续等待。由于上述方法限制了等待线程的资源,并在完成时阻碍了标识的设置。这种情况类似于整晚和列车司机聊天:司机必须慢慢开车,因为你分散了他的注意力,所以火车需要更长的时间才能到达车站。同样,等待的线程会等待更长的时间,这也会消耗系统资源。

第二种选择是等待线程检查间隙std::this_thread::sleep_for()定期间歇(详见4).3节):

bool flag; std::mutex m;  void wait_for_flag() { 
            std::unique_lock<std::mutex> lk(m);    while(!flag)    { 
              lk.unlock();  //** 1 **解锁互斥量      std::this_thread::sleep_for(std::chrono::milliseconds(100));  //** 2 **休眠100ms      lk.lock();   // **3 **再锁互斥量    } } 

在这个循环中,休眠前②,解锁函数对互斥量①,并且在休眠结束后锁定相互排斥,因此其他线程有机会获得锁并设置标识。

这种实现取得了很大的进步,因为当线程休眠时,线程不会浪费执行时间,但很难确定正确的休眠时间。休眠时间过短,就像没有休眠一样,会浪费执行时间;休眠时间过长,可能会让任务等待线程醒来。在高节奏的游戏中,休眠时间过长是很少见的,因为会直接影响程序的行为(fast-paced game)在实时应用中,它意味着丢帧或超过一个时间片。

第三种选择(也是优先选择)是使用C 等待事件发生工具等待事件发生。通过另一个线程触发等待事件的机制是最基本的唤醒方式(例如,当流水线上有额外的任务时),称为条件变量(condition variable)。从概念上讲,一个条件变量将与多个事件或其他条件有关,一个或多个线程将等待条件的实现。当某些线程终止时,为了唤醒等待线程的信息(允许等待线程继续执行)。

4.1.1 等待条件实现

C 标准库实现了两套条件变量:std::condition_variable和std::condition_variable_any。这两个实现都包括在内<condition_variable>在头文件的声明中。两者都需要相互排斥才能工作(相互排斥是为了同步);前者仅限于和谐std::mutex一起工作,后者可以和任何符合最低标准的互斥量一起工作,从而增加*_any*的后缀。因为std::condition_variable_any更加通用,这就可能从体积、性能,以及系统资源的使用方面产生额外的开销,所以std::condition_variable一般作为首选类型,当对灵活性有硬性要求时,我们会考虑std::condition_variable_any。

因此,如何使用std::condition_variable处理前面提到的情况——当需要处理数据时,如何唤醒休眠中的线程?以下列表显示了一种使用条件变量来唤醒的方法。

清单4.1 使用std::condition_variable等待数据

std::mutex mu;
std::queue<data_chunk> data_queue;  //** 1**
std::condition_variable data_cond;

void data_preparation_thread()
{ 
        
  while(more_data_to_prepare())
  { 
        
     data_chunkconst data=prepare_data();
     std::lock_guard<std::mutex> lk(mut);
     data_queue.push(data);  // **2**
     data_cond.notify_one();  //** 3**
  }
}

void data_processing_thread()
{ 
        
   while(true)
   { 
        
     std::unique_lock<std::mutex> lk(mut);  // **4**
     data_cond.wait(lk,[]{ 
        return !data_queue.empty();});  // **5**
     data_chunkdata=data_queue.front();
     data_queue.pop();
     lk.unlock();  //** 6**
     process(data);
     
     if(is_last_chunk(data))
        break;
  }
}

首先,你拥有一个用来在两个线程之间传递数据的队列①。当数据准备好时,使用std::lock_guard对队列上锁,将准备好的数据压入队列中②,之后线程会对队列中的数据上锁。然后调用std::condition_variable的notify_one()成员函数,对等待的线程(如果有等待线程)进行通知③。

在另外一侧,你有一个正在处理数据的线程,这个线程首先对互斥量上锁,但在这里std::unique_lock要比std::lock_guard④更加合适——且听我细细道来。线程之后会调用std::condition_variable的成员函数wait(),传递一个锁和一个lambda函数表达式(作为等待的条件⑤)。Lambda函数是C++11添加的新特性,它可以让一个匿名函数作为其他表达式的一部分,并且非常合适作为标准函数的谓词,例如wait()函数。在这个例子中,简单的lambda函数**[]{return !data_queue.empty();}**会去检查data_queue是否不为空,当data_queue不为空——那就意味着队列中已经准备好数据了。附录A的A.5节有Lambda函数更多的信息。

wait()会去检查这些条件(通过调用所提供的lambda函数),当条件满足(lambda函数返回true)时返回。如果条件不满足(lambda函数返回false),wait()函数将解锁互斥量,并且将这个线程(上段提到的处理数据的线程)置于阻塞或等待状态。当准备数据的线程调用notify_one()通知条件变量时,处理数据的线程从睡眠状态中苏醒,重新获取互斥锁,并且对条件再次检查,在条件满足的情况下,从wait()返回并继续持有锁。当条件不满足时,线程将对互斥量解锁,并且重新开始等待。这就是为什么用std::unique_lock而不使用std::lock_guard——等待中的线程必须在等待期间解锁互斥量,并在这这之后对互斥量再次上锁,而std::lock_guard没有这么灵活。如果互斥量在线程休眠期间保持锁住状态,准备数据的线程将无法锁住互斥量,也无法添加数据到队列中;同样的,等待线程也永远不会知道条件何时满足。

清单4.1使用了一个简单的lambda函数用于等待⑤,这个函数用于检查队列何时不为空,不过任意的函数和可调用对象都可以传入wait()。当你已经写好了一个函数去做检查条件(或许比清单中简单检查要复杂很多),那就可以直接将这个函数传入wait();不一定非要放在一个lambda表达式中。在调用wait()的过程中,一个条件变量可能会去检查给定条件若干次;然而,它总是在互斥量被锁定时这样做,当且仅当提供测试条件的函数返回true时,它就会立即返回。当等待线程重新获取互斥量并检查条件时,如果它并非直接响应另一个线程的通知,这就是所谓的“伪唤醒”(spurious wakeup)。因为任何伪唤醒的数量和频率都是不确定的,这里不建议使用一个有副作用的函数做条件检查。当你这样做了,就必须做好多次产生副作用的心理准备。

解锁std::unique_lock的灵活性,不仅适用于对wait()的调用;它还可以用于有待处理但还未处理的数据⑥。处理数据可能是一个耗时的操作,并且如你在第3章见到的,你就知道持有锁的时间过长是一个多么糟糕的主意。

使用队列在多个线程中转移数据(如清单4.1)是很常见的。做得好的话,同步操作可以限制在队列本身,同步问题和条件竞争出现的概率也会降低。鉴于这些好处,现在从清单4.1中提取出一个通用线程安全的队列。

4.1.2 使用条件变量构建线程安全队列

当你正在设计一个通用队列时,花一些时间想想有哪些操作需要添加到队列实现中去,就如之前在3.2.3节看到的线程安全的栈。可以看一下C++标准库提供的实现,找找灵感;std::queue<>容器的接口展示如下:

清单4.2 std::queue接口

template <class T, class Container =std::deque<T> >
class queue 
{ 
        
public:
  explicit queue(const Container&);
  explicit queue(Container&& = Container());
  template <class Alloc> explicitqueue(const Alloc&);
  template<class Alloc> queue(const Container&, const Alloc&);
  template<class Alloc> queue(Container&&, const Alloc&);
  template<class Alloc> queue(queue&&, const Alloc&);
 
  void swap(queue& q);
  bool empty()const;
  size_type size() const;
  T& front();
  const T&front() const;
  T& back();
  const T&back() const;
  void push(const T& x);
  void push(T&& x);
  void pop();
  template<class... Args> void emplace(Args&&... args);
};

当你忽略构造、赋值以及交换操作时,你就剩下了三组操作:1. 对整个队列的状态进行查询(empty()和size());2.查询在队列中的各个元素(front()和back());3.修改队列的操作(push(), pop()和emplace())。这就和3.2.3中的栈一样了,因此你也会遇到在固有接口上的条件竞争。因此,你需要将front()和pop()合并成一个函数调用,就像之前在栈实现时合并top()和pop()一样。与清单4.1中的代码不同的是:当使用队列在多个线程中传递数据时,接收线程通常需要等待数据的压入。这里我们提供pop()函数的两个变种:try_pop()和wait_and_pop()。try_pop() ,尝试从队列中弹出数据,总会直接返回(当有失败时),即使没有指可检索;wait_and_pop(),将会等待有值可检索的时候才返回。当你使用之前栈的方式来实现你的队列,你实现的队列接口就可能会是下面这样:

清单4.3 线程安全队列的接口

#include <memory> // 为了使用std::shared_ptr

template<typename T>
class threadsafe_queue
{ 
        
public:
    threadsafe_queue();
    threadsafe_queue(const threadsafe_queue&);
    threadsafe_queue& operator=(const threadsafe_queue&) =delete;  // 不允许简单的赋值
    void push(Tnew_value);
    booltry_pop(T& value);  // 1
    std::shared_ptr<T> try_pop(); // 2
    voidwait_and_pop(T& value);
    std::shared_ptr<T> wait_and_pop();
    bool empty()const;
};

就像之前对栈做的那样,在这里你将很多构造函数剪掉了,并且禁止了对队列的简单赋值。和之前一样,你也需要提供两个版本的try_pop()和wait_for_pop()。第一个重载的try_pop()①在引用变量中存储着检索值,所以它可以用来返回队列中值的状态;当检索到一个变量时,他将返回true,否则将返回false(详见A.2节)。第二个重载②就不能做这样了,因为它是用来直接返回检索值的。当没有值可检索时,这个函数可以返回NULL指针。

那么问题来了,如何将以上这些和清单4.1中的代码相关联呢?好吧,我们现在就来看看怎么去关联。你可以从之前的代码中提取push()和wait_and_pop(),如以下清单所示。

清单4.4 从清单4.1中提取push()和wait_and_pop()

#include <queue>
#include <mutex>
#include <condition_variable>

template<typename T>
class threadsafe_queue
{ 
        
private:
  std::mutexmut;
 std::queue<T> data_queue;
 std::condition_variable data_cond;

public:
  void push(Tnew_value)
  { 
        
     std::lock_guard<std::mutex> lk(mut);
     data_queue.push(new_value);
     data_cond.notify_one();
  }

  voidwait_and_pop(T& value)
  { 
        
     std::unique_lock<std::mutex> lk(mut);
     data_cond.wait(lk,[this]{ 
        return !data_queue.empty();});
     value=data_queue.front();
     data_queue.pop();
  }
};

threadsafe_queue<data_chunk> data_queue;  // **1**

void data_preparation_thread()
{ 
        
  while(more_data_to_prepare())
  { 
        
     data_chunkconst data=prepare_data();
     data_queue.push(data);  // **2**
  }
}

void data_processing_thread()
{ 
        
  while(true)
  { 
        
    data_chunkdata;
    data_queue.wait_and_pop(data);  //**3**
    process(data);
   
    if(is_last_chunk(data))
      break;
  }
}

线程队列的实例中包含有互斥量和条件变量,所以独立的变量就不需要了①,并且调用push()也不需要外部同步②。当然,wait_and_pop()还要兼顾条件变量的等待③。

另一个wait_and_pop()函数的重载写起来就很琐碎了,剩下的函数就像从清单3.5实现的栈中一个个的粘过来一样。最终的队列实现如下所示。

清单4.5 使用条件变量的线程安全队列(完整版)

#include <queue>
#include <memory>
#include <mutex>
#include <condition_variable>

template<typename T>
class threadsafe_queue
{ 
        
private:
  mutablestd::mutex mut;  // **1** 互斥量必须是可变的 
 std::queue<T> data_queue;
 std::condition_variable data_cond;

public:
 threadsafe_queue() { 
        }
 threadsafe_queue(threadsafe_queue const& other)
  { 
        
   std::lock_guard<std::mutex> lk(other.mut);
   data_queue=other.data_queue;
  }

  void push(Tnew_value)
  { 
        
   std::lock_guard<std::mutex> lk(mut);
   data_queue.push(new_value);
   data_cond.notify_one();
  }

  voidwait_and_pop(T& value)
  { 
        
   std::unique_lock<std::mutex> lk(mut);
   data_cond.wait(lk,[this]{ 
        return !data_queue.empty();});
   value=data_queue.front();
   data_queue.pop();
  }

 std::shared_ptr<T> wait_and_pop()
  { 
        
   std::unique_lock<std::mutex> lk(mut);
   data_cond.wait(lk,[this]{ 
        return !data_queue.empty();});
   std::shared_ptr<T>res(std::make_shared<T>(data_queue.front()));

   data_queue.pop();
    return res;
  }

  bool try_pop(T& value)
  { 
        
   std::lock_guard<std::mutex> lk(mut);
   if(data_queue.empty())
      return false;

   value=data_queue.front();
   data_queue.pop();
    return true;
  }

 std::shared_ptr<T> try_pop()
  { 
        
   std::lock_guard<std::mutex> lk(mut);
   if(data_queue.empty())
      returnstd::shared_ptr<T>();

   std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));

   data_queue.pop();

    return res;

  }

 

  bool empty()const
  { 
        
   std::lock_guard<std::mutex> lk(mut);
    return data_queue.empty();
  }
};

empty()是一个const成员函数,并且传入拷贝构造函数的other形参是一个const引用;因为其他线程可能有这个类型的非const引用对象,并调用变种成员函数,所以这里有必要对互斥量上锁。如果锁住互斥量是一个可变操作,那么这个互斥量对象就会标记为可变的①,之后他就可以在empty()和拷贝构造函数中上锁了。

条件变量在多个线程等待同一个事件时,也是很有用的。当线程用来分解工作负载,并且只有一个线程可以对通知做出反应,与清单4.1中使用的结构完全相同;运行多个数据实例——处理线程(processing thread)。当新的数据准备完成,调用notify_one()将会触发一个正在执行wait()的线程,去检查条件和wait()函数的返回状态(因为你仅是向data_queue添加一个数据项)。 这里不保证线程一定会被通知到,即使只有一个等待线程被通知时,所有处线程也有可能都在处理数据。

另一种可能是,很多线程等待同一事件,对于通知他们都需要做出回应。这会发生在共享数据正在初始化的时候,当处理线程可以使用同一数据时,就要等待数据被初始化(有不错的机制可用来应对;可见第3章,3.3.1节),或等待共享数据的更新,比如,定期重新初始化(periodicreinitialization)。在这些情况下,准备线程准备数据数据时,就会通过条件变量调用notify_all()成员函数,而非直接调用notify_one()函数。顾名思义,这就是全部线程在都去执行wait()(检查他们等待的条件是否满足)的原因。

当等待线程只等待一次,当条件为true时,它就不会再等待条件变量了,所以一个条件变量可能并非同步机制的最好选择。尤其是,条件在等待一组可用的数据块时。在这样的情况下,期望(future)就是一个适合的选择。

4.2 使用期望等待一次性事件

假设你乘飞机去国外度假。当你到达机场,并且办理完各种登机手续后,你还需要等待机场广播通知你登机,可能要等很多个小时。你可能会在候机室里面找一些事情来打发时间,比如:读书,上网,或者来一杯价格不菲的机场咖啡,不过从根本上来说你就在等待一件事情:机场广播能够登机的时间。给定的飞机班次再之后没有可参考性;当你在再次度假的时候,你可能会等待另一班飞机。

C++标准库模型将这种一次性事件称为“期望” (future)。当一个线程需要等待一个特定的一次性事件时,在某种程度上来说它就需要知道这个事件在未来的表现形式。之后,这个线程会周期性(较短的周期)的等待或检查,事件是否触发(检查信息板);在检查期间也会执行其他任务(品尝昂贵的咖啡)。另外,在等待任务期间它可以先执行另外一些任务,直到对应的任务触发,而后等待期望的状态会变为“就绪”(ready)。一个“期望”可能是数据相关的(比如,你的登机口编号),也可能不是。当事件发生时(并且期望状态为就绪),这个“期望”就不能被重置。

在C++标准库中,有两种“期望”,使用两种类型模板实现,声明在头文件中: 唯一期望(unique futures)(std::future<>)和共享期望(shared futures)(std::shared_future<>)。这是仿照std::unique_ptr和std::shared_ptr。std::future的实例只能与一个指定事件相关联,而std::shared_future的实例就能关联多个事件。后者的实现中,所有实例会在同时变为就绪状态,并且他们可以访问与事件相关的任何数据。这种数据关联与模板有关,比如std::unique_ptr和std::shared_ptr的模板参数就是相关联的数据类型。在与数据无关的地方,可以使用std::future与std::shared_future的特化模板。虽然,我希望用于线程间的通讯,但是“期望”对象本身并不提供同步访问。当多个线程需要访问一个独立“期望”对象时,他们必须使用互斥量或类似同步机制对访问进行保护,如在第3章提到的那样。不过,在你将要阅读到的4.2.5节中,多个线程会对一个std::shared_future<>实例的副本进行访问,而不需要期望同步,即使他们是同一个异步结果。

最基本的一次性事件,就是一个后台运行出的计算结果。在第2章中,你已经了解了std::thread 执行的任务不能有返回值,并且我能保证,这个问题将在使用“期望”后解决——现在就来看看是怎么解决的。

4.2.1 带返回值的后台任务

假设,你有一个需要长时间的运算,你需要其能计算出一个有效的值,但是你现在并不迫切需要这个值。可能你已经找到了生命、宇宙,以及万物的答案,就像道格拉斯·亚当斯[1]一样。你可以启动一个新线程来执行这个计算,但是这就意味着你必须关注如何传回计算的结果,因为std::thread并不提供直接接收返回值的机制。这里就需要std::async函数模板(也是在头文件中声明的)了。

当任务的结果你不着急要时,你可以使用std::async启动一个异步任务。与std::thread对象等待运行方式的不同,std::async会返回一个std::future对象,这个对象持有最终计算出来的结果。当你需要这个值时,你只需要调用这个对象的get()成员函数;并且直到“期望”状态为就绪的情况下,线程才会阻塞;之后,返回计算结果。下面清单中代码就是一个简单的例子。

清单4.6 使用std::future从异步任务中获取返回值

#include <future>
#include <iostream>

int find_the_answer_to_ltuae();
void do_other_stuff();

int main()
{ 
        
   std::future<int> the_answer=std::async(find_the_answer_to_ltuae);
   do_other_stuff();
   std::cout<<"The answer is"<<the_answer.get()<<std::endl;
}

与std::thread 做的方式一样,std::async允许你通过添加额外的调用参数,向函数传递额外的参数。当第一个参数是一个指向成员函数的指针,第二个参数提供有这个函数成员类的具体对象(不是直接的,就是通过指针,还可以包装在std::ref中),剩余的参数可作为成员函数的参数传入。否则,第二个和随后的参数将作为函数的参数,或作为指定可调用对象的第一个参数。就如std::thread,当参数为右值(rvalues)时,拷贝操作将使用移动的方式转移原始数据。这就允许使用“只移动”类型作为函数对象和参数。来看一下下面的程序清单:

清单4.7 使用std::async向函数传递参数

#include <string>
#include <future> 

struct X
{ 
        
  voidfoo(int,std::string const&);
  std::stringbar(std::string const&);
};

X x;

autof1=std::async(&X::foo,&x,42,"hello");  // 调用p->foo(42, "hello"),p是指向x的指针
autof2=std::async(&X::bar,x,"goodbye");  // 调用tmpx.bar("goodbye"), tmpx是x的拷贝副本

struct Y
{ 
        
  double operator()(double);
};

Y y;

auto f3=std::async(Y(),3.141);  // 调用tmpy(3.141),tmpy通过Y的移动构造函数得到
auto f4=std::async(std::ref(y),2.718);  // 调用y(2.718)

X baz(X&);

std::async(baz,std::ref(x
        标签: sunx电涡流位移传感器

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台