4949#include " ngraph_bridge/ngraph_prefetch_shared_data.h"
5050#include " ngraph_bridge/ngraph_timer.h"
5151#include " ngraph_bridge/ngraph_utils.h"
52+ #include " ngraph_bridge/ngraph_var.h"
5253
5354#if defined(NGRAPH_TF_ENABLE_VARIABLES_AND_OPTIMIZERS)
54- #include " ngraph_bridge/enable_variable_ops/ngraph_var.h"
5555#include " ngraph_bridge/ngraph_catalog.h"
5656#endif
5757
@@ -88,13 +88,8 @@ NGraphEncapsulateOp::NGraphEncapsulateOp(OpKernelConstruction* ctx)
8888 ctx, backend != nullptr ,
8989 errors::Internal (" Cannot get the backend object for BE: " , be_name));
9090
91- // If we have the VARIABLE capture on then we can't use the
92- // parallel executor until that support is added.
93- #if !defined(NGRAPH_TF_ENABLE_VARIABLES_AND_OPTIMIZERS)
91+ // If backend executable can create tensors we use parallel executor
9492 m_use_parallel_executor = backend->executable_can_create_tensors ();
95- #else
96- m_use_parallel_executor = false ;
97- #endif
9893
9994 // Override the switch for debugging/testing
10095 if (std::getenv (" NGRAPH_TF_USE_LEGACY_EXECUTOR" ) != nullptr ) {
@@ -402,7 +397,7 @@ NGraphEncapsulateOp::~NGraphEncapsulateOp() {
402397// OpKernel::Compute
403398// ---------------------------------------------------------------------------
404399void NGraphEncapsulateOp::Compute (OpKernelContext* ctx) {
405- ngraph::Event event_compute (" Compute" , " " , " " );
400+ ngraph::Event event_compute (" NGEncap:: Compute:: " + name (), name () , " " );
406401
407402 if (m_use_parallel_executor) {
408403 NGRAPH_VLOG (1 ) << " NGraphEncapsulateOp::Compute: Using Parallel Executor" ;
@@ -459,6 +454,7 @@ void NGraphEncapsulateOp::ComputeUsingParallelExecutor(OpKernelContext* ctx) {
459454 m_parallel_executor->GetTensorPipelineDepth ()));
460455
461456 // Get Tensor Manager and some error checking
457+ ngraph::Event event_prepare_ng_tensors (" Prepare NG In/Out Tensors" , " " , " " );
462458 auto tensor_manager = m_parallel_executor->GetTensorManager ();
463459 int num_of_inputs = tensor_manager->GetNumberOfInputs ();
464460 int num_of_outputs = tensor_manager->GetNumberOfOutputs ();
@@ -499,14 +495,18 @@ void NGraphEncapsulateOp::ComputeUsingParallelExecutor(OpKernelContext* ctx) {
499495 vector<shared_ptr<ng::runtime::Tensor>> ng_inputs (num_of_inputs);
500496 vector<shared_ptr<ng::runtime::Tensor>> ng_outputs (num_of_outputs);
501497
502- // All inputs and outputs are pipelined.
503- // Of all these pipelined inputs some are prefetched
504- // TODO: Fit in variables
505- ng_inputs = get<1 >(pipelined_io_tensors);
506- ng_outputs = get<2 >(pipelined_io_tensors);
498+ // Prepare NG Input Output Tensors
499+ // Assemble Variable tensors and pipelined tensors to ng_input and ng_outputs
500+ OP_REQUIRES_OK (ctx, GetIOTensorsReadyForExecution (
501+ ctx, tensor_manager, get<1 >(pipelined_io_tensors),
502+ get<2 >(pipelined_io_tensors), ng_inputs, ng_outputs));
503+ event_prepare_ng_tensors.Stop ();
504+ ngraph::Event::write_trace (event_prepare_ng_tensors);
507505
508506 // And execute
509- ngraph::Event event_execute_graph (" Execute Graph" , " " , " " );
507+ ngraph::Event event_execute_graph (
508+ " Execute Graph Pipeline Indx" + to_string (current_iter_pipeline_depth),
509+ " " , " " );
510510
511511 BackendManager::LockBackend (m_parallel_executor->GetOpBackendName ());
512512 NGRAPH_VLOG (4 ) << " NGraphEncapsulateOp::Compute call starting for cluster "
@@ -540,12 +540,14 @@ void NGraphEncapsulateOp::ComputeUsingParallelExecutor(OpKernelContext* ctx) {
540540 ngraph::Event::write_trace (event_execute_graph);
541541
542542 // Now prepare the output
543- ngraph::Event event_copy_output_tensor (" Copy Output Tensor" , " " , " " );
543+ // Allocate TF Tensors
544+ NGRAPH_VLOG (4 ) << " NGraphEncapsulateOp::Compute Allocating TF Output Tensors "
545+ << m_parallel_executor->GetNgraphClusterId ();
544546
545- std::vector<std::unique_ptr<ngraph::Event>> output_copy_events;
547+ ngraph::Event event_prepare_tf_output_tensors (" Prepare TF Output Tensor" , " " ,
548+ " " );
549+ vector<Tensor*> tf_output_tensors;
546550 for (auto i = 0 ; i < ng_exec->get_results ().size (); i++) {
547- std::unique_ptr<ngraph::Event> event_copy_prep (
548- new ngraph::Event (" Copy Prep" , " " , " " ));
549551 auto ng_element = ng_exec->get_results ()[i];
550552 auto ng_shape = ng_element->get_shape ();
551553 auto ng_element_type = ng_element->get_element_type ();
@@ -558,7 +560,7 @@ void NGraphEncapsulateOp::ComputeUsingParallelExecutor(OpKernelContext* ctx) {
558560 TensorShape tf_shape (dims);
559561 Tensor* tf_output_tensor = nullptr ;
560562 OP_REQUIRES_OK (ctx, ctx->allocate_output (i, tf_shape, &tf_output_tensor));
561-
563+ tf_output_tensors. push_back (tf_output_tensor);
562564 // Make sure the nGraph-inferred element type agrees with what TensorFlow
563565 // expected.
564566 ng::element::Type expected_elem_type;
@@ -569,28 +571,45 @@ void NGraphEncapsulateOp::ComputeUsingParallelExecutor(OpKernelContext* ctx) {
569571 ctx, ng_element_type == expected_elem_type,
570572 errors::Internal (" Element type inferred by nGraph does not match "
571573 " the element type expected by TensorFlow" ));
572- event_copy_prep->Stop ();
573- output_copy_events.push_back (std::move (event_copy_prep));
574+ }
574575
575- // Now copy the nGraph Tensor to Host Tensor
576- std::unique_ptr<ngraph::Event> event_copy_d2h (
577- new ngraph::Event (" Device to Host Copy" , " " , " " ));
578- void * dst_ptr = DMAHelper::base (tf_output_tensor);
576+ // Copy Tensors that are required
577+ NGRAPH_VLOG (4 ) << " NGraphEncapsulateOp::Compute Read NG Output Tensors "
578+ << m_parallel_executor->GetNgraphClusterId ();
579579
580- ng_outputs[i]->read (
581- dst_ptr, ng_outputs[i]->get_element_count () * ng_element_type.size ());
580+ std::vector<std::unique_ptr<ngraph::Event>> output_copy_events;
581+
582+ auto output_indexes_to_be_copied =
583+ tensor_manager->GetOutputIndexesThatNeedCopy ();
584+ for (auto output_index : output_indexes_to_be_copied) {
585+ // Copy the nGraph Tensor to Host Tensor
586+ std::unique_ptr<ngraph::Event> event_copy_d2h (new ngraph::Event (
587+ " D2H_Output_" + std::to_string (output_index), " " , " " ));
588+ void * dst_ptr = (void *)DMAHelper::base (tf_output_tensors[output_index]);
589+ ng_outputs[output_index]->read (
590+ dst_ptr, ng_outputs[output_index]->get_element_count () *
591+ ng_outputs[output_index]->get_element_type ().size ());
582592 event_copy_d2h->Stop ();
583593 output_copy_events.push_back (std::move (event_copy_d2h));
584594 }
585-
586595 for (auto & next : output_copy_events) {
587596 ngraph::Event::write_trace (*next.get ());
588597 }
598+ event_prepare_tf_output_tensors.Stop ();
599+ ngraph::Event::write_trace (event_prepare_tf_output_tensors);
589600
590- event_copy_output_tensor.Stop ();
591- ngraph::Event::write_trace (event_copy_output_tensor);
601+ // Synch Var Output Tensors as required
602+ NGRAPH_VLOG (4 )
603+ << " NGraphEncapsulateOp::Compute Sync NG Output Variable Tensors "
604+ << m_parallel_executor->GetNgraphClusterId ();
605+ ngraph::Event event_update_ngvar_tensors (" Update NGVar Tensors" , " " , " " );
606+ OP_REQUIRES_OK (ctx, SyncOutputVarTensors (ctx, tensor_manager));
607+ event_update_ngvar_tensors.Stop ();
608+ ngraph::Event::write_trace (event_update_ngvar_tensors);
592609
593610 // Now return them to the cache
611+ NGRAPH_VLOG (4 ) << " NGraphEncapsulateOp::Returning Tensors "
612+ << m_parallel_executor->GetNgraphClusterId ();
594613 ngraph::Event event_return_tensor (" Return Tensor" , " " , " " );
595614 pipelined_tensor_store->return_tensors (current_iter_pipeline_depth);
596615
0 commit comments