Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions cpp/arcticdb/async/async_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#include <folly/futures/Future.h>
#include <arcticdb/async/tasks.hpp>
#include <arcticdb/storage/key_segment_pair.hpp>
#include <arcticdb/log/log.hpp>
#include <pthread.h>

Check failure on line 19 in cpp/arcticdb/async/async_store.hpp

View workflow job for this annotation

GitHub Actions / Windows C++ Tests / compile (windows, windows-cl, win_amd64, C:/cpp_build, C:/vcpkg_packages, *.pdb, *.lib, *.ilk, *....

Cannot open include file: 'pthread.h': No such file or directory

namespace arcticdb::toolbox::apy {
class LibraryTool;
Expand Down Expand Up @@ -439,11 +441,17 @@
[this, columns_to_decode](pipelines::RangesAndKey&& ranges_and_key) {
const auto key = ranges_and_key.key_;
return read_and_continue(
key,
library_,
storage::ReadKeyOpts{},
DecodeSliceTask{std::move(ranges_and_key), columns_to_decode}
);
key,
library_,
storage::ReadKeyOpts{},
DecodeSliceTask{std::move(ranges_and_key), columns_to_decode}
)
.thenValueInline([](pipelines::SegmentAndSlice&& s) {
log::version().info(
"READ_DONE tid={}", static_cast<unsigned long>(pthread_self())

Check failure on line 451 in cpp/arcticdb/async/async_store.hpp

View workflow job for this annotation

GitHub Actions / macOS C++ Tests / compile (macos, macos, macosx_arm64, /tmp/cpp_build, *.dSYM, *.dylib, *.[ao], vcpkg_installed, ma...

static_cast from 'pthread_t _Nonnull' (aka '_opaque_pthread_t *') to 'unsigned long' is not allowed
);
return std::move(s);
});
},
2 * async::TaskScheduler::instance()->io_thread_count()
);
Expand Down
51 changes: 47 additions & 4 deletions cpp/arcticdb/version/version_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <pipeline/frame_slice.hpp>
#include <arcticdb/codec/codec.hpp>
#include <folly/futures/FutureSplitter.h>
#include <folly/executors/InlineExecutor.h>
#include <arcticdb/pipeline/write_options.hpp>
#include <arcticdb/stream/index.hpp>
#include <arcticdb/pipeline/query.hpp>
Expand Down Expand Up @@ -768,26 +769,50 @@ std::shared_ptr<std::vector<folly::Future<std::vector<EntityId>>>> schedule_firs
) {
// TODO 11961775873: remove this kill switch
static const bool process_on_cpu_executor = ConfigsMap::instance()->get_int("Storage.ProcessOnCpuExecutor", 1) == 1;
log::version().info(
"Storage.ProcessOnCpuExecutor={}: scheduling first iteration on {} executor",
process_on_cpu_executor ? 1 : 0,
process_on_cpu_executor ? "CPU" : "IO");
auto* processing_executor = process_on_cpu_executor ? dynamic_cast<folly::Executor*>(&async::cpu_executor())
: dynamic_cast<folly::Executor*>(&async::io_executor());
: dynamic_cast<folly::Executor*>(&folly::InlineExecutor::instance());
// Used to make sure each entity is only added into the component manager once
auto slice_added_mtx = std::make_shared<std::vector<std::mutex>>(num_segments);
auto slice_added = std::make_shared<std::vector<bool>>(num_segments, false);
auto futures = std::make_shared<std::vector<folly::Future<std::vector<EntityId>>>>();

// ===== DEBUG: count how many read futures are already fulfilled at entry =====
{
size_t ready_at_entry = 0;
for (const auto& fos : segment_and_slice_future_splitters) {
if (std::holds_alternative<folly::Future<pipelines::SegmentAndSlice>>(fos)) {
if (std::get<folly::Future<pipelines::SegmentAndSlice>>(fos).isReady()) ready_at_entry++;
}
}
log::version().info(
"schedule_first_iteration ENTRY: {}/{} read futures already fulfilled",
ready_at_entry,
segment_and_slice_future_splitters.size());
}
auto lambda_run_count = std::make_shared<std::atomic<size_t>>(0);
const size_t total_work_units = entities_by_work_unit.size();

size_t work_unit_idx = 0;
for (auto& entity_ids : entities_by_work_unit) {
std::vector<folly::Future<pipelines::SegmentAndSlice>> local_futs;
local_futs.reserve(entity_ids.size());
bool all_ready = true;
for (auto id : entity_ids) {
const auto pos = id_to_pos->at(id);
auto& future_or_splitter = segment_and_slice_future_splitters[pos];
// Some of the entities for this unit of work may be shared with other units of work
util::variant_match(
future_or_splitter,
[&local_futs](folly::Future<pipelines::SegmentAndSlice>& fut) {
[&local_futs, &all_ready](folly::Future<pipelines::SegmentAndSlice>& fut) {
if (!fut.isReady()) all_ready = false;
local_futs.emplace_back(std::move(fut));
},
[&local_futs](folly::FutureSplitter<pipelines::SegmentAndSlice>& splitter) {
[&local_futs, &all_ready](folly::FutureSplitter<pipelines::SegmentAndSlice>& splitter) {
all_ready = false;
local_futs.emplace_back(splitter.getFuture());
}
);
Expand All @@ -803,9 +828,17 @@ std::shared_ptr<std::vector<folly::Future<std::vector<EntityId>>>> schedule_firs
slice_added_mtx,
slice_added,
clauses,
lambda_run_count,
work_unit_idx,
entity_ids = std::move(entity_ids
)](std::vector<pipelines::SegmentAndSlice>&& segment_and_slices
) mutable {
auto n = lambda_run_count->fetch_add(1) + 1;
log::version().info(
"schedule_first_iteration LAMBDA unit={} run_count={} tid={}",
work_unit_idx,
n,
static_cast<unsigned long>(pthread_self()));
for (auto&& [idx, segment_and_slice] : folly::enumerate(segment_and_slices)) {
auto entity_id = entity_ids[idx];
auto pos = id_to_pos->at(entity_id);
Expand All @@ -823,7 +856,17 @@ std::shared_ptr<std::vector<folly::Future<std::vector<EntityId>>>> schedule_firs
}
return async::MemSegmentProcessingTask(*clauses, std::move(entity_ids))();
}));
}
log::version().info(
"schedule_first_iteration ATTACH unit={} all_ready_before_attach={} lambdas_run_so_far={}",
work_unit_idx,
all_ready,
lambda_run_count->load());
work_unit_idx++;
}
log::version().info(
"schedule_first_iteration EXIT: {}/{} lambdas fired during attach loop",
lambda_run_count->load(),
total_work_units);
return futures;
}

Expand Down
72 changes: 72 additions & 0 deletions scripts/column_stats_memory/measure_create_column_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
"""Stage B: measure peak RSS for create_column_stats on the symbol written by write_data.py.

Prints VmRSS just before the call (baseline) and ru_maxrss at exit (peak high-water).
Best run wrapped in `/usr/bin/time -v` as a cross-check.
"""
import argparse
import resource
import time
from pathlib import Path

from arcticdb import Arctic
from arcticdb_ext import set_config_int
from arcticdb_ext import cpp_async as adb_async

from write_data import LIB_NAME, SYMBOL, SYMBOL_ZEROS, DEFAULT_LMDB


def read_vmrss_kb() -> int:
with open("/proc/self/status") as f:
for line in f:
if line.startswith("VmRSS:"):
return int(line.split()[1])
return -1


def main():
p = argparse.ArgumentParser()
p.add_argument("--num-cols", type=int, default=100)
p.add_argument("--lmdb-path", default=str(DEFAULT_LMDB))
p.add_argument(
"--no-cpu-executor",
action="store_true",
help="set Storage.ProcessOnCpuExecutor=0",
)
p.add_argument("--io-threads", type=int, default=None)
p.add_argument("--cpu-threads", type=int, default=None)
p.add_argument("--zeros", action="store_true", help="use the compressible-zeros symbol")
args = p.parse_args()
symbol = SYMBOL_ZEROS if args.zeros else SYMBOL

if args.no_cpu_executor:
set_config_int("Storage.ProcessOnCpuExecutor", 0)
if args.io_threads is not None:
set_config_int("VersionStore.NumIOThreads", args.io_threads)
if args.cpu_threads is not None:
set_config_int("VersionStore.NumCPUThreads", args.cpu_threads)
if args.io_threads is not None or args.cpu_threads is not None:
adb_async.reinit_task_scheduler()

ac = Arctic(f"lmdb://{Path(args.lmdb_path)}")
lib = ac[LIB_NAME]
nvs = lib._nvs

nvs.drop_column_stats(symbol)

column_stats = {f"f_{i}": {"MINMAX"} for i in range(args.num_cols)}

baseline_kb = read_vmrss_kb()
print(f"baseline VmRSS = {baseline_kb / 1024:.1f} MiB", flush=True)

t0 = time.perf_counter()
nvs.create_column_stats(symbol, column_stats)
dt = time.perf_counter() - t0

peak_kb = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
print(f"create_column_stats wall = {dt:.2f}s")
print(f"peak ru_maxrss = {peak_kb / 1024:.1f} MiB")
print(f"peak - baseline = {(peak_kb - baseline_kb) / 1024:.1f} MiB")


if __name__ == "__main__":
main()
Loading