Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "tent/common/concurrent/thread_pool.h"

#include "tent/runtime/transport.h"
#include <chrono>

// Beta version -- use with own risk

Expand All @@ -29,6 +30,58 @@ class TransferEngineImpl;
class TaskInfo;
struct StageBufferCache;

struct ProxyManagerMetrics {
std::atomic<uint64_t> total_transfers{0};
std::atomic<uint64_t> total_bytes_transferred{0};
std::atomic<uint64_t> total_latency_us{0};
std::atomic<uint64_t> remote_staging_count{0};
std::atomic<uint64_t> pipeline_parallel_chunks{0};
std::atomic<uint64_t> retry_count{0};
std::atomic<int64_t> start_time{0};
std::atomic<int64_t> end_time{0};

void reset() {
total_transfers = 0;
total_bytes_transferred = 0;
total_latency_us = 0;
remote_staging_count = 0;
pipeline_parallel_chunks = 0;
retry_count = 0;
start_time = 0;
end_time = 0;
}

void markStartTime() {
start_time.store(
std::chrono::steady_clock::now().time_since_epoch().count(),
std::memory_order_relaxed);
}

void markEndTime() {
end_time.store(
std::chrono::steady_clock::now().time_since_epoch().count(),
std::memory_order_relaxed);
}

double getAvgLatencyMs() const {
uint64_t count = total_transfers.load();
return count > 0 ? (total_latency_us.load() / 1000.0) / count : 0.0;
}

double getThroughputGBps() const {
auto start = start_time.load();
auto end = end_time.load();
if (start == 0 || end == 0) return 0.0;

auto duration_us = end - start;
if (duration_us <= 0) return 0.0;

double duration_s = duration_us / 1e6;
double bytes_gb = total_bytes_transferred.load() / 1e9;
return bytes_gb / duration_s;
}
Comment on lines +71 to +82
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The throughput calculation (total_bytes_transferred.load() / 1e9) / (us / 1e6) is based on the sum of latencies of all transfers. In a parallel execution environment with multiple shards, this sum grows faster than wall-clock time, meaning the metric represents the average throughput per stream rather than the aggregate system throughput. To measure system-wide throughput, you should track the wall-clock time since the metrics were last reset.

};

struct StagingTask {
TaskInfo* native{nullptr};
std::vector<std::string> params;
Expand All @@ -44,8 +97,8 @@ class ProxyManager {

public:
explicit ProxyManager(TransferEngineImpl* impl,
size_t chunk_size = 8 * 1024 * 1024,
size_t chunk_count = 32);
size_t chunk_size = 4 * 1024 * 1024,
size_t chunk_count = 64, int max_retries = 3);

~ProxyManager();

Expand All @@ -59,6 +112,13 @@ class ProxyManager {

Status unpinStageBuffer(uint64_t addr);

const ProxyManagerMetrics& getMetrics() const { return metrics_; }

void resetMetrics() {
metrics_.reset();
metrics_.markStartTime();
}

private:
void runner(size_t id);

Expand Down Expand Up @@ -97,9 +157,11 @@ class ProxyManager {
private:
const size_t chunk_size_;
const size_t chunk_count_;
const int max_retries_;
TransferEngineImpl* impl_;
std::unordered_map<std::string, StageBuffers> stage_buffers_;
std::atomic<bool> running_;
ProxyManagerMetrics metrics_;
struct WorkerShard {
std::thread thread;
std::mutex mu;
Expand Down
61 changes: 57 additions & 4 deletions mooncake-transfer-engine/tent/src/runtime/proxy_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,13 @@
namespace mooncake {
namespace tent {
ProxyManager::ProxyManager(TransferEngineImpl* impl, size_t chunk_size,
size_t chunk_count)
: chunk_size_(chunk_size), chunk_count_(chunk_count), impl_(impl) {
size_t chunk_count, int max_retries)
: chunk_size_(chunk_size),
chunk_count_(chunk_count),
max_retries_(max_retries),
impl_(impl) {
running_ = true;

for (size_t i = 0; i < kShards; ++i) {
shards_[i].thread = std::thread(&ProxyManager::runner, this, i);
}
Expand Down Expand Up @@ -241,6 +245,8 @@ Status ProxyManager::transferEventLoop(StagingTask& task,
const static size_t kStageBuffers = 4;
uint64_t local_stage_buffer[kStageBuffers],
remote_stage_buffer[kStageBuffers];

auto start_time = std::chrono::steady_clock::now();
if (local_staging) {
for (size_t i = 0; i < kStageBuffers; ++i) {
local_stage_buffer[i] =
Expand Down Expand Up @@ -278,6 +284,7 @@ Status ProxyManager::transferEventLoop(StagingTask& task,
StageState prev_state;
StageState state;
BatchID batch;
int retry_count = 0;
};

std::queue<size_t> event_queue;
Expand Down Expand Up @@ -329,6 +336,8 @@ Status ProxyManager::transferEventLoop(StagingTask& task,
remote_futures[id]);
chunk.prev_state = chunk.state;
chunk.state = StageState::INFLIGHT_REMOTE;
metrics_.remote_staging_count.fetch_add(
1, std::memory_order_relaxed);
} else {
chunk.state = StageState::CROSS;
}
Expand Down Expand Up @@ -366,6 +375,8 @@ Status ProxyManager::transferEventLoop(StagingTask& task,
remote_futures[id]);
chunk.prev_state = chunk.state;
chunk.state = StageState::INFLIGHT_REMOTE;
metrics_.remote_staging_count.fetch_add(
1, std::memory_order_relaxed);
} else if (request.opcode == Request::READ && local_staging) {
chunk.batch = submitLocalStage(request, chunk.local_buf,
chunk.length, chunk.offset);
Expand Down Expand Up @@ -416,6 +427,13 @@ Status ProxyManager::transferEventLoop(StagingTask& task,
}

case StageState::FAILED: {
// Free all in-flight batches before returning
for (auto& chunk : chunks) {
if (chunk.batch != 0) {
impl_->freeBatch(chunk.batch);
chunk.batch = 0;
}
}
return Status::InternalError(
"Proxy event loop in failed state");
}
Expand All @@ -424,14 +442,35 @@ Status ProxyManager::transferEventLoop(StagingTask& task,
auto& fut = remote_futures[id];
if (!fut.valid()) {
chunk.state = StageState::FAILED;
event_queue.push(id);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This call triggers the StageState::FAILED case, which returns an error immediately. However, the current implementation of the FAILED case (line 432) does not free any in-flight batches stored in the chunks vector. This leads to a resource leak in the TransferEngineImpl. Before returning an error, the loop should ensure all allocated batches for all chunks are freed.

break;
}
if (fut.wait_for(std::chrono::seconds(0)) ==
std::future_status::ready) {
Status rs = fut.get();
if (!rs.ok()) {
chunk.state = StageState::FAILED;
break;
if (chunk.retry_count < max_retries_) {
chunk.retry_count++;
metrics_.retry_count.fetch_add(
1, std::memory_order_relaxed);
LOG(WARNING)
<< "Remote staging failed for chunk " << id
<< ", retrying (attempt " << chunk.retry_count
<< "/" << max_retries_
<< "): " << rs.ToString();
submitRemoteStage(server_addr, request,
chunk.remote_buf, chunk.length,
chunk.offset, remote_futures[id]);
event_queue.push(id);
break;
} else {
LOG(ERROR) << "Remote staging failed for chunk "
<< id << " after " << max_retries_
<< " retries: " << rs.ToString();
chunk.state = StageState::FAILED;
event_queue.push(id);
break;
}
}
if (chunk.prev_state == StageState::PRE) {
chunk.state = StageState::CROSS;
Expand All @@ -451,6 +490,20 @@ Status ProxyManager::transferEventLoop(StagingTask& task,
}
}

auto end_time = std::chrono::steady_clock::now();
auto latency_us = std::chrono::duration_cast<std::chrono::microseconds>(
end_time - start_time)
.count();

// Update metrics only after successful completion
metrics_.total_transfers.fetch_add(1, std::memory_order_relaxed);
metrics_.total_bytes_transferred.fetch_add(request.length,
std::memory_order_relaxed);
metrics_.total_latency_us.fetch_add(latency_us, std::memory_order_relaxed);
metrics_.pipeline_parallel_chunks.fetch_add(chunks.size(),
std::memory_order_relaxed);
metrics_.markEndTime();

return Status::OK();
}

Expand Down
Loading