Skip to content

Latest commit

 

History

History
1172 lines (824 loc) · 46 KB

File metadata and controls

1172 lines (824 loc) · 46 KB

七、多线程和同步

嵌入式平台跨越了计算能力的广阔版图。 有只有几千字节内存的微控制器;有功能强大的片上系统(SoCS);有能同时运行多个应用的多核 CPU。

随着更多的计算资源可供嵌入式开发人员使用,以及他们可以在其上构建更复杂的应用,多线程支持变得非常重要。 开发人员需要知道如何将他们的应用并行化,以有效地利用所有 CPU 核心。 我们将学习如何编写能够高效、安全地利用所有可用 CPU 核心的应用。

在本章中,我们将介绍以下主题:

  • 探索 C++ 中的线程支持
  • 探索数据同步
  • 使用条件变量
  • 使用原子变量
  • 使用 C++ 内存模型
  • 探索无锁同步
  • 在共享内存中使用原子变量
  • 探索异步功能和未来

这些方法可以用作构建您自己的高效多线程和多处理同步代码的示例。

探索 C++ 中的线程支持

在 C++ 11 之前,线程完全不属于 C++ 语言的范围。 开发人员可以使用特定于平台的库,例如 pthread 或 Win32应用编程接口(API)。 由于每个库都有自己的行为,将应用移植到另一个平台需要大量的开发和测试工作。

C++ 11 引入了线程作为 C++ 标准的一部分,并定义了一组类来在其标准库中创建多线程应用。

在本食谱中,我们将学习如何使用 C++ 在单个应用中生成多个并发线程。

怎么做……

在本食谱中,我们将学习如何创建两个并发运行的工作线程。

  1. 在您的~/test工作目录中,创建一个名为threads的子目录。
  2. 使用您喜欢的文本编辑器在threads子目录中创建threads.cpp文件。 将代码片段复制到threads.cpp文件中:
#include <chrono>
#include <iostream>
#include <thread>

void worker(int index) {
  for (int i = 0; i < 10; i++) {
    std::cout << "Worker " << index << " begins" << std::endl;
    std::this_thread::sleep_for(std::chrono::milliseconds(50));
    std::cout << "Worker " << index << " ends" << std::endl;
    std::this_thread::sleep_for(std::chrono::milliseconds(1));
  }
}

int main() {
  std::thread worker1(worker, 1);
  std::thread worker2(worker, 2);
  worker1.join();
  worker2.join();
  std::cout << "Done" << std::endl;
}
  1. loop子目录下创建名为CMakeLists.txt的文件,内容如下:
cmake_minimum_required(VERSION 3.5.1)
project(threads)
add_executable(threads threads.cpp)

set(CMAKE_SYSTEM_NAME Linux)
set(CMAKE_SYSTEM_PROCESSOR arm)

SET(CMAKE_CXX_FLAGS "--std=c++ 11")
target_link_libraries(threads pthread)

set(CMAKE_CXX_COMPILER /usr/bin/arm-linux-gnueabi-g++)

您可以构建和运行应用。

它是如何运作的..。

在此应用中,我们定义了一个名为worker的函数。 为了使代码简单,它没有做太多有用的工作,只打印Worker X开始和Worker X结束 10 次,消息之间有 50 毫秒的延迟。

main函数中,我们创建了两个工作线程worker1worker2

 std::thread worker1(worker, 1);
 std::thread worker2(worker, 2);

我们将两个参数传递给线程构造函数:

  • 在线程中运行的函数。
  • 函数的参数。 由于我们将先前定义的worker函数作为线程函数传递,因此参数应该与其类型匹配-在我们的示例中,它是int

这样,我们定义了两个执行相同工作但索引不同的工作线程-12

创建线程后,它们立即开始运行;不需要调用任何其他方法来启动它们。 它们完全并发执行,正如我们从程序输出中看到的那样:

我们的工作线程的输出是混合的,有时甚至是乱码的,比如Worker Worker 1 ends2 ends。 之所以会发生这种情况,是因为终端的输出也在并发工作。

由于辅助线程是独立执行的,因此在创建辅助线程之后,主线程不会执行任何操作。 但是,如果主线程的执行到达main函数的末尾,则程序终止。 为了避免这种情况,我们为每个工作线程添加了对join方法的调用。 此方法会一直阻塞,直到线程终止。 这样,我们只在两个工作线程完成工作后才退出主程序。

探索数据同步

数据同步是任何处理多个执行线程的应用的重要方面。 不同的线程通常需要访问相同的变量或内存区域。 由两个或多个独立线程同时写入同一内存可能会导致数据损坏。 即使在另一个线程更新变量的同时读取变量也是危险的,因为在读取时它只能部分更新。

为了避免这些问题,并发线程可以使用所谓的同步原语,即使对共享内存的访问具有确定性和可预测性的 API。

与线程支持的情况类似,C++ 语言在 C++ 11 标准之前没有提供任何同步原语。 从 C++ 11 开始,许多同步原语作为标准的一部分被添加到 C++ 标准库中。

在本食谱中,我们将学习如何使用互斥锁和锁保护来同步对变量的访问。

怎么做……

在前面的配方中,我们了解了如何完全并发运行两个工作线程,并注意到这可能会导致终端的输出出错。 我们将修改前面配方中的代码,使用互斥锁和锁保护添加同步,看看有什么不同。

  1. 在您的~/test工作目录中,创建一个名为mutex的子目录。
  2. 使用您喜欢的文本编辑器在mutex子目录中创建mutex.cpp文件。 将代码片段复制到mutex.cpp文件中:
#include <chrono>
#include <iostream>
#include <mutex>
#include <thread>

std::mutex m;

void worker(int index) {
  for (int i = 0; i < 10; i++) {
    {
 std::lock_guard<std::mutex> g(m);
 std::cout << "Worker " << index << " begins" << std::endl;
 std::this_thread::sleep_for(std::chrono::milliseconds(50));
 std::cout << "Worker " << index << " ends" << std::endl;
 }
    std::this_thread::sleep_for(std::chrono::milliseconds(1));
  }
}

int main() {
  std::thread worker1(worker, 1);
  std::thread worker2(worker, 2);
  worker1.join();
  worker2.join();
  std::cout << "Done" << std::endl;
}
  1. loop子目录下创建名为CMakeLists.txt的文件,内容如下:
cmake_minimum_required(VERSION 3.5.1)
project(mutex)
add_executable(mutex mutex.cpp)

set(CMAKE_SYSTEM_NAME Linux)
set(CMAKE_SYSTEM_PROCESSOR arm)

SET(CMAKE_CXX_FLAGS "--std=c++ 11")
target_link_libraries(mutex pthread)

set(CMAKE_CXX_COMPILER /usr/bin/arm-linux-gnueabi-g++)

您可以构建和运行应用。

它是如何运作的..。

在构建并运行应用之后,我们可以看到它的输出类似于线程应用的输出。 然而,也有明显的不同之处:

首先,输出没有乱码。 其次,我们可以看到一个清晰的顺序--没有工人被另一个工人打断,每个开始后面跟着相应的结束。 区别在于突出显示的源代码片段。 我们创建一个全局mutex m

std::mutex m;

然后,我们使用lock_guard来保护代码的关键部分,它从打印Worker X begins的行开始,到打印Worker X ends的行结束。

lock_guard是互斥锁之上的包装器,它使用RAII(Resource Acquisition 是 Initialization)技术在定义锁对象时自动锁定构造函数中的相应互斥锁,并在到达其作用域结束后在析构函数中解锁它。 这就是为什么我们添加额外的花括号来定义临界区的范围:

    {
      std::lock_guard<std::mutex> g(m);
      std::cout << "Worker " << index << " begins" << std::endl;
      std::this_thread::sleep_for(std::chrono::milliseconds(50));
      std::cout << "Worker " << index << " ends" << std::endl;
    }

虽然可以通过调用互斥锁的 lock 和 unlock 方法来显式锁定和解锁互斥锁,但不建议这样做。 忘记解锁锁定的互斥会导致多线程同步问题,这些问题很难检测,也很难调试。 RAII 方法自动解锁互斥锁,使代码更安全、更易于阅读和理解。

还有更多的..。

线程同步的正确实现需要非常注意细节和透彻的分析。 多线程应用中的一个非常常见的问题是死锁。 这是一种线程被阻塞的情况,因为它正在等待另一个线程,而另一个线程又因为它在等待第一线程而被阻塞。 结果,两个线程被无限阻塞。

如果同步需要两个或多个互斥锁,则会发生死锁。 C++ 17 引入了std::SCOPED_LOCK,可从https://en.cppreference.com/w/cpp/thread/scoped_lock获得,它是多个互斥锁的 RAII 包装器,有助于避免死锁。

使用条件变量

我们了解了如何从两个或多个线程同步对同一变量的同时访问。 线程访问变量的特定顺序并不重要;我们只防止同时读取和写入变量。

线程等待另一个线程开始处理数据是一种常见的场景。 在这种情况下,当数据可用时,第一线程应该通知第二个线程。 它可以使用 C++ 从 C++ 11 标准开始支持的条件变量来完成。

在本食谱中,一旦数据可用,我们将学习如何使用条件变量在单独的线程中激活数据处理。

怎么做……

我们将实现一个具有两个工作线程的应用,类似于我们在探索数据同步食谱中创建的应用。

  1. 在您的~/test工作目录中,创建一个名为condvar的子目录。

  2. 使用您喜欢的文本编辑器在condvar子目录中创建condv.cpp文件。

  3. 现在,我们在condvar.cpp中放置所需的标头并定义全局变量:

#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>

std::mutex m;
std::condition_variable cv;
std::vector<int> result;
int next = 0;
  1. 定义全局变量后,我们添加worker函数,该函数类似于前面配方中的worker函数:
void worker(int index) {
  for (int i = 0; i < 10; i++) {
    std::unique_lock<std::mutex> l(m);
    cv.wait(l, [=]{return next == index; });
    std::cout << "worker " << index << "\n";
    result.push_back(index);
    next = next + 1;
    if (next > 2) { next = 1; };
    cv.notify_all();
  }
}
  1. 最后,我们定义入口点-main函数:
int main() {
  std::thread worker1(worker, 1);
  std::thread worker2(worker, 2);
  {
    std::lock_guard<std::mutex> l(m);
    next = 1;
  }
  std::cout << "Start\n";
  cv.notify_all();
  worker1.join();
  worker2.join();
  for (int e : result) {
    std::cout << e << ' ';
  }
  std::cout << std::endl;
}
  1. loop子目录下创建名为CMakeLists.txt的文件,内容如下:
cmake_minimum_required(VERSION 3.5.1)
cmake_minimum_required(VERSION 3.5.1)
project(condvar)
add_executable(condvar condvar.cpp)

set(CMAKE_SYSTEM_NAME Linux)
set(CMAKE_SYSTEM_PROCESSOR arm)

SET(CMAKE_CXX_FLAGS "--std=c++ 11")
target_link_libraries(condvar pthread)

set(CMAKE_CXX_COMPILER /usr/bin/arm-linux-gnueabi-g++)

您可以构建和运行应用。

它是如何运作的..。

与我们在探索数据同步配方中创建的应用类似,我们创建了两个工作线程worker1worker2,它们使用相同的worker函数线程,只是index参数不同。

除了将消息打印到控制台之外,工作线程还更新全局矢量结果。 每个 Worker 只需将其索引添加到其循环中的result变量中,如以下命令所示:

std::vector<int> result;

我们希望每个 Worker 只在轮流时将其索引添加到结果中-worker 1,然后是worker 2,然后是worker 1,依此类推。 没有同步是不可能做到这一点的;然而,使用互斥锁的简单同步是不够的。 它可以保证两个并发线程不会同时访问代码的同一临界区,但不能保证顺序。 在worker 2锁定互斥之前,worker 1可能会再次锁定互斥。

为了解决排序问题,我们定义了一个cv条件变量和一个next整数变量:

std::condition_variable cv;
int next = 0;

next变量包含工作者的索引。 它使用0进行初始化,并在main函数中设置为特定的工作者索引。 由于此变量是从多个线程访问的,因此我们在锁保护的保护下执行此操作:

  {
    std::lock_guard<std::mutex> l(m);
    next = 1;
  }

尽管工作线程在创建后开始执行,但它们都会立即在条件变量上被阻塞,等待next变量的值与其索引匹配。 条件变量需要std::unique_lock才能等待。 我们在调用wait方法之前创建它:

std::unique_lock<std::mutex> l(m);
cv.wait(l, [=]{return next == index; });

虽然在main函数中将条件变量cv设置为1,但这是不够的。 我们需要显式通知等待条件变量的线程。 我们使用notify_all方法完成此操作:

cv.notify_all();

这将唤醒所有等待的线程,并将它们的索引与next变量进行比较。 匹配的线程解除阻塞,所有其他线程再次进入休眠状态。

活动线程向控制台写入一条消息并更新result变量。 然后,它更新next变量以选择下一个要激活的线程。 我们递增索引,直到其达到最大值,然后将其重置为1

next = next + 1;
if (next > 2) { next = 1; };

main函数中的代码类似,在确定next线程的索引后,我们需要调用notify_all来唤醒所有线程,并让它们决定该轮到谁工作:

cv.notify_all();

当工作线程工作时,main函数等待它们完成:

 worker1.join();
 worker2.join();

当所有工作线程完成时,将打印result变量的值:

  for (int e : result) {
    std::cout << e << ' ';
  }

在构建并运行我们的程序之后,我们将获得以下输出:

正如我们所看到的,所有线程都是按预期顺序激活的。

还有更多的..。

在这个配方中,我们只使用了条件变量对象提供的几个方法。 除了简单的wait函数外,还有等待特定时间或等待到指定时间点的函数。 在https://en.cppreference.com/w/cpp/thread/condition_variable参考页面了解有关C++ 条件变量类的更多信息。

使用原子变量

原子变量之所以这样命名,是因为它们不能部分读取或写入。 例如,比较Pointint数据类型:

struct Point {
  int x, y;
};

Point p{0, 0};
int b = 0;

p = {10, 10};
b = 10;

在本例中,修改p变量相当于两个赋值:

p.x = 10;
p.y = 10;

这意味着读取p变量的任何并发线程都可能获得部分修改的数据,如x=10y=0,这可能会导致难以检测和难以重现的错误计算。 这就是对此类数据类型的访问应该同步的原因。

那么b变量呢? 可以部分修改吗? 答案是:是的,取决于平台。 但是,C++ 提供了一组数据类型和模板,以确保变量作为一个整体以原子方式一次全部更改。

在本食谱中,我们将学习如何使用原子变量进行多线程同步。 由于原子变量不能部分修改,因此不需要使用互斥锁或其他昂贵的同步原语。

怎么做……

我们将创建一个应用,该应用派生两个工作线程来并发更新数据数组。 我们将使用原子变量而不是互斥锁来确保并发更新是安全的。

  1. 在您的~/test工作目录中,创建一个名为atomic的子目录。

  2. 使用您喜欢的文本编辑器在atomic子目录中创建一个atomic.cpp文件。

  3. 现在,我们放置所需的标头,并在atomic.cpp中定义全局变量:

#include <atomic>
#include <chrono>
#include <iostream>
#include <thread>
#include <vector>

std::atomic<size_t> shared_index{0};
std::vector<int> data;
  1. 定义全局变量后,我们添加worker函数。 它类似于前面配方中的worker函数,但除了index之外,它还有一个额外的参数-timeout
void worker(int index, int timeout) {
  while(true) {
  size_t worker_index = shared_index.fetch_add(1);
  if (worker_index >= data.size()) {
      break;
  }
  std::cout << "Worker " << index << " handles "
              << worker_index << std::endl;
  data[worker_index] = data[worker_index] * 2;
    std::this_thread::sleep_for(std::chrono::milliseconds(timeout));
  }
  }
  1. 最后,我们定义入口点-main函数:
int main() {
  for (int i = 0; i < 10; i++) {
    data.emplace_back(i);
  }
  std::thread worker1(worker, 1, 50);
  std::thread worker2(worker, 2, 20);
  worker1.join();
  worker2.join();
  std::cout << "Result: ";
  for (auto& v : data) {
    std::cout << v << ' ';
  }
  std::cout << std::endl;
}
  1. loop子目录下创建名为CMakeLists.txt的文件,内容如下:
cmake_minimum_required(VERSION 3.5.1)
project(atomic)
add_executable(atomic atomic.cpp)

set(CMAKE_SYSTEM_NAME Linux)
set(CMAKE_SYSTEM_PROCESSOR arm)

SET(CMAKE_CXX_FLAGS "--std=c++ 11")
target_link_libraries(atomic pthread)

set(CMAKE_CXX_COMPILER /usr/bin/arm-linux-gnueabi-g++)

您可以构建和运行应用。

它是如何运作的..。

我们正在创建一个使用多个工作线程更新数组所有元素的应用。 对于昂贵的更新操作,此方法可以在多核平台上带来显著的性能提升。

困难在于在多个工作线程之间共享工作,因为每个工作线程处理数据元素可能需要不同的时间。

我们使用shared_index原子变量来存储尚未被任何工作线程声明的下一个元素的索引。 此变量以及要处理的数组被声明为全局变量:

std::atomic<size_t> shared_index{0};
std::vector<int> data;

我们的worker函数类似于早期配方中的worker函数,但有重要区别。 首先,它有一个额外的参数timeout。 这用于模拟处理每个元素所需时间的差异。

其次,我们的工作线程不是固定的迭代次数,而是在循环中运行,直到shared_index变量达到最大值。 这表示所有元素都已处理,工作进程可以终止。

在每次迭代中,一个 Worker 读取shared_index的值。 如果有要处理的元素,它会将shared_index变量的值存储在局部worker_index变量中,同时递增shared_index变量。

虽然可以按照与常规变量相同的方式使用原子变量-首先获取其当前值,然后递增变量-但这可能会导致争用情况。 两个辅助线程几乎可以同时读取该变量。 在本例中,它们都获得相同的值,然后开始处理相同的元素,相互干扰。 这就是为什么我们使用一种特殊的方法fetch_add,该方法递增变量并将其在递增前的值作为一个不可中断的操作返回:

size_t worker_index = shared_index.fetch_add(1);

如果worker_index变量达到数组的大小,则意味着所有元素都已处理,并且 Worker 可以终止:

if (worker_index >= data.size()) {
      break;
}

如果worker_index变量有效,则工作器使用它通过此索引更新数组元素的值。 在我们的示例中,我们只需将其乘以2

data[worker_index] = data[worker_index] * 2;

为了模拟昂贵的数据操作,我们使用自定义延迟。 延迟的持续时间由timeout参数确定:

std::this_thread::sleep_for(std::chrono::milliseconds(timeout));

main函数中,我们将要处理的元素添加到数据向量中。 我们使用循环用 0 到 9 之间的数字填充向量:

for (int i = 0; i < 10; i++) {
    data.emplace_back(i);
}

在初始数据集准备好之后,我们创建两个工作线程,提供indextimeout参数。 使用工作线程的不同超时来模拟不同的性能:

 std::thread worker1(worker, 1, 50);
 std::thread worker2(worker, 2, 20);

然后,我们等待两个工作线程完成它们的作业,并将结果打印到控制台。 当我们构建和运行我们的应用时,我们会得到以下输出:

正如我们所看到的,Worker 2Worker 1处理了更多的元素,因为它的超时时间是 20 毫秒,而Worker 1是 50 毫秒。 此外,所有元素都按照预期进行了处理,没有遗漏和重复。

还有更多的..。

我们学习了如何使用整数原子变量。 虽然这种类型的原子变量是最常用的,但 C++ 也允许定义其他类型的原子变量,包括非整数类型,只要它们是普通的可复制、复制可构造和复制可赋值的。

除了我们在示例中使用的fetch_add方法之外,原子变量还有其他类似的方法,可以帮助开发人员在单个操作中查询值和修改变量。 考虑使用这些方法来避免争用条件或使用互斥锁进行代价高昂的同步。

在 C++ 20 中,原子变量接收waitnotify_allnotify_one方法,类似于条件变量的方法。 它们允许通过使用更高效、更轻量级的原子变量来实现以前需要条件变量的逻辑。

有关原子变量的更多信息,请参见https://en.cppreference.com/w/cpp/atomic/atomic

使用 C++ 内存模型

从 C++ 11 标准开始,C++ 将用于线程和同步的 API 和原语定义为该语言的一部分。 具有多个处理器核心的系统中的存储器同步是复杂的,因为现代处理器可以通过重新排序指令来优化代码执行。 即使在使用原子变量时,也不能保证以所需的顺序修改或访问数据,因为编译器可以更改顺序。

为了避免歧义,C++ 11 引入了内存模型,定义了并发访问内存区域的行为。 作为内存模型的一部分,C++ 定义了std::memory_order枚举,它向编译器提供有关预期访问模型的提示。 这有助于编译器以不干扰预期代码行为的方式优化代码。

在本食谱中,我们将学习如何使用最简单的std::memory_order枚举形式来实现共享计数器变量。

怎么做……

我们正在实现一个应用,该应用具有一个共享计数器,该计数器由两个并发工作线程递增。

  1. 在您的~/test工作目录中,创建一个名为memorder的子目录。
  2. 使用您喜欢的文本编辑器在atomic子目录中创建memorder.cpp文件。
  3. 现在,我们在memorder.cpp中放置所需的标头并定义全局变量:
#include <atomic>
#include <chrono>
#include <iostream>
#include <thread>
#include <vector>

std::atomic<bool> running{true};
std::atomic<int> counter{0};
  1. 定义全局变量后,我们添加worker函数。 该函数仅递增计数器,然后在特定的时间间隔内休眠:
void worker() {
 while(running) {
 counter.fetch_add(1, std::memory_order_relaxed);
 }
 }
  1. 然后,我们定义我们的main函数:
int main() {
  std::thread worker1(worker);
  std::thread worker2(worker);
  std::this_thread::sleep_for(std::chrono::seconds(1));
  running = false;
  worker1.join();
  worker2.join();
  std::cout << "Counter: " << counter << std::endl;
}
  1. loop子目录下创建名为CMakeLists.txt的文件,内容如下:
cmake_minimum_required(VERSION 3.5.1)
project(memorder)
add_executable(memorder memorder.cpp)

set(CMAKE_SYSTEM_NAME Linux)
set(CMAKE_SYSTEM_PROCESSOR arm)

SET(CMAKE_CXX_FLAGS "--std=c++ 11")
target_link_libraries(memorder pthread)

set(CMAKE_CXX_COMPILER /usr/bin/arm-linux-gnueabi-g++)

您可以构建和运行应用。

它是如何运作的..。

在我们的应用中,我们将创建两个工作线程,它们将递增共享计数器,并让它们运行特定的时间量。

作为第一步,我们定义两个全局原子变量runningcounter

std::atomic<bool> running{true};
std::atomic<int> counter{0};

running变量是一个二进制标志。 当它设置为true时,工作线程应该保持运行。 在它更改为false之后,工作线程应该终止。

counter变量是我们的共享计数器。 工作线程将同时递增它。 我们使用在使用原子变量配方的中已经使用的fetch_add方法。 它用于自动递增变量。 在此配方中,我们将一个额外的参数std::memory_order_relaxed传递给此方法:

counter.fetch_add(1, std::memory_order_relaxed);

这一论点是一个提示。 虽然原子性和修改的一致性很重要,并且对于计数器的实现应该得到保证,但是并发存储器访问之间的顺序并不那么重要。 std::memory_order_relaxed为原子变量定义了这种内存访问。 将其传递给fetch_add方法允许我们针对特定的目标平台对其进行微调,以避免可能影响性能的不必要的同步延迟。

main函数中,我们创建了两个工作线程:

std::thread worker1(worker);
std::thread worker2(worker);

然后,主线程暂停 1 秒。 暂停后,主线程将running变量的值设置为false,表示工作线程应该终止:

running = false;

在工作线程终止后,我们打印计数器的值:

结果计数器值由传递给worker函数的超时间隔确定。 在我们的示例中,更改fetch_add方法中的内存顺序类型不会导致结果值发生明显变化。 但是,它可以提高使用原子变量的高并发应用的性能,因为编译器可以在不破坏应用逻辑的情况下对并发线程中的操作进行重新排序。 这种优化高度依赖于开发人员的意图,在没有开发人员提示的情况下无法自动推断。

还有更多的..。

C++ 内存模型和内存排序类型是复杂的主题,需要深入了解现代 CPU 如何访问内存并优化其代码执行。 C++ 内存模型参考https://en.cppreference.com/w/cpp/language/memory_model提供了大量信息,是学习优化多线程应用的高级技术的良好起点。

探索无锁同步

在前面的配方中,我们了解了如何使用互斥锁和锁来同步多线程对共享数据的访问。 如果多个线程尝试运行受锁保护的代码的临界区,则一次只有一个线程可以执行此操作。 所有其他线程都必须等待,直到该线程离开临界区。

但是,在某些情况下,可以在没有互斥锁和显式锁的情况下同步对共享数据的访问。 其想法是使用数据的本地副本进行修改,然后在单个、不可中断且不可分割的操作中更新共享副本。

这种类型的同步取决于硬件。 目标处理器应提供某种形式的比较和交换(CAS)指令。 这将检查内存位置中的值是否与给定值匹配,并仅在匹配时才用新的给定值替换它。 因为它是单处理器指令,所以不能被上下文切换中断。 这使得它成为更复杂的原子操作的基本构建块。

在本食谱中,我们将学习如何检查原子变量是无锁的,还是使用互斥锁或其他锁定操作实现的。 我们还将基于 C++ 11 的原子比较交换函数系列的示例(可从https://en.cppreference.com/w/cpp/atomic/atomic_compare_exchange获得),实现自定义堆栈的无锁推送操作。

怎么做……

我们正在实现一个简单的Stack类,它提供一个名为Push的构造函数和函数。

  1. 在您的~/test工作目录中,创建一个名为lockfree的子目录。
  2. 使用您喜欢的文本编辑器在lockfree子目录中创建lockfree.cpp文件。
  3. 现在,我们放入所需的标头,并在lockfree.cpp文件中定义Node帮助器数据类型:
#include <atomic>
#include <iostream>

struct Node {
  int data;
  Node* next;
};
  1. 接下来,我们定义一个简单的Stack类。 这使用Node数据类型来组织数据存储:
class Stack {
  std::atomic<Node*> head;

  public:
    Stack() {
    std::cout << "Stack is " <<
    (head.is_lock_free() ? "" : "not ")
    << "lock-free" << std::endl;
    }

   void Push(int data) {
      Node* new_node = new Node{data, nullptr};
      new_node->next = head.load();
      while(!std::atomic_compare_exchange_weak(
                &head,
                &new_node->next,
                new_node));
    }
    };
  1. 最后,我们定义了一个简单的main函数,该函数创建Stack的一个实例并将一个元素推入其中:
int main() {
  Stack s;
  s.Push(1);
}
  1. loop子目录下创建名为CMakeLists.txt的文件,内容如下:
cmake_minimum_required(VERSION 3.5.1)
project(lockfree)
add_executable(lockfree lockfree.cpp)

set(CMAKE_SYSTEM_NAME Linux)
set(CMAKE_SYSTEM_PROCESSOR arm)

SET(CMAKE_CXX_FLAGS "--std=c++ 11")
target_link_libraries(lockfree pthread)

set(CMAKE_CXX_COMPILER /usr/bin/arm-linux-gnueabi-g++)

您可以构建和运行应用。

它是如何运作的..。

我们创建了一个简单的应用,它实现了一个简单的整数值堆栈。 我们将堆栈的元素存储在动态内存中,对于每个元素,我们应该能够确定它后面的元素。

为此,我们定义了一个具有两个数据字段的Node帮助器结构。 data字段存储元素的实际值,而next字段是指向堆栈中下一个元素的指针:

int data;
Node* next;

然后,我们定义了Stack类。 通常,堆栈包含两个操作:

  • Push:将元素放在堆栈顶部
  • Pull:从堆栈顶部获取元素

为了跟踪堆栈的顶部,我们创建了一个top变量,该变量保存指向Node对象的指针。 它将是我们堆栈的首位:

std::atomic<Node*> head;

我们还定义了一个简单的构造函数,用于初始化top变量的值并检查它是否无锁。 在 C++ 中,原子变量可以使用原子一致性、可用性和分区容错(CAP)操作或使用常规互斥锁来实现。 这取决于目标 CPU:

(head.is_lock_free() ? "" : "not ")

在我们的应用中,我们只实现了Push方法,以演示如何以无锁的方式完成它。

Push方法接受放在堆栈顶部的值。 为此,我们创建Node对象的新实例:

 Node* new_node = new Node{data, nullptr};

因为我们将元素放在堆栈的顶部,所以指向新创建的实例的指针应该分配给top变量,而top变量的旧值应该分配给新的Node对象的next指针。

但是,直接这样做并不是线程安全的。 两个或多个线程可以同时修改top变量,从而导致数据损坏。 我们需要某种数据同步。 我们可以使用锁和互斥锁来做到这一点,但是也可以用一种无锁的方式来做到这一点。

这就是为什么我们最初只更新下一个指针的原因。 因为我们的新Node对象还不是堆栈的一部分,所以我们可以在没有同步的情况下完成它,因为其他线程无法访问它:

new_node->next = head.load();

现在,我们需要将其添加为堆栈的新top变量。 我们使用std::atomic_compare_exchange_weak函数上的循环来完成此操作:

      while(!std::atomic_compare_exchange_weak(
                &head,
                &new_node->next,
                new_node));

此函数将top变量的值与存储在新元素的next指针中的值进行比较。 如果它们匹配,则用指向新节点的指针替换top变量的值,并返回true。 否则,它将top变量的值写入新元素的next指针并返回false。 由于我们在下一步更新了next指针以匹配top变量,因此只有在调用std::atomic_compare_exchange_weak函数之前另一个线程修改了它,才会发生这种情况。 最终,该函数将返回true,表示top头已使用指向我们的元素的指针进行了更新。

函数的作用是:创建一个 Stack 实例,并将一个元素推入其中。 在输出中,我们可以看到底层实现是否无锁:

对于我们的目标来说,实现是无锁的。

还有更多的..。

无锁同步是一个极其复杂的主题。 无锁数据结构和算法的开发需要大量的工作。 即使是使用无锁操作的简单Push逻辑的实现也不容易理解。 要对代码进行适当的分析和调试,还需要付出更大的努力。 通常,它会导致难以注意和难以实现的微妙问题。

虽然无锁算法的实现可以提高应用的性能,但请考虑使用现有的无锁数据结构库之一,而不是自己编写。 例如,Boost.Lockfree提供了一个可供您使用的无锁数据类型集合。

在共享内存中使用原子变量

我们了解了如何在多线程应用中使用原子变量实现两个或更多线程的同步。 但是,原子变量也可以用于同步作为单独进程运行的独立应用。

我们已经知道如何使用共享内存在两个应用之间交换数据。 现在,我们可以结合这两种技术-共享内存和原子变量-来实现两个独立应用的数据交换和同步。

怎么做……

在本配方中,我们将修改在内存管理中创建的应用,用于在使用共享内存区域的两个处理器之间交换数据。

  1. 在您的~/test工作目录中,创建一个名为shmatomic的子目录。
  2. 使用您喜欢的文本编辑器在shmatomic子目录中创建shmatomic.cpp文件。
  3. 我们重用在shmem应用中创建的共享内存数据结构。 将公共标头和常量放入shmatomic.cpp文件:
#include <atomic>
#include <iostream>
#include <chrono>
#include <thread>

#include <sys/mman.h>
#include <fcntl.h>
#include <unistd.h>

const char* kSharedMemPath = "/sample_point";
  1. 接下来,开始定义模板化的SharedMem类:
template<class T>
class SharedMem {
  int fd;
  T* ptr;
  const char* name;

  public:
  1. 该类将有一个构造函数、一个析构函数和一个 getter 方法。 让我们添加构造函数:
    SharedMem(const char* name, bool owner=false) {
      fd = shm_open(name, O_RDWR | O_CREAT, 0600);
      if (fd == -1) {
        throw std::runtime_error("Failed to open a shared
        memory region");
      }
      if (ftruncate(fd, sizeof(T)) < 0) {
        close(fd);
        throw std::runtime_error("Failed to set size of a shared
        memory region");
      };
      ptr = (T*)mmap(nullptr, sizeof(T), PROT_READ | PROT_WRITE, 
      MAP_SHARED, fd, 0);
      if (!ptr) {
        close(fd);
        throw std::runtime_error("Failed to mmap a shared memory
        region");
      }
      this->name = owner ? name : nullptr;
      }
  1. 简单的析构函数和 getter 如下:
~SharedMem() {
munmap(ptr, sizeof(T));
close(fd);
if (name) {
std::cout << "Remove shared mem instance " << name << std::endl;
shm_unlink(name);
}
}

T& get() const {
return *ptr;
}
};
  1. 现在,我们定义将用于数据交换和同步的数据类型:
struct Payload {
std::atomic_bool data_ready;
std::atomic_bool data_processed;
int index;
};
  1. 接下来,我们定义一个将生成数据的函数:
void producer() {
  SharedMem<Payload> writer(kSharedMemPath);
  Payload& pw = writer.get();
if (!pw.data_ready.is_lock_free()) {
throw std::runtime_error("Flag is not lock-free");
  }
for (int i = 0; i < 10; i++) {
pw.data_processed.store(false);
pw.index = i;
    pw.data_ready.store(true);
while(!pw.data_processed.load());
}
}
  1. 紧跟其后的是使用数据的函数:
void consumer() {
SharedMem<Payload> point_reader(kSharedMemPath, true);
Payload& pr = point_reader.get();
if (!pr.data_ready.is_lock_free()) {
throw std::runtime_error("Flag is not lock-free");
}
for (int i = 0; i < 10; i++) {
 while(!pr.data_ready.load());
    pr.data_ready.store(false);
std::cout << "Processing data chunk " << pr.index << std::endl;
    pr.data_processed.store(true);
}
}
  1. 最后,我们添加main函数,该函数将所有内容联系在一起:
int main() {

if (fork()) {
    consumer();
} else {
    producer();
}
}
  1. loop子目录下创建名为CMakeLists.txt的文件,内容如下:
cmake_minimum_required(VERSION 3.5.1)
project(shmatomic)
add_executable(shmatomic shmatomic.cpp)

set(CMAKE_SYSTEM_NAME Linux)
set(CMAKE_SYSTEM_PROCESSOR arm)

SET(CMAKE_CXX_FLAGS "--std=c++ 11")
target_link_libraries(shmatomic pthread rt)

set(CMAKE_CXX_COMPILER /usr/bin/arm-linux-gnueabi-g++)

您可以构建和运行应用。

它是如何运作的..。

在我们的应用中,我们重用了在内存管理中介绍的模板化SharedMem类。 此类用于在共享内存区域中存储特定类型的元素。 让我们快速回顾一下它是如何工作的。

SharedMem类是可移植操作系统接口(POSIX)共享内存 API 之上的包装器。 它定义了三个私有数据字段来保存系统特定的处理程序和指针,并公开了一个由两个函数组成的公共接口:

  • 接受共享区域名称和所有权标志的构造函数
  • 返回对存储在共享内存中的对象的引用的get方法

该类还定义了一个析构函数,该函数执行正确关闭共享对象所需的所有操作。 因此,SharedMem类可以通过 C++ RAII 习惯用法用于安全的资源管理。

SharedMem类是模板化的类。 它由我们希望存储在共享内存中的数据类型参数化。 为此,我们定义了一个名为Payload的结构:

struct Payload {
  std::atomic_bool data_ready;
  std::atomic_bool data_processed;
  int index;
};

它有一个我们将用作数据交换字段的index整数变量,以及用于数据同步的两个原子布尔标志data_readydata_processed

我们还定义了两个函数producerconsumer,它们将在单独的进程中工作,并使用共享内存区域在彼此之间交换数据。

producer函数正在生成数据区块。 首先,它创建SharedMem类的一个实例,该实例由Payload数据类型参数化。 它将共享内存区的路径传递给SharedMem构造函数:

SharedMem<Payload> writer(kSharedMemPath);

创建共享内存实例后,它将获取对存储在其中的有效负载数据的引用,并检查我们在Payload数据类型中定义的任何原子标志是否无锁:

if (!pw.data_ready.is_lock_free()) {
    throw std::runtime_error("Flag is not lock-free");
}

该函数在一个循环中产生 10 个数据块。 块的索引被放入有效载荷的index字段:

pw.index = i;

但是,除了将数据放入共享内存之外,我们还需要同步对该数据的访问。 这是我们使用原子旗的时候。

对于每个迭代,在更新index字段之前,我们将重置data_processed标志。 在索引更新之后,我们设置data ready标志,这是告诉消费者新的数据块已准备好的指示器,并等待消费者处理数据。 我们循环直到data_processed标志变为true,然后转到下一个迭代:

pw.data_ready.store(true);
while(!pw.data_processed.load());

consumer函数的工作方式与此类似。 因为它在单独的进程中工作,所以它通过使用相同的路径创建SharedMem类的实例来打开相同的共享内存区域。 我们还使consumer函数成为共享内存实例的所有者。 这意味着在其SharedMem实例被销毁后,它负责删除共享内存区:

SharedMem<Payload> point_reader(kSharedMemPath, true);

producer函数类似,consumer函数检查原子标志是否无锁,并进入数据消耗循环。

对于每一次迭代,它都会在一个紧密的循环中等待,直到数据准备就绪:

while(!pr.data_ready.load());

producer函数将data_ready标志设置为true后,consumer函数可以安全地读取和处理数据。 在我们的实现中,它只将index字段打印到控制台。 数据处理后,consumer功能通过将data_processed标志设置为true来表示这一点:

pr.data_processed.store(true);

这将触发producer函数端的下一次数据生产迭代:

因此,我们可以看到经过处理的数据块的确定性输出,没有遗漏或重复;这在数据访问不同步的情况下很常见。

探索异步功能和未来

在多线程应用中处理数据同步是困难的、容易出错的,并且需要开发人员编写大量代码来正确调整数据交换和数据通知。 为了简化开发,C++ 11 引入了一种标准 API,用于以类似于常规同步函数调用的方式编写异步代码,并隐藏了大量同步复杂性。

在本食谱中,我们将学习如何使用异步函数调用和期货在多线程中运行我们的代码,而实际上不需要额外的工作,从而实现数据同步。

怎么做……

我们将实现一个简单的应用,该应用在单独的线程中调用一个长时间运行的函数并等待其结果。 当函数运行时,应用可以继续处理其他计算。

  1. 在您的~/test工作目录中,创建一个名为async的子目录。
  2. 使用您喜欢的文本编辑器在async子目录中创建一个async.cpp文件。
  3. 将我们的应用代码放到async.cpp文件中,从公共头文件和我们的长时间运行的函数开始:
#include <chrono>
#include <future>
#include <iostream>

int calculate (int x) {
  auto start = std::chrono::system_clock::now();
  std::cout << "Start calculation\n";
  std::this_thread::sleep_for(std::chrono::seconds(1));
  auto delta = std::chrono::system_clock::now() - start;
  auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(delta);
  std::cout << "Done in " << ms.count() << " ms\n";
  return x*x;
}
  1. 接下来,添加test函数,该函数调用长时间运行的函数:
void test(int value, int worktime) {
  std::cout << "Request result of calculations for " << value << std::endl;
  std::future<int> fut = std::async (calculate, value);
  std::cout << "Keep working for " << worktime << " ms" << std::endl;
  std::this_thread::sleep_for(std::chrono::milliseconds(worktime));
  auto start = std::chrono::system_clock::now();
  std::cout << "Waiting for result" << std::endl;
  int result = fut.get();
  auto delta = std::chrono::system_clock::now() - start;
  auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(delta);

  std::cout << "Result is " << result
            << ", waited for " << ms.count() << " ms"
            << std::endl << std::endl;
}
  1. 最后,添加一个main极简函数:
int main ()
{
  test(5, 400);
  test(8, 1200);
  return 0;
}
  1. loop子目录下创建名为CMakeLists.txt的文件,内容如下:
cmake_minimum_required(VERSION 3.5.1)
project(async)
add_executable(async async.cpp)

set(CMAKE_SYSTEM_NAME Linux)
set(CMAKE_SYSTEM_PROCESSOR arm)

SET(CMAKE_CXX_FLAGS "--std=c++ 14")
target_link_libraries(async pthread -static-libstdc++)

set(CMAKE_CXX_COMPILER /usr/bin/arm-linux-gnueabi-g++)

您可以构建和运行应用。

它是如何运作的..。

在我们的应用中,我们定义了一个应该需要很长时间才能运行的calculate函数。 从技术上讲,我们的函数计算整数参数的平方,但我们添加了一个人为延迟,使其运行 1 秒。 我们使用sleep_for标准库函数向应用添加延迟:

std::this_thread::sleep_for(std::chrono::seconds(1));

除了计算之外,该函数还会记录开始工作的时间、完成的时间和花费的时间。

接下来,我们定义了一个调用calculate函数的test函数,以演示异步调用是如何工作的。

该函数有两个参数。 第一个参数是传递给calculate函数的值。 第二个参数是test函数在运行calculate函数之后和请求结果之前将要花费的时间。 通过这种方式,我们对函数可以与其请求的计算并行执行的有用工作进行建模。

test函数通过在异步模式下运行calculate函数并向其传递第一个参数value开始工作:

std::future<int> fut = std::async (calculate, value);

async函数隐式产生一个线程并开始执行calculate函数。

因为我们异步运行函数,所以结果还没有准备好。 相反,async函数返回std::future的一个实例,该对象将在结果可用时保存该结果。

接下来,我们模拟有用的工作。 在我们的示例中,它是指定时间间隔的暂停。 在可以并行完成的工作完成之后,我们需要得到calculate函数的结果才能继续。 要请求结果,我们使用std::future对象的get方法,如下所示:

int result = fut.get();

get方法会一直阻塞,直到结果可用。 然后,我们可以计算等待结果所花费的时间,并将结果和等待时间一起输出到控制台。

main函数中,我们运行test函数来评估两个场景:

  • 有用的工作比结果的计算花费更少的时间。
  • 有用的工作比结果的计算花费更多的时间。

运行应用会产生以下输出。

在第一个场景中,我们可以看到我们正在开始计算,然后在计算完成之前开始等待结果。 结果,get方法被阻塞了 600 毫秒,直到结果准备就绪:

在第二个场景中,有用的工作花费了1200毫秒。 正如我们所看到的,在请求结果之前已经完成了计算,因此,get方法没有阻塞,并立即返回结果。

还有更多的..。

期货和异步函数为编写并行且易于理解的代码提供了强大的机制。 异步功能灵活,支持不同的执行策略。 承诺是另一种使开发人员能够克服异步编程复杂性的机制。 更多信息可在位于[https://en.cppreference.com/w/cpp/thread/future]的std::future、位于[https://en.cppreference.com/w/cpp/thread/promise]的std::promise以及位于[https://en.cppreference.com/w/cpp/thread/async]的std::async的参考页中找到。