55#include < variant>
66#include < vector>
77
8+ #include " absl/flags/flag.h"
89#include " absl/log/log.h"
910#include " absl/status/status.h"
1011#include " absl/status/statusor.h"
12+ #include " absl/strings/str_cat.h"
1113#include " absl/strings/string_view.h"
1214#include " absl/time/clock.h"
1315#include " absl/time/time.h"
1719#include " xla/tsl/platform/errors.h"
1820#include " xla/tsl/platform/threadpool.h"
1921#include " tsl/platform/path.h"
22+ #include " absl/flags/declare.h"
2023#include " xprof/convert/profile_processor.h"
2124#include " xprof/convert/profile_processor_factory.h"
2225#include " xprof/convert/repository.h"
2326#include " xprof/convert/tool_options.h"
27+ #include " xprof/convert/unified_profile_processor.h"
28+ #include " xprof/convert/unified_profile_processor_factory.h"
2429#include " plugin/xprof/protobuf/worker_service.pb.h"
2530#include " plugin/xprof/worker/grpc_utils.h"
2631#include " plugin/xprof/worker/stub_factory.h"
2732
33+ ABSL_DECLARE_FLAG (bool , enable_unified_xprof);
34+
2835namespace tensorflow {
2936namespace profiler {
37+
3038namespace {
39+ using xprof::UnifiedProfileProcessor;
3140
3241constexpr absl::string_view kXplaneFileName = " .xplane.pb" ;
3342
@@ -133,6 +142,43 @@ absl::Status ProcessSession(xprof::ProfileProcessor* processor,
133142 return absl::OkStatus ();
134143}
135144
145+ absl::Status RunUnifiedMapReduce (const SessionSnapshot& session_snapshot,
146+ absl::string_view tool_name,
147+ UnifiedProfileProcessor* processor,
148+ const ToolOptions& options) {
149+ const int num_hosts = session_snapshot.XSpaceSize ();
150+ std::vector<absl::StatusOr<std::string>> map_outputs (num_hosts);
151+
152+ {
153+ tsl::thread::ThreadPool thread_pool (tsl::Env::Default (), __FUNCTION__,
154+ num_hosts);
155+ for (int i = 0 ; i < num_hosts; ++i) {
156+ thread_pool.Schedule ([&session_snapshot, &tool_name, &options,
157+ &map_outputs, i] {
158+ std::string hostname = session_snapshot.GetHostname (i);
159+ std::string xspace_path = GetXSpaceFilePath (session_snapshot, hostname);
160+ map_outputs[i] = CallWorkerService (xspace_path, tool_name, options);
161+ });
162+ }
163+ }
164+
165+ std::vector<std::string> map_output_files;
166+ map_output_files.reserve (num_hosts);
167+ for (int i = 0 ; i < num_hosts; ++i) {
168+ TF_RETURN_IF_ERROR (map_outputs[i].status ());
169+ map_output_files.push_back (*std::move (map_outputs[i]));
170+ }
171+ LOG (INFO) << " Started reducing outputs for tool: " << tool_name
172+ << " num_hosts: " << num_hosts;
173+ absl::Time start_time = absl::Now ();
174+ absl::Status reduce_status =
175+ processor->Reduce (session_snapshot, map_output_files);
176+ absl::Duration reduce_time = absl::Now () - start_time;
177+ LOG (INFO) << " Finished reducing outputs for tool: " << tool_name
178+ << " num_hosts: " << num_hosts << " time taken: " << reduce_time;
179+ return reduce_status;
180+ }
181+
136182} // namespace
137183
138184absl::StatusOr<std::string> ConvertMultiXSpacesToToolDataWithProfileProcessor (
@@ -146,6 +192,28 @@ absl::StatusOr<std::string> ConvertMultiXSpacesToToolDataWithProfileProcessor(
146192
147193 absl::Time start_time = absl::Now ();
148194
195+ if (absl::GetFlag (FLAGS_enable_unified_xprof)) {
196+ auto unified_processor =
197+ xprof::UnifiedProfileProcessorFactory::GetInstance ().Create (
198+ tool_name, options);
199+ if (!unified_processor) {
200+ LOG (WARNING) << " Unified processor not found for tool: " << tool_name
201+ << " , falling back to legacy." ;
202+ } else {
203+ LOG (INFO) << " Using unified workflow for tool: " << tool_name;
204+ if (unified_processor->ShouldUseWorkerService (session_snapshot,
205+ options)) {
206+ TF_RETURN_IF_ERROR (RunUnifiedMapReduce (session_snapshot, tool_name,
207+ unified_processor.get (),
208+ options));
209+ } else {
210+ TF_RETURN_IF_ERROR (
211+ unified_processor->ProcessSession (session_snapshot, options));
212+ }
213+ return unified_processor->GetData ();
214+ }
215+ }
216+
149217 auto processor =
150218 xprof::ProfileProcessorFactory::GetInstance ().Create (tool_name, options);
151219 if (!processor) {
0 commit comments