@@ -46,6 +46,11 @@ S3MotrKVSWriter::S3MotrKVSWriter(std::shared_ptr<RequestObject> req,
4646 } else {
4747 s3_motr_api = std::make_shared<ConcreteMotrAPI>();
4848 }
49+ on_success_for_parallelkv =
50+ std::bind (&S3MotrKVSWriter::parallel_put_kv_successful, this ,
51+ std::placeholders::_1);
52+ on_failure_for_parallelkv = std::bind (
53+ &S3MotrKVSWriter::parallel_put_kv_failed, this , std::placeholders::_1);
4954}
5055
5156S3MotrKVSWriter::S3MotrKVSWriter (std::string req_id,
@@ -61,6 +66,11 @@ S3MotrKVSWriter::S3MotrKVSWriter(std::string req_id,
6166 } else {
6267 s3_motr_api = std::make_shared<ConcreteMotrAPI>();
6368 }
69+ on_success_for_parallelkv =
70+ std::bind (&S3MotrKVSWriter::parallel_put_kv_successful, this ,
71+ std::placeholders::_1);
72+ on_failure_for_parallelkv = std::bind (
73+ &S3MotrKVSWriter::parallel_put_kv_failed, this , std::placeholders::_1);
6474}
6575
6676S3MotrKVSWriter::~S3MotrKVSWriter () {
@@ -479,11 +489,88 @@ void S3MotrKVSWriter::delete_indices_failed() {
479489 s3_log (S3_LOG_DEBUG, " " , " %s Exit" , __func__);
480490}
481491
492+ void S3MotrKVSWriter::parallel_put_kv_successful (unsigned int processed_count) {
493+ s3_log (S3_LOG_DEBUG, request_id, " %s Entry\n " , __func__);
494+ next_key_offset += processed_count;
495+ kvop_keys_in_flight -= 1 ;
496+ unsigned int total_keys = kv_list.size ();
497+ s3_log (S3_LOG_DEBUG, request_id, " Total writer contexts = %zu" ,
498+ parallel_writer_contexts.size ());
499+ #if 0
500+ // Early free of writer context
501+ if ((next_key_offset - 1) < parallel_writer_contexts.size()) {
502+ s3_log(S3_LOG_DEBUG, request_id, "Freeing writer context at index = %d",
503+ (next_key_offset - 1));
504+ parallel_writer_contexts[next_key_offset - 1].reset(NULL);
505+ s3_log(S3_LOG_DEBUG, request_id, "Outstanding writer contexts = %zu",
506+ parallel_writer_contexts.size() - next_key_offset);
507+ }
508+ #endif
509+
510+ if ((next_key_offset + kvop_keys_in_flight) < total_keys &&
511+ !atleast_one_failed) {
512+ // Still more to launch PUT KV
513+ if (kvop_keys_in_flight < max_parallel_kv) {
514+ // Current number of parallel KVs is less than max allowed.
515+ // Launch next PUT KV operation
516+ put_partial_keyval (idx_los[0 ], kv_list, on_success_for_parallelkv,
517+ on_failure_for_parallelkv,
518+ next_key_offset + kvop_keys_in_flight, 1 , false );
519+ ++kvop_keys_in_flight;
520+ // Save writer context
521+ parallel_writer_contexts.push_back (std::move (this ->get_writer_context ()));
522+ }
523+ } else {
524+ // All PUT KVs done(all success or atleast one fail). Perform next step
525+ if (atleast_one_failed && kvop_keys_in_flight == 0 ) {
526+ // One of PUT KV failed previously, and there is no outstanding PUT KV
527+ // that we need to wait from the batch of parallel PUT KV.
528+ // Perform next step: Failure
529+ // TODO: Need to rollback all successfull PUT KV
530+ handler_on_failed ();
531+ return ;
532+ }
533+ if (!atleast_one_failed && kvop_keys_in_flight == 0 ) {
534+ // All keys are done, and all PUT KV in parallel
535+ // operation are done. Perform next step: Success
536+ handler_on_success ();
537+ return ;
538+ }
539+ // Need to wait for reminaing PUT KV from the batch of parallel
540+ // PUT KV
541+ }
542+ s3_log (S3_LOG_DEBUG, " " , " %s Exit" , __func__);
543+ }
544+
545+ void S3MotrKVSWriter::parallel_put_kv_failed (unsigned int processed_count) {
546+ s3_log (S3_LOG_DEBUG, request_id, " %s Entry\n " , __func__);
547+ next_key_offset += processed_count;
548+ atleast_one_failed = true ;
549+ kvop_keys_in_flight -= 1 ;
550+ s3_log (S3_LOG_DEBUG, request_id, " Total writer contexts = %zu" ,
551+ parallel_writer_contexts.size ());
552+ #if 0
553+ // Early free of writer context
554+ if ((next_key_offset - 1) < parallel_writer_contexts.size()) {
555+ s3_log(S3_LOG_DEBUG, request_id, "Freeing writer context at index = %d",
556+ (next_key_offset - 1));
557+ parallel_writer_contexts[next_key_offset - 1].reset(NULL);
558+ s3_log(S3_LOG_DEBUG, request_id, "Outstanding writer contexts = %zu",
559+ parallel_writer_contexts.size() - next_key_offset);
560+ }
561+ #endif
562+ if (kvop_keys_in_flight == 0 ) {
563+ // All the keys in parallel batch are done, perform next step
564+ handler_on_failed ();
565+ }
566+ s3_log (S3_LOG_DEBUG, " " , " %s Exit" , __func__);
567+ }
568+
482569void S3MotrKVSWriter::put_keyval (
483570 const struct s3_motr_idx_layout &idx_lo,
484571 const std::map<std::string, std::string> &kv_list,
485572 std::function<void (void )> on_success, std::function<void (void )> on_failed,
486- S3MotrKVSWriter:: CallbackType callback) {
573+ CallbackType callback, bool parallel_mode ) {
487574
488575 s3_log (S3_LOG_INFO, stripped_request_id,
489576 " %s Entry with oid = %" SCNx64 " : %" SCNx64
@@ -499,24 +586,47 @@ void S3MotrKVSWriter::put_keyval(
499586 s3_log (S3_LOG_ERROR, request_id, " Empty key in PUT KV\n " );
500587 }
501588 }
502- idx_los.clear ();
503- idx_los.push_back (idx_lo);
504589
505590 this ->handler_on_success = std::move (on_success);
506591 this ->handler_on_failed = std::move (on_failed);
507592
508- if (idx_ctx) {
509- // clean up any old allocations
510- clean_up_contexts ();
511- }
512- idx_ctx = create_idx_context (1 );
593+ if (!this ->parallel_run || parallel_mode) {
594+ // Initialize once in parallel mode
595+ idx_los.clear ();
596+ idx_los.push_back (idx_lo);
513597
598+ if (idx_ctx) {
599+ // clean up any old allocations
600+ clean_up_contexts ();
601+ }
602+ idx_ctx = create_idx_context (1 );
603+ }
604+ if (parallel_mode && kv_list.size () > 1 ) {
605+ // Parallel mode PUT kv, provided list contains at least 2 keys
606+ this ->kv_list = kv_list;
607+ next_key_offset = 0 ;
608+ parallel_writer_contexts.clear ();
609+ const int parallel_kvs =
610+ (kv_list.size () <= max_parallel_kv) ? kv_list.size () : max_parallel_kv;
611+ this ->parallel_run = parallel_mode;
612+ s3_log (S3_LOG_DEBUG, request_id, " Parallel PUT kv, with %d kv in parallel" ,
613+ parallel_kvs);
614+ for (int i = 0 ; i < parallel_kvs; ++i) {
615+ // Call below in non-parallel mode
616+ put_partial_keyval (idx_lo, kv_list, on_success_for_parallelkv,
617+ on_failure_for_parallelkv, i, 1 , false );
618+ ++kvop_keys_in_flight;
619+ // Save writer context
620+ parallel_writer_contexts.push_back (std::move (this ->get_writer_context ()));
621+ }
622+ // End of Parallel mode
623+ return ;
624+ }
514625 writer_context.reset (new S3AsyncMotrKVSWriterContext (
515626 request, std::bind (&S3MotrKVSWriter::put_keyval_successful, this ),
516627 std::bind (&S3MotrKVSWriter::put_keyval_failed, this ), 1 , s3_motr_api));
517628
518629 writer_context->init_kvs_write_op_ctx (kv_list.size ());
519-
520630 // Ret code can be ignored as its already handled in async case.
521631 put_keyval_impl (kv_list, true , false , 0 , 0 , callback);
522632}
@@ -526,7 +636,7 @@ void S3MotrKVSWriter::put_partial_keyval(
526636 const std::map<std::string, std::string> &kv_list,
527637 std::function<void (unsigned int )> on_success,
528638 std::function<void (unsigned int )> on_failed, unsigned int offset,
529- unsigned int how_many) {
639+ unsigned int how_many, bool parallel_mode ) {
530640 std::map<std::string, std::string>::const_iterator it;
531641 unsigned int record_count = 0 ;
532642 assert (how_many != 0 );
@@ -557,17 +667,41 @@ void S3MotrKVSWriter::put_partial_keyval(
557667 break ;
558668 }
559669 }
560- idx_los.clear ();
561- idx_los.push_back (idx_lo);
562-
563670 this ->on_success_for_extends = std::move (on_success);
564671 this ->on_failure_for_extends = std::move (on_failed);
565672
566- if (idx_ctx) {
567- // clean up any old allocations
568- clean_up_contexts ();
673+ if (!this ->parallel_run || parallel_mode) {
674+ // Initialize once in parallel mode
675+ idx_los.clear ();
676+ idx_los.push_back (idx_lo);
677+
678+ if (idx_ctx) {
679+ // clean up any old allocations
680+ clean_up_contexts ();
681+ }
682+ idx_ctx = create_idx_context (1 );
683+ }
684+ if (parallel_mode && how_many > 1 ) {
685+ // Parallel mode PUT partial kv, provided list contains at least 2 keys
686+ this ->kv_list = kv_list;
687+ next_key_offset = 0 ;
688+ parallel_writer_contexts.clear ();
689+ const int parallel_kvs =
690+ (how_many <= max_parallel_kv) ? how_many : max_parallel_kv;
691+ this ->parallel_run = parallel_mode;
692+ s3_log (S3_LOG_DEBUG, request_id, " Parallel PUT kv, with %d kv in parallel" ,
693+ parallel_kvs);
694+ for (int i = 0 ; i < parallel_kvs; ++i) {
695+ // Call below in non-parallel mode
696+ put_partial_keyval (idx_lo, kv_list, on_success_for_parallelkv,
697+ on_failure_for_parallelkv, i, 1 , false );
698+ ++kvop_keys_in_flight;
699+ // Save writer context
700+ parallel_writer_contexts.push_back (std::move (this ->get_writer_context ()));
701+ }
702+ // End of Parallel mode
703+ return ;
569704 }
570- idx_ctx = create_idx_context (1 );
571705 writer_context.reset (new S3AsyncMotrKVSWriterContext (
572706 request, std::bind (&S3MotrKVSWriter::put_partial_keyval_successful, this ),
573707 std::bind (&S3MotrKVSWriter::put_partial_keyval_failed, this ), 1 ,
@@ -720,7 +854,7 @@ void S3MotrKVSWriter::put_keyval(const struct s3_motr_idx_layout &idx_lo,
720854 const std::string &key, const std::string &val,
721855 std::function<void (void )> on_success,
722856 std::function<void (void )> on_failed,
723- S3MotrKVSWriter:: CallbackType callback) {
857+ CallbackType callback) {
724858 s3_log (S3_LOG_INFO, stripped_request_id,
725859 " %s Entry with oid = %" SCNx64 " : %" SCNx64
726860 " key = %s and value = %s\n " ,
0 commit comments