Skip to content

Commit 5057b9a

Browse files
committed
edits logs and added statments
1 parent 06d44cd commit 5057b9a

1 file changed

Lines changed: 22 additions & 25 deletions

File tree

dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

Lines changed: 22 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -233,9 +233,6 @@ public void setupOpenLineage(DDTraceId traceId) {
233233
public synchronized void onApplicationStart(SparkListenerApplicationStart applicationStart) {
234234
this.applicationStart = applicationStart;
235235

236-
if (isRunningOnDatabricks) {
237-
log.info("databricksClusterName{}", databricksClusterName);
238-
}
239236
if (openLineageSparkListener == null) {
240237
openLineageSparkListener =
241238
InstanceStore.of(SparkListenerInterface.class).get("openLineageListener");
@@ -249,7 +246,6 @@ public synchronized void onApplicationStart(SparkListenerApplicationStart applic
249246
.orElse(predeterminedTraceIdContext.getTraceId()));
250247
}
251248
notifyOl(x -> openLineageSparkListener.onApplicationStart(x), applicationStart);
252-
log.info("end of application start");
253249
}
254250

255251
private void initApplicationSpanIfNotInitialized() {
@@ -258,25 +254,25 @@ private void initApplicationSpanIfNotInitialized() {
258254
}
259255

260256
log.debug("Starting tracer application span.");
261-
AgentTracer.SpanBuilder builder = buildSparkSpan("spark.application", null);
262-
263-
if (applicationStart != null) {
264-
String ddTags =
265-
Config.get().getGlobalTags().entrySet().stream()
266-
.sorted(Map.Entry.comparingByKey())
267-
.map(e -> e.getKey() + ":" + e.getValue())
268-
.collect(Collectors.joining(","));
269-
270-
builder
271-
.withStartTimestamp(applicationStart.time() * 1000)
272-
.withTag("application_name", applicationStart.appName())
273-
.withTag("djm.tags", ddTags)
274-
.withTag("spark_user", applicationStart.sparkUser());
275-
276-
if (applicationStart.appAttemptId().isDefined()) {
277-
builder.withTag("app_attempt_id", applicationStart.appAttemptId().get());
278-
}
257+
AgentTracer.SpanBuilder builder = buildSparkSpan("spark.application", null);
258+
259+
if (applicationStart != null) {
260+
String ddTags =
261+
Config.get().getGlobalTags().entrySet().stream()
262+
.sorted(Map.Entry.comparingByKey())
263+
.map(e -> e.getKey() + ":" + e.getValue())
264+
.collect(Collectors.joining(","));
265+
266+
builder
267+
.withStartTimestamp(applicationStart.time() * 1000)
268+
.withTag("application_name", applicationStart.appName())
269+
.withTag("djm.tags", ddTags)
270+
.withTag("spark_user", applicationStart.sparkUser());
271+
272+
if (applicationStart.appAttemptId().isDefined()) {
273+
builder.withTag("app_attempt_id", applicationStart.appAttemptId().get());
279274
}
275+
}
280276

281277
captureApplicationParameters(builder);
282278
captureEmrStepId(builder);
@@ -312,9 +308,6 @@ private void captureOpenlineageContextIfPresent(
312308

313309
@Override
314310
public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
315-
if (isRunningOnDatabricks) {
316-
log.info("On Application End");
317-
}
318311
log.info(
319312
"Received spark application end event, finish trace on this event: {}",
320313
finishTraceOnApplicationEnd);
@@ -330,6 +323,10 @@ public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
330323
public synchronized void finishApplication(
331324
long time, Throwable throwable, int exitCode, String msg) {
332325
log.info("Finishing spark application trace");
326+
327+
if(applicationEnded){
328+
return;
329+
}
333330
applicationEnded = true;
334331

335332
if (applicationSpan == null && jobCount > 0) {

0 commit comments

Comments
 (0)