Skip to content

Latest commit

 

History

History
1444 lines (1084 loc) · 67 KB

File metadata and controls

1444 lines (1084 loc) · 67 KB

十、性能优化

性能是选择 C++ 作为项目编程语言的关键驱动因素之一。现在是时候讨论当我们以功能性风格构建代码时,如何提高性能了。

虽然性能是一个巨大的话题,我们显然不能在一章中完全涵盖,但我们将研究提高性能的关键思想,纯函数式语言如何优化性能,以及如何将这些优化转化为 C++。

本章将涵盖以下主题:

  • 交付绩效的流程
  • 如何使用并行/异步来提高性能
  • 了解什么是尾部递归以及如何激活它
  • 使用函数构造时如何提高内存消耗
  • 功能异步代码

技术要求

您将需要一个支持 C++ 17 的编译器。我用的是 GCC 7.3.0。

代码可以在的 GitHub 上找到。com/ PacktPublishing/动手-函数-用- Cpp 编程Chapter10文件夹中的。它包含并使用了doctest,这是一个单头开源单元测试库。你可以在它的 GitHub 资源库上找到它。com/ onqtam/ doctest

性能优化

谈论性能优化就像谈论披萨。有些人喜欢并搜索菠萝比萨。其他人只吃传统的意大利比萨饼(或来自特定地区)。有的只吃素食披萨,有的喜欢各种披萨。关键是,性能优化与您的代码库和产品息息相关。你在看什么样的表演?对用户来说,性能中最有价值的部分是什么?你需要考虑哪些限制因素?

与我一起工作的客户通常有一些性能要求,具体取决于主题:

  • 嵌入式产品(例如汽车、能源或电信)往往需要在内存限制内工作。堆栈和堆通常很小,因此限制了长期变量的数量。增加内存的成本可能高得令人望而却步(一位客户告诉我们,他们需要 1000 多万欧元才能在所有设备上增加 1 MB 的内存)。因此,程序员需要通过尽可能避免不必要的内存分配来克服这些限制。这可以包括初始化、通过复制传递参数(尤其是较大的结构)、避免需要消耗内存的特定算法等等。
  • 工程应用(例如计算机辅助设计或 CAD)需要在非常大的数据集上使用从数学、物理和工程导出的特定算法,并尽快返回结果。处理通常是在现代个人电脑上完成的,所以内存不成问题;然而,中央处理器是。随着多核处理器、可以接管部分处理的专用 GPU 以及允许在多个强大或专用服务器之间分配工作负载的云技术的出现,开发人员的工作通常会变成在并行和异步环境中优化速度。
  • 桌面游戏和游戏引擎都有自己特别关注的地方。图形必须看起来尽可能好,以便在中低端机器上优雅地缩小比例,并避免滞后。游戏通常会接管它们运行的机器,所以它们只需要与操作系统和系统应用(如反病毒软件或防火墙)争夺资源。它们还可以采用特定级别的图形处理器、中央处理器和内存。优化变成了关于并行性(因为需要多个内核)和避免浪费,以便在整个游戏中保持流畅的体验。
  • 游戏服务器然而,是一个不同的野兽。像暴雪的 Battle.net 这样的服务(我经常作为一名星际争霸 2玩家使用的服务)需要快速响应,即使是在压力下。在云计算时代,使用的服务器数量和它们的能力并不重要;我们可以轻松地放大或缩小它们。主要关注的是尽可能快地响应主要是输入/输出的工作负载。
  • 未来令人振奋。游戏的趋势是将处理转移到服务器,从而允许玩家甚至在低端机器上玩。这将为未来的游戏打开惊人的机会。(用 10 个 GPU 代替一个能做什么?100 块怎么样?)但也会导致需要优化游戏引擎进行服务器端、多机、并行处理。为了远离游戏,物联网行业为嵌入式软件和可扩展的服务器端处理开辟了更多机会。

考虑到所有这些可能性,我们能做些什么来在代码库中提供性能呢?

交付绩效的流程

如您所见,性能优化在很大程度上取决于您试图实现的目标。接下来的步骤可以很快总结如下:

  1. 定义一个明确的绩效目标,包括衡量标准以及如何衡量它们。
  2. 为性能定义一些编码准则。保持它们清晰,并针对代码的特定部分进行定制。
  3. 让代码发挥作用。
  4. 在需要的地方衡量和提高绩效。
  5. 监控和改进。

在我们更详细地研究这些步骤之前,理解性能优化的一个重要注意事项是很重要的——优化有两种类型。第一个来自干净的设计和干净的代码。例如,通过从代码中移除某些类型的相似性,您可能最终会减小可执行文件的大小,从而为数据留出更多空间;数据在代码中的传播可能会减少,从而避免不必要的复制或间接访问;或者,它将允许编译器更好地理解代码,并为您优化代码。从我的经验来看,将代码重构为简单的设计通常也能提高性能。

第二种提高性能的方法是使用点优化。这些都是非常具体的方法,我们可以重写一个函数或流程,让代码更快或更少地工作,通常是针对特定的编译器和平台。生成的代码通常看起来很聪明,但很难理解和更改。

点优化与编写易于更改和维护的代码有着天然的冲突。这导致唐纳德·克努特(Donald Knuth)说过早优化是所有邪恶的根源。这并不意味着我们应该编写明显缓慢的代码,例如通过复制传递大型集合。然而,这确实意味着我们应该首先针对可变性优化设计,然后测量性能,然后优化它,并且只有在绝对必要的情况下才使用点优化。平台中的怪癖、特定的编译器版本或使用的库可能需要不时进行点优化;把它们分开,少用。

现在让我们来看看优化性能的过程。

为绩效定义一个明确的目标,包括指标以及如何衡量它们

如果我们不知道我们要去哪里,我们去哪个方向并不重要——我是在转述《爱丽丝梦游仙境》。因此,我们应该知道我们要去哪里。我们需要一份符合产品需求的性能指标清单。此外,对于每个性能指标,我们需要一个范围来定义什么是指标的良好值,什么是可接受的值。我们来看几个例子。

如果您正在为一个具有 4 MB 内存的设备构建一个嵌入式产品,您可以查看以下指标:

  • 内存消耗:
    • 很好:1-3 MB
    • 好:3-4 MB
  • 设备启动时间:
    • 很好:< 1s
    • 好:1-3 秒

如果你正在构建一个桌面计算机辅助设计应用,通过一个建筑设计来模拟声波,其他的度量标准也很有趣。

模拟声波的计算时间:

  • 对于小房间:
    • 很好:< 1 分钟
    • 良好:< 5 分钟
  • 对于中型房间:
    • 很好:< 2 分钟
    • 好:< 10 分钟

这里的数字只是说明性的;您需要为您的产品找到自己的指标。

有了这些指标和良好/良好的范围,我们就可以在添加新功能后衡量性能,并进行相应的优化。它还允许我们简单地向利益相关者或业务人员解释产品的性能。

为性能定义一些编码准则——保持它们清晰,并针对代码的特定部分进行定制

如果你问 50 个不同的 C++ 程序员优化性能的技巧,你很快就会被建议淹没。如果你开始调查这个建议,会发现有些是过时的,有些是非常具体的,有些是很棒的。

因此,为性能制定编码准则是很重要的,但是有一个警告。C++ 代码库往往非常庞大,因为它们已经开发了很多年。如果你批判性地审视你的代码库,你会发现只有部分代码是性能的瓶颈。举个例子,只有当一个数学运算被多次调用时,计算速度快 1 毫秒才有意义;如果只调用一两次,或者很少调用,就没必要优化了。事实上,下一个版本的编译器或中央处理器在优化方面可能会比你做得更好。

由于这个事实,您应该理解代码的哪些部分对于您定义的性能标准是至关重要的。找出什么样的设计最适合那段特定的代码;有明确的指导方针,并遵循它们。虽然const&在任何地方都很有用,但也许你可以避免浪费开发人员的时间来整理一个只做了一次的非常小的集合。

让代码工作

考虑到这些指导原则,并考虑到要实现的新特性,第一步应该始终是让代码工作。此外,对其进行结构化,以便在您的限制范围内进行更改。不要试图在这里优化性能;同样,编译器和 CPU 可能比你想象的更聪明,做的工作也比你预期的多。了解情况是否如此的唯一方法是衡量绩效。

根据需要衡量和改进绩效

您的代码按照您的指导方针工作和结构化,并针对变化进行了优化。是时候写下一些关于优化它的假设,然后测试它们了。

因为您有明确的性能指标,所以验证它们相对容易。当然,这需要正确的基础设施和适当的测量过程。有了这些,您可以根据您的绩效指标来衡量自己的地位。

这里应该欢迎更多的假设。类似于——如果我们像这样重组这段代码,我预计指标 X 会有所改进。然后,您可以继续并测试您的假设——创建一个分支,更改代码,构建产品,通过性能指标测量过程,并查看结果。当然,它比我说的要复杂——有时它可能需要用不同的编译器、不同的优化选项或统计数据来构建。如果你想做出明智的决定,所有这些都是必要的。最好在度量上投入一些时间,而不是改变代码并使其更难理解。否则,你最终会欠下一笔技术债务,并为此支付长期利息。

然而,如果你必须做点优化,没有解决办法。只要确保尽可能详细地记录它们。既然你之前已经验证了你的假设,你会有很多东西要写,不是吗?

监控和改进

我们从定义性能指标开始循环。是时候结束它了——我们需要监控这些指标(可能还有其他指标),并根据我们学到的知识调整我们的间隔和编码准则。性能优化是一个持续的过程,因为目标设备也在不断发展。

我们已经研究了交付性能的过程,但是这与函数式编程有什么关系呢?哪些用例让功能代码结构闪闪发光,哪些没有那么好用?是时候深入研究我们的代码结构了。

并行性——利用不变性

编写并行运行的代码一直是软件开发中许多痛苦的根源。多线程、多进程或多服务器环境产生的问题似乎从根本上难以解决。死锁、饥饿、数据竞争、锁或调试多线程代码只是让我们这些见过它们的人害怕再次遇到它们的几个术语。然而,由于多核 CPU、GPU 和多服务器,我们不得不面对并行代码。函数式编程对此有帮助吗?

每个人都同意这是函数式编程的优点之一,特别是源自不变性。如果您的数据从未改变,那么就没有锁,并且同步非常简单,可以一般化。如果你只使用纯函数和函数转换(当然除了输入输出),你就可以免费获得并行化(几乎)。

事实上,C++ 17 标准包含了 STL 高级函数的执行策略,允许我们仅用一个参数就可以将算法从顺序改为并行。让我们并行检查一个向量的所有数字是否都大于5。我们只需要将execution::par作为all_of的执行政策:

auto aVector = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
auto all_of_parallel = [&aVector](){
    return all_of(execution::par, aVector.begin(), aVector.end(),  
        [](auto value){return value > 5;});
};

然后,我们可以从chrono命名空间测量使用高分辨率计时器的算法的顺序和并行版本之间的差异,如下所示:

auto measureExecutionTimeForF = [](auto f){
    auto t1 = high_resolution_clock::now();
    f();
    auto t2 = high_resolution_clock::now();
    chrono::nanoseconds duration = t2 - t1;
    return duration;
};

通常情况下,我现在会根据我的实验向您展示执行中的差异。不幸的是,在这种情况下,我不能这样做。在撰写本文时,实现执行策略的编译器只有 MSVC 和英特尔 C++,但两者都不符合标准。但是,如下面的代码片段所示,我在parallelExecution.cpp源文件中编写了代码,允许您在编译器支持该标准时通过取消注释一行来启用它,如下所示:

// At the time when I created this file, only MSVC had implementation  
    for execution policies.
// Since you're seeing this in the future, you can enable the parallel 
    execution code by uncommenting the following line 
//#define PARALLEL_ENABLED

执行此操作时,您将运行的代码将显示顺序和并行运行all_of的相对持续时间,如下所示:

TEST_CASE("all_of with sequential execution policy"){
    auto aVector = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};

    auto all_of_sequential = [&aVector](){
        return all_of(execution::seq, aVector.begin(), aVector.end(), 
            [](auto value){return value > 5;});
    };

    auto sequentialDuration = 
        measureExecutionTimeForF(all_of_sequential);
        cout << "Execution time for sequential policy:" << 
            sequentialDuration.count() << " ns" << endl;

    auto all_of_parallel = [&aVector](){
        return all_of(execution::par, aVector.begin(), aVector.end(), 
            [](auto value){return value > 5;});
    };

    auto parallelDuration = measureExecutionTimeForF(all_of_parallel);
    cout << "Execution time for parallel policy:" <<   
        parallelDuration.count() << " ns" << endl;
}

虽然我很想在这里分析一些执行数据,但这可能是最好的,因为本章最重要的信息是测量,测量,测量,然后优化。希望到时候你会做一些自我测量。

C++ 17 标准支持很多 STL 函数的执行策略,包括sortfindcopytransformreduce。也就是说,如果你正在链接这些函数并使用纯函数,你只需要向所有调用(或bind更高级的函数)传递一个额外的参数来实现并行执行!我甚至可以说,对于任何尝试过自己管理线程或调试奇怪的同步问题的人来说,这就像变魔术一样。事实上,只要编译器支持完整的 C++ 17 标准,我们在前面几章中为井字游戏和扑克之手编写的所有代码都可以轻松切换到并行执行。

但是这是如何工作的呢?all_of在多线程中运行相当容易;它们中的每一个都对集合中的特定元素执行谓词,返回一个布尔值,当第一个谓词返回False时,该过程停止。只有当谓词是纯函数时,这才是可能的;以任何方式修改结果或向量都会造成竞争条件。文档特别声明程序员负责保持谓词函数的纯粹性——不会有警告或编译错误。除了纯粹之外,您的谓词不能假定元素的处理顺序。

如果并行执行策略无法启动(例如,由于缺乏资源),执行将退回到顺序调用。在衡量性能时,这是一件需要记住的有用的事情:如果它比预期的低得多,首先检查程序是否可以并行执行。

该选项对于使用多个 CPU 的计算密集型应用非常有用。如果你对它的内存命中感兴趣,你就必须测量它,因为它取决于你使用的编译器和标准库。

记忆化

纯函数有一个有趣的性质。对于相同的输入值,它们返回相同的输出。这使得它们相当于一个大的值表,其输出值对应于输入参数的每个值组合。有时候,记住这个表的某些部分比做计算更快。这种技术被称为记忆

纯函数式编程语言,以及 Python 和 Groovy 等语言,都有办法在特定的函数调用上实现记忆化,从而提供高级别的控制。不幸的是,C++ 没有这个功能,所以我们必须自己编写。

实现记忆

为了开始我们的实现,我们需要一个函数;理想情况下,计算量很大。让我们选择power功能。一个简单的实现只是标准pow函数的包装器,如下面的代码片段所示:

function<long long(int, int)> power = [](auto base, auto exponent){
    return pow(base, exponent);
};

我们如何开始实施记忆化?嗯,记忆的核心是缓存。每当第一次调用函数时,它会正常运行,但也会将结果与输入值一起存储。在随后的调用中,该函数将在映射中进行搜索,以查看该值是否被缓存,如果被缓存,则返回该值。

这意味着我们将需要一个缓存,它将参数作为关键字,并将计算结果作为值。要将参数组合在一起,我们可以简单地使用一对或一个元组:

tuple<int, int> parameters

因此,缓存将是:

    map<tuple<int, int>, long long> cache;

让我们改变我们的power函数来使用这个缓存。首先,我们需要在缓存中查找一个结果:

    function<long long(int, int)> memoizedPower = [&cache](int base, 
        int exponent){
            tuple<int, int> parameters(base, exponent);
            auto valueIterator = cache.find(parameters);

如果什么都没有找到,我们计算结果并将其存储在缓存中。如果发现了什么,那就是我们返回的值:

        if(valueIterator == cache.end()){
            result = pow(base, exponent);
            cache[parameters] = result;
        } else{
            result = valueIterator -> second;
        }
        return result; 

为了检查这种方法是否工作正常,让我们运行一些测试:

    CHECK_EQ(power(1, 1), memoizedPower(1, 1));
    CHECK_EQ(power(3, 19), memoizedPower(3, 19));
    CHECK_EQ(power(2, 25), memoizedPower(2, 25));

一切正常。现在让我们比较一下 power 的两个版本,在下面的代码片段中有记忆和没有记忆。下面的代码展示了我们如何提取一种更通用的方法来记忆函数:

    function<long long(int, int)> power = [](int base, int exponent){
        return pow(base, exponent);
    };

    map<tuple<int, int>, long long> cache;

    function<long long(int, int)> memoizedPower = [&cache](int base, 
        int exponent){
            tuple<int, int> parameters(base, exponent);
            auto valueIterator = cache.find(parameters);
            long long result;
            if(valueIterator == cache.end()){
 result = pow(base, exponent);
            cache[parameters] = result;
        } else{
            result = valueIterator -> second;
        }
        return result; 
    };

第一个观察结果是,我们可以用对原始幂函数的调用来替换粗线,所以让我们这样做:

    function<long long(int, int)> memoizedPower = [&cache, &power](int 
        base, int exponent){
            tuple<int, int> parameters(base, exponent);
            auto valueIterator = cache.find(parameters);
            long long result;
            if(valueIterator == cache.end()){
 result = power(base, exponent);
            cache[parameters] = result;
        } else{
            result = valueIterator -> second;
        }
        return result; 
    };

如果我们传入在记忆过程中需要调用的函数,我们会得到一个更一般的解:

    auto memoize = [&cache](int base, int exponent, auto 
        functionToMemoize){
            tuple<int, int> parameters(base, exponent);
            auto valueIterator = cache.find(parameters);
            long long result;
            if(valueIterator == cache.end()){
            result = functionToMemoize(base, exponent);
            cache[parameters] = result;
        } else{
            result = valueIterator -> second;
        }
        return result; 
    };

    CHECK_EQ(power(1, 1), memoize(1, 1, power));
    CHECK_EQ(power(3, 19), memoize(3, 19, power));
    CHECK_EQ(power(2, 25), memoize(2, 25, power));

但是返回一个记忆化的函数不是很好吗?我们可以修改我们的memoize函数来接收一个函数,并返回一个被记忆的函数,它接收与初始函数相同的参数:

    auto memoize = [](auto functionToMemoize){
        map<tuple<int, int>, long long> cache;
 return [&](int base, int exponent) {
            tuple<int, int> parameters(base, exponent);
            auto valueIterator = cache.find(parameters);
            long long result;
            if(valueIterator == cache.end()){
                result = functionToMemoize(base, exponent);
                cache[parameters] = result;
            } else{
                result = valueIterator -> second;
            }
            return result; 
            };
    };
    auto memoizedPower = memoize(power);

这种变化最初不起作用——我遇到了分割错误。原因是我们正在更改 lambda 内部的缓存。为了使它工作,我们需要使 lambda 可变并按值捕获:

    auto memoize = [](auto functionToMemoize){
        map<tuple<int, int>, long long> cache;
 return [=](int base, int exponent) mutable {
            tuple<int, int> parameters(base, exponent);
            auto valueIterator = cache.find(parameters);
            long long result;
            if(valueIterator == cache.end()){
                result = functionToMemoize(base, exponent);
                cache[parameters] = result;
            } else{
                result = valueIterator -> second;
            }
            return result; 
            };
    };

我们现在有了一个函数,它可以记住任何带有两个整数参数的函数。借助几个类型参数,很容易使它更通用。我们需要返回值的类型、第一个参数的类型和第二个参数的类型:

template<typename ReturnType, typename FirstArgType, typename 
    SecondArgType>
auto memoizeTwoParams = [](function<ReturnType(FirstArgType, SecondArgType)> functionToMemoize){
    map<tuple<FirstArgType, SecondArgType>, ReturnType> cache;
    return [=](FirstArgType firstArg, SecondArgType secondArg) mutable {
        tuple<FirstArgType, SecondArgType> parameters(firstArg, 
    secondArg);
        auto valueIterator = cache.find(parameters);
        ReturnType result;
        if(valueIterator == cache.end()){
            result = functionToMemoize(firstArg, secondArg);
            cache[parameters] = result;
        } else{
            result = valueIterator -> second;
        }
        return result; 
    };
};

对于任何有两个参数的函数,我们都得到了一个记忆函数。我们可以做得更好。C++ 允许我们使用带有不确定数量的类型参数的模板——所谓的变量模板。利用它们的魔力,我们最终得到了一个记忆化的实现,它可以与具有任意数量参数的任何函数一起工作:

template<typename ReturnType, typename... Args>
function<ReturnType(Args...)> memoize(function<ReturnType(Args...)> f){
    map<tuple<Args...>, ReturnType> cache;
    return ([=](Args... args) mutable  {
            tuple<Args...> theArguments(args...);
            auto cached = cache.find(theArguments);
            if(cached != cache.end()) return cached -> second;
            auto result = f(args...);
            cache[theArguments] = result;
            return result;
    });
};

该函数有助于缓存任何其他函数;然而,有一个问题。直到现在,我们一直在使用权力的包装实现。下面是一个例子,说明如果我们编写自己的代码会是什么样子:

function<long long(int, int)> power = [&](auto base, auto exponent) 
{
    return (exponent == 0) ? 1 : base * power(base, exponent - 1);
};

记住这个函数只会缓存最终结果。然而,该函数是递归的,对我们的memoize函数的调用不会记住递归的中间结果。为此,我们需要告诉我们的记忆化幂函数不要调用幂函数,而要调用记忆化power函数。

不幸的是,没有简单的方法可以做到这一点。我们可以将递归调用的函数作为参数传递,但是由于实现的原因,这将改变原始的函数签名。或者我们可以重写函数来利用记忆。

尽管如此,我们最终还是找到了一个很好的解决方案。让我们来测试一下。

使用记忆

让我们使用measureExecutionTimeForF函数来测量对我们的power函数进行各种调用所需的时间。也是时候想想我们期待的结果了。我们确实缓存了重复调用的值,但是这需要在每次调用函数时都有自己的处理和内存。所以,也许会有帮助,也许不会。除非我们尝试,否则我们不会知道:

TEST_CASE("Pow vs memoized pow"){
    function<int(int, int)> power = [](auto first, auto second){
        return pow(first, second);
    };

    cout << "Computing pow" << endl;
    printDuration("First call no memoization: ",  [&](){ return 
        power(5, 24);});
    printDuration("Second call no memoization: ", [&](){return power(3, 
        1024);});
    printDuration("Third call no memoization: ", [&](){return power(9, 
        176);});
    printDuration("Fourth call no memoization (same as first call): ", 
        [&](){return power(5, 24);});

    auto powerWithMemoization = memoize(power);
    printDuration("First call with memoization: ",  [&](){ return 
        powerWithMemoization(5, 24);});
    printDuration("Second call with memoization: ", [&](){return 
        powerWithMemoization(3, 1024);});
    printDuration("Third call with memoization: ", [&](){return 
        powerWithMemoization(9, 176);});
    printDuration("Fourth call with memoization (same as first call): 
        ", [&](){return powerWithMemoization(5, 24);});
    cout << "DONE computing pow" << endl;

    CHECK_EQ(power(5, 24),  powerWithMemoization(5, 24));
    CHECK_EQ(power(3, 1024),  powerWithMemoization(3, 1024));
    CHECK_EQ(power(9, 176),  powerWithMemoization(9, 176));
}

这段代码用相同的值调用power函数,最后一次调用返回第一个值。然后它继续做同样的事情,但是在创建了power的记忆版本之后。最后,进行健全性检查——比较power函数和记忆化power函数的结果,以确保我们在memoize函数中没有错误。

问题是——记忆是否改善了执行系列中最后一个调用的时间(与系列中第一个调用完全相同)?在我的配置中,结果是混合的,如下面的代码片段所示:

Computing pow
First call no memoization: 26421 ns
Second call no memoization: 5207 ns
Third call no memoization: 2058 ns
Fourth call no memoization (same as first call): 179 ns
First call with memoization: 2380 ns
Second call with memoization: 2207 ns
Third call with memoization: 1539 ns
Fourth call with memoization (same as first call): 936 ns
DONE computing pow

或者,为了更好地查看(首先是没有记忆的调用),有以下内容:

First call: 26421 ns > 2380 ns
Second call: 5207 ns > 2207 ns
Third call: 2058 ns > 1539 ns
Fourth call: 179 ns < 936 ns

总的来说,记忆化的调用更好,除了我们重复第一次调用的时候。当然,当重复运行测试时,结果会有所不同,但这表明提高性能并不像仅仅使用缓存那么容易。幕后发生了什么?我认为最有可能的解释是另一种缓存机制在起作用——CPU 或其他。

如果有什么不同的话,这证明了测量的重要性。毫不奇怪,CPU 和编译器已经做了相当多的优化,我们只能在代码中做这么多。

如果我们尝试递归记忆呢?我重写了power函数来递归使用 memoization,它将缓存和递归调用混合在一起。下面是代码:

    map<tuple<int, int>, long long> cache;
    function<long long(int, int)> powerWithMemoization = [&](auto base, 
        auto exponent) -> long long{
            if(exponent == 0) return 1;
            long long value;

            tuple<int, int> parameters(base, exponent);
            auto valueIterator = cache.find(parameters);
            if(valueIterator == cache.end()){
            value = base * powerWithMemoization(base, exponent - 1);
            cache[parameters] = value;
            } else {
            value = valueIterator->second;
        };
        return value;
    };

当我们运行它时,结果如下:

Computing pow
First call no memoization: 1761 ns
Second call no memoization: 106994 ns
Third call no memoization: 8718 ns
Fourth call no memoization (same as first call): 1395 ns
First call with recursive memoization: 30921 ns
Second call with recursive memoization: 2427337 ns
Third call with recursive memoization: 482062 ns
Fourth call with recursive memoization (same as first call): 1721 ns
DONE computing pow

或者,在压缩视图中(首先是没有记忆的调用),有以下内容:

First call: 1761 ns < 30921 ns
Second call: 106994 ns < 2427337 ns
Third call: 8718 ns < 482062 ns
Fourth call: 1395 ns < 1721 ns

如您所见,构建缓存的时间非常长。然而,它为重复调用提供了回报,但在这种情况下,它仍然无法击败中央处理器和编译器的优化。

记忆有帮助吗?当我们使用一个更复杂的函数时,它就会发生。接下来让我们试着计算两个数的阶乘之差。我们将使用阶乘的简单实现,我们将尝试首先记住阶乘函数,然后计算差的函数。为了保持一致,我们将使用与之前相同的数字对。让我们看看下面代码片段中的代码:

TEST_CASE("Factorial difference vs memoized"){
    function<int(int)> fact = [&fact](int n){
        if(n == 0) return 1;
        return n * fact(n-1);
    };

    function<int(int, int)> factorialDifference = [&fact](auto first, 
        auto second){
            return fact(second) - fact(first);
    };
    cout << "Computing factorial difference" << endl;
    printDuration("First call no memoization: ",  [&](){ return 
        factorialDifference(5, 24);});
    printDuration("Second call no memoization: ", [&](){return 
        factorialDifference(3, 1024);});
    printDuration("Third call no memoization: ", [&](){return 
        factorialDifference(9, 176);});
    printDuration("Fourth call no memoization (same as first call): ", 
        [&](){return factorialDifference(5, 24);});

    auto factWithMemoization = memoize(fact);
    function<int(int, int)> factorialMemoizedDifference = 
        [&factWithMemoization](auto first, auto second){
        return factWithMemoization(second) - 
            factWithMemoization(first);
    };
    printDuration("First call with memoized factorial: ",  [&](){ 
        return factorialMemoizedDifference(5, 24);});
    printDuration("Second call with memoized factorial: ", [&](){return 
        factorialMemoizedDifference(3, 1024);});
    printDuration("Third call with memoized factorial: ", [&](){return 
        factorialMemoizedDifference(9, 176);});
    printDuration("Fourth call with memoized factorial (same as first 
        call): ", [&](){return factorialMemoizedDifference(5, 24);});

    auto factorialDifferenceWithMemoization = 
        memoize(factorialDifference);
    printDuration("First call with memoization: ",  [&](){ return 
        factorialDifferenceWithMemoization(5, 24);});
    printDuration("Second call with memoization: ", [&](){return 
        factorialDifferenceWithMemoization(3, 1024);});
    printDuration("Third call with memoization: ", [&](){return 
        factorialDifferenceWithMemoization(9, 176);});
    printDuration("Fourth call with memoization (same as first call): 
        ", [&](){return factorialDifferenceWithMemoization(5, 24);});

    cout << "DONE computing factorial difference" << endl;

    CHECK_EQ(factorialDifference(5, 24),  
        factorialMemoizedDifference(5, 24));
    CHECK_EQ(factorialDifference(3, 1024),  
        factorialMemoizedDifference(3, 1024));
    CHECK_EQ(factorialDifference(9, 176),        
        factorialMemoizedDifference(9, 176));

    CHECK_EQ(factorialDifference(5, 24),  
        factorialDifferenceWithMemoization(5, 24));
    CHECK_EQ(factorialDifference(3, 1024),  
        factorialDifferenceWithMemoization(3, 1024));
    CHECK_EQ(factorialDifference(9, 176),  
        factorialDifferenceWithMemoization(9, 176));
}

结果如何?让我们首先看看普通函数和使用记忆化阶乘的函数之间的区别:

Computing factorial difference
First call no memoization: 1727 ns
Second call no memoization: 79908 ns
Third call no memoization: 8037 ns
Fourth call no memoization (same as first call): 1539 ns
First call with memoized factorial: 4672 ns
Second call with memoized factorial: 41183 ns
Third call with memoized factrorial: 10029 ns
Fourth call with memoized factorial (same as first call): 1105 ns

让我们再一次并排比较它们:

First call: 1727 ns < 4672 ns
Second call: 79908 ns > 41183 ns
Third call: 8037 ns < 10029 ns
Fourth call: 1539 ns > 1105 ns

虽然其他调用的结果是混合的,但是当达到缓存值时,memoized 函数比非 memoized 函数有大约 20%的改进。这似乎是一个小小的改进,因为阶乘是递归的,所以,理论上,记忆应该有很大的帮助。然而,我们没有记住递归。相反,阶乘函数仍在递归调用非 memoized 版本。我们稍后再来讨论这个问题;现在,让我们来看看记忆factorialDifference功能时会发生什么:

First call no memoization: 1727 ns
Second call no memoization: 79908 ns
Third call no memoization: 8037 ns
Fourth call no memoization (same as first call): 1539 ns
First call with memoization: 2363 ns
Second call with memoization: 39700 ns
Third call with memoization: 8678 ns
Fourth call with memoization (same as first call): 704 ns

让我们一起看看结果:

First call: 1727 ns < 2363 ns
Second call: 79908 ns > 39700 ns
Third call: 8037 ns < 8678 ns
Fourth call: 1539 ns > 704 ns

在缓存值上,内存化版本的速度是非内存化版本的两倍!这是巨大的!然而,当我们没有缓存该值时,我们会用性能上的提升来买单。还有,第二次通话时发生了一些奇怪的事情;某种缓存可能会干扰我们的结果。

我们能通过优化阶乘函数的所有递归来改善这一点吗?让我想想。我们需要更改我们的阶乘函数,以便缓存适用于每个调用。为此,我们需要递归调用 memoized 阶乘函数,而不是普通的阶乘函数,如下所示:

    map<int, int> cache;
    function<int(int)> recursiveMemoizedFactorial = 
        [&recursiveMemoizedFactorial, &cache](int n) mutable{
        auto value = cache.find(n); 
        if(value != cache.end()) return value->second;
        int result;

        if(n == 0) 
            result = 1;
        else 
            result = n * recursiveMemoizedFactorial(n-1);

        cache[n] = result;
        return result;
    };

我们使用差分函数,它递归地记住对阶乘的两次调用:

    function<int(int, int)> factorialMemoizedDifference =  
        [&recursiveMemoizedFactorial](auto first, auto second){
                return recursiveMemoizedFactorial(second) -  
                    recursiveMemoizedFactorial(first);
    };

通过并行运行没有记忆的初始函数和具有相同数据的前一个函数,我得到了以下输出:

Computing factorial difference
First call no memoization: 1367 ns
Second call no memoization: 58045 ns
Third call no memoization: 16167 ns
Fourth call no memoization (same as first call): 1334 ns
First call with recursive memoized factorial: 16281 ns
Second call with recursive memoized factorial: 890056 ns
Third call with recursive memoized factorial: 939 ns
Fourth call with recursive memoized factorial (same as first call): 798 ns 

我们可以一起看看这个:

First call: 1,367 ns < 16,281 ns
Second call: 58,045 ns < 890,056 ns Third call: 16,167 ns > 939 ns Fourth call: 1,334 ns > 798 ns

正如我们所看到的,缓存正在积累,第一次大型计算就有大量的惩罚命中;第二个电话涉及 1024!但是,由于缓存命中,后续调用要快得多。

总之,我们可以说,当有足够的可用内存时,记忆对于加速重复的复杂计算是有用的。这可能需要一些调整,因为缓存大小和缓存命中取决于对函数的调用次数和重复调用次数。所以,不要认为这是理所当然的——衡量,衡量,衡量。

尾部递归优化

递归算法在函数编程中非常常见。事实上,我们的许多命令循环可以用纯函数重写为递归算法。

然而,递归在命令式编程中不是很流行,因为它有一些问题。首先,与命令式循环相比,开发人员倾向于较少练习递归算法。第二,可怕的堆栈溢出——默认情况下,递归调用放在堆栈上,如果迭代次数太多,堆栈就会溢出一个难看的错误。

幸运的是,编译器很聪明,可以为我们解决这个问题,同时优化递归函数。进入尾部递归优化。

让我们来看看一个简单的递归函数。我们将重用上一节中的阶乘,如下所示:

    function<int(int)> fact = [&fact](int n){
        if(n == 0) return 1;
        return n * fact(n-1);
    };

通常,每个调用都会放在堆栈上,因此您的堆栈会随着每个调用而增长。让我们想象一下:

Stack content fact(1024)
1024 * fact(1023)
1023 * fact(1022)
...
1 * fact(0)
fact(0) = 1 => unwind the stack

我们可以通过重写代码来避免堆栈。我们注意到递归调用出现在最后;因此,我们可以重写函数,类似于下面的伪代码:

    function<int(int)> fact = [&fact](int n){
        if(n == 0) return 1;
        return n * (n-1) * (n-1-1) * (n-1-1-1) * ... * fact(0);
    };

简而言之,如果我们启用正确的优化标志,这就是编译器可以为我们做的。这种调用不仅占用更少的内存,避免堆栈溢出,而且速度更快。

到现在,你应该知道不要相信任何人的主张——包括我的——而不去衡量它们。那么,让我们来验证这个假设。

首先,我们需要一个测试来测量多次调用阶乘函数的时间。我选择了一些值来进行测试:

TEST_CASE("Factorial"){
    function<int(int)> fact = [&fact](int n){
        if(n == 0) return 1;
        return n * fact(n-1);
    };

    printDuration("Duration for 0!: ", [&](){return fact(0);});
    printDuration("Duration for 1!: ", [&](){return fact(1);});
    printDuration("Duration for 10!: ", [&](){return fact(10);});
    printDuration("Duration for 100!: ", [&](){return fact(100);});
    printDuration("Duration for 1024!: ", [&](){return fact(1024);});
}

然后,我们需要在禁用和启用优化的情况下编译这个函数。优化尾部递归的 GNU 编译器集合 ( GCC )标志为-foptimize-sibling-calls;该名称指的是这样一个事实,即该标志优化了同级调用和尾部调用。我不会详细讨论兄弟调用优化的作用;这么说吧,这丝毫不影响我们的测试。

是时候运行这两个程序了。首先,让我们看看原始输出:

  • 这是没有优化的程序:
Duration for 0!: 210 ns
Duration for 1!: 152 ns
Duration for 10!: 463 ns
Duration for 100!: 10946 ns
Duration for 1024!: 82683 ns
  • 这是带有优化的程序:
Duration for 0!: 209 ns
Duration for 1!: 152 ns
Duration for 10!: 464 ns
Duration for 100!: 6455 ns
Duration for 1024!: 75602 ns

现在让我们一起看看结果;没有优化的持续时间在左边:

Duration for 0!: 210 ns > 209 ns
Duration for 1!: 152 ns  = 152 ns
Duration for 10!: 463 ns < 464 ns
Duration for 100!: 10946 ns > 6455 ns
Duration for 1024!: 82683 ns > 75602 ns

看来优化真的在我的机器上实现了更大的价值。这再一次证明了无论什么时候性能都很重要。

在接下来的部分中,我们将以各种方式对代码进行实验,并测量结果。

完全优化的呼叫

出于好奇,我决定在打开所有安全优化标志的情况下运行同一个程序。在 GCC 中,这个选项是-O3。至少可以说,结果是惊人的:

Duration for 0!: 128 ns
Duration for 1!: 96 ns
Duration for 10!: 96 ns
Duration for 100!: 405 ns
Duration for 1024!: 17249 ns

让我们将启用所有优化标志(下一个片段中的第二个值)的结果与仅尾部递归优化的结果进行比较:

Duration for 0!: 209 ns > 128 ns
Duration for 1!: 152 ns > 96 ns
Duration for 10!: 464 ns > 96 ns
Duration for 100!: 6455 ns > 405 ns
Duration for 1024!: 75602 ns > 17249 ns

正如你所看到的,差别是惊人的。结论是,虽然尾部递归优化是有用的,但更好的是让 CPU 缓存命中和所有好东西都由编译器启用。

但是我们使用的是if语句;当我们使用?:运算符时,这是否会有所不同?

If vs?:

出于好奇,我决定使用?:运算符而不是if语句重写代码,如下所示:

    function<int(int)> fact = [&fact](int n){
        return (n == 0) ? 1 : (n * fact(n-1));
    };

我不知道会发生什么,结果很有趣。让我们看看原始输出:

  • 没有优化标志:
Duration for 0!: 633 ns
Duration for 1!: 561 ns
Duration for 10!: 1441 ns
Duration for 100!: 20407 ns
Duration for 1024!: 215600 ns
  • 尾部递归标志打开时:
Duration for 0!: 277 ns
Duration for 1!: 214 ns
Duration for 10!: 578 ns
Duration for 100!: 9573 ns
Duration for 1024!: 81182 ns

让我们来看看结果的比较;没有优化的持续时间排在第一位:

Duration for 0!: 633 ns > 277 ns
Duration for 1!: 561 ns > 214 ns
Duration for 10!: 1441 ns > 578 ns
Duration for 100!: 20407 ns > 9573 ns
Duration for 1024!: 75602 ns > 17249 ns

两个版本的差别非常大,这是我没有完全预料到的。和往常一样,这很可能是 GCC 编译器的结果,您应该自行测试。然而,这个版本似乎更适合我的编译器的尾部优化——至少可以说是一个有趣的结果。

双递归

尾部递归对双递归有效吗?我们需要想出一个例子,将递归从一个函数传递到另一个函数来检查这一点。我决定写两个函数,f1f2,递归调用对方。f1将当前参数乘以f2(n - 1 ),而f2f1(n)加到f1(n-1)。下面是代码:

    function<int(int)> f2;
    function<int(int)> f1 = [&f2](int n){
        return (n == 0) ? 1 : (n * f2(n-1));
    };

    f2 = [&f1](int n){
        return (n == 0) ? 2 : (f1(n) + f1(n-1));
    };

让我们用从08的值来检查呼叫f1的时间:

    printDuration("Duration for f1(0): ", [&](){return f1(0);});
    printDuration("Duration for f1(1): ", [&](){return f1(1);});
    printDuration("Duration for f1(2): ", [&](){return f1(2);});
    printDuration("Duration for f1(3): ", [&](){return f1(3);});
    printDuration("Duration for f1(4): ", [&](){return f1(4);});
    printDuration("Duration for f1(5): ", [&](){return f1(5);});
    printDuration("Duration for f1(6): ", [&](){return f1(6);});
    printDuration("Duration for f1(7): ", [&](){return f1(7);});
    printDuration("Duration for f1(8): ", [&](){return f1(8);});

以下是我们获得的信息:

  • 没有尾部调用优化:
Duration for f1(0): 838 ns
Duration for f1(1): 825 ns
Duration for f1(2): 1218 ns
Duration for f1(3): 1515 ns
Duration for f1(4): 2477 ns
Duration for f1(5): 3919 ns
Duration for f1(6): 5809 ns
Duration for f1(7): 9354 ns
Duration for f1(8): 14884 ns
  • 通过呼叫优化:
Duration for f1(0): 206 ns
Duration for f1(1): 327 ns
Duration for f1(2): 467 ns
Duration for f1(3): 642 ns
Duration for f1(4): 760 ns
Duration for f1(5): 1155 ns
Duration for f1(6): 2023 ns
Duration for f1(7): 3849 ns
Duration for f1(8): 4986 ns

让我们一起来看看结果;没有尾部优化的呼叫持续时间在左边:

f1(0): 838 ns > 206 ns
f1(1): 825 ns > 327 ns
f1(2): 1218 ns > 467 ns
f1(3): 1515 ns > 642 ns
f1(4): 2477 ns > 760 ns
f1(5): 3919 ns > 1155 ns
f1(6): 5809 ns > 2023 ns
f1(7): 9354 ns > 3849 ns
f1(8): 14884 ns > 4986 ns

差异确实非常大,表明代码得到了极大的优化。但是,请记住,对于 GCC,我们使用的是-foptimize-sibling-calls优化标志。此标志执行两种类型的优化:尾部调用和同级调用。同级调用是对具有相同大小的返回类型和相同总大小的参数列表的函数的调用,因此允许编译器用尾部调用类似地处理它们。很有可能,在我们的例子中,两种优化都被应用了。

用异步代码优化执行时间

当我们有多个线程时,我们可以使用两种关闭技术来优化执行时间:并行执行和异步执行。我们在前面的章节中已经看到了并行执行是如何工作的;异步调用呢?

首先,让我们提醒自己什么是异步调用。我们想打一个电话,在主线程上正常继续,并在未来的某个时候得到结果。对我来说,这听起来是一份完美的职能工作。我们只需要调用函数,让它们执行,过一会儿再和它们对话。

既然我们已经谈到了未来,那么我们就来谈谈 C++ 中的future构造。

期货

我们已经确定,避免管理程序中的线程是理想的,除非是在做非常专业的工作时,但是我们需要并行执行,并且经常需要同步来从另一个线程获得结果。一个典型的例子是一个长计算,它会阻塞主线程,除非我们在它自己的线程中运行它。我们如何知道计算何时完成,如何得到计算结果?

在 1976-1977 年,计算机科学中提出了两个概念来简化这个问题的解决方案——未来和承诺。虽然这些概念在各种技术中经常互换使用,但在 C++ 中,它们有特定的含义:

  • 未来可以在处理同步的同时从提供程序中检索值
  • 承诺存储未来的价值,此外还提供同步点

由于其性质,一个future对象在 C++ 中是有限制的。它不能复制,只能移动,并且只有在与共享状态相关联时才有效。这意味着我们只能通过调用asyncpromise.get_future()packaged_task.get_future()来创建一个有效的未来对象。

还值得一提的是,承诺和未来在实现中使用线程库;因此,您可能需要向另一个库添加依赖项。在我的系统(Ubuntu 18.04,64 位)上,用 g++ 编译的时候,不得不向pthread库添加链接依赖;如果您在 mingw 或 cygwin 配置上使用 g++ 的话,我想您也会需要同样的东西。

我们先来看看如何串联使用futurepromise。首先,我们将为一个秘密信息创建一个promise:

    promise<string> secretMessagePromise;

然后,让我们创建一个future并使用它启动一个新线程。该线程将使用一个简单打印秘密消息的 lambda:

    future<string> secretMessageFuture = 
        secretMessagePromise.get_future();
    thread isPrimeThread(printSecretMessage, ref(secretMessageFuture));

注意我们需要避免复制future;在这种情况下,我们使用未来的引用包装器。

我们暂时坚持这条线索;接下来的事情就是履行承诺,也就是设定一个值:

    secretMessagePromise.set_value("It's a secret");
    isPrimeThread.join();

同时,另一个线程会做一些事情,然后会要求我们遵守诺言。嗯,不完全是;它会询问promise的值,这会阻止它,直到调用join():

auto printSecretMessage = [](future<string>& secretMessageFuture) {
    string secretMessage = secretMessageFuture.get();
    cout << "The secret message: " << secretMessage << '\n';
};

正如您可能注意到的,这个方法设置了在主线程中计算值的责任。如果我们想让它在第二线程上呢?我们只需要使用async

假设我们想检查一个数是否是质数。我们首先编写一个 lambda,它将以一种天真的方式检查从2x-1的每个可能除数,并检查x是否能被它整除。如果它不能被任何值整除,它就是一个质数:

auto is_prime = [](int x) {
    auto xIsDivisibleBy = bind(isDivisibleBy, x, _1);
    return none_of_collection(
            rangeFrom2To(x - 1), 
            xIsDivisibleBy
        );
};

使用了几个辅助 lambdas。一个用于生成这样的范围:

auto rangeFromTo = [](const int start, const int end){
    vector<int> aVector(end);
    iota(aVector.begin(), aVector.end(), start);
    return aVector;
};

然后,它专门用于生成以2开始的范围:

auto rangeFrom2To = bind(rangeFromTo, 2, _1);

然后,检查两个数字是否可分的谓词:

auto isDivisibleBy = [](auto value, auto factor){
    return value % factor == 0;
};

为了在独立于主线程的线程中运行这个函数,我们需要使用async声明一个future:

    future<bool> futureIsPrime(async(is_prime, 2597));

async的第二个参数是我们函数的输入参数。允许多个参数。

然后,我们可以做其他事情,最后,询问结果:

TEST_CASE("Future with async"){
    future<bool> futureIsPrime(async(is_prime, 7757));
    cout << "doing stuff ..." << endl;
 bool result = futureIsPrime.get();

    CHECK(result);
}

加粗的代码行标记了主线程停止等待辅助线程结果的时间点。

如果需要多个future,可以使用。在下面的例子中,我们将在四个不同的线程中用四个不同的值运行is_prime,如下所示:

TEST_CASE("more futures"){
    future<bool> future1(async(is_prime, 2));
    future<bool> future2(async(is_prime, 27));
    future<bool> future3(async(is_prime, 1977));
    future<bool> future4(async(is_prime, 7757));

    CHECK(future1.get());
    CHECK(!future2.get());
    CHECK(!future3.get());
    CHECK(future4.get());
}

功能异步代码

我们已经看到线程最简单的实现是 lambda,但是我们可以做得更多。最后一个示例使用多个线程对不同的值异步运行相同的操作,它可以变成一个功能性的高阶函数。

但是让我们从几个简单的循环开始。首先,我们将输入值和预期结果转换成向量:

    vector<int> values{2, 27, 1977, 7757};
    vector<bool> expectedResults{true, false, false, true};

然后,我们需要一个for循环来创建期货。重要的是不要调用future()构造函数,因为这将由于试图将新构造的future对象复制到容器中而失败。相反,将async()的结果直接添加到容器中:

    vector<future<bool>> futures;
    for(auto value : values){
        futures.push_back(async(is_prime, value));
    }

然后,我们需要从线程中获取结果。同样,我们需要避免复制future,所以我们将在迭代时使用引用:

    vector<bool> results;
    for(auto& future : futures){
        results.push_back(future.get());
    }

让我们看看整个测试:

TEST_CASE("more futures with loops"){
    vector<int> values{2, 27, 1977, 7757};
    vector<bool> expectedResults{true, false, false, true};

    vector<future<bool>> futures;
    for(auto value : values){
        futures.push_back(async(is_prime, value));
    }

    vector<bool> results;
    for(auto& future : futures){
        results.push_back(future.get());
    }

    CHECK_EQ(results, expectedResults);
}

很明显,我们可以把它变成一些转换调用。但是,我们需要特别注意避免期货的复制。首先,我创建了一个有助于创建future的 lambda:

    auto makeFuture = [](auto value){
        return async(is_prime, value);
    };

第一个for循环然后变成一个transformAll调用:

    vector<future<bool>> futures = transformAll<vector<future<bool>>>
       (values, makeFuture);

第二部分比预期的要棘手。我们对transformAll的实现不起作用,所以我将把transform改为内联调用:

    vector<bool> results(values.size());
    transform(futures.begin(), futures.end(), results.begin(), []
        (future<bool>& future){ return future.get();});

我们最终通过了以下测试:

TEST_CASE("more futures functional"){
    vector<int> values{2, 27, 1977, 7757};

    auto makeFuture = [](auto value){
        return async(is_prime, value);
    };

    vector<future<bool>> futures = transformAll<vector<future<bool>>>
        (values, makeFuture);
    vector<bool> results(values.size());
    transform(futures.begin(), futures.end(), results.begin(), []
        (future<bool>& future){ return future.get();});

    vector<bool> expectedResults{true, false, false, true};

    CHECK_EQ(results, expectedResults);
}

老实说,这是迄今为止最难正确实现的代码。在从事期货工作时,会有很多事情出错,原因并不明显。这些错误消息毫无帮助,至少对我的 g++ 版本来说是如此。正如我在本节中向您展示的那样,我成功完成这项工作的唯一方法是一步一步来。

然而,这个代码示例显示了一个重要事实;通过对 futures 的深入思考和测试,我们可以并行化高阶函数。因此,如果您需要更好的性能,可以使用多个内核,并且不能等待标准中并行运行策略的实现,这是一个可能的解决方案。如果只是为了这个,我想我的努力是有用的!

既然我们讨论的是异步调用,我们也可以快速浏览一下反应式编程的世界。

反应式编程的味道

反应式编程是一种专注于处理数据流的代码编写范式。想象一下,必须分析一系列温度值,来自安装在自动驾驶汽车上的传感器的值,或者分享特定公司的值。在反应式编程中,我们接收这种连续的数据流并运行分析它的函数。由于新数据可能不可预测地到达流中,编程模型必须是异步的;也就是说,主线程持续等待新数据,当它到达时,处理被委托给辅助流。结果通常也是异步收集的——要么被推送到用户界面,保存在数据存储中,要么被传递给其他数据流。

我们已经看到函数式编程的主要焦点是数据。因此,函数式编程是处理实时数据流的一个很好的候选对象,这并不奇怪。mapreducefilter等高阶函数的可组合性,加上并行处理的机会,使得函数式设计成为反应式编程的绝佳解决方案。

我们不会详细讨论反应式编程。通常,使用特定的库或框架来促进这种数据流处理的实现,但是使用我们到目前为止拥有的元素,我们可以编写一个小规模的示例。

我们需要一些东西。首先,数据流;第二,接收数据并立即将其传递到处理流水线的主线程;第三,获取输出的方法。

对于这个例子的目标,我将简单地使用标准输入作为输入流。我们将从键盘输入数字,并以反应的方式检查它们是否是质数,从而保持主线程始终响应。这意味着我们将使用async功能为从键盘上读取的每个数字创建一个future。输出将被简单地写入输出流。

我们将使用与之前相同的is_prime函数,但是添加另一个函数,无论值是否为质数,都会打印到标准输出中:

auto printIsPrime = [](int value){
    cout << value << (is_prime(value) ? " is prime" : " is not prime")  
    << endl;
};

main函数是一个无限循环,它从输入流中读取数据,并在每次有新值进入时启动一个future:

int main(){
    int number;

    while(true){
        cin >> number;
        async(printIsPrime, number);
    }
}

用一些随机类型的值运行此代码会产生以下输出:

23423
23423 is not prime
453576
453576 is not prime
53
53 is prime
2537
2537 is not prime
364544366
5347
54
534532
436
364544366 is not prime
5347 is prime
54 is not prime
534532 is not prime
436 is not prime

如您所见,结果会尽快返回,但程序允许随时引入新数据。

不得不提的是,为了避免每次编译本章代码时出现无限循环,反应式示例可以用make reactive编译运行。因为它是一个无限循环,所以你必须用一个中断来停止它。

这是一个基本的反应式编程示例。随着数据量的增加、复杂的流水线以及每个流水线的并行化,它显然会变得更加复杂。然而,我们实现了本节的目标——让您体验反应式编程,以及我们如何使用函数构造和异步调用来使其工作。

我们已经讨论了很多关于优化执行时间的问题,探讨了帮助我们实现更快性能的各种方法。现在是时候看看我们想要减少程序内存使用的情况了。

优化内存使用

到目前为止,我们讨论的以函数方式构造代码的方法包括多次传递被视为不可变的集合。因此,这可能会导致集合的副本。例如,让我们看一个简单的代码示例,它使用transform来增加向量的所有元素:

template<typename DestinationType>
auto transformAll = [](const auto source, auto lambda){
    DestinationType result;
    transform(source.begin(), source.end(), back_inserter(result), 
        lambda);
    return result;
};

TEST_CASE("Memory"){
    vector<long long> manyNumbers(size);
    fill_n(manyNumbers.begin(), size, 1000L);

    auto result = transformAll<vector<long long>>(manyNumbers, 
        increment);

    CHECK_EQ(result[0], 1001);
}

这种实现会导致大量内存分配。首先将manyNumbers向量复制到transformAll中。然后,result.push_back()被自动调用,可能导致内存分配。最后,result被返回,但是初始的manyNumbers向量仍然被分配。

我们可以立即改善其中的一些问题,但也值得讨论它们与其他可能的优化相比如何。

为了执行测试,我们需要处理大型集合和一种方法来测量进程的内存分配。第一部分很简单——只需分配大量 64 位值(我的编译器上的长整型);足以分配 1 GB 内存:

const long size_1GB_64Bits = 125000000;
TEST_CASE("Memory"){
    auto size = size_1GB_64Bits;
    vector<long long> manyNumbers(size);
    fill_n(manyNumbers.begin(), size, 1000L);

    auto result = transformAll<vector<long long>>(manyNumbers, 
        increment);

    CHECK_EQ(result[0], 1001);
}

第二部分有点难。幸运的是,在我的 Ubuntu 18.04 系统上,我可以在/proc/PID/status中的一个文件中查看一个进程的内存,其中 PID 是进程标识符。有了一点 Bash 魔法,我可以创建一个makefile配方,将每 0.1 s 获取的内存值输出到一个文件中,如下所示:

memoryConsumptionNoMoveIterator: .outputFolder 
    g++ -DNO_MOVE_ITERATOR -std=c++ 17 memoryOptimization.cpp -Wall -
        Wextra -Werror -o out/memoryOptimization
    ./runWithMemoryConsumptionMonitoring memoryNoMoveIterator.log

你会注意到-DNO_MOVE_ITERATOR的论点;这是一个编译指令,允许我为不同的目标编译同一个文件,以便检查多个解决方案的内存占用。这意味着我们之前的测试是在#if NO_MOVE_ITERATOR指令中编写的。

只有一个警告——由于我使用了 bash watch命令来生成输出,所以在运行make memoryConsumptionNoMoveIterator之后,您将需要按下一个键,对于每隔一个内存日志配方也是如此。

有了这个设置,让我们改进transformAll使用更少的内存,看看输出。我们需要从一开始就使用引用类型并为结果分配内存,如下所示:

template<typename DestinationType>
auto transformAll = [](const auto& source, auto lambda){
    DestinationType result;
    result.resize(source.size());
    transform(source.begin(), source.end(), result.begin(), lambda);
    return result;
};

不出所料,改进的结果是,最大分配从 0.99 GB 开始,但跃升至 1.96 GB,大致翻了一番。

我们需要把这个价值放在上下文中。让我们首先测量一个简单的for循环能做什么,并将结果与用transform实现的相同算法进行比较。

测量简单 for 循环的内存

带有for循环的解决方案非常简单:

TEST_CASE("Memory"){
    auto size = size_1GB_64Bits;
    vector<long long> manyNumbers(size);
    fill_n(manyNumbers.begin(), size, 1000L);

    for(auto iter = manyNumbers.begin(); iter != manyNumbers.end(); 
        ++ iter){
            ++(*iter);
    };

    CHECK_EQ(manyNumbers[0], 1001);
}

测量内存时,没有什么好惊讶的——整个过程中占用空间保持在 0.99 GB。我们也可以用transform达到这个结果吗?嗯,transform有一个版本可以就地修改集合。让我们来测试一下。

测量就地转换的内存

要使用transform,我们需要提供目标迭代器参数source.begin(),如下所示:

auto increment = [](const auto value){
    return value + 1;
};

auto transformAllInPlace = [](auto& source, auto lambda){
    transform(source.begin(), source.end(), source.begin(), lambda);
};

TEST_CASE("Memory"){
    auto size = size_1GB_64Bits;
    vector<long long> manyNumbers(size);
    fill_n(manyNumbers.begin(), size, 1000L);

    transformAllInPlace(manyNumbers, increment);

    CHECK_EQ(manyNumbers[0], 1001);
}

根据文档,这应该在同一个集合中改变;因此,它不应该分配更多的内存。不出所料,它具有与简单的for循环相同的行为,并且在整个程序期间内存占用保持在 0.99 GB。

但是,您可能会注意到,我们现在不返回值以避免复制。不过,我喜欢从转换到返回值,我们还有另一个选择,使用移动语义:

template<typename SourceType>
auto transformAllInPlace = [](auto& source, auto lambda) -> SourceType&& {
    transform(source.begin(), source.end(), source.begin(), lambda);
    return move(source);
};

为了进行调用编译,我们需要在调用transformAllInPlace时传入源的类型,因此我们的测试更改为:

TEST_CASE("Memory"){
    auto size = size_1GB_64Bits;
    vector<long long> manyNumbers(size);
    fill_n(manyNumbers.begin(), size, 1000L);

    auto result = transformAllInPlace<vector<long long>>(manyNumbers, 
        increment);

    CHECK_EQ(result[0], 1001);
}

让我们衡量一下移动语义是否有任何帮助。结果果然不出所料;在整个运行期间,内存占用量保持在 0.99 GB。

这引出了一个有趣的想法。如果我们在对transform的调用中使用移动语义呢?

用移动迭代器转换

我们可以重写我们的transform函数来使用移动迭代器,如下所示:

template<typename DestinationType>
auto transformAllWithMoveIterator = [](auto& source, auto lambda){
    DestinationType result(source.size());
    transform(make_move_iterator(source.begin()), 
        make_move_iterator(source.end()), result.begin(), lambda);
    source.clear();
    return result;
};

理论上,这应该做的是将值移动到目的地,而不是复制它们,从而保持低内存占用。为了进行测试,我们在记录内存的同时运行相同的测试:

TEST_CASE("Memory"){
    auto size = size_1GB_64Bits;
    vector<long long> manyNumbers(size);
    fill_n(manyNumbers.begin(), size, 1000L);

    auto result = transformAllWithMoveIterator<vector<long long>>
        (manyNumbers, increment);

    CHECK_EQ(result[0], 1001);
}

结果出乎意料;内存从 0.99 GB 开始,上升到 1.96 GB(可能是在transform调用之后),然后又回到 0.99 GB(很可能是source.clear()的结果)。我尝试了多种变体来避免这种行为,但找不到将内存占用保持在 0.99 GB 的解决方案。这似乎是移动迭代器实现的一个问题;我建议你在你的编译器上测试一下,看看它是否有效。

比较解决方案

使用就地或移动语义的解决方案在减少内存占用的同时,仅在额外计算不需要源数据时才起作用。如果您计划在其他计算中重用数据,就没有办法保留初始集合。此外,不清楚这些调用是否可以并行运行;由于 g++ 还没有实现并行执行策略,所以我无法测试它们,所以我将这个问题留给读者作为练习。

但是函数式编程语言为了减少内存占用做了什么呢?答案很有意思。

不可变的数据结构

纯函数式编程语言使用不可变数据结构和垃圾收集的组合。每次修改数据结构的调用都会创建一个初始数据结构的副本,只改变一个元素。初始结构不受任何影响。但是,这是使用指针完成的;基本上,新的数据结构与初始数据结构相同,只是有一个指向已更改值的指针。丢弃初始集合时,旧值不再使用,垃圾收集器会自动将其从内存中移除。

这种机制充分利用了不变性,允许 C++ 无法实现的优化。此外,实现通常是递归的,这也利用了尾部递归优化。

然而,在 C++ 中实现这样的数据结构是可能的。一个例子是名为音麦的图书馆,你可以在https://github.com/arximboldi/immer的 GitHub 上找到。Immer 实现了许多不可变的集合。我们来看看immer::vector;每次我们调用一个通常会修改向量的操作(比如push_back ), immer::vector都会返回一个新的集合。返回的每个值都可以是常量,因为它从不改变。我在章节代码中用 imme 0 . 5 . 0 写了一个小测试,展示了immer::vector的用法,可以在下面的代码中看到:

TEST_CASE("Check immutable vector"){
    const auto empty = immer::vector<int>{};
    const auto withOneElement = empty.push_back(42);

    CHECK_EQ(0, empty.size());
    CHECK_EQ(1, withOneElement.size());
    CHECK_EQ(42, withOneElement[0]);
}

关于不可变的数据结构,我不再赘述;不过,我强烈建议你看看音麦网站(https://sinusoid.es/immer/introduction.html)上的文档,玩玩图书馆。

摘要

我们已经看到性能优化是一个复杂的话题。作为 C++ 程序员,我们已经准备好要求代码有更高的性能;我们在这一章中问的问题是:有没有可能优化以函数风格编写的代码?

答案是——是的,如果你衡量,如果你有一个明确的目标。我们需要一个特定的计算来更快地完成吗?我们需要减少内存占用吗?应用的哪个领域最需要性能改进?我们想要做多少怪异的点优化,可能需要用下一个编译器、库或平台版本重写?在继续优化代码之前,这些都是您需要回答的问题。

然而,我们已经看到,在使用计算机上的所有内核时,函数式编程有着巨大的好处。当我们等待高阶函数并行执行的标准实现时,我们可以通过编写自己的并行算法来利用不变性。递归是函数式编程的另一个主要部分,无论何时使用,我们都可以利用尾部递归优化。

至于内存消耗,在第三方库中实现的不可变数据结构,以及根据目标仔细优化我们正在使用的高阶函数,可以帮助我们保持代码的简单性,而复杂性发生在代码的特定位置。当我们丢弃源集合时,可以使用移动语义,但是要记得检查它是否与并行执行一起工作。

最重要的是,我希望您已经了解了测量是性能优化最重要的部分。毕竟,如果你不知道你在哪里,你需要去哪里,你怎么能去旅行?

我们将利用数据生成器进行测试,继续我们的函数式编程之旅。是时候看看基于属性的测试了。