Skip to content

Debug logging and fixes for column stats write memory#3136

Draft
poodlewars wants to merge 1 commit into
masterfrom
aseaton/column-stats-write-perf-mem
Draft

Debug logging and fixes for column stats write memory#3136
poodlewars wants to merge 1 commit into
masterfrom
aseaton/column-stats-write-perf-mem

Conversation

@poodlewars
Copy link
Copy Markdown
Collaborator

@poodlewars poodlewars commented May 22, 2026

  • LMDB can mmap in large amounts of data.

  • With random (uncompressible) data, this is the size of the dataframe we're testing.

  • So the benchmark uses all zeros which compresses down, and the peak RSS measurement is not affected by LMDB's mmap (which is compressed data, so is now tiny). Alternatively could do this testing against S3.

  • Changing to InlineExecutor here is important:

    // TODO 11961775873: remove this kill switch
    static const bool process_on_cpu_executor = ConfigsMap::instance()->get_int("Storage.ProcessOnCpuExecutor", 1) == 1;
    log::version().info(
            "Storage.ProcessOnCpuExecutor={}: scheduling first iteration on {} executor",
            process_on_cpu_executor ? 1 : 0,
            process_on_cpu_executor ? "CPU" : "IO");
    auto* processing_executor = process_on_cpu_executor ? dynamic_cast<folly::Executor*>(&async::cpu_executor())
                                                        : dynamic_cast<folly::Executor*>(&folly::InlineExecutor::instance());

It guarantees that the processing (column stats creation) is doing immediately after the segment load. The .via(io_executor).thenValueInline(process) still queued the processing after all the segment loads.

  • There's also a bunch of debug logging here from Claude.
  • With the inline executor the peak RSS goes to:
  ┌─────┬─────┬──────────────────────┬────────────────────┐
  │ IO  │ CPU │ CPU-exec wall / peak │ Inline wall / peak │
  ├─────┼─────┼──────────────────────┼────────────────────┤
  │ 1   │ 1   │ 3.05 s / 3.18 GiB    │ 3.85 s / 0.26 GiB  │
  ├─────┼─────┼──────────────────────┼────────────────────┤
  │ 1   │ 16  │ 1.67 s / 0.40 GiB    │ 3.79 s / 0.26 GiB  │
  ├─────┼─────┼──────────────────────┼────────────────────┤
  │ 8   │ 1   │ 3.40 s / 7.04 GiB    │ 0.54 s / 0.79 GiB  │
  ├─────┼─────┼──────────────────────┼────────────────────┤
  │ 8   │ 16  │ 0.46 s / 1.60 GiB    │ 0.55 s / 0.80 GiB  │
  ├─────┼─────┼──────────────────────┼────────────────────┤
  │ 24  │ 1   │ 3.28 s / 7.47 GiB    │ 0.28 s / 2.02 GiB  │
  ├─────┼─────┼──────────────────────┼────────────────────┤
  │ 24  │ 16  │ 0.38 s / 4.76 GiB    │ 0.29 s / 2.02 GiB  │
  └─────┴─────┴──────────────────────┴────────────────────┘

There are advantages to implementing this with the semaphore though, because then we can keep the whole IO pool busy in the case where the stats generation is fast relative to IO, and we can utilize the CPU pool (which is not being used at all with this idea). Having said that, in the long run column stats generation will be automatic - we don't need to get this manual pipeline too perfect. The semaphore idea is only clearly safe in the special case where processing consumes a single segment at a time - otherwise we need to worry about deadlocks.

I think something like this approach should be fine.

@github-actions
Copy link
Copy Markdown

Label error. Requires exactly 1 of: patch, minor, major. Found:

@poodlewars
Copy link
Copy Markdown
Collaborator Author

poodlewars commented May 22, 2026

Big explanation from Claude about why swapping to io executor wasn't enough,

  The mechanism

  Three folly files are relevant: Future-inl.h (the public API), detail/Core.cpp (the FSM that decides inline-vs-queue), and QueuedImmediateExecutor.{h,cpp} / InlineExecutor.h (the executor implementations).

  What thenValueInline actually promises
  
  In detail/Core.cpp::doCallback (lines 576-592), the lambda doAdd makes the decision:

  if (addCompletingKA.get() == currentKeepAlive.get()) {
      keepAliveFunc(std::move(currentKeepAlive));      // INLINE
  } else {
      std::move(currentKeepAlive).add(std::move(keepAliveFunc));   // QUEUE
  }

  The inline path is only taken when:
  1. priorState == OnlyCallbackAllowInline (i.e., thenValueInline was used) — see line 596, otherwise completingKA is cleared and the inline path can't fire.
  2. addCompletingKA.get() == currentKeepAlive.get() — a raw pointer comparison between the "completing" executor (the one that ran the predecessor's callback) and the "current" executor (the destination set by via).

  If either fails, the callback gets enqueued on the destination executor.

  Why the pointer comparison fails for our chain

  Walk it back from the lambda in schedule_first_iteration:
  
  1. batch_read_uncompressed wraps reads in folly::window. Future-inl.h:1900:
  auto executor = &QueuedImmediateExecutor::instance();
  1. The window uses QueuedImmediateExecutor (let's call it QI) as the carrier executor between read completion and downstream consumers. Each future window returns is promise.getSemiFuture().via(QI) (line 1963).
  2. The chain that propagates the read result to our user-visible futures passes ka = QI as the completing KA at every hop. By the time those futures arrive in schedule_first_iteration, they have destination executor = QI.
  3. folly::collect(local_futs) (Future-inl.h:1551) creates a new promise/core. The Context destructor (line 1572) fulfills it with setTry(p, ka=ctx->ka, finalResult), where ctx->ka was captured from the last input's callback — and that captures the QI keepalive (line 1597).
  4. The user's .via(processing_executor) then sets that core's destination to processing_executor (which is io_executor in the --no-cpu-executor test).
  5. .thenValueInline(lambda) calls thenImplementation (Future-inl.h:432), which calls setCallback_(..., InlineContinuation::permit) on that core. The core is now in OnlyCallbackAllowInline.
  6. When the collect-core fulfills, doCallback runs:
    - completingKA = QI
    - currentKeepAlive = io_executor (set by via)
    - QI.get() != io_executor.get() → queue path.

  So io_executor.add(lambda) is called. The lambda gets pushed onto the IO pool's task queue. The IO thread that just completed the read returns to its task loop and picks up the next pending task — which is the next read submitted by folly::window::spawn (which already ran inline on this same thread before
   we got here).

  Net result: 2 reads complete before the first lambda runs, even though everything is happening on the single IO thread.

  Why InlineExecutor fixes it
  
  InlineExecutor.h:42:
  void add(Func f) override { f(); }
  
  It doesn't use the pointer-comparison fast path. It wins because its add() is synchronous — even when doAdd takes the queue branch, the "queue" runs the function immediately on the calling thread.

  There's no task ever placed onto the IO pool's queue. The IO thread, having just completed a read, runs the MINMAX lambda to completion before returning to its task loop. By the time it picks the next task, the segment is freed.

  (QueuedImmediateExecutor is structurally similar — add runs the function immediately on the calling thread, with a per-thread queue to avoid re-entrance / stack overflow when the function itself enqueues more work. So switching to it would behave the same way. InlineExecutor is the simpler choice unless
  you have re-entrant adds.)

  The corollary

  The pointer-comparison fast path (case 2 above) would also fix this, but only if everything in the chain used the same thread-pool executor instance. Once folly::window injects QI into the chain, the rest of the chain has lost the ability to inline against IO or CPU pools — addCompletingKA is QI all the
  way through, so no pool-executor will ever pointer-match.

  That means: thenValueInline as used in schedule_first_iteration was never going to work as backpressure for this code path, because window's QueuedImmediateExecutor plumbing has already changed the completing-KA. The only way to keep it inline through to the lambda is to use an inline-by-construction
  executor at the lambda's via. InlineExecutor (or QueuedImmediateExecutor) is that.

  Sources:
  - Future-inl.h:1551-1614 (collect)
  - Future-inl.h:1900-1967 (window)
  - Future-inl.h:386-462 (thenImplementation)
  - Future-inl.h:903-911 (Future::via)
  - detail/Core.cpp:568-641 (doCallback / doAdd / pointer comparison)
  - detail/Core.h:286-296 (OnlyCallbackAllowInline state docs)
  - QueuedImmediateExecutor.cpp:31-55 (synchronous add)
  - InlineExecutor.h:42 (synchronous add)

❯ Is there a way to use the IO executor rather than the QueuedImmediateExecutor for the window? What does this carrier executor do exactly, why do we need it? Tell me more about the QueuedImmediateExecutor. Is it OK to run our processing on it, it doesn't end up with all our processing being done on a      
  single thread does it?                                                                                                                                                                                                                                                                                            
  
● Yes you can — folly::window has a 3-arg overload that takes any executor (Future-inl.h:1913):
  
  std::vector<Future<Result>> window(
      Executor::KeepAlive<> executor, Collection input, F func, size_t n);

  The 2-arg form (which batch_read_uncompressed calls) hardcodes QI:

  auto executor = &QueuedImmediateExecutor::instance();
  return window(executor, std::move(input), std::move(func), n);

  If we call the 3-arg form with io_executor instead, the pointer-match fast path I described earlier would succeed end-to-end and thenValueInline would do what you originally expected — no InlineExecutor patch needed.

  What the carrier executor does
  
  In window (Future-inl.h:1913-1967) it's used in three places:

  1. Initial spawn submission (line 1957): executor->add([ctx]() { spawn(ctx); }) schedules the first n spawns.
  2. Destination of the inner fut inside spawn (line 1933): func(...).via(ctx->executor.get()). This is the future whose setCallback_ (line 1935) holds the "fulfill the user's promise, then call spawn(next)" logic — so this controls where the recursive spawn happens.
  3. Destination of every user-visible future (line 1963): promise.getSemiFuture().via(executor.copy()).

  That last one is the load-bearing one for us. Whatever the carrier executor is, that's the destination executor of the futures the user receives. And as we saw, the inline-path pointer comparison requires completingKA == destination — so the carrier has to match whatever the user's downstream code is
  via-ing to.

  Why a carrier at all (and why QI specifically)

  From QueuedImmediateExecutor.h:30-33:
  
  ▎ Runs inline like InlineExecutor, but with a queue so that any tasks added to this executor by one of its own callbacks will be queued instead of executed inline (nested). This is usually better behavior than Inline.

  The hazard QI exists to prevent: spawn calls itself recursively (line 1944). If func ever returns an already-ready future (sync work, prefetched data, etc.), the setCallback_ callback fires synchronously, which calls spawn(next), which can call its own callback synchronously, and so on — unbounded
  recursion blowing the stack.

  QI's mechanism (per-thread queue + a running flag, see QueuedImmediateExecutor.cpp:31-55):

  - If you add while not already running on this thread: set running=true, execute synchronously, then drain any tasks that got queued during the execution.
  - If you add while running on this thread: push onto this thread's queue. The outer call will drain it after returning.

  So nested re-entrant adds execute iteratively rather than recursively. Same memory model as InlineExecutor (functions run on the calling thread, no thread hop), but it cannot stack-overflow.

  Is it OK to run processing on QI?
  
  Yes, and it's not single-threaded.

  The queue is per-thread (folly::ThreadLocal at QueuedImmediateExecutor.h:47). Whatever thread calls add runs the function on that thread. There's no central worker thread. So when the IO pool has 8 threads and each one completes a read concurrently, each IO thread independently calls QI.add(lambda) and
  runs its own lambda on itself, in parallel.

  What "running processing on QI" means in our chain: the processing lambda runs on whichever IO-pool thread completed the read. Same threading model as the InlineExecutor patch I already have in place; the only difference vs. InlineExecutor is the recursion guard.
  The hazard QI exists to prevent: spawn calls itself recursively (line 1944). If func ever returns an already-ready future (sync work, prefetched data, etc.), the setCallback_ callback fires synchronously, which calls spawn(next), which can call its own callback synchronously, and so on — unbounded
  recursion blowing the stack.

  QI's mechanism (per-thread queue + a running flag, see QueuedImmediateExecutor.cpp:31-55):

  - If you add while not already running on this thread: set running=true, execute synchronously, then drain any tasks that got queued during the execution.
  - If you add while running on this thread: push onto this thread's queue. The outer call will drain it after returning.

  So nested re-entrant adds execute iteratively rather than recursively. Same memory model as InlineExecutor (functions run on the calling thread, no thread hop), but it cannot stack-overflow.

  Is it OK to run processing on QI?

  Yes, and it's not single-threaded.

  The queue is per-thread (folly::ThreadLocal at QueuedImmediateExecutor.h:47). Whatever thread calls add runs the function on that thread. There's no central worker thread. So when the IO pool has 8 threads and each one completes a read concurrently, each IO thread independently calls QI.add(lambda) and
  runs its own lambda on itself, in parallel.

  What "running processing on QI" means in our chain: the processing lambda runs on whichever IO-pool thread completed the read. Same threading model as the InlineExecutor patch I already have in place; the only difference vs. InlineExecutor is the recursion guard.

  The thing to watch is that this uses the IO pool's threads for processing. If processing is heavy (resample/asof-join), you may want it on a separate CPU pool so the IO threads stay free to pull more reads. For MINMAX-style column-stats work, processing is cheap and running it on the IO thread is the
  right call — it gives natural backpressure, as our zeros measurements showed.

  Practical recommendation

  Two equivalent fixes for the create_column_stats peak-RSS issue:

  1. What I already have: via(InlineExecutor::instance()).thenValueInline(...) in schedule_first_iteration. Minimal change.
  2. Alternative: change batch_read_uncompressed to use folly::window(&async::io_executor(), ...) (the 3-arg form). Then the existing via(processing_executor).thenValueInline(...) would inline naturally when processing_executor == io_executor, because the pointer-match fast path succeeds. This keeps the
  kill-switch semantics (CPU vs IO executor) but only the IO-executor branch benefits from inlining.

  Option 2 is arguably cleaner because it doesn't bypass the via machinery, just makes the executor pointers line up. But it does affect every caller of batch_read_uncompressed, not just column stats. Option 1 is more surgical.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant