1818#include " utils/scoped_vlog_timer.h"
1919#include " utils.h"
2020
21+ #include < async_simple/coro/FutureAwaiter.h>
22+
2123namespace mooncake {
2224
2325namespace {
@@ -148,6 +150,10 @@ DataManager::DataManager(std::unique_ptr<TieredBackend> tiered_backend,
148150 async_memcpy_executor_ = std::make_unique<AsyncMemcpyExecutor>(
149151 local_transfer_config_.local_memcpy_async_worker_num );
150152 }
153+ if (local_transfer_config_.te_async_poll_worker_num > 0 ) {
154+ te_poll_executor_ = std::make_unique<AsyncMemcpyExecutor>(
155+ local_transfer_config_.te_async_poll_worker_num );
156+ }
151157
152158 lease_duration_ = std::chrono::milliseconds (GetEnvOr<uint32_t >(
153159 " P2P_RPC_LEASE_DURATION_MS" , kDefaultLeaseDurationMs ));
@@ -162,8 +168,10 @@ DataManager::DataManager(std::unique_ptr<TieredBackend> tiered_backend,
162168 ? " TE"
163169 : " MEMCPY" )
164170 << " , te_endpoint=" << local_transfer_config_.te_endpoint
165- << " , async_memcpy_workers ="
171+ << " , memcpy_async_workers ="
166172 << local_transfer_config_.local_memcpy_async_worker_num
173+ << " , te_async_poll_workers="
174+ << local_transfer_config_.te_async_poll_worker_num
167175 << " , lease_duration_ms=" << lease_duration_.count ()
168176 << " , lease_scan_interval_ms=" << lease_scan_interval_.count ();
169177}
@@ -176,6 +184,9 @@ void DataManager::Stop() {
176184 if (async_memcpy_executor_) {
177185 async_memcpy_executor_->Shutdown ();
178186 }
187+ if (te_poll_executor_) {
188+ te_poll_executor_->Shutdown ();
189+ }
179190 if (tiered_backend_) {
180191 tiered_backend_->Stop ();
181192 }
@@ -485,16 +496,6 @@ tl::expected<std::unique_ptr<TaskHandle<void>>, ErrorCode> DataManager::Put(
485496 return tl::unexpected (ErrorCode::INTERNAL_ERROR);
486497}
487498
488- // TODO: The returned CallableTaskHandle's WaitAsync() falls back to a
489- // synchronous Wait() on the coroutine's current thread, because the
490- // WaitAllTransferBatches() is a loop with no async completion notification.
491- // Possible optimizations:
492- // (1) run a polling coroutine on yalantinglibs coro_io's io_context (via
493- // co_await coro_io::sleep_for(100us) + getTransferStatus), no new thread;
494- // (2) introduce a lightweight timer service to bridge cv-poll to
495- // async_simple::Promise;
496- // (3) introduce a completion callback from transfer_engine itself.
497- // Once any of these lands, switch the return type to FutureHandle.
498499tl::expected<std::unique_ptr<TaskHandle<void >>, ErrorCode>
499500DataManager::PutViaTe (std::string_view key, std::vector<Slice>& slices) {
500501 // using Te, treat local memory as remote memory
@@ -536,46 +537,51 @@ DataManager::PutViaTe(std::string_view key, std::vector<Slice>& slices) {
536537 return tl::unexpected (submit_result.error ());
537538 }
538539
539- return CallableTaskHandle<void >::Create (
540- [this , ctx = std::move (*submit_result), alloc_handle, kctx,
541- pending_write_token]() mutable -> tl::expected<void , ErrorCode> {
542- ScopedVLogTimer timer (1 , " DataManager::PutViaTe" );
543- timer.LogRequest (" key=" , kctx.key );
540+ auto te_phase = [this , ctx = std::move (*submit_result), alloc_handle, kctx,
541+ pending_write_token]() mutable -> tl::expected<void , ErrorCode> {
542+ ScopedVLogTimer timer (1 , " DataManager::PutViaTe" );
543+ timer.LogRequest (" key=" , kctx.key );
544+
545+ auto wait_result = WaitAllTransferBatches (ctx.transfer_batches );
546+ if (!wait_result) {
547+ LOG (ERROR) << " WaitAllTransferBatches failed"
548+ << " , key=" << kctx.key
549+ << " , error_code=" << toString (wait_result.error ());
550+ (void )WriteRevokeInternal (kctx, pending_write_token);
551+ return tl::unexpected (wait_result.error ());
552+ }
544553
545- auto wait_result = WaitAllTransferBatches (ctx.transfer_batches );
546- if (!wait_result) {
547- LOG (ERROR) << " WaitAllTransferBatches failed"
554+ if (ctx.handle ->loc .data .type != MemoryType::DRAM && ctx.temp_buffer ) {
555+ auto & loc_data = ctx.handle ->loc .data ;
556+ auto copy_result = CopyFromDRAMBuffer (
557+ ctx.temp_buffer .get (),
558+ reinterpret_cast <void *>(loc_data.buffer ->data ()),
559+ loc_data.type , loc_data.buffer ->size (), ctx.handle ->backend );
560+ if (!copy_result) {
561+ LOG (ERROR) << " CopyFromDRAMBuffer failed"
548562 << " , key=" << kctx.key
549- << " , error_code=" << toString (wait_result .error ());
563+ << " , error_code=" << toString (copy_result .error ());
550564 (void )WriteRevokeInternal (kctx, pending_write_token);
551- return tl::unexpected (wait_result .error ());
565+ return tl::unexpected (copy_result .error ());
552566 }
567+ }
553568
554- if (ctx.handle ->loc .data .type != MemoryType::DRAM &&
555- ctx.temp_buffer ) {
556- auto & loc_data = ctx.handle ->loc .data ;
557- auto copy_result = CopyFromDRAMBuffer (
558- ctx.temp_buffer .get (),
559- reinterpret_cast <void *>(loc_data.buffer ->data ()),
560- loc_data.type , loc_data.buffer ->size (),
561- ctx.handle ->backend );
562- if (!copy_result) {
563- LOG (ERROR)
564- << " CopyFromDRAMBuffer failed"
565- << " , key=" << kctx.key
566- << " , error_code=" << toString (copy_result.error ());
567- (void )WriteRevokeInternal (kctx, pending_write_token);
568- return tl::unexpected (copy_result.error ());
569- }
570- }
569+ auto commit_result = WriteCommitInternal (kctx, pending_write_token);
570+ if (!commit_result) {
571+ return tl::unexpected (commit_result.error ());
572+ }
573+ timer.LogResponse (" error_code=" , ErrorCode::OK);
574+ return {};
575+ };
571576
572- auto commit_result = WriteCommitInternal (kctx, pending_write_token);
573- if (!commit_result) {
574- return tl::unexpected (commit_result.error ());
575- }
576- timer.LogResponse (" error_code=" , ErrorCode::OK);
577- return {};
578- });
577+ if (te_poll_executor_) {
578+ auto future = te_poll_executor_
579+ ->SubmitSingleTask <tl::expected<void , ErrorCode>>(
580+ std::move (te_phase));
581+ return FutureHandle<void >::Create (std::shared_ptr<void >{},
582+ std::move (future));
583+ }
584+ return CallableTaskHandle<void >::Create (std::move (te_phase));
579585}
580586
581587tl::expected<std::unique_ptr<TaskHandle<void >>, ErrorCode>
@@ -756,19 +762,27 @@ tl::expected<ReadTaskHandle, ErrorCode> DataManager::BuildDataCopierViaTe(
756762
757763 ReadTaskHandle res;
758764 res.data_size = static_cast <int64_t >(source_size);
759- res.task_handle = CallableTaskHandle<void >::Create (
760- [this , ctx = std::move (submit_result.value ()),
761- h = handle]() mutable -> tl::expected<void , ErrorCode> {
762- ScopedVLogTimer timer (1 , " DataManager::BuildDataCopierViaTe" );
763- auto wait_result = WaitAllTransferBatches (ctx.transfer_batches );
764- if (!wait_result) {
765- LOG (ERROR) << " Failed to wait TE read transfer, error_code="
766- << wait_result.error ();
767- return tl::unexpected (wait_result.error ());
768- }
769- timer.LogResponse (" error_code=" , ErrorCode::OK);
770- return {};
771- });
765+ auto te_wait = [this , ctx = std::move (submit_result.value ()),
766+ h = handle]() mutable -> tl::expected<void , ErrorCode> {
767+ ScopedVLogTimer timer (1 , " DataManager::BuildDataCopierViaTe" );
768+ auto wait_result = WaitAllTransferBatches (ctx.transfer_batches );
769+ if (!wait_result) {
770+ LOG (ERROR) << " Failed to wait TE read transfer, error_code="
771+ << wait_result.error ();
772+ return tl::unexpected (wait_result.error ());
773+ }
774+ timer.LogResponse (" error_code=" , ErrorCode::OK);
775+ return {};
776+ };
777+ if (te_poll_executor_) {
778+ auto future = te_poll_executor_
779+ ->SubmitSingleTask <tl::expected<void , ErrorCode>>(
780+ std::move (te_wait));
781+ res.task_handle = FutureHandle<void >::Create (std::shared_ptr<void >{},
782+ std::move (future));
783+ } else {
784+ res.task_handle = CallableTaskHandle<void >::Create (std::move (te_wait));
785+ }
772786 return res;
773787}
774788
@@ -1375,7 +1389,9 @@ DataManager::SubmitTeTransferBatches(
13751389 return submitted_batches;
13761390}
13771391
1378- tl::expected<void , ErrorCode> DataManager::TransferWithTeNoTierStaging (
1392+ tl::expected<std::vector<std::tuple<Transport::BatchID, size_t , std::string>>,
1393+ ErrorCode>
1394+ DataManager::SubmitTeNoTierStagingBatches (
13791395 void * local_transfer_base, size_t total_size,
13801396 const std::vector<RemoteBufferDesc>& peer_buffers,
13811397 Transport::TransferRequest::OpCode opcode) {
@@ -1394,8 +1410,16 @@ tl::expected<void, ErrorCode> DataManager::TransferWithTeNoTierStaging(
13941410 LOG (ERROR) << " TransferEngine not initialized" ;
13951411 return tl::make_unexpected (ErrorCode::INTERNAL_ERROR);
13961412 }
1397- auto batches = SubmitTeTransferBatches (local_transfer_base, total_size,
1398- peer_buffers, opcode);
1413+ return SubmitTeTransferBatches (local_transfer_base, total_size, peer_buffers,
1414+ opcode);
1415+ }
1416+
1417+ tl::expected<void , ErrorCode> DataManager::TransferWithTeNoTierStaging (
1418+ void * local_transfer_base, size_t total_size,
1419+ const std::vector<RemoteBufferDesc>& peer_buffers,
1420+ Transport::TransferRequest::OpCode opcode) {
1421+ auto batches = SubmitTeNoTierStagingBatches (
1422+ local_transfer_base, total_size, peer_buffers, opcode);
13991423 if (!batches) {
14001424 return tl::unexpected (batches.error ());
14011425 }
@@ -1409,6 +1433,29 @@ tl::expected<void, ErrorCode> DataManager::TransferWithTeNoTierStaging(
14091433 return {};
14101434}
14111435
1436+ async_simple::coro::Lazy<tl::expected<void , ErrorCode>>
1437+ DataManager::TransferWithTeNoTierStagingAsync (
1438+ void * local_transfer_base, size_t total_size,
1439+ const std::vector<RemoteBufferDesc>& peer_buffers,
1440+ Transport::TransferRequest::OpCode opcode) {
1441+ if (!te_poll_executor_) {
1442+ co_return TransferWithTeNoTierStaging (local_transfer_base, total_size,
1443+ peer_buffers, opcode);
1444+ }
1445+ auto batches = SubmitTeNoTierStagingBatches (
1446+ local_transfer_base, total_size, peer_buffers, opcode);
1447+ if (!batches) {
1448+ co_return tl::make_unexpected (batches.error ());
1449+ }
1450+ auto batch_vec = std::move (batches.value ());
1451+ auto fut =
1452+ te_poll_executor_->SubmitSingleTask <tl::expected<void , ErrorCode>>(
1453+ [this , batch_vec = std::move (batch_vec)]() mutable {
1454+ return WaitAllTransferBatches (batch_vec);
1455+ });
1456+ co_return co_await std::move (fut);
1457+ }
1458+
14121459tl::expected<void , ErrorCode> DataManager::ValidateRemoteBuffers (
14131460 const std::vector<RemoteBufferDesc>& buffers) {
14141461 if (buffers.empty ()) {
0 commit comments