diff --git a/src/main/java/com/uci/outbound/consumers/OutboundKafkaController.java b/src/main/java/com/uci/outbound/consumers/OutboundKafkaController.java index d19c838..bff39af 100644 --- a/src/main/java/com/uci/outbound/consumers/OutboundKafkaController.java +++ b/src/main/java/com/uci/outbound/consumers/OutboundKafkaController.java @@ -8,6 +8,11 @@ import lombok.extern.slf4j.Slf4j; import messagerosa.core.model.XMessage; import com.uci.dao.utils.XMessageDAOUtils; + +import io.grpc.Context; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Scope; import messagerosa.xml.XMessageParser; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.event.ApplicationStartedEvent; @@ -31,6 +36,9 @@ public class OutboundKafkaController { @Autowired private XMessageRepository xMessageRepo; + + @Autowired + private Tracer tracer; @EventListener(ApplicationStartedEvent.class) public void onMessage() { @@ -38,8 +46,10 @@ public void onMessage() { .doOnNext(new Consumer>() { @Override public void accept(ReceiverRecord msg) { + Span rootSpan = tracer.spanBuilder("outbound-processMessage").startSpan(); + Context currentContext = Context.current(); XMessage currentXmsg = null; - try { + try(Scope scope = rootSpan.makeCurrent()) { currentXmsg = XMessageParser.parse(new ByteArrayInputStream(msg.value().getBytes())); String channel = currentXmsg.getChannelURI(); String provider = currentXmsg.getProviderURI(); @@ -54,6 +64,7 @@ public void accept(XMessage xMessage) { @Override public void accept(XMessageDAO xMessageDAO) { log.info("XMessage Object saved is with sent user ID >> " + xMessageDAO.getUserId()); + rootSpan.end(); } }); } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 5b9cebd..c5de8f1 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -51,3 +51,13 @@ spring.r2dbc.password=${FORMS_DB_PASSWORD} #Caffeine Cache caffeine.cache.max.size=${CAFFEINE_CACHE_MAX_SIZE:#{1000}} caffeine.cache.exprie.duration.seconds=${CAFFEINE_CACHE_EXPIRE_DURATION:#{300}} + +#Opentelemetry Lighstep Config +opentelemetry.lightstep.tracer=${OPENTELEMETERY_LIGHTSTEP_TRACER} +opentelemetry.lightstep.tracer.version=${OPENTELEMETERY_LIGHTSTEP_TRACER_VERSION} +opentelemetry.lightstep.service=${OPENTELEMETERY_LIGHTSTEP_SERVICE} +opentelemetry.lightstep.access.token=${OPENTELEMETERY_LIGHTSTEP_ACCESS_TOKEN} +opentelemetry.lightstep.end.point=${OPENTELEMETERY_LIGHTSTEP_END_POINT} + +#for log4j2 vulnerability +log4j2.formatMsgNoLookups=true