Skip to content

Latest commit

 

History

History
1073 lines (782 loc) · 40.4 KB

File metadata and controls

1073 lines (782 loc) · 40.4 KB

六、内存管理

内存效率是嵌入式应用的主要要求之一。 由于目标嵌入式平台的性能和内存能力通常有限,因此开发人员需要知道如何以最有效的方式使用可用内存。

令人惊讶的是,最有效的方法并不一定意味着使用最少的内存量。 由于嵌入式系统是专门化的,开发人员提前知道哪些应用或组件将在系统上执行。 除非在同一系统中运行的另一个应用可以使用额外的内存,否则在一个应用中节省内存不会带来任何收益。 这就是为什么嵌入式系统中内存管理的最重要特征是确定性或可预测性。 知道应用在任何负载下都可以使用 2 兆字节的内存比知道应用在大多数情况下可以使用 1 兆字节的内存,但偶尔需要 3 兆字节要重要得多。

同样,可预测性也适用于内存分配和释放时间。 在许多情况下,嵌入式应用倾向于花费更多内存来实现确定性计时。

在本章中,我们将学习几种在嵌入式应用中广泛使用的内存管理技术。 本章介绍的食谱如下:

  • 使用动态内存分配
  • 浏览对象池
  • 使用环形缓冲区
  • 使用共享内存
  • 使用专用内存

这些方法将帮助您理解内存管理最佳实践,并可在应用中使用内存分配时作为构建块使用。

使用动态内存分配

动态内存分配是 C++ 开发人员的一种普遍做法,在 C++ 标准库中得到了广泛的应用,但在嵌入式系统环境中,它往往成为难以发现和难以避免的问题的根源。

最值得注意的问题是时机。 内存分配的最坏情况时间是不受限制的;但是,嵌入式系统,特别是那些控制真实进程或设备的系统,通常需要在特定的时间内做出响应。

另一个问题是支离破碎。 当分配和释放不同大小的内存块时,出现的内存区域在技术上是空闲的,但由于它们太小而无法满足应用请求,因此无法分配。 内存碎片会随着时间的推移而增长,并可能导致内存分配请求失败,尽管可用内存总量相当大。

避免此类问题的一个简单而强大的策略是在编译时或启动时预先分配应用可能需要的所有内存。 然后,应用根据需要使用该内存。 此内存一旦分配,在应用终止之前永远不会被释放。

这种方法的一个缺点是,应用分配的内存比此时实际使用的内存多,而不是让其他应用使用。 实际上,这对于嵌入式应用来说不是问题,因为它们在受控环境中运行,在该环境中,所有应用及其内存需求都是事先知道的。

怎么做……

在本食谱中,我们将学习如何预分配内存并在稍后的应用中使用:

  1. 在您的工作~/test目录中,创建一个名为prealloc的子目录。
  2. 使用您喜欢的文本编辑器在prealloc子目录中创建名为prealloc.cpp的文件。 将以下代码片段复制到prealloc.cpp文件中以定义SerialDevice类:
#include <cstdint>
#include <string.h>

constexpr size_t kMaxFileNameSize = 256;
constexpr size_t kBufferSize = 4096;
constexpr size_t kMaxDevices = 16;

class SerialDevice {
    char device_file_name[256];
    uint8_t input_buffer[kBufferSize];
    uint8_t output_buffer[kBufferSize];
    int file_descriptor;
    size_t input_length;
    size_t output_length;

  public:
    SerialDevice():
      file_descriptor(-1), input_length(0), output_length(0) {}

    bool Init(const char* name) {
      strncpy(device_file_name, name, sizeof(device_file_name));
    }

    bool Write(const uint8_t* data, size_t size) {
      if (size > sizeof(output_buffer)) {
        throw "Data size exceeds the limit";
      }
      memcpy(output_buffer, data, size);
    }

    size_t Read(uint8_t* data, size_t size) {
      if (size < input_length) {
        throw "Read buffer is too small";
      }
      memcpy(data, input_buffer, input_length);
      return input_length;
    }
};
  1. 添加使用SerialDevice类的main函数:
int main() {
  SerialDevice devices[kMaxDevices];
  size_t number_of_devices = 0;

  uint8_t data[] = "Hello";
  devices[0].Init("test");
  devices[0].Write(data, sizeof(data));
  number_of_devices = 1;

  return 0;
}
  1. loop子目录中创建名为CMakeLists.txt的文件,内容如下:
cmake_minimum_required(VERSION 3.5.1)
project(prealloc)
add_executable(prealloc prealloc.cpp)

set(CMAKE_SYSTEM_NAME Linux)
set(CMAKE_SYSTEM_PROCESSOR arm)

SET(CMAKE_CXX_FLAGS "--std=c++ 17")
set(CMAKE_CXX_COMPILER /usr/bin/arm-linux-gnueabi-g++)

现在您可以构建和运行应用了。 它不输出任何数据,因为它的目的是演示我们如何在不知道设备数量和与设备交换的消息大小的情况下预先分配内存。

它是如何运作的..。

在本配方中,我们定义了封装与串行设备的数据交换的对象。 设备由可变长度的设备文件名字符串标识。 我们可以向设备发送和从设备接收长度可变的消息。

由于我们只能在运行时发现连接到系统的设备数量,因此我们可能会在发现设备对象时创建它。 同样,由于我们不知道发送和接收的消息的大小,因此为消息动态分配内存是很自然的。

相反,我们预先分配未初始化设备对象的数组:

  SerialDevice devices[kMaxDevices];

反过来,每个对象预分配足够数量的内存来存储消息和设备文件名:

  char device_file_name[kMaxFileNameSize];
  uint8_t input_buffer[kBufferSize];
  uint8_t output_buffer[kBufferSize];

我们使用局部变量来跟踪输入和输出缓冲区中的实际数据大小。 不需要跟踪文件名的大小,因为它应该是以零结尾的:

  size_t input_length;
  size_t output_length;

同样,我们跟踪发现的实际设备数量:

  size_t number_of_devices = 0;

这样,我们就避免了动态内存分配。 然而,这是有代价的:我们人为地限制了设备的最大数量和我们支持的消息的最大大小。 其次,大量分配的内存从未被使用过。 例如,如果我们最多支持 16 个设备,而系统中只有 1 个设备,那么我们实际上只使用了已分配内存的 1/16。 如前所述,这对于嵌入式系统来说不是问题,因为所有应用及其需求都是预定义的。 没有任何应用可以从它可以分配的额外内存中受益。

浏览对象池

正如我们在本章的第一个配方中讨论的那样,预先分配应用使用的所有内存是一种有效的策略,可以帮助嵌入式应用避免与内存碎片和分配时间相关的各种陷阱。

自组织存储器预分配的一个缺点是应用现在负责跟踪预分配的对象使用情况。

对象池旨在通过提供通用且方便的接口来隐藏对象跟踪的负担,该接口类似于动态内存分配,但使用预先分配的数组中的对象。

怎么做……

在本食谱中,我们将创建一个简单的对象池实现,并学习如何在您的应用中使用它:

  1. 在您的工作~/test目录中,创建一个名为objpool的子目录。
  2. 使用您喜欢的文本编辑器在objpool子目录中创建objpool.cpp文件。 让我们定义一个模板化的ObjectPool类。 我们从私有数据成员和构造函数开始:
#include <iostream>

template<class T, size_t N>
class ObjectPool {
  private:
    T objects[N];
    size_t available[N];
    size_t top = 0;
  public:
    ObjectPool(): top(0) {
      for (size_t i = 0; i < N; i++) {
        available[i] = i;
      }
    }
  1. 现在,让我们添加一个从池中获取元素的方法:
    T& get() {
      if (top < N) {
        size_t idx = available[top++ ];
        return objects[idx];
      } else {
        throw std::runtime_error("All objects are in use");
      }
    }
  1. 接下来,我们添加一个将元素返回到池的方法:
    void free(const T& obj) {
      const T* ptr = &obj;
      size_t idx = (ptr - objects) / sizeof(T);
      if (idx < N) {
        if (top) {
          top--;
          available[top] = idx;
        } else {
          throw std::runtime_error("Some object was freed more than once");
        }
      } else {
        throw std::runtime_error("Freeing object that does not belong to
       the pool");
      }
     }
  1. 然后,用一个小函数来包装类定义,该函数返回池中请求的元素数量:
    size_t requested() const { return top; }
    };
  1. 定义要存储在对象池中的数据类型,如以下代码所示:
struct Point {
  int x, y;
};
  1. 然后添加使用对象池的代码:
int main() {
  ObjectPool<Point, 10> points;

  Point& a = points.get();
  a.x = 10; a.y=20;
  std::cout << "Point a (" << a.x << ", " << a.y << ") initialized, requested "        <<
    points.requested() << std::endl;

  Point& b = points.get();
  std::cout << "Point b (" << b.x << ", " << b.y << ") not initialized, requested " <<
    points.requested() << std::endl;

  points.free(a);
  std::cout << "Point a(" << a.x << ", " << a.y << ") returned, requested " <<
    points.requested() << std::endl;

  Point& c = points.get();
  std::cout << "Point c(" << c.x << ", " << c.y << ") not intialized, requested " <<
    points.requested() << std::endl;

  Point local;
  try {
    points.free(local);
  } catch (std::runtime_error e) {
    std::cout << "Exception caught: " << e.what() << std::endl;
  }
  }
  1. loop子目录中创建名为CMakeLists.txt的文件,内容如下:
cmake_minimum_required(VERSION 3.5.1)
project(objpool)
add_executable(objpool objpool.cpp)

set(CMAKE_SYSTEM_NAME Linux)
set(CMAKE_SYSTEM_PROCESSOR arm)

SET(CMAKE_CXX_FLAGS "--std=c++ 11")
set(CMAKE_CXX_COMPILER /usr/bin/arm-linux-gnueabi-g++)
  1. 构建应用并将生成的可执行二进制文件复制到目标系统。 使用第 2 章设置环境中的食谱来完成。
  2. 切换到目标系统终端。 如果需要,使用用户凭据登录。
  3. 运行二进制文件。

它是如何运作的..。

在这个应用中,我们使用与第一个配方中相同的概念(预先分配的对象的静态数组);但是,我们将其包装到一个模板化的ObjectPool类中,以提供一个通用接口来处理不同类型的对象。

我们的模板有两个参数-存储在ObjectPool类的实例中的对象的类或数据类型,以及池大小。 这些参数用于定义类的两个私有数据字段-对象数组和自由索引数组:

     T objects[N];
     size_t available[N];

由于模板参数是在编译时解析的,因此这些数组是静态分配的。 此外,该类有一个名为top的私有数据成员,它充当available数组中的索引,并指向下一个可用对象。

可用数组包含objects数组中当前可供使用的所有对象的索引。 一开始,所有对象都是空闲的,可用数组中所有元素的索引填充到可用数组中:

      for (size_t i = 0; i < N; i++) {
        available[i] = i;
      }

当应用需要从池中获取元素时,它会调用get方法。 此方法使用 top 变量获取池中下一个可用元素的索引:

      size_t idx = available[top++ ];
      return objects[idx];

top索引达到数组的大小时,意味着不能再分配更多的元素,因此该方法抛出一个异常来指示错误情况:

      throw std::runtime_error("All objects are in use");

可以使用free将对象返回到池中。 首先,它根据元素的地址检测元素的索引。 索引计算为对象地址和池起始地址之间的差值。 由于池对象是连续存储在内存中的,因此我们可以很容易地筛选出相同类型的对象,但不能筛选出源自该池的对象:

      const T* ptr = &obj;
      size_t idx = (ptr - objects) / sizeof(T);

注意,因为size_t类型是无符号的,所以我们不需要检查结果索引是否小于零-这是不可能的。 如果我们尝试将一个对象返回到不属于它的池中,并且该对象的地址小于池的起始地址,那么它无论如何都会被视为正索引。

如果我们返回的对象属于池,我们将更新顶部计数器,并将结果索引放入可用数组中以供进一步使用:

  top--;
  available[top] = idx;

否则,我们抛出一个异常,指示我们试图返回一个不是从该池中取出的对象:

     throw std::runtime_error("Freeing object that does not belong to the pool");

请求的方法用于跟踪池对象的使用情况。 它返回 top 变量,该变量有效地跟踪已声明但尚未返回池的对象的数量。

     size_t requested() const { return top; }

让我们定义一个数据类型,并尝试使用池中的对象。 我们声明一个名为Point的结构,该结构包含两个int字段,如以下代码所示:

 struct Point {
  int x, y;
 };

现在,我们创建一个大小为10Point对象池:

    ObjectPool<Point, 10> points;

我们从池中获取一个对象并填充其数据字段:

 Point& a = points.get();
 a.x = 10; a.y=20;

该程序会生成以下输出:

输出的第一行根据请求报告一个对象。

我们再请求一个对象并按原样打印其数据字段,而不进行任何初始化。 不出所料,池报告请求了两个对象。

现在,我们将第一个对象返回到池中,并确保请求的对象数量减少。 我们还可以注意到,即使在将对象返回到池之后,我们也可以从其中读取数据。

让我们再从池子里认领一件物品。 请求的计数增加,但请求的对象与我们在上一步返回的对象相同。

我们可以看到,Point c在从池中取出后没有初始化,但是它的字段包含与Point a相同的值。 实际上,现在ac是对池中同一对象的引用,因此修改变量a会影响变量c。 这是我们实现对象池的限制之一。

最后,我们创建一个本地Point对象,并尝试将其返回到池中:

  Point local;
  try {
    points.free(local);
  } catch (std::runtime_error e) {
    std::cout << "Exception caught: " << e.what() << std::endl;
  }

预计它会因例外而失败,事实也的确如此。 在程序输出中,您可以看到Exception caught: Freeing object that does not belong to the pool消息。

还有更多的..。

尽管对象池的实现简化了对预分配对象的处理,但它也有许多限制。

首先,所有对象都是从一开始就创建的。 因此,调用池的get方法不会触发对象构造函数,调用free方法也不会调用析构函数。 开发人员需要使用各种解决方法来初始化和取消初始化对象。

一种可能的解决方法是定义目标对象的特殊方法,如initializedeinitialize,它们将分别由ObjectPool类的getfree方法调用。 然而,这种方法将类的实现耦合到ObjectPool实现。 在本章的后面部分,我们将介绍克服这一限制的更高级技术。

我们的池实现不会检测是否为一个对象多次调用了free方法。 这是一个错误,但它很常见,并且会导致难以调试的问题。 虽然在技术上可行,但它增加了实现的额外复杂性,这对于本例来说是不必要的。

使用环形缓冲区

环形缓冲区或循环缓冲区是嵌入式世界中广泛使用的数据结构。 它的工作方式是放置在固定大小内存阵列顶部的队列。 缓冲区可以包含固定数量的元素。 生成这些元素的函数按顺序逐个将它们放入缓冲区。 当到达缓冲区的末尾时,它切换到缓冲区的开头,就好像它的第一个元素跟在最后一个元素之后一样。

在组织独立且不能相互等待的数据生产者和消费者之间的数据交换时,这种设计已被证明是非常高效的,这是嵌入式开发中的常见场景。 例如,当中断被禁用时,中断服务例程应该快速将来自设备的数据排队以进行进一步处理。 如果它落后,它就不能等待处理数据的函数。 同时,处理功能不需要与中断服务例程(ISR)完全同步;它可以一次处理多个元素,并在稍后赶上 ISR。

这一点,再加上环可以静态预分配的事实,使得环缓冲区在许多情况下成为最佳选择。

怎么做……

在本食谱中,我们将学习如何在 C++ 数组之上创建和使用环形缓冲区:

  1. 在您的工作~/test目录中,创建一个名为ringbuf的子目录。
  2. 使用您喜欢的文本编辑器在ringbuf子目录中创建ringbuf.cpp文件。
  3. private数据字段开始定义RingBuffer类:
#include <iostream>

template<class T, size_t N>
class RingBuffer {
  private:
    T objects[N];
    size_t read;
    size_t write;
    size_t queued;
  public:
    RingBuffer(): read(0), write(0), queued(0) {}
  1. 现在我们添加一个将数据推送到缓冲区的方法:
    T& push() {
      T& current = objects[write];
      write = (write + 1) % N;
      queued++ ;
      if (queued > N) {
        queued = N;
        read = write;
      }
      return current;
    }
  1. 接下来,我们添加一个从缓冲区拉取数据的方法:
    const T& pull() {
      if (!queued) {
        throw std::runtime_error("No data in the ring buffer");
      }
      T& current = objects[read];
      read = (read + 1) % N;
      queued--;
      return current;
    }
  1. 让我们添加一个小方法来检查缓冲区是否包含任何数据,并结束类定义:
bool has_data() {
  return queued != 0;
}
};
  1. 定义了RingBuffer之后,我们现在可以添加使用它的代码。 首先,让我们定义要使用的数据类型:
struct Frame {
  uint32_t index;
  uint8_t data[1024];
};
  1. 其次,添加main函数并将RingBuffer的实例定义为其变量,以及尝试使用空缓冲区的代码:
int main() {
  RingBuffer<Frame, 10> frames;

  std::cout << "Frames " << (frames.has_data() ? "" : "do not ")
      << "contain data" << std::endl;
  try {
    const Frame& frame = frames.pull();
  } catch (std::runtime_error e) {
    std::cout << "Exception caught: " << e.what() << std::endl;
  }
  1. 接下来,在缓冲区中添加使用五个元素的代码:
for (size_t i = 0; i < 5; i++) {
Frame& out = frames.push();
out.index = i;
out.data[0] = 'a' + i;
out.data[1] = '\0';
  }
std::cout << "Frames " << (frames.has_data() ? "" : "do not ")
<< "contain data" << std::endl;
while (frames.has_data()) {
const Frame& in = frames.pull();
    std::cout << "Frame " << in.index << ": " << in.data << std::endl;
  }
  1. 在此之后,添加处理大量可以添加的元素的类似代码:
    for (size_t i = 0; i < 26; i++) {
    Frame& out = frames.push();
    out.index = i;
    out.data[0] = 'a' + i;
    out.data[1] = '\0';
    }
    std::cout << "Frames " << (frames.has_data() ? "" : "do not ")
      << "contain data" << std::endl;
    while (frames.has_data()) {
    const Frame& in = frames.pull();
    std::cout << "Frame " << in.index << ": " << in.data << std::endl;
    }
    }
  1. loop子目录中创建名为CMakeLists.txt的文件,内容如下:
cmake_minimum_required(VERSION 3.5.1)
project(ringbuf)
add_executable(ringbuf ringbuf.cpp)

set(CMAKE_SYSTEM_NAME Linux)
set(CMAKE_SYSTEM_PROCESSOR arm)

SET(CMAKE_CXX_FLAGS "--std=c++ 11")
set(CMAKE_CXX_COMPILER /usr/bin/arm-linux-gnueabi-g++)
  1. 构建应用并将生成的可执行二进制文件复制到目标系统。 使用第 2 章设置环境中的食谱来完成。
  2. 切换到目标系统终端。 如果需要,使用用户凭据登录。
  3. 运行二进制文件。

它是如何运作的..。

我们将环形缓冲区实现为具有三个私有数据字段的模板化 C++ 类:

  • objects:类型为TN元素的静态数组
  • read:从中读取元素的索引
  • write:要将元素写入的索引

RingBuffer类公开三个公共方法:

  • push():将数据写入缓冲区
  • pull():从缓冲区读取数据
  • has_data():检查缓冲区是否包含数据

让我们仔细看看它们是如何工作的。

push()方法旨在由函数用于在缓冲区中存储数据。 与动态队列或动态堆栈的类似push()方法(接受将值存储为参数)不同,我们的实现不接受任何参数。 由于所有元素都是在编译时预分配的,因此它返回对缓冲区中要更新的值的引用。

push()方法的实现很简单;它通过write索引获取指向元素的指针,然后推进write索引并增加存储在缓冲区中的元素数量。 请注意,当write索引达到大小限制时,如何使用除法余数运算符将write索引换行到数组的开头:

T& current = objects[write];
write = (write + 1) % N;
queued++ ;

如果我们试图推送超过objects数组容量所能处理的元素,会发生什么情况呢? 这取决于我们计划存储在缓冲区中的数据的性质。 在我们的实现中,我们假设接收方对最近的数据感兴趣,并且如果不能赶上发送方,可以容忍中间数据的丢失。 如果接收方速度太慢,在接收方read数据之前,发送方运行多少圈都无关紧要:此时所有超过N步的数据都会被覆盖。 这就是为什么,一旦存储的元素数量超过N,我们就开始将read索引与write索引一起前进,以使它们正好保持N步的距离:

 if (queued > N) {
  queued = N;
  read = write;
 }

pull()方法由从缓冲区读取数据的函数使用。 与push()方法类似,它不接受任何参数,并返回对缓冲区中元素的引用。 不过,与push()方法不同的是,它返回一个常量引用(如下面的代码所示),以指示它不应该修改缓冲区中的数据:

 const T& pull() {

首先,它检查缓冲区中是否有数据,如果缓冲区不包含元素,则抛出异常:

  if (!queued) {
   throw std::runtime_error("No data in the ring buffer");
  }

它通过读取索引获取对元素的引用,然后推进read索引,应用与push()方法对write索引相同的除法余数运算符:

  read = (read + 1) % N;
  queued--;

has_data()方法的实现很简单。 如果对象计数器为零,则返回false,否则返回true

  bool has_data() {
  return queued != 0;
  }

现在,让我们在行动中试一试。 我们声明一个简单的数据结构Frame,它模仿设备生成的数据。 它包含一个帧索引和一个不透明的数据缓冲区:

  uint32_t index;
  uint8_t data[1024];
  };

我们定义一个容量为10frame 类型元素的环形缓冲区:

  RingBuffer<Frame, 10> frames;

让我们来看看程序输出:

首先,如预期的那样,我们尝试从空缓冲区读取并获得异常。

然后,我们使用拉丁字母字符作为数据有效负载,将五个元素写入缓冲区:

  for (size_t i = 0; i < 5; i++) {
    Frame& out = frames.push();
    out.index = i;
    out.data[0] = 'a' + i;
    out.data[1] = '\0';
  }

注意我们如何获取对元素的引用,然后就地更新它,而不是将frame的本地副本推入环形缓冲区。 然后我们读取缓冲区中的所有数据并将其打印在屏幕上:

  while (frames.has_data()) {
    const Frame& in = frames.pull();
    std::cout << "Frame " << in.index << ": " << in.data << std::endl;
  }

程序输出表明,我们可以成功读取所有五个元素。 现在,我们尝试将拉丁字母表中的 26 个字母全部写入数组,远远超出了数组的容量。

 for (size_t i = 0; i < 26; i++) {
    Frame& out = frames.push();
    out.index = i;
    out.data[0] = 'a' + i;
    out.data[1] = '\0';
  }

然后,我们以与读取五种元素相同的方式读取数据。 读取成功,但我们只收到写入的最后 10 个元素;此时所有其他帧都已丢失并被覆盖。 这对于我们的示例应用并不重要,但对于许多其他应用来说可能是不可接受的。 确保数据不会丢失的最佳方法是保证接收方比发送方更频繁地激活。 有时,如果缓冲区中没有可用的数据,接收器将被激活,但为了避免数据丢失,这是可以接受的代价。

使用共享内存

在支持MMU(简写为内存管理单元)的硬件上运行的现代操作系统中,每个应用都作为一个进程运行,并将其内存与其他应用隔离。

这种隔离带来了重要的可靠性好处。 一个应用不可能意外损坏另一个应用的内存。 同样,意外损坏自身内存并崩溃的应用可以由操作系统关闭,而不会影响系统中的其他应用。 将嵌入式系统的功能解耦到几个孤立的应用中,这些应用通过定义明确的 API 相互通信,显著降低了实现的复杂性,从而提高了稳定性。

然而,与世隔绝是要付出代价的。 由于每个进程都有自己的独立地址空间,因此两个应用之间的数据交换意味着数据复制、上下文切换和操作系统内核同步机制的使用,这可能相对昂贵。

共享内存是许多操作系统提供的一种机制,用于将某些内存区域声明为共享。 这样,应用可以在不复制的情况下交换数据。 这对于交换大型数据对象(例如视频帧或音频样本)尤其重要。

怎么做……

在本食谱中,我们将学习如何使用 Linux 共享内存 API 在两个或多个应用之间进行数据交换:

  1. 在您的工作~/test目录中,创建一个名为shmem的子目录。
  2. 使用您喜欢的文本编辑器在shmem子目录中创建shmem.cpp文件。 定义SharedMem类,从公共标头和常量开始:
#include <algorithm>
#include <iostream>
#include <chrono>
#include <thread>

#include <sys/mman.h>
#include <fcntl.h>
#include <unistd.h>

const char* kSharedMemPath = "/sample_point";
const size_t kPayloadSize = 16;

using namespace std::literals;

template<class T>
class SharedMem {
  int fd;
  T* ptr;
  const char* name;

  public:
  1. 然后,定义一个完成大部分工作的构造函数:
SharedMem(const char* name, bool owner=false) {
fd = shm_open(name, O_RDWR | O_CREAT, 0600);
if (fd == -1) {
throw std::runtime_error("Failed to open a shared memory region");
}
if (ftruncate(fd, sizeof(T)) < 0) {
close(fd);
throw std::runtime_error("Failed to set size of a shared memory 
region");
};
ptr = (T*)mmap(nullptr, sizeof(T), PROT_READ | PROT_WRITE, 
MAP_SHARED, fd, 0);
if (!ptr) {
close(fd);
    throw std::runtime_error("Failed to mmap a shared memory region");
}
    this->name = owner ? name : nullptr;
    std::cout << "Opened shared mem instance " << name << std::endl;
}
  1. 添加析构函数的定义:
    ~SharedMem() {
      munmap(ptr, sizeof(T));
      close(fd);
      if (name) {
        std::cout << "Remove shared mem instance " << name << std::endl;
        shm_unlink(name);
      }
      }
  1. 使用一个返回对共享对象的引用的小方法完成类定义:
    T& get() const {
      return *ptr;
    }
    };
  1. 我们的SharedMem类可以使用不同的数据类型。 让我们声明一个我们想要使用的自定义数据结构:
struct Payload {
  uint32_t index;
  uint8_t raw[kPayloadSize];
};
  1. 现在添加将数据写入共享内存的代码:
void producer() {
  SharedMem<Payload> writer(kSharedMemPath);
  Payload& pw = writer.get();
  for (int i = 0; i < 5; i++) {
    pw.index = i;
    std::fill_n(pw.raw, sizeof(pw.raw) - 1, 'a' + i);
    pw.raw[sizeof(pw.raw) - 1] = '\0';
    std::this_thread::sleep_for(150ms);
  }
}
  1. 另外,添加从共享内存读取数据的代码:
void consumer() {
  SharedMem<Payload> point_reader(kSharedMemPath, true);
  Payload& pr = point_reader.get();
  for (int i = 0; i < 10; i++) {
    std::cout << "Read data frame " << pr.index << ": " << pr.raw << std::endl;
    std::this_thread::sleep_for(100ms);
  }
  }
  1. 添加main函数将所有内容绑定在一起,如以下代码所示:
int main() {

  if (fork()) {
    consumer();
  } else {
    producer();
  }
  }
  1. loop子目录中创建名为CMakeLists.txt的文件,内容如下:
cmake_minimum_required(VERSION 3.5.1)
project(shmem)
add_executable(shmem shmem.cpp)
target_link_libraries(shmem rt)

set(CMAKE_SYSTEM_NAME Linux)
set(CMAKE_SYSTEM_PROCESSOR arm)

SET(CMAKE_CXX_FLAGS "--std=c++ 14")
set(CMAKE_CXX_COMPILER /usr/bin/arm-linux-gnueabi-g++)
  1. 构建应用并将生成的可执行二进制文件复制到目标系统。 使用第 2 章设置环境中的食谱来完成。
  2. 切换到目标系统终端。 如果需要,使用用户凭据登录。
  3. 运行二进制文件。

它是如何运作的..。

在本配方中,我们使用POSIX(Portable Operating System Interface)API 来处理共享内存。 这是一个灵活且细粒度的 C API,有许多可以调优或配置的参数。 我们的目标是通过在底层 API 上实现一个更方便、类型安全的 C++ 包装器来隐藏它的复杂性。 我们将使用RAII(资源获取的缩写是 Initialization)习惯用法来确保所有分配的资源都被正确释放,并且我们的应用中不会有内存或文件描述符泄漏。

我们定义了一个模板化的SharedMem类。 模板参数定义了存储在共享内存实例中的数据类型。 这样,我们使SharedMem类类型的实例变得安全。 与我们在应用代码中使用空指针和强制转换类型不同,C++ 编译器会自动为我们执行此操作:

template<class T>
class SharedMem {

所有共享内存分配和初始化都在SharedMem构造函数中实现。 它接受两个参数:

  • 共享内存对象名称
  • 所有权标志

POSIX 定义了一个shm_openAPI,其中共享内存对象由名称标识,类似于文件名。 这样,使用相同名称的两个独立进程可以引用相同的共享内存对象。 共享对象的生命周期是多少? 当为同一对象名调用shm_unlink函数时,共享对象被销毁。 如果该对象由多个进程使用,则第一个调用shm_open的进程将创建该对象,其他进程将重用同一对象。 但它们中的哪一个要为它的删除负责呢? 这就是所有权标志的用途。 当设置为true时,它表示SharedMem实例在被销毁时负责清除共享对象。

构造函数顺序调用三个 POSIX API 函数。 首先,它使用shm_open创建一个共享对象。 虽然函数接受访问标志和文件权限作为参数,但我们始终使用读写访问模式,对当前用户进行读写访问:

fd = shm_open(name, O_RDWR | O_CREAT, 0600);

接下来,我们使用ftruncate调用定义共享区域的大小。 为此,我们使用模板数据类型的大小:

if (ftruncate(fd, sizeof(T)) < 0) {

最后,我们使用mmap函数将共享区域映射到我们的进程内存地址空间。 它返回一个指针,我们可以使用该指针引用数据实例:

ptr = (T*)mmap(nullptr, sizeof(T), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);

该对象将共享内存块的文件描述符和指向内存区域的指针作为其私有成员。 当对象被销毁时,析构函数释放它们。 如果设置了所有者标志,我们还会保留对象名称,以便可以将其删除:

int fd;
T* ptr;
const char* name;

SharedMem析构函数将共享内存对象从地址空间取消映射:

 munmap(ptr, sizeof(T));

如果对象是所有者,我们可以使用shm_unlink调用删除它。 请注意,我们不再需要 Owner 标志,因为名称设置为nullptr,除非对象是 Owner:

 if (name) {
   std::cout << "Remove shared mem instance " << name << std::endl;
 shm_unlink(name);
 }

要访问共享数据,该类提供了一个简单的get方法。 它返回对存储在共享内存中的对象的引用:

  T& get() const {
      return *ptr;
  }

让我们创建两个使用我们创建的共享内存 API 的独立进程。 我们使用 POSIXfork函数来派生子进程。 子流程将是数据生产者,父流程将是数据使用者:

  if (fork()) {
    consumer();
  } else {
    producer();
  }

我们定义了Payload数据类型,生产者和消费者都使用该数据类型进行数据交换:

  struct Payload {
  uint32_t index;
  uint8_t raw[kPayloadSize];
  };

数据生成器创建一个SharedMem实例:

  SharedMem<Payload> writer(kSharedMemPath);

它使用通过get方法接收到的引用,每 150 毫秒更新一次共享对象。 每次,它都会递增有效负载的索引字段,并使用与索引匹配的拉丁字母填充其数据。

消费者和生产者一样简单。 它创建一个与生产者同名的SharedMem实例,但声明该对象的所有权。 这意味着它将负责删除它,如以下代码所示:

  SharedMem<Payload> point_reader(kSharedMemPath, true);

运行应用并观察以下输出:

每隔 100 毫秒,应用就会从共享对象读取数据并将其打印到屏幕上。 在消费者输出中,我们可以看到它接收生产者写入的数据。 由于消费者周期和生产者周期的持续时间不匹配,我们可以看到,有时会读取相同的数据两次

本例中有意省略的一个重要逻辑部分是生产者和消费者的同步。 因为它们作为独立的项目运行,所以不能保证生产者在消费者尝试读取数据时已经更新了任何数据。 以下是我们在结果输出中看到的内容:

Opened shared mem instance /sample_point
Read data frame 0: 
Opened shared mem instance /sample_point

我们可以看到,在生产者打开相同的对象之前,消费者打开了共享内存对象并读取了一些数据。

同样,不能保证当消费者尝试读取数据字段时,生产者会完全更新数据字段。 我们将在下一章更详细地讨论这个主题。

还有更多的..。

共享内存本身是一种快速高效的进程间通信机制,但当与环形缓冲区结合使用时,它确实大放异彩。 通过将环形缓冲区放入共享内存,开发人员允许独立的数据生产者和数据消费者异步交换数据,并且同步开销最小。

使用专用内存

嵌入式系统通常在特定的存储器地址范围内提供对其外部设备的访问。 当程序访问此类区域中的地址时,它不会读取或写入内存中的值。 相反,数据被发送到设备或从映射到该地址的设备读取。

该技术通常命名为MMIO(缩写为内存映射输入/输出)。 在本食谱中,我们将学习如何从用户空间 Linux 应用使用 MMIO 访问 Raspberry PI 的外部设备。

怎么做……

Raspberry PI 有许多可通过 MMIO 访问的外部设备。 为了演示 MMIO 的工作原理,我们的应用将访问系统计时器:

  1. 在您的工作~/test目录中,创建一个名为timer的子目录。
  2. 使用您喜欢的文本编辑器在timer子目录中创建名为timer.cpp的文件。
  3. 将所需的标头、常量和类型声明放入timer.cpp
#include <iostream>
#include <chrono>
#include <system_error>
#include <thread>

#include <fcntl.h>
#include <sys/mman.h>

constexpr uint32_t kTimerBase = 0x3F003000;

struct SystemTimer {
  uint32_t CS;
  uint32_t counter_lo;
  uint32_t counter_hi;
};
  1. 添加main函数,该函数包含程序的所有逻辑:
int main() {

  int memfd = open("/dev/mem", O_RDWR | O_SYNC);
  if (memfd < 0) {
  throw std::system_error(errno, std::generic_category(),
  "Failed to open /dev/mem. Make sure you run as root.");
  }

  SystemTimer *timer = (SystemTimer*)mmap(NULL, sizeof(SystemTimer),
  PROT_READ|PROT_WRITE, MAP_SHARED,
  memfd, kTimerBase);
  if (timer == MAP_FAILED) {
  throw std::system_error(errno, std::generic_category(),
  "Memory mapping failed");
  }

  uint64_t prev = 0;
  for (int i = 0; i < 10; i++) {
   uint64_t time = ((uint64_t)timer->counter_hi << 32) + timer->counter_lo;
   std::cout << "System timer: " << time;
   if (i > 0) {
   std::cout << ", diff " << time - prev;
    }
    prev = time;
    std::cout << std::endl;
    std::this_thread::sleep_for(std::chrono::milliseconds(10));
  }
  return 0;
 }
  1. timer子目录中创建名为CMakeLists.txt的文件,内容如下:
cmake_minimum_required(VERSION 3.5.1)
project(timer)
add_executable(timer timer.cpp)

set(CMAKE_SYSTEM_NAME Linux)
set(CMAKE_SYSTEM_PROCESSOR arm)

SET(CMAKE_CXX_FLAGS "--std=c++ 11")

set(CMAKE_CXX_COMPILER /usr/bin/arm-linux-gnueabi-g++)
  1. 现在您可以构建和运行应用了。

Please note that it should be run under root on a real Raspberry PI 3 device.

它是如何运作的..。

系统定时器是使用 MMIO 接口连接到处理器的外部设备。 这意味着它有一个专用的物理地址范围,每个地址都有特定的格式和用途。

我们的应用使用表示为两个 32 位值的定时器计数器。 它们组合在一起,形成一个 64 位只读计数器,在系统运行时始终递增。

对于 Raspberry PI 3,分配给系统定时器的物理内存地址范围偏移了以下值-0x3F003000(根据 Raspberry PI 硬件版本的不同,可能会有所不同)。 我们把它定义为一个常数。

constexpr uint32_t kTimerBase = 0x3F003000;

要访问区域内的各个字段,我们定义了一个SystemTimer结构:

struct SystemTimer {
  uint32_t CS;
  uint32_t counter_lo;
  uint32_t counter_hi;
};

现在,我们需要获取指向定时器地址范围的指针,并将其转换为指向SystemTimer的指针。 这样,我们就可以通过读取SystemTimer数据字段来访问计数器的地址。

然而,有一个问题我们需要解决。 我们知道物理地址空间中的偏移量,但是我们的 Linux 应用在虚拟地址空间中工作。 我们需要找到一种将物理地址映射到虚拟地址的方法。

Linux 使用特殊的/proc/mem文件提供对物理内存地址的访问。 由于它包含所有物理内存的快照,因此只能通过root访问。

我们使用open函数将其作为常规文件打开:

int memfd = open("/dev/mem", O_RDWR | O_SYNC);

一旦文件打开,并且我们知道它的描述符,我们就可以将其映射到我们的虚拟地址空间。 我们不需要映射整个物理内存。 与计时器相关的区域就足够了;这就是为什么我们将系统计时器范围开始作为偏移量参数,将SystemTimer结构的大小作为大小参数:

SystemTimer *timer = (SystemTimer*)mmap(NULL, sizeof(SystemTimer),
PROT_READ|PROT_WRITE, MAP_SHARED, memfd, kTimerBase);

现在我们可以访问计时器字段了。 我们读取循环中的计时器计数器,并显示其当前值及其与前一个值的方差。 当我们以root身份运行应用时,会得到以下输出:

正如我们所看到的,从该内存地址读取将返回递增的值。 差值在 10,000 左右,而且相当恒定。 由于我们在计数器读取循环中添加了 10 毫秒的延迟,因此我们可以推断内存地址与计时器关联,而不是与常规内存关联,并且计时器计数器的粒度为 1 微秒。

还有更多的..。

Raspberry PI 有许多可通过 MMIO 访问的外部设备。 您可以在位于https://www.raspberrypi.org/documentation/hardware/raspberrypi/bcm2835/BCM2835-ARM-Peripherals.pdfBCM2835ARM 外设手册中找到有关其地址范围和访问语义的详细信息

请注意,开发人员在使用可由多个设备同时访问的内存时必须格外小心。 当内存可由多个处理器或同一处理器的多个内核访问时,您可能需要使用高级同步技术(如内存屏障)来避免同步问题。 我们将在下一章中讨论其中的一些问题。 如果使用直接内存访问(DMA)或 MMIO,事情会变得更加复杂。 由于 CPU 可能不知道外部硬件更改了内存,因此其高速缓存可能不同步,从而导致数据一致性问题。