diff --git a/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/ObservationConsumer.java b/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/ObservationConsumer.java
new file mode 100644
index 000000000..1a7f86be8
--- /dev/null
+++ b/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/ObservationConsumer.java
@@ -0,0 +1,157 @@
+package gov.cdc.nbs.report.pipeline.observation;
+
+import static gov.cdc.etldatapipeline.commonutil.UtilHelper.errorMessage;
+import static gov.cdc.etldatapipeline.commonutil.UtilHelper.extractChangeDataCaptureOperation;
+import static gov.cdc.etldatapipeline.commonutil.UtilHelper.extractUid;
+import static gov.cdc.etldatapipeline.commonutil.UtilHelper.extractValue;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import gov.cdc.etldatapipeline.commonutil.DataProcessingException;
+import gov.cdc.etldatapipeline.commonutil.NoDataException;
+import gov.cdc.nbs.report.pipeline.observation.service.ObservationProcessor;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.ToLongFunction;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.errors.SerializationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.annotation.RetryableTopic;
+import org.springframework.kafka.retrytopic.DltStrategy;
+import org.springframework.kafka.retrytopic.TopicSuffixingStrategy;
+import org.springframework.kafka.support.serializer.DeserializationException;
+import org.springframework.retry.annotation.Backoff;
+import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
+import org.springframework.stereotype.Service;
+
+/**
+ * Service class for processing Observation-related change events in the Real Time Reporting (RTR)
+ * pipeline. This service handles the "hydration" of data for Observations by consuming Kafka events
+ * from transactional source topics, transforming them, and producing them to reporting topics. This
+ * service operates differently than other RTR services in the fact that it also inserts directly
+ * into the nrt_ database tables to eliminate a race condition between the PostProcessingService and
+ * the Kafka Sink Connector. More info can be found here:
+ * https://cdc-nbs.atlassian.net/browse/APP-519
+ *
+ *
Key responsibilities include:
+ *
+ *
+ * - Consuming CDC (Change Data Capture) events for Observations and Act Relationships.
+ *
- Fetching enriched data from the database using stored procedures.
+ *
- Transforming raw data into reporting-optimized formats for Observation and its related
+ * entities (Coded, Date, EDX, Material, Numeric, Reason, Txt).
+ *
- Persisting transformed data to corresponding nrt_observation_* tables
+ *
- Pushing transformed data to corresponding output topic in Kafka.
+ *
- Handling retries and dead-letter topics (DLT) for resilient processing.
+ *
+ */
+@Service
+public class ObservationConsumer {
+ private static final Logger logger = LoggerFactory.getLogger(ObservationConsumer.class);
+ private static final String BEFORE_PATH = "before";
+ private static final String TOPIC_DEBUG_LOG = "Received Observation with id: {} from topic: {}";
+
+ private final String observationTopic;
+ private final String actRelationshipTopic;
+ private final ObservationProcessor observationProcessor;
+ private final ExecutorService obsExecutor;
+
+ public ObservationConsumer(
+ final ObservationProcessor observationProcessor,
+ @Value("${spring.kafka.topics.nbs.observation}") final String observationTopic,
+ @Value("${spring.kafka.topics.nbs.act-relationship}") final String actRelationshipTopic,
+ @Value("${featureFlag.thread-pool-size:1}") final int threadPoolSize) {
+ this.observationProcessor = observationProcessor;
+ this.observationTopic = observationTopic;
+ this.actRelationshipTopic = actRelationshipTopic;
+
+ obsExecutor =
+ Executors.newFixedThreadPool(threadPoolSize, new CustomizableThreadFactory("obs-"));
+ }
+
+ public static final ToLongFunction> toBatchId =
+ rec -> rec.timestamp() + rec.offset() + rec.partition();
+
+ @RetryableTopic(
+ attempts = "${spring.kafka.consumer.max-retry}",
+ autoCreateTopics = "false",
+ dltStrategy = DltStrategy.FAIL_ON_ERROR,
+ retryTopicSuffix = "${spring.kafka.dlq.retry-suffix}",
+ dltTopicSuffix = "${spring.kafka.dlq.dlq-suffix}",
+ // retry topic name, such as topic-retry-1, topic-retry-2, etc
+ topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE,
+ // time to wait before attempting to retry
+ backoff = @Backoff(delay = 1000, multiplier = 2.0),
+ exclude = {
+ SerializationException.class,
+ DeserializationException.class,
+ RuntimeException.class,
+ NoDataException.class
+ },
+ kafkaTemplate = "observationKafkaTemplate")
+ @KafkaListener(
+ topics = {
+ "${spring.kafka.topics.nbs.observation}",
+ "${spring.kafka.topics.nbs.act-relationship}"
+ },
+ containerFactory = "observationKafkaListenerContainerFactory")
+ public CompletableFuture processMessage(ConsumerRecord rec) {
+
+ long batchId = toBatchId.applyAsLong(rec);
+ String topic = rec.topic();
+ String message = rec.value();
+ logger.debug(TOPIC_DEBUG_LOG, message, topic);
+
+ if (topic.equals(observationTopic)) {
+ return CompletableFuture.runAsync(() -> handleObservation(message, batchId), obsExecutor);
+ } else if (topic.equals(actRelationshipTopic) && message != null) {
+ return CompletableFuture.runAsync(() -> handleActRelationship(message, batchId), obsExecutor);
+ } else {
+ return CompletableFuture.failedFuture(
+ new DataProcessingException(
+ "Received data from an unknown topic: " + topic, new NoSuchElementException()));
+ }
+ }
+
+ private void handleObservation(String message, long batchId) {
+ try {
+ String observationUid = extractUid(message, "observation_uid");
+ observationProcessor.process(batchId, observationUid);
+ } catch (JsonProcessingException e) {
+ throw new DataProcessingException(errorMessage("Observation", "", e), e);
+ }
+ }
+
+ private void handleActRelationship(String value, long batchId) {
+ String sourceActUid = "";
+
+ try {
+ String typeCd;
+ String targetClassCd;
+ String operationType = extractChangeDataCaptureOperation(value);
+
+ if (operationType.equals("d")) {
+ sourceActUid = extractUid(value, "source_act_uid", BEFORE_PATH);
+ typeCd = extractValue(value, "type_cd", BEFORE_PATH);
+ targetClassCd = extractValue(value, "target_class_cd", BEFORE_PATH);
+ } else {
+ return;
+ }
+
+ logger.info(TOPIC_DEBUG_LOG, "Act_relationship", sourceActUid, actRelationshipTopic);
+ // For LabReport values, we only need to trigger if the relationship is deleted (not covered
+ // in updates to Observation)
+ // PHC targets are excluded from the LabReport association updates, as the LabReport will
+ // receive an update in Observation
+ if (typeCd.equals("LabReport") && targetClassCd.equals("OBS")) {
+ observationProcessor.process(batchId, sourceActUid);
+ }
+ } catch (Exception e) {
+ throw new DataProcessingException(errorMessage("ActRelationship", sourceActUid, e), e);
+ }
+ }
+}
diff --git a/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/model/dto/observation/ObservationCodedKey.java b/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/model/dto/observation/ObservationCodedKey.java
deleted file mode 100644
index 71162fa64..000000000
--- a/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/model/dto/observation/ObservationCodedKey.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package gov.cdc.nbs.report.pipeline.observation.model.dto.observation;
-
-import com.fasterxml.jackson.databind.PropertyNamingStrategies;
-import com.fasterxml.jackson.databind.annotation.JsonNaming;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-@Data
-@NoArgsConstructor
-@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
-public class ObservationCodedKey {
- private Long observationUid;
- private String ovcCode;
-}
diff --git a/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/model/dto/observation/ObservationEdxKey.java b/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/model/dto/observation/ObservationEdxKey.java
deleted file mode 100644
index de559e4ca..000000000
--- a/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/model/dto/observation/ObservationEdxKey.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package gov.cdc.nbs.report.pipeline.observation.model.dto.observation;
-
-import com.fasterxml.jackson.databind.PropertyNamingStrategies;
-import com.fasterxml.jackson.databind.annotation.JsonNaming;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-import lombok.NonNull;
-
-@Data
-@NoArgsConstructor
-@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
-public class ObservationEdxKey {
- @NonNull private Long edxDocumentUid;
-}
diff --git a/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/model/dto/observation/ObservationKey.java b/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/model/dto/observation/ObservationKey.java
index fd7e0fdc7..c0b225c1f 100644
--- a/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/model/dto/observation/ObservationKey.java
+++ b/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/model/dto/observation/ObservationKey.java
@@ -1,17 +1,5 @@
package gov.cdc.nbs.report.pipeline.observation.model.dto.observation;
import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.databind.PropertyNamingStrategies;
-import com.fasterxml.jackson.databind.annotation.JsonNaming;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-import lombok.NonNull;
-@Data
-@NoArgsConstructor
-@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
-public class ObservationKey {
- @NonNull
- @JsonProperty("observation_uid")
- private Long observationUid;
-}
+public record ObservationKey(@JsonProperty("observation_uid") Long observationUid) {}
diff --git a/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/model/dto/observation/ObservationMaterialKey.java b/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/model/dto/observation/ObservationMaterialKey.java
deleted file mode 100644
index de2770bfb..000000000
--- a/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/model/dto/observation/ObservationMaterialKey.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package gov.cdc.nbs.report.pipeline.observation.model.dto.observation;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.databind.PropertyNamingStrategies;
-import com.fasterxml.jackson.databind.annotation.JsonNaming;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-import lombok.NonNull;
-
-@Data
-@NoArgsConstructor
-@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
-public class ObservationMaterialKey {
- @NonNull
- @JsonProperty("material_id")
- private Long materialId;
-}
diff --git a/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/model/dto/observation/ObservationReasonKey.java b/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/model/dto/observation/ObservationReasonKey.java
deleted file mode 100644
index da8f43c23..000000000
--- a/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/model/dto/observation/ObservationReasonKey.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package gov.cdc.nbs.report.pipeline.observation.model.dto.observation;
-
-import com.fasterxml.jackson.databind.PropertyNamingStrategies;
-import com.fasterxml.jackson.databind.annotation.JsonNaming;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-@Data
-@NoArgsConstructor
-@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
-public class ObservationReasonKey {
- private Long observationUid;
- private String reasonCd;
-}
diff --git a/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/model/dto/observation/ObservationTxtKey.java b/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/model/dto/observation/ObservationTxtKey.java
deleted file mode 100644
index 911f51cc7..000000000
--- a/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/model/dto/observation/ObservationTxtKey.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package gov.cdc.nbs.report.pipeline.observation.model.dto.observation;
-
-import com.fasterxml.jackson.databind.PropertyNamingStrategies;
-import com.fasterxml.jackson.databind.annotation.JsonNaming;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-@Data
-@NoArgsConstructor
-@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
-public class ObservationTxtKey {
- private Long observationUid;
- private Integer ovtSeq;
-}
diff --git a/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/model/dto/observation/ParsedObservation.java b/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/model/dto/observation/ParsedObservation.java
new file mode 100644
index 000000000..bd97b18bb
--- /dev/null
+++ b/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/model/dto/observation/ParsedObservation.java
@@ -0,0 +1,30 @@
+package gov.cdc.nbs.report.pipeline.observation.model.dto.observation;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Output of the ObservationTransformer that contains the ObservationTransformed object as well as
+ * lists of entites that need to be persisted to the database
+ */
+public record ParsedObservation(
+ ObservationTransformed transformed,
+ List materialEntries,
+ List codedEntries,
+ List dateEntries,
+ List edxEntries,
+ List numericEntries,
+ List reasonEntries,
+ List textEntries) {
+ public ParsedObservation(ObservationTransformed observationTransformed) {
+ this(
+ observationTransformed,
+ new ArrayList<>(),
+ new ArrayList<>(),
+ new ArrayList<>(),
+ new ArrayList<>(),
+ new ArrayList<>(),
+ new ArrayList<>(),
+ new ArrayList<>());
+ }
+}
diff --git a/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/repository/NrtObservationWriter.java b/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/repository/NrtObservationWriter.java
new file mode 100644
index 000000000..843a6faac
--- /dev/null
+++ b/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/repository/NrtObservationWriter.java
@@ -0,0 +1,456 @@
+package gov.cdc.nbs.report.pipeline.observation.repository;
+
+import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationCoded;
+import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationDate;
+import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationEdx;
+import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationMaterial;
+import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationNumeric;
+import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationReason;
+import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationTxt;
+import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ParsedObservation;
+import java.util.List;
+import org.springframework.jdbc.core.simple.JdbcClient;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
+
+/**
+ * Responsible for upserting Observation data to the following tables:
+ *
+ *
+ * - nrt_observation_material
+ *
- nrt_observation_coded
+ *
- nrt_observation_date
+ *
- nrt_observation_edx
+ *
- nrt_observation_numeric
+ *
- nrt_observation_reason
+ *
- nrt_observation_txt
+ *
+ */
+@Component
+@Transactional
+public class NrtObservationWriter {
+
+ private final JdbcClient client;
+
+ public NrtObservationWriter(final JdbcClient client) {
+ this.client = client;
+ }
+
+ public void persist(ParsedObservation parsedObservation) {
+ persistMaterials(parsedObservation.materialEntries());
+ persistCoded(parsedObservation.codedEntries());
+ persistDate(parsedObservation.dateEntries());
+ persistEdx(parsedObservation.edxEntries());
+ persistNumeric(parsedObservation.numericEntries());
+ persistReason(parsedObservation.reasonEntries());
+ persistText(parsedObservation.textEntries());
+ }
+
+ private static final String UPSERT_MATERIAL =
+ """
+ MERGE INTO nrt_observation_material
+ USING (
+ SELECT
+ :act_uid AS act_uid,
+ :material_id AS material_id,
+ :type_cd AS type_cd,
+ :subject_class_cd AS subject_class_cd,
+ :record_status AS record_status,
+ :type_desc_txt AS type_desc_txt,
+ :last_chg_time AS last_chg_time,
+ :material_cd AS material_cd,
+ :material_nm AS material_nm,
+ :material_details AS material_details,
+ :material_collection_vol AS material_collection_vol,
+ :material_collection_vol_unit AS material_collection_vol_unit,
+ :material_desc AS material_desc,
+ :risk_cd AS risk_cd,
+ :risk_desc_txt AS risk_desc_txt
+ ) AS source
+ ON nrt_observation_material.act_uid = source.act_uid AND nrt_observation_material.material_id = source.material_id
+ WHEN MATCHED THEN
+ UPDATE SET
+ type_cd = source.type_cd,
+ subject_class_cd = source.subject_class_cd,
+ record_status = source.record_status,
+ type_desc_txt = source.type_desc_txt,
+ last_chg_time = source.last_chg_time,
+ material_cd = source.material_cd,
+ material_nm = source.material_nm,
+ material_details = source.material_details,
+ material_collection_vol = source.material_collection_vol,
+ material_collection_vol_unit = source.material_collection_vol_unit,
+ material_desc = source.material_desc,
+ risk_cd = source.risk_cd,
+ risk_desc_txt = source.risk_desc_txt
+ WHEN NOT MATCHED THEN
+ INSERT(
+ act_uid,
+ type_cd,
+ material_id,
+ subject_class_cd,
+ record_status,
+ type_desc_txt,
+ last_chg_time,
+ material_cd,
+ material_nm,
+ material_details,
+ material_collection_vol,
+ material_collection_vol_unit,
+ material_desc,
+ risk_cd,
+ risk_desc_txt
+ ) VALUES (
+ source.act_uid,
+ source.type_cd,
+ source.material_id,
+ source.subject_class_cd,
+ source.record_status,
+ source.type_desc_txt,
+ source.last_chg_time,
+ source.material_cd,
+ source.material_nm,
+ source.material_details,
+ source.material_collection_vol,
+ source.material_collection_vol_unit,
+ source.material_desc,
+ source.risk_cd,
+ source.risk_desc_txt
+ );
+ """;
+
+ void persistMaterials(List materials) {
+ materials.forEach(
+ m ->
+ client
+ .sql(UPSERT_MATERIAL)
+ .param("act_uid", m.getActUid())
+ .param("material_id", m.getMaterialId())
+ .param("type_cd", m.getTypeCd())
+ .param("subject_class_cd", m.getSubjectClassCd())
+ .param("record_status", m.getRecordStatus())
+ .param("type_desc_txt", m.getTypeDescTxt())
+ .param("last_chg_time", m.getLastChgTime())
+ .param("material_cd", m.getMaterialCd())
+ .param("material_nm", m.getMaterialNm())
+ .param("material_details", m.getMaterialDetails())
+ .param("material_collection_vol", m.getMaterialCollectionVol())
+ .param("material_collection_vol_unit", m.getMaterialCollectionVolUnit())
+ .param("material_desc", m.getMaterialDesc())
+ .param("risk_cd", m.getRiskCd())
+ .param("risk_desc_txt", m.getRiskDescTxt())
+ .update());
+ }
+
+ private static final String UPSERT_CODED =
+ """
+ MERGE INTO nrt_observation_coded
+ USING (
+ SELECT
+ :observation_uid AS observation_uid,
+ :ovc_code AS ovc_code,
+ :ovc_code_system_cd AS ovc_code_system_cd,
+ :ovc_code_system_desc_txt AS ovc_code_system_desc_txt,
+ :ovc_display_name AS ovc_display_name,
+ :ovc_alt_cd AS ovc_alt_cd,
+ :ovc_alt_cd_desc_txt AS ovc_alt_cd_desc_txt,
+ :ovc_alt_cd_system_cd AS ovc_alt_cd_system_cd,
+ :ovc_alt_cd_system_desc_txt AS ovc_alt_cd_system_desc_txt,
+ :batch_id AS batch_id
+ ) AS source
+ ON nrt_observation_coded.observation_uid = source.observation_uid AND nrt_observation_coded.ovc_code = source.ovc_code
+ WHEN MATCHED THEN
+ UPDATE SET
+ ovc_code_system_cd = source.ovc_code_system_cd,
+ ovc_code_system_desc_txt = source.ovc_code_system_desc_txt,
+ ovc_display_name = source.ovc_display_name,
+ ovc_alt_cd = source.ovc_alt_cd,
+ ovc_alt_cd_desc_txt = source.ovc_alt_cd_desc_txt,
+ ovc_alt_cd_system_cd = source.ovc_alt_cd_system_cd,
+ ovc_alt_cd_system_desc_txt = source.ovc_alt_cd_system_desc_txt,
+ batch_id = source.batch_id
+ WHEN NOT MATCHED THEN
+ INSERT (
+ observation_uid,
+ ovc_code,
+ ovc_code_system_cd,
+ ovc_code_system_desc_txt,
+ ovc_display_name,
+ ovc_alt_cd,
+ ovc_alt_cd_desc_txt,
+ ovc_alt_cd_system_cd,
+ ovc_alt_cd_system_desc_txt,
+ batch_id
+ ) VALUES (
+ source.observation_uid,
+ source.ovc_code,
+ source.ovc_code_system_cd,
+ source.ovc_code_system_desc_txt,
+ source.ovc_display_name,
+ source.ovc_alt_cd,
+ source.ovc_alt_cd_desc_txt,
+ source.ovc_alt_cd_system_cd,
+ source.ovc_alt_cd_system_desc_txt,
+ source.batch_id
+ );
+ """;
+
+ void persistCoded(List codedEntries) {
+ codedEntries.forEach(
+ c ->
+ client
+ .sql(UPSERT_CODED)
+ .param("observation_uid", c.getObservationUid())
+ .param("ovc_code", c.getOvcCode())
+ .param("ovc_code_system_cd", c.getOvcCodeSystemCd())
+ .param("ovc_code_system_desc_txt", c.getOvcCodeSystemDescTxt())
+ .param("ovc_display_name", c.getOvcDisplayName())
+ .param("ovc_alt_cd", c.getOvcAltCd())
+ .param("ovc_alt_cd_desc_txt", c.getOvcAltCdDescTxt())
+ .param("ovc_alt_cd_system_cd", c.getOvcAltCdSystemCd())
+ .param("ovc_alt_cd_system_desc_txt", c.getOvcAltCdSystemDescTxt())
+ .param("batch_id", c.getBatchId())
+ .update());
+ }
+
+ private static final String UPSERT_DATE =
+ """
+ MERGE INTO nrt_observation_date
+ USING (
+ SELECT
+ :observation_uid AS observation_uid,
+ :ovd_from_date AS ovd_from_date,
+ :ovd_to_date AS ovd_to_date,
+ :ovd_seq AS ovd_seq,
+ :batch_id AS batch_id
+ ) AS source
+ ON nrt_observation_date.observation_uid = source.observation_uid
+ WHEN MATCHED THEN
+ UPDATE SET
+ ovd_from_date = source.ovd_from_date,
+ ovd_to_date = source.ovd_to_date,
+ ovd_seq = source.ovd_seq,
+ batch_id = source.batch_id
+ WHEN NOT MATCHED THEN
+ INSERT (
+ observation_uid,
+ ovd_from_date,
+ ovd_to_date,
+ ovd_seq,
+ batch_id
+ ) VALUES (
+ source.observation_uid,
+ source.ovd_from_date,
+ source.ovd_to_date,
+ source.ovd_seq,
+ source.batch_id
+ );
+ """;
+
+ void persistDate(List dateEntries) {
+ dateEntries.forEach(
+ d ->
+ client
+ .sql(UPSERT_DATE)
+ .param("observation_uid", d.getObservationUid())
+ .param("ovd_from_date", d.getOvdFromDate())
+ .param("ovd_to_date", d.getOvdToDate())
+ .param("ovd_seq", d.getOvdSeq())
+ .param("batch_id", d.getBatchId())
+ .update());
+ }
+
+ private static final String UPSERT_EDX =
+ """
+ MERGE INTO nrt_observation_edx
+ USING (
+ SELECT
+ :edx_document_uid AS edx_document_uid,
+ :edx_act_uid AS edx_act_uid,
+ :edx_add_time AS edx_add_time
+ ) AS source
+ ON nrt_observation_edx.edx_document_uid = source.edx_document_uid AND nrt_observation_edx.edx_act_uid = source.edx_act_uid
+ WHEN MATCHED THEN
+ UPDATE SET
+ edx_add_time = source.edx_add_time
+ WHEN NOT MATCHED THEN
+ INSERT (
+ edx_document_uid,
+ edx_act_uid,
+ edx_add_time
+ ) VALUES (
+ source.edx_document_uid,
+ source.edx_act_uid,
+ source.edx_add_time
+ );
+ """;
+
+ void persistEdx(List edxEntries) {
+ edxEntries.forEach(
+ e ->
+ client
+ .sql(UPSERT_EDX)
+ .param("edx_document_uid", e.getEdxDocumentUid())
+ .param("edx_act_uid", e.getEdxActUid())
+ .param("edx_add_time", e.getEdxAddTime())
+ .update());
+ }
+
+ private static final String UPSERT_NUMERIC =
+ """
+ MERGE INTO nrt_observation_numeric
+ USING (
+ SELECT
+ :observation_uid AS observation_uid,
+ :ovn_high_range AS ovn_high_range,
+ :ovn_low_range AS ovn_low_range,
+ :ovn_comparator_cd_1 AS ovn_comparator_cd_1,
+ :ovn_numeric_value_1 AS ovn_numeric_value_1,
+ :ovn_numeric_value_2 AS ovn_numeric_value_2,
+ :ovn_numeric_unit_cd AS ovn_numeric_unit_cd,
+ :ovn_separator_cd AS ovn_separator_cd,
+ :ovn_seq AS ovn_seq,
+ :batch_id AS batch_id
+ ) AS source
+ ON nrt_observation_numeric.observation_uid = source.observation_uid AND nrt_observation_numeric.ovn_seq = source.ovn_seq
+ WHEN MATCHED THEN
+ UPDATE SET
+ ovn_high_range = source.ovn_high_range,
+ ovn_low_range = source.ovn_low_range,
+ ovn_comparator_cd_1 = source.ovn_comparator_cd_1,
+ ovn_numeric_value_1 = source.ovn_numeric_value_1,
+ ovn_numeric_value_2 = source.ovn_numeric_value_2,
+ ovn_numeric_unit_cd = source.ovn_numeric_unit_cd,
+ ovn_separator_cd = source.ovn_separator_cd,
+ batch_id = source.batch_id
+ WHEN NOT MATCHED THEN
+ INSERT (
+ observation_uid,
+ ovn_high_range,
+ ovn_low_range,
+ ovn_comparator_cd_1,
+ ovn_numeric_value_1,
+ ovn_numeric_value_2,
+ ovn_numeric_unit_cd,
+ ovn_separator_cd,
+ ovn_seq,
+ batch_id
+ ) VALUES (
+ source.observation_uid,
+ source.ovn_high_range,
+ source.ovn_low_range,
+ source.ovn_comparator_cd_1,
+ source.ovn_numeric_value_1,
+ source.ovn_numeric_value_2,
+ source.ovn_numeric_unit_cd,
+ source.ovn_separator_cd,
+ source.ovn_seq,
+ source.batch_id
+ );
+ """;
+
+ void persistNumeric(List numericEntries) {
+ numericEntries.forEach(
+ n ->
+ client
+ .sql(UPSERT_NUMERIC)
+ .param("observation_uid", n.getObservationUid())
+ .param("ovn_seq", n.getOvnSeq())
+ .param("ovn_high_range", n.getOvnHighRange())
+ .param("ovn_low_range", n.getOvnLowRange())
+ .param("ovn_comparator_cd_1", n.getOvnComparatorCd1())
+ .param("ovn_numeric_value_1", n.getOvnNumericValue1())
+ .param("ovn_numeric_value_2", n.getOvnNumericValue2())
+ .param("ovn_numeric_unit_cd", n.getOvnNumericUnitCd())
+ .param("ovn_separator_cd", n.getOvnSeparatorCd())
+ .param("batch_id", n.getBatchId())
+ .update());
+ }
+
+ private static final String UPSERT_REASON =
+ """
+ MERGE INTO nrt_observation_reason
+ USING (
+ SELECT
+ :observation_uid as observation_uid,
+ :reason_cd as reason_cd,
+ :reason_desc_txt as reason_desc_txt,
+ :batch_id as batch_id
+ ) AS source
+ ON nrt_observation_reason.observation_uid = source.observation_uid AND nrt_observation_reason.reason_cd = source.reason_cd
+ WHEN MATCHED THEN
+ UPDATE SET
+ reason_desc_txt = source.reason_desc_txt,
+ batch_id = source.batch_id
+ WHEN NOT MATCHED THEN
+ INSERT (
+ observation_uid,
+ reason_cd,
+ reason_desc_txt,
+ batch_id
+ ) VALUES (
+ source.observation_uid,
+ source.reason_cd,
+ source.reason_desc_txt,
+ source.batch_id
+ );
+ """;
+
+ void persistReason(List reasonEntries) {
+ reasonEntries.forEach(
+ r ->
+ client
+ .sql(UPSERT_REASON)
+ .param("observation_uid", r.getObservationUid())
+ .param("reason_cd", r.getReasonCd())
+ .param("reason_desc_txt", r.getReasonDescTxt())
+ .param("batch_id", r.getBatchId())
+ .update());
+ }
+
+ private static final String UPSERT_TEXT =
+ """
+ MERGE INTO nrt_observation_txt
+ USING (
+ SELECT
+ :observation_uid AS observation_uid,
+ :ovt_seq AS ovt_seq,
+ :ovt_txt_type_cd AS ovt_txt_type_cd,
+ :ovt_value_txt AS ovt_value_txt,
+ :batch_id AS batch_id
+ ) AS source
+ ON nrt_observation_txt.observation_uid = source.observation_uid AND nrt_observation_txt.ovt_seq = source.ovt_seq
+ WHEN MATCHED THEN
+ UPDATE SET
+ ovt_txt_type_cd = source.ovt_txt_type_cd,
+ ovt_value_txt = source.ovt_value_txt,
+ batch_id = source.batch_id
+ WHEN NOT MATCHED THEN
+ INSERT (
+ observation_uid,
+ ovt_seq,
+ ovt_txt_type_cd,
+ ovt_value_txt,
+ batch_id
+ ) VALUES (
+ source.observation_uid,
+ source.ovt_seq,
+ source.ovt_txt_type_cd,
+ source.ovt_value_txt,
+ source.batch_id
+ );
+ """;
+
+ void persistText(List textEntries) {
+ textEntries.forEach(
+ t ->
+ client
+ .sql(UPSERT_TEXT)
+ .param("observation_uid", t.getObservationUid())
+ .param("ovt_seq", t.getOvtSeq())
+ .param("ovt_txt_type_cd", t.getOvtTxtTypeCd())
+ .param("ovt_value_txt", t.getOvtValueTxt())
+ .param("batch_id", t.getBatchId())
+ .update());
+ }
+}
diff --git a/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/service/ObservationProcessor.java b/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/service/ObservationProcessor.java
new file mode 100644
index 000000000..fead27561
--- /dev/null
+++ b/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/service/ObservationProcessor.java
@@ -0,0 +1,120 @@
+package gov.cdc.nbs.report.pipeline.observation.service;
+
+import static gov.cdc.etldatapipeline.commonutil.UtilHelper.errorMessage;
+
+import gov.cdc.etldatapipeline.commonutil.DataProcessingException;
+import gov.cdc.etldatapipeline.commonutil.NoDataException;
+import gov.cdc.etldatapipeline.commonutil.json.CustomJsonGeneratorImpl;
+import gov.cdc.etldatapipeline.commonutil.metrics.CustomMetrics;
+import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.Observation;
+import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationKey;
+import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationReporting;
+import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ParsedObservation;
+import gov.cdc.nbs.report.pipeline.observation.repository.NrtObservationWriter;
+import gov.cdc.nbs.report.pipeline.observation.repository.ObservationRepository;
+import gov.cdc.nbs.report.pipeline.observation.transformer.ObservationParser;
+import io.micrometer.core.instrument.Counter;
+import jakarta.persistence.EntityNotFoundException;
+import java.util.Optional;
+import org.modelmapper.ModelMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.stereotype.Service;
+
+/** Handles the processing of Observation data */
+@Service
+public class ObservationProcessor {
+ private static final Logger logger = LoggerFactory.getLogger(ObservationProcessor.class);
+ private final ModelMapper modelMapper = new ModelMapper();
+ private final CustomJsonGeneratorImpl jsonGenerator = new CustomJsonGeneratorImpl();
+
+ private final CustomMetrics metrics;
+ private static final String[] TAGS = {"service", "observation-reporting"};
+ private Counter msgProcessed;
+ private Counter msgSuccess;
+ private Counter msgFailure;
+
+ private final ObservationRepository observationRepository;
+ private final KafkaTemplate kafkaTemplate;
+ private final NrtObservationWriter nrtWriter;
+ private final String nrtObservationTopic;
+
+ public ObservationProcessor(
+ final CustomMetrics metrics,
+ final ObservationRepository observationRepository,
+ @Qualifier("observationKafkaTemplate") final KafkaTemplate kafkaTemplate,
+ @Value("${spring.kafka.topics.nrt.observation}") final String nrtObservationTopic,
+ final NrtObservationWriter nrtWriter) {
+ this.metrics = metrics;
+ this.observationRepository = observationRepository;
+ this.kafkaTemplate = kafkaTemplate;
+ this.nrtObservationTopic = nrtObservationTopic;
+ this.nrtWriter = nrtWriter;
+
+ msgProcessed = metrics.counter("obs_msg_processed", TAGS);
+ msgSuccess = metrics.counter("obs_msg_success", TAGS);
+ msgFailure = metrics.counter("obs_msg_failure", TAGS);
+ }
+
+ public void process(final long batchId, final String observationUid) {
+ msgProcessed.increment();
+
+ metrics.recordTime(
+ "obs_msg_processing_seconds",
+ () -> {
+ try {
+ logger.info("Received Observation with id: {}", observationUid);
+
+ // Query NBS_ODSE for observation data
+ Optional observationData =
+ observationRepository.computeObservations(observationUid);
+
+ // Ensure data is returned
+ if (observationData.isEmpty()) {
+ throw new EntityNotFoundException(
+ "Unable to find Observation with id: " + observationUid);
+ }
+
+ // Convert Entity to reporting object that will be sent to nrt_observation
+ ObservationReporting reportingModel =
+ modelMapper.map(observationData.get(), ObservationReporting.class);
+
+ // Parse all fields from incoming mesage
+ ParsedObservation parsed = ObservationParser.parse(observationData.get(), batchId);
+
+ // Push parsed fields into reporting object
+ modelMapper.map(parsed.transformed(), reportingModel);
+
+ // Insert parsed data into nrt_observation_* database tables
+ nrtWriter.persist(parsed);
+
+ // Send observation data to nrt_observation kafka topic
+ ObservationKey observationKey = new ObservationKey(Long.valueOf(observationUid));
+ pushKeyValuePairToKafka(observationKey, reportingModel, nrtObservationTopic);
+ logger.info(
+ "Observation data (uid={}) sent to {}", observationUid, nrtObservationTopic);
+
+ msgSuccess.increment();
+
+ } catch (EntityNotFoundException ex) {
+ msgFailure.increment();
+ throw new NoDataException(ex.getMessage(), ex);
+
+ } catch (Exception e) {
+ msgFailure.increment();
+ throw new DataProcessingException(errorMessage("Observation", observationUid, e), e);
+ }
+ },
+ TAGS);
+ }
+
+ private void pushKeyValuePairToKafka(
+ ObservationKey observationKey, Object model, String topicName) {
+ String jsonKey = jsonGenerator.generateStringJson(observationKey);
+ String jsonValue = jsonGenerator.generateStringJson(model);
+ kafkaTemplate.send(topicName, jsonKey, jsonValue);
+ }
+}
diff --git a/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/service/ObservationService.java b/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/service/ObservationService.java
deleted file mode 100644
index 57dfb08a1..000000000
--- a/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/service/ObservationService.java
+++ /dev/null
@@ -1,245 +0,0 @@
-package gov.cdc.nbs.report.pipeline.observation.service;
-
-import static gov.cdc.etldatapipeline.commonutil.UtilHelper.*;
-
-import gov.cdc.etldatapipeline.commonutil.DataProcessingException;
-import gov.cdc.etldatapipeline.commonutil.NoDataException;
-import gov.cdc.etldatapipeline.commonutil.json.CustomJsonGeneratorImpl;
-import gov.cdc.etldatapipeline.commonutil.metrics.CustomMetrics;
-import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.Observation;
-import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationKey;
-import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationReporting;
-import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationTransformed;
-import gov.cdc.nbs.report.pipeline.observation.repository.ObservationRepository;
-import gov.cdc.nbs.report.pipeline.observation.transformer.ProcessObservationDataUtil;
-import io.micrometer.core.instrument.Counter;
-import jakarta.annotation.PostConstruct;
-import jakarta.persistence.EntityNotFoundException;
-import java.util.NoSuchElementException;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.function.ToLongFunction;
-import lombok.RequiredArgsConstructor;
-import lombok.Setter;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.errors.SerializationException;
-import org.modelmapper.ModelMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.kafka.annotation.RetryableTopic;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.retrytopic.DltStrategy;
-import org.springframework.kafka.retrytopic.TopicSuffixingStrategy;
-import org.springframework.kafka.support.serializer.DeserializationException;
-import org.springframework.retry.annotation.Backoff;
-import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
-import org.springframework.stereotype.Service;
-
-/**
- * Service class for processing Observation-related change events in the Real Time Reporting (RTR)
- * pipeline. This service handles the "hydration" of data for Observations by consuming Kafka events
- * from transactional source topics, transforming them, and producing them to reporting topics.
- *
- * Key responsibilities include:
- *
- *
- * - Consuming CDC (Change Data Capture) events for Observations and Act Relationships.
- *
- Fetching enriched data from the database using stored procedures.
- *
- Transforming raw data into reporting-optimized formats for Observation and its related
- * entities (Coded, Date, EDX, Material, Numeric, Reason, Txt).
- *
- Pushing transformed data to corresponding output topics in Kafka.
- *
- Handling retries and dead-letter topics (DLT) for resilient processing.
- *
- */
-@Service
-@Setter
-@RequiredArgsConstructor
-public class ObservationService {
- private static final Logger logger = LoggerFactory.getLogger(ObservationService.class);
- private static final String BEFORE_PATH = "before";
-
- @Value("${spring.kafka.topics.nbs.observation}")
- private String observationTopic;
-
- @Value("${spring.kafka.topics.nbs.act-relationship}")
- private String actRelationshipTopic;
-
- @Value("${spring.kafka.topics.nrt.observation}")
- private String observationTopicOutputReporting;
-
- @Value("${featureFlag.thread-pool-size:1}")
- private int threadPoolSize;
-
- private final ObservationRepository observationRepository;
-
- @Qualifier("observationKafkaTemplate")
- private final KafkaTemplate kafkaTemplate;
-
- private final ProcessObservationDataUtil processObservationDataUtil;
- private final ModelMapper modelMapper = new ModelMapper();
- private final CustomJsonGeneratorImpl jsonGenerator = new CustomJsonGeneratorImpl();
-
- private ExecutorService obsExecutor;
-
- private static String topicDebugLog = "Received Observation with id: {} from topic: {}";
- public static final ToLongFunction> toBatchId =
- rec -> rec.timestamp() + rec.offset() + rec.partition();
-
- ObservationKey observationKey = new ObservationKey();
-
- private static final String SERVICE_NAME = "observation-reporting";
-
- private final CustomMetrics metrics;
-
- private Counter msgProcessed;
- private Counter msgSuccess;
- private Counter msgFailure;
-
- @PostConstruct
- void initMetrics() {
- String[] tags = {"service", SERVICE_NAME};
-
- msgProcessed = metrics.counter("obs_msg_processed", tags);
- msgSuccess = metrics.counter("obs_msg_success", tags);
- msgFailure = metrics.counter("obs_msg_failure", tags);
-
- obsExecutor =
- Executors.newFixedThreadPool(threadPoolSize, new CustomizableThreadFactory("obs-"));
- }
-
- @RetryableTopic(
- attempts = "${spring.kafka.consumer.max-retry}",
- autoCreateTopics = "false",
- dltStrategy = DltStrategy.FAIL_ON_ERROR,
- retryTopicSuffix = "${spring.kafka.dlq.retry-suffix}",
- dltTopicSuffix = "${spring.kafka.dlq.dlq-suffix}",
- // retry topic name, such as topic-retry-1, topic-retry-2, etc
- topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE,
- // time to wait before attempting to retry
- backoff = @Backoff(delay = 1000, multiplier = 2.0),
- exclude = {
- SerializationException.class,
- DeserializationException.class,
- RuntimeException.class,
- NoDataException.class
- },
- kafkaTemplate = "observationKafkaTemplate")
- @KafkaListener(
- topics = {
- "${spring.kafka.topics.nbs.observation}",
- "${spring.kafka.topics.nbs.act-relationship}"
- },
- containerFactory = "observationKafkaListenerContainerFactory")
- public CompletableFuture processMessage(ConsumerRecord rec) {
-
- long batchId = toBatchId.applyAsLong(rec);
- String topic = rec.topic();
- String message = rec.value();
- logger.debug(topicDebugLog, message, topic);
-
- if (topic.equals(observationTopic)) {
- return CompletableFuture.runAsync(
- () -> processObservation(message, batchId, true, ""), obsExecutor);
- } else if (topic.equals(actRelationshipTopic) && message != null) {
- return CompletableFuture.runAsync(
- () -> processActRelationship(message, batchId), obsExecutor);
- } else {
- return CompletableFuture.failedFuture(
- new DataProcessingException(
- "Received data from an unknown topic: " + topic, new NoSuchElementException()));
- }
- }
-
- private void processObservation(
- String value,
- long batchId,
- boolean isFromObservationTopic,
- String actRelationshipSourceActUid) {
- msgProcessed.increment();
- metrics.recordTime(
- "obs_msg_processing_seconds",
- () -> {
- String observationUid = "";
- try {
- observationUid =
- isFromObservationTopic
- ? extractUid(value, "observation_uid")
- : actRelationshipSourceActUid;
- observationKey.setObservationUid(Long.valueOf(observationUid));
- logger.info(topicDebugLog, observationUid, observationTopic);
- Optional observationData =
- observationRepository.computeObservations(observationUid);
- if (observationData.isPresent()) {
- ObservationReporting reportingModel =
- modelMapper.map(observationData.get(), ObservationReporting.class);
- ObservationTransformed observationTransformed =
- processObservationDataUtil.transformObservationData(
- observationData.get(), batchId);
- modelMapper.map(observationTransformed, reportingModel);
- pushKeyValuePairToKafka(
- observationKey, reportingModel, observationTopicOutputReporting);
- logger.info(
- "Observation data (uid={}) sent to {}",
- observationUid,
- observationTopicOutputReporting);
- msgSuccess.increment();
- } else {
- throw new EntityNotFoundException(
- "Unable to find Observation with id: " + observationUid);
- }
- } catch (EntityNotFoundException ex) {
- msgFailure.increment();
- throw new NoDataException(ex.getMessage(), ex);
- } catch (Exception e) {
- msgFailure.increment();
- throw new DataProcessingException(errorMessage("Observation", observationUid, e), e);
- }
- },
- "service",
- SERVICE_NAME);
- }
-
- private void processActRelationship(String value, long batchId) {
- String sourceActUid = "";
-
- try {
- String typeCd;
- String targetClassCd;
- String operationType = extractChangeDataCaptureOperation(value);
-
- if (operationType.equals("d")) {
- sourceActUid = extractUid(value, "source_act_uid", BEFORE_PATH);
- typeCd = extractValue(value, "type_cd", BEFORE_PATH);
- targetClassCd = extractValue(value, "target_class_cd", BEFORE_PATH);
- } else {
- return;
- }
-
- logger.info(topicDebugLog, "Act_relationship", sourceActUid, actRelationshipTopic);
- // For LabReport values, we only need to trigger if the relationship is deleted (not covered
- // in updates to Observation)
- // PHC targets are excluded from the LabReport association updates, as the LabReport will
- // receive
- // an update in Observation
- if (typeCd.equals("LabReport") && targetClassCd.equals("OBS")) {
- processObservation(value, batchId, false, sourceActUid);
- }
- } catch (Exception e) {
- throw new DataProcessingException(errorMessage("ActRelationship", sourceActUid, e), e);
- }
- }
-
- // This same method can be used for elastic search as well and that is why the generic model is
- // present
- private void pushKeyValuePairToKafka(
- ObservationKey observationKey, Object model, String topicName) {
- String jsonKey = jsonGenerator.generateStringJson(observationKey);
- String jsonValue = jsonGenerator.generateStringJson(model);
- kafkaTemplate.send(topicName, jsonKey, jsonValue);
- }
-}
diff --git a/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/transformer/ProcessObservationDataUtil.java b/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/transformer/ObservationParser.java
similarity index 51%
rename from reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/transformer/ProcessObservationDataUtil.java
rename to reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/transformer/ObservationParser.java
index 9066b9d21..507ed4c05 100644
--- a/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/transformer/ProcessObservationDataUtil.java
+++ b/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/transformer/ObservationParser.java
@@ -4,60 +4,30 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
-import gov.cdc.etldatapipeline.commonutil.json.CustomJsonGeneratorImpl;
-import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.*;
import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.Observation;
+import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationCoded;
+import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationDate;
+import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationEdx;
+import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationMaterial;
+import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationNumeric;
+import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationReason;
import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationTransformed;
+import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationTxt;
+import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ParsedObservation;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
+import java.util.function.Consumer;
import java.util.function.Function;
-import lombok.RequiredArgsConstructor;
-import lombok.Setter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.stereotype.Component;
-
-@Component
-@RequiredArgsConstructor
-@Setter
-public class ProcessObservationDataUtil {
- private static final Logger logger = LoggerFactory.getLogger(ProcessObservationDataUtil.class);
+
+public class ObservationParser {
+ private static final Logger logger = LoggerFactory.getLogger(ObservationParser.class);
private static final ObjectMapper objectMapper =
new ObjectMapper().registerModule(new JavaTimeModule());
- @Qualifier("observationKafkaTemplate")
- private final KafkaTemplate kafkaTemplate;
-
- private final CustomJsonGeneratorImpl jsonGenerator = new CustomJsonGeneratorImpl();
-
- @Value("${spring.kafka.topics.nrt.observation-coded}")
- public String codedTopicName;
-
- @Value("${spring.kafka.topics.nrt.observation-date}")
- public String dateTopicName;
-
- @Value("${spring.kafka.topics.nrt.observation-edx}")
- public String edxTopicName;
-
- @Value("${spring.kafka.topics.nrt.observation-material}")
- public String materialTopicName;
-
- @Value("${spring.kafka.topics.nrt.observation-numeric}")
- public String numericTopicName;
-
- @Value("${spring.kafka.topics.nrt.observation-reason}")
- public String reasonTopicName;
-
- @Value("${spring.kafka.topics.nrt.observation-txt}")
- public String txtTopicName;
-
- ObservationKey observationKey = new ObservationKey();
-
private static final String SUBJECT_CLASS_CD = "subject_class_cd";
public static final String TYPE_CD = "type_cd";
public static final String ENTITY_ID = "entity_id";
@@ -66,39 +36,69 @@ public class ProcessObservationDataUtil {
public static final String ACT_ID_SEQ = "act_id_seq";
public static final String ROOT_EXTENSION_TXT = "root_extension_txt";
- public ObservationTransformed transformObservationData(Observation observation, long batchId) {
- ObservationTransformed observationTransformed = new ObservationTransformed();
+ private ObservationParser() {}
+
+ public static ParsedObservation parse(final Observation observation, final long batchId) {
+ ParsedObservation parsedObservation = new ParsedObservation(new ObservationTransformed());
+ ObservationTransformed observationTransformed = parsedObservation.transformed();
+
observationTransformed.setObservationUid(observation.getObservationUid());
observationTransformed.setReportObservationUid(observation.getObservationUid());
observationTransformed.setBatchId(batchId);
- observationKey.setObservationUid(observation.getObservationUid());
- String obsDomainCdSt1 = observation.getObsDomainCdSt1();
-
- transformPersonParticipations(
- observation.getPersonParticipations(), obsDomainCdSt1, observationTransformed);
- transformOrganizationParticipations(
- observation.getOrganizationParticipations(), obsDomainCdSt1, observationTransformed);
- transformMaterialParticipations(
- observation.getMaterialParticipations(), obsDomainCdSt1, observationTransformed);
- transformFollowupObservations(
- observation.getFollowupObservations(), obsDomainCdSt1, observationTransformed);
- transformParentObservations(observation.getParentObservations(), observationTransformed);
- transformActIds(observation.getActIds(), observationTransformed);
- transformObservationCoded(observation.getObsCode(), batchId);
- transformObservationDate(observation.getObsDate(), batchId);
- transformObservationEdx(observation.getEdxIds());
- transformObservationNumeric(observation.getObsNum(), batchId);
- transformObservationReasons(observation.getObsReason(), batchId);
- transformObservationTxt(observation.getObsTxt(), batchId);
-
- return observationTransformed;
+ // Person Participations
+ parsePersonParticipations(
+ observation.getPersonParticipations(),
+ observation.getObsDomainCdSt1(),
+ observationTransformed);
+
+ // Organization Participations
+ parseOrganizationParticipations(
+ observation.getOrganizationParticipations(),
+ observation.getObsDomainCdSt1(),
+ observationTransformed);
+
+ // Material Participations
+ parseMaterialParticipations(
+ observation.getMaterialParticipations(),
+ observation.getObsDomainCdSt1(),
+ parsedObservation);
+
+ // Follow up Observations
+ parseFollowupObservations(
+ observation.getFollowupObservations(),
+ observation.getObsDomainCdSt1(),
+ observationTransformed);
+
+ // Parent Observations
+ parseParentObservations(observation.getParentObservations(), observationTransformed);
+
+ // Act Ids
+ parseActIds(observation.getActIds(), observationTransformed);
+
+ // Observation Coded data
+ parseObservationCoded(observation.getObsCode(), parsedObservation);
+
+ // Observation Date data
+ parseObservationDate(observation.getObsDate(), parsedObservation);
+
+ // Observation Edx data
+ parseObservationEdx(observation.getEdxIds(), parsedObservation);
+
+ // Observation Numeric data
+ parseObservationNumeric(observation.getObsNum(), parsedObservation);
+
+ // Observation Reason data
+ parseObservationReasons(observation.getObsReason(), parsedObservation);
+
+ // Observation Text data
+ parseObservationTxt(observation.getObsTxt(), parsedObservation);
+
+ return parsedObservation;
}
- private void transformPersonParticipations(
- String personParticipations,
- String obsDomainCdSt1,
- ObservationTransformed observationTransformed) {
+ private static void parsePersonParticipations(
+ String personParticipations, String obsDomainCdSt1, ObservationTransformed transformed) {
try {
JsonNode personParticipationsJsonArray = parseJsonArray(personParticipations);
@@ -110,7 +110,7 @@ private void transformPersonParticipations(
Long entityId = getNodeValue(jsonNode, ENTITY_ID, JsonNode::asLong);
if (typeCd.equals("PATSBJ")) {
- transformPersonParticipationRoles(jsonNode, observationTransformed, entityId);
+ setPersonParticipationRoles(jsonNode, transformed, entityId);
}
if (ORDER.equals(obsDomainCdSt1)) {
@@ -121,61 +121,45 @@ private void transformPersonParticipations(
orderers.add(String.valueOf(entityId));
break;
case "PATSBJ", "SubjOfMorbReport":
- observationTransformed.setPatientId(entityId);
+ transformed.setPatientId(entityId);
break;
case "PhysicianOfMorb":
- observationTransformed.setMorbPhysicianId(entityId);
+ transformed.setMorbPhysicianId(entityId);
break;
case "ReporterOfMorbReport":
- observationTransformed.setMorbReporterId(entityId);
+ transformed.setMorbReporterId(entityId);
break;
case "ENT":
- observationTransformed.setTranscriptionistId(entityId);
- Optional.ofNullable(jsonNode.get("first_nm"))
- .filter(n -> !n.isNull())
- .ifPresent(n -> observationTransformed.setTranscriptionistFirstNm(n.asText()));
- Optional.ofNullable(jsonNode.get("last_nm"))
- .filter(n -> !n.isNull())
- .ifPresent(n -> observationTransformed.setTranscriptionistLastNm(n.asText()));
- Optional.ofNullable(jsonNode.get("person_id_val"))
- .filter(n -> !n.isNull())
- .ifPresent(n -> observationTransformed.setTranscriptionistVal(n.asText()));
- Optional.ofNullable(jsonNode.get("person_id_assign_auth_cd"))
- .filter(n -> !n.isNull())
- .ifPresent(
- n -> observationTransformed.setTranscriptionistIdAssignAuth(n.asText()));
- Optional.ofNullable(jsonNode.get("person_id_type_desc"))
- .filter(n -> !n.isNull())
- .ifPresent(n -> observationTransformed.setTranscriptionistAuthType(n.asText()));
+ transformed.setTranscriptionistId(entityId);
+
+ ifPresentSet(jsonNode, "first_nm", transformed::setTranscriptionistFirstNm);
+ ifPresentSet(jsonNode, "last_nm", transformed::setTranscriptionistLastNm);
+ ifPresentSet(jsonNode, "person_id_val", transformed::setTranscriptionistVal);
+ ifPresentSet(
+ jsonNode,
+ "person_id_assign_auth_cd",
+ transformed::setTranscriptionistIdAssignAuth);
+ ifPresentSet(
+ jsonNode, "person_id_type_desc", transformed::setTranscriptionistAuthType);
break;
case "ASS":
- observationTransformed.setAssistantInterpreterId(entityId);
- Optional.ofNullable(jsonNode.get("first_nm"))
- .filter(n -> !n.isNull())
- .ifPresent(
- n -> observationTransformed.setAssistantInterpreterFirstNm(n.asText()));
- Optional.ofNullable(jsonNode.get("last_nm"))
- .filter(n -> !n.isNull())
- .ifPresent(
- n -> observationTransformed.setAssistantInterpreterLastNm(n.asText()));
- Optional.ofNullable(jsonNode.get("person_id_val"))
- .filter(n -> !n.isNull())
- .ifPresent(n -> observationTransformed.setAssistantInterpreterVal(n.asText()));
- Optional.ofNullable(jsonNode.get("person_id_assign_auth_cd"))
- .filter(n -> !n.isNull())
- .ifPresent(
- n ->
- observationTransformed.setAssistantInterpreterIdAssignAuth(n.asText()));
- Optional.ofNullable(jsonNode.get("person_id_type_desc"))
- .filter(n -> !n.isNull())
- .ifPresent(
- n -> observationTransformed.setAssistantInterpreterAuthType(n.asText()));
+ transformed.setAssistantInterpreterId(entityId);
+
+ ifPresentSet(jsonNode, "first_nm", transformed::setAssistantInterpreterFirstNm);
+ ifPresentSet(jsonNode, "last_nm", transformed::setAssistantInterpreterLastNm);
+ ifPresentSet(jsonNode, "person_id_val", transformed::setAssistantInterpreterVal);
+ ifPresentSet(
+ jsonNode,
+ "person_id_assign_auth_cd",
+ transformed::setAssistantInterpreterIdAssignAuth);
+ ifPresentSet(
+ jsonNode, "person_id_type_desc", transformed::setAssistantInterpreterAuthType);
break;
case "VRF":
- observationTransformed.setResultInterpreterId(entityId);
+ transformed.setResultInterpreterId(entityId);
break;
case "PRF":
- observationTransformed.setLabTestTechnicianId(entityId);
+ transformed.setLabTestTechnicianId(entityId);
break;
default:
}
@@ -183,7 +167,7 @@ private void transformPersonParticipations(
}
}
if (!orderers.isEmpty()) {
- observationTransformed.setOrderingPersonId(String.join(",", orderers));
+ transformed.setOrderingPersonId(String.join(",", orderers));
}
} catch (IllegalArgumentException ex) {
logger.info(ex.getMessage(), "PersonParticipations", personParticipations);
@@ -194,26 +178,10 @@ private void transformPersonParticipations(
}
}
- private void transformPersonParticipationRoles(
- JsonNode node, ObservationTransformed observationTransformed, Long entityId) {
- String roleSubject = node.path("role_subject_class_cd").asText();
- if ("PROV".equals(roleSubject)) {
- String roleCd = node.path("role_cd").asText();
- if ("SPP".equals(roleCd)) {
- String roleScoping = node.path("role_scoping_class_cd").asText();
- if ("PSN".equals(roleScoping)) {
- observationTransformed.setSpecimenCollectorId(entityId);
- }
- } else if ("CT".equals(roleCd)) {
- observationTransformed.setCopyToProviderId(entityId);
- }
- }
- }
-
- private void transformOrganizationParticipations(
+ private static void parseOrganizationParticipations(
String organizationParticipations,
String obsDomainCdSt1,
- ObservationTransformed observationTransformed) {
+ ObservationTransformed transformed) {
try {
JsonNode organizationParticipationsJsonArray = parseJsonArray(organizationParticipations);
@@ -225,26 +193,25 @@ private void transformOrganizationParticipations(
Long entityId = getNodeValue(jsonNode, ENTITY_ID, JsonNode::asLong);
if (subjectClassCd.equals("ORG")) {
- if (RESULT.equals(obsDomainCdSt1)) {
- if ("PRF".equals(typeCd)) {
- observationTransformed.setPerformingOrganizationId(entityId);
- }
+
+ if (RESULT.equals(obsDomainCdSt1) && "PRF".equals(typeCd)) {
+ transformed.setPerformingOrganizationId(entityId);
} else if (ORDER.equals(obsDomainCdSt1)) {
switch (typeCd) {
case "AUT":
- observationTransformed.setAuthorOrganizationId(entityId);
+ transformed.setAuthorOrganizationId(entityId);
break;
case "ORD":
- observationTransformed.setOrderingOrganizationId(entityId);
+ transformed.setOrderingOrganizationId(entityId);
break;
case "HCFAC":
- observationTransformed.setHealthCareId(entityId);
+ transformed.setHealthCareId(entityId);
break;
case "ReporterOfMorbReport":
- observationTransformed.setMorbHospReporterId(entityId);
+ transformed.setMorbHospReporterId(entityId);
break;
case "HospOfMorbObs":
- observationTransformed.setMorbHospId(entityId);
+ transformed.setMorbHospId(entityId);
break;
default:
break;
@@ -261,10 +228,8 @@ private void transformOrganizationParticipations(
}
}
- private void transformMaterialParticipations(
- String materialParticipations,
- String obsDomainCdSt1,
- ObservationTransformed observationTransformed) {
+ private static void parseMaterialParticipations(
+ String materialParticipations, String obsDomainCdSt1, ParsedObservation parsedObservation) {
try {
JsonNode materialParticipationsJsonArray = parseJsonArray(materialParticipations);
@@ -275,19 +240,14 @@ private void transformMaterialParticipations(
assertDomainCdMatches(obsDomainCdSt1, ORDER);
if ("SPC".equals(typeCd) && "MAT".equals(subjectClassCd)) {
Long materialId = jsonNode.get(ENTITY_ID).asLong();
- observationTransformed.setMaterialId(materialId);
+ parsedObservation.transformed().setMaterialId(materialId);
ObservationMaterial material =
objectMapper.treeToValue(jsonNode, ObservationMaterial.class);
material.setMaterialId(materialId);
- ObservationMaterialKey key = new ObservationMaterialKey();
- key.setMaterialId(observationTransformed.getMaterialId());
- sendToKafka(
- key,
- material,
- materialTopicName,
- materialId,
- "Observation Material data (uid={}) sent to {}");
+
+ // Add material to list that will be persisted to the database
+ parsedObservation.materialEntries().add(material);
}
}
} catch (IllegalArgumentException ex) {
@@ -299,20 +259,34 @@ private void transformMaterialParticipations(
}
}
- private void transformFollowupObservations(
- String followupObservations,
- String obsDomainCdSt1,
- ObservationTransformed observationTransformed) {
+ private static void setPersonParticipationRoles(
+ JsonNode node, ObservationTransformed observationTransformed, Long entityId) {
+ String roleSubject = fieldAsText(node, "role_subject_class_cd");
+ if ("PROV".equals(roleSubject)) {
+ String roleCd = fieldAsText(node, "role_cd");
+ if ("SPP".equals(roleCd)) {
+ String roleScoping = fieldAsText(node, "role_scoping_class_cd");
+ if ("PSN".equals(roleScoping)) {
+ observationTransformed.setSpecimenCollectorId(entityId);
+ }
+ } else if ("CT".equals(roleCd)) {
+ observationTransformed.setCopyToProviderId(entityId);
+ }
+ }
+ }
+
+ private static void parseFollowupObservations(
+ String followupObservations, String obsDomainCdSt1, ObservationTransformed transformed) {
try {
JsonNode followupObservationsJsonArray = parseJsonArray(followupObservations);
List results = new ArrayList<>();
List followUps = new ArrayList<>();
for (JsonNode jsonNode : followupObservationsJsonArray) {
- Optional domainCd = Optional.ofNullable(jsonNode.get("domain_cd_st_1"));
+ String domainCd = fieldAsText(jsonNode, "domain_cd_st_1");
assertDomainCdMatches(obsDomainCdSt1, ORDER);
- if (domainCd.isPresent() && RESULT.equals(domainCd.get().asText())) {
+ if (RESULT.equals(domainCd)) {
Optional.ofNullable(jsonNode.get("result_observation_uid"))
.ifPresent(r -> results.add(r.asText()));
} else {
@@ -322,10 +296,10 @@ private void transformFollowupObservations(
}
if (!results.isEmpty()) {
- observationTransformed.setResultObservationUid(String.join(",", results));
+ transformed.setResultObservationUid(String.join(",", results));
}
if (!followUps.isEmpty()) {
- observationTransformed.setFollowUpObservationUid(String.join(",", followUps));
+ transformed.setFollowUpObservationUid(String.join(",", followUps));
}
} catch (IllegalArgumentException ex) {
logger.info(ex.getMessage(), "FollowupObservations", followupObservations);
@@ -336,24 +310,24 @@ private void transformFollowupObservations(
}
}
- private void transformParentObservations(
- String parentObservations, ObservationTransformed observationTransformed) {
+ private static void parseParentObservations(
+ String parentObservations, ObservationTransformed transformed) {
try {
JsonNode parentObservationsJsonArray = parseJsonArray(parentObservations);
for (JsonNode jsonNode : parentObservationsJsonArray) {
Long parentUid = getNodeValue(jsonNode, "parent_uid", JsonNode::asLong);
- String parentTypeCd = jsonNode.path("parent_type_cd").asText();
- String parentDomainCd = jsonNode.path("parent_domain_cd_st_1").asText();
+ String parentTypeCd = fieldAsText(jsonNode, "parent_type_cd");
+ String parentDomainCd = fieldAsText(jsonNode, "parent_domain_cd_st_1");
- if (parentTypeCd.equals("SPRT")) {
- observationTransformed.setReportSprtUid(parentUid);
- } else if (parentTypeCd.equals("REFR")) {
- observationTransformed.setReportRefrUid(parentUid);
+ if ("SPRT".equals(parentTypeCd)) {
+ transformed.setReportSprtUid(parentUid);
+ } else if ("REFR".equals(parentTypeCd)) {
+ transformed.setReportRefrUid(parentUid);
}
if (parentDomainCd.contains(ORDER)) {
- observationTransformed.setReportObservationUid(parentUid);
+ transformed.setReportObservationUid(parentUid);
}
}
} catch (IllegalArgumentException ex) {
@@ -365,7 +339,7 @@ private void transformParentObservations(
}
}
- private void transformActIds(String actIds, ObservationTransformed observationTransformed) {
+ private static void parseActIds(String actIds, ObservationTransformed observationTransformed) {
try {
JsonNode actIdsJsonArray = parseJsonArray(actIds);
@@ -392,22 +366,16 @@ private void transformActIds(String actIds, ObservationTransformed observationTr
}
}
- private void transformObservationCoded(String observationCoded, long batchId) {
+ private static void parseObservationCoded(
+ String observationCoded, ParsedObservation parsedObservation) {
try {
JsonNode observationCodedJsonArray = parseJsonArray(observationCoded);
- ObservationCodedKey codedKey = new ObservationCodedKey();
for (JsonNode jsonNode : observationCodedJsonArray) {
ObservationCoded coded = objectMapper.treeToValue(jsonNode, ObservationCoded.class);
- coded.setBatchId(batchId);
- codedKey.setObservationUid(coded.getObservationUid());
- codedKey.setOvcCode(coded.getOvcCode());
- sendToKafka(
- codedKey,
- coded,
- codedTopicName,
- coded.getObservationUid(),
- "Observation Coded data (uid={}) sent to {}");
+ coded.setBatchId(parsedObservation.transformed().getBatchId());
+
+ parsedObservation.codedEntries().add(coded);
}
} catch (IllegalArgumentException ex) {
logger.info(ex.getMessage(), "ObservationCoded");
@@ -418,19 +386,16 @@ private void transformObservationCoded(String observationCoded, long batchId) {
}
}
- private void transformObservationDate(String observationDate, long batchId) {
+ private static void parseObservationDate(
+ String observationDate, ParsedObservation parsedObservation) {
try {
JsonNode observationDateJsonArray = parseJsonArray(observationDate);
for (JsonNode jsonNode : observationDateJsonArray) {
ObservationDate obsDate = objectMapper.treeToValue(jsonNode, ObservationDate.class);
- obsDate.setBatchId(batchId);
- sendToKafka(
- observationKey,
- obsDate,
- dateTopicName,
- obsDate.getObservationUid(),
- "Observation Date data (uid={}) sent to {}");
+ obsDate.setBatchId(parsedObservation.transformed().getBatchId());
+
+ parsedObservation.dateEntries().add(obsDate);
}
} catch (IllegalArgumentException ex) {
logger.info(ex.getMessage(), "ObservationDate");
@@ -440,20 +405,14 @@ private void transformObservationDate(String observationDate, long batchId) {
}
}
- private void transformObservationEdx(String observationEdx) {
+ private static void parseObservationEdx(
+ String observationEdx, ParsedObservation parsedObservation) {
try {
JsonNode observationEdxJsonArray = parseJsonArray(observationEdx);
- ObservationEdxKey edxKey = new ObservationEdxKey();
-
for (JsonNode jsonNode : observationEdxJsonArray) {
ObservationEdx edx = objectMapper.treeToValue(jsonNode, ObservationEdx.class);
- edxKey.setEdxDocumentUid(edx.getEdxDocumentUid());
- sendToKafka(
- edxKey,
- edx,
- edxTopicName,
- edx.getEdxDocumentUid(),
- "Observation Edx data (edx doc uid={}) sent to {}");
+
+ parsedObservation.edxEntries().add(edx);
}
} catch (IllegalArgumentException ex) {
logger.info(ex.getMessage(), "ObservationEdx");
@@ -463,19 +422,15 @@ private void transformObservationEdx(String observationEdx) {
}
}
- private void transformObservationNumeric(String observationNumeric, long batchId) {
+ private static void parseObservationNumeric(
+ String observationNumeric, ParsedObservation parsedObservation) {
try {
JsonNode observationNumericJsonArray = parseJsonArray(observationNumeric);
for (JsonNode jsonNode : observationNumericJsonArray) {
ObservationNumeric numeric = objectMapper.treeToValue(jsonNode, ObservationNumeric.class);
- numeric.setBatchId(batchId);
- sendToKafka(
- observationKey,
- numeric,
- numericTopicName,
- numeric.getObservationUid(),
- "Observation Numeric data (uid={}) sent to {}");
+ numeric.setBatchId(parsedObservation.transformed().getBatchId());
+ parsedObservation.numericEntries().add(numeric);
}
} catch (IllegalArgumentException ex) {
logger.info(ex.getMessage(), "ObservationNumeric");
@@ -486,22 +441,15 @@ private void transformObservationNumeric(String observationNumeric, long batchId
}
}
- private void transformObservationReasons(String observationReasons, long batchId) {
+ private static void parseObservationReasons(
+ String observationReasons, ParsedObservation parsedObservation) {
try {
JsonNode observationReasonsJsonArray = parseJsonArray(observationReasons);
- ObservationReasonKey reasonKey = new ObservationReasonKey();
for (JsonNode jsonNode : observationReasonsJsonArray) {
ObservationReason reason = objectMapper.treeToValue(jsonNode, ObservationReason.class);
- reason.setBatchId(batchId);
- reasonKey.setObservationUid(reason.getObservationUid());
- reasonKey.setReasonCd(reason.getReasonCd());
- sendToKafka(
- reasonKey,
- reason,
- reasonTopicName,
- reason.getObservationUid(),
- "Observation Reason data (uid={}) sent to {}");
+ reason.setBatchId(parsedObservation.transformed().getBatchId());
+ parsedObservation.reasonEntries().add(reason);
}
} catch (IllegalArgumentException ex) {
logger.info(ex.getMessage(), "ObservationReasons");
@@ -512,22 +460,16 @@ private void transformObservationReasons(String observationReasons, long batchId
}
}
- private void transformObservationTxt(String observationTxt, long batchId) {
+ private static void parseObservationTxt(
+ String observationTxt, ParsedObservation parsedObservation) {
try {
JsonNode observationTxtJsonArray = parseJsonArray(observationTxt);
- ObservationTxtKey txtKey = new ObservationTxtKey();
for (JsonNode jsonNode : observationTxtJsonArray) {
ObservationTxt txt = objectMapper.treeToValue(jsonNode, ObservationTxt.class);
- txt.setBatchId(batchId);
- txtKey.setObservationUid(txt.getObservationUid());
- txtKey.setOvtSeq(txt.getOvtSeq());
- sendToKafka(
- txtKey,
- txt,
- txtTopicName,
- txt.getObservationUid(),
- "Observation Txt data (uid={}) sent to {}");
+ txt.setBatchId(parsedObservation.transformed().getBatchId());
+
+ parsedObservation.textEntries().add(txt);
}
} catch (IllegalArgumentException ex) {
logger.info(ex.getMessage(), "ObservationTxt");
@@ -537,21 +479,7 @@ private void transformObservationTxt(String observationTxt, long batchId) {
}
}
- private void sendToKafka(Object key, Object value, String topicName, Long uid, String message) {
- String jsonKey = jsonGenerator.generateStringJson(key);
- String jsonValue =
- Optional.ofNullable(value).map(jsonGenerator::generateStringJson).orElse(null);
- kafkaTemplate
- .send(topicName, jsonKey, jsonValue)
- .whenComplete(
- (res, e) -> {
- if (message != null) {
- logger.info(message, uid, topicName);
- }
- });
- }
-
- private JsonNode parseJsonArray(String jsonString)
+ private static JsonNode parseJsonArray(String jsonString)
throws JsonProcessingException, IllegalArgumentException {
JsonNode jsonArray = jsonString != null ? objectMapper.readTree(jsonString) : null;
if (jsonArray != null && jsonArray.isArray()) {
@@ -561,7 +489,8 @@ private JsonNode parseJsonArray(String jsonString)
}
}
- private T getNodeValue(JsonNode jsonNode, String fieldName, Function mapper) {
+ private static T getNodeValue(
+ JsonNode jsonNode, String fieldName, Function mapper) {
JsonNode node = jsonNode.get(fieldName);
if (node == null || node.isNull()) {
throw new IllegalArgumentException("Field " + fieldName + " is null or not found in {}: {}");
@@ -569,9 +498,20 @@ private T getNodeValue(JsonNode jsonNode, String fieldName, Function setter) {
+ String value = fieldAsText(node, field);
+ if (value != null) {
+ setter.accept(value);
+ }
+ }
}
diff --git a/reporting-pipeline-service/src/main/resources/application.yaml b/reporting-pipeline-service/src/main/resources/application.yaml
index ea8a680ff..d76d1938c 100644
--- a/reporting-pipeline-service/src/main/resources/application.yaml
+++ b/reporting-pipeline-service/src/main/resources/application.yaml
@@ -45,13 +45,6 @@ spring:
ldf-data: ${TOPIC_NRT_LDF_DATA:nrt_ldf_data}
metadata-columns: ${TOPIC_NRT_METADATA_COLUMNS:nrt_metadata_columns}
observation: ${TOPIC_NRT_OBSERVATION:nrt_observation}
- observation-coded: ${TOPIC_NRT_OBSERVATION_CODED:nrt_observation_coded}
- observation-date: ${TOPIC_NRT_OBSERVATION_DATE:nrt_observation_date}
- observation-edx: ${TOPIC_NRT_OBSERVATION_EDX:nrt_observation_edx}
- observation-material: ${TOPIC_NRT_OBSERVATION_MATERIAL:nrt_observation_material}
- observation-numeric: ${TOPIC_NRT_OBSERVATION_NUMERIC:nrt_observation_numeric}
- observation-reason: ${TOPIC_NRT_OBSERVATION_REASON:nrt_observation_reason}
- observation-txt: ${TOPIC_NRT_OBSERVATION_TXT:nrt_observation_txt}
odse-nbs-page: ${TOPIC_NRT_ODSE_NBS_PAGE:nrt_odse_NBS_page}
odse-state-defined-field-metadata: ${TOPIC_NRT_ODSE_STATE_DEFINED_FIELD_METADATA:nrt_odse_state_defined_field_metadata}
organization: ${TOPIC_NRT_ORGANIZATION:nrt_organization}
diff --git a/reporting-pipeline-service/src/test/java/gov/cdc/nbs/report/pipeline/integration/support/config/DataSourceConfig.java b/reporting-pipeline-service/src/test/java/gov/cdc/nbs/report/pipeline/integration/support/config/DataSourceConfig.java
index b2b34176f..4740dd1b9 100644
--- a/reporting-pipeline-service/src/test/java/gov/cdc/nbs/report/pipeline/integration/support/config/DataSourceConfig.java
+++ b/reporting-pipeline-service/src/test/java/gov/cdc/nbs/report/pipeline/integration/support/config/DataSourceConfig.java
@@ -28,7 +28,7 @@ public DataSource dataSource(DataSourceProperties properties) {
}
@Primary
- @Bean
+ @Bean("rtrClient")
public JdbcClient jdbcClient(DataSource dataSource) {
return JdbcClient.create(dataSource);
}
diff --git a/reporting-pipeline-service/src/test/java/gov/cdc/nbs/report/pipeline/observation/ObservationConsumerTest.java b/reporting-pipeline-service/src/test/java/gov/cdc/nbs/report/pipeline/observation/ObservationConsumerTest.java
new file mode 100644
index 000000000..a72d77417
--- /dev/null
+++ b/reporting-pipeline-service/src/test/java/gov/cdc/nbs/report/pipeline/observation/ObservationConsumerTest.java
@@ -0,0 +1,149 @@
+package gov.cdc.nbs.report.pipeline.observation;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.*;
+
+import gov.cdc.nbs.report.pipeline.observation.service.ObservationProcessor;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.jupiter.api.Test;
+import org.mockito.*;
+
+class ObservationConsumerTest {
+
+ private final String observationTopic = "Observation";
+ private final String actRelationshipTopic = "Act_relationship";
+
+ @Mock ObservationProcessor processor = Mockito.mock(ObservationProcessor.class);
+ private ObservationConsumer consumer =
+ new ObservationConsumer(processor, observationTopic, actRelationshipTopic, 1);
+
+ @Test
+ void processesObservationMessage() { // observation_uid
+ String message =
+ """
+ {
+ "payload" : {
+ "op": "c",
+ "after": {
+ "observation_uid": "123"
+ }
+ }
+ }
+ """;
+ ConsumerRecord consumerRecord =
+ new ConsumerRecord<>(observationTopic, 0, 1l, null, message);
+ // receives valid observation message
+ consumer.processMessage(consumerRecord).join();
+
+ // sends to ObservationProcessor
+ verify(processor, times(1)).process(0, "123");
+ }
+
+ @Test
+ void processesActRelationshipMessage() {
+ String message =
+ """
+ {
+ "payload" : {
+ "op": "d",
+ "before": {
+ "source_act_uid": "1",
+ "type_cd": "LabReport",
+ "target_class_cd": "OBS"
+ }
+ }
+ }
+ """;
+ ConsumerRecord consumerRecord =
+ new ConsumerRecord<>(actRelationshipTopic, 0, 1l, null, message);
+ // receives valid act_relationship message
+ consumer.processMessage(consumerRecord).join();
+
+ // sends to ObservationProcessor
+ verify(processor, times(1)).process(0, "1");
+ }
+
+ @Test
+ void doesNotProcessActRelationshipMessageBadOp() {
+ String message =
+ """
+ {
+ "payload" : {
+ "op": "c",
+ "before": {
+ "source_act_uid": "1",
+ "type_cd": "LabReport",
+ "target_class_cd": "OBS"
+ }
+ }
+ }
+ """;
+ ConsumerRecord consumerRecord =
+ new ConsumerRecord<>(actRelationshipTopic, 0, 1l, null, message);
+ // receives non 'delete' act_relationship message
+ consumer.processMessage(consumerRecord).join();
+
+ // does not send to ObservationProcessor
+ verifyNoInteractions(processor);
+ }
+
+ @Test
+ void doesNotProcessActRelationshipMessageBadTypeCd() {
+ String message =
+ """
+ {
+ "payload" : {
+ "op": "d",
+ "before": {
+ "source_act_uid": "1",
+ "type_cd": "BadValue",
+ "target_class_cd": "OBS"
+ }
+ }
+ }
+ """;
+ ConsumerRecord consumerRecord =
+ new ConsumerRecord<>(actRelationshipTopic, 0, 1l, null, message);
+ // receives act_relationship message with a type_cd other than 'LabReport'
+ consumer.processMessage(consumerRecord).join();
+
+ // does not send to ObservationProcessor
+ verifyNoInteractions(processor);
+ }
+
+ @Test
+ void throwsExceptionForBadTopic() {
+ ConsumerRecord consumerRecord =
+ new ConsumerRecord<>("bad_topic", 0, 1l, null, "");
+ CompletableFuture future = consumer.processMessage(consumerRecord);
+
+ CompletionException ex = assertThrows(CompletionException.class, future::join);
+ assertThat(ex.getCause().getMessage())
+ .isEqualTo("Received data from an unknown topic: bad_topic");
+ }
+
+ @Test
+ void throwsExceptionForBadActRelationshipMessage() {
+ String message =
+ """
+ {
+ "payload" : {
+ "op": "d",
+ "before": {
+ }
+ }
+ }
+ """;
+ ConsumerRecord consumerRecord =
+ new ConsumerRecord<>(actRelationshipTopic, 0, 1l, null, message);
+ CompletableFuture future = consumer.processMessage(consumerRecord);
+
+ CompletionException ex = assertThrows(CompletionException.class, future::join);
+ assertThat(ex.getCause().getMessage())
+ .isEqualTo(
+ "Error processing ActRelationship data: The source_act_uid field is missing in the message payload.");
+ }
+}
diff --git a/reporting-pipeline-service/src/test/java/gov/cdc/nbs/report/pipeline/observation/repository/NrtObservationWriterTest.java b/reporting-pipeline-service/src/test/java/gov/cdc/nbs/report/pipeline/observation/repository/NrtObservationWriterTest.java
new file mode 100644
index 000000000..fe52cc054
--- /dev/null
+++ b/reporting-pipeline-service/src/test/java/gov/cdc/nbs/report/pipeline/observation/repository/NrtObservationWriterTest.java
@@ -0,0 +1,603 @@
+package gov.cdc.nbs.report.pipeline.observation.repository;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import gov.cdc.nbs.report.pipeline.integration.unit.UnitTest;
+import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationCoded;
+import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationDate;
+import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationEdx;
+import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationMaterial;
+import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationNumeric;
+import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationReason;
+import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationTxt;
+import java.text.SimpleDateFormat;
+import java.util.List;
+import java.util.Map;
+import org.json.JSONException;
+import org.junit.jupiter.api.Test;
+import org.skyscreamer.jsonassert.Customization;
+import org.skyscreamer.jsonassert.JSONAssert;
+import org.skyscreamer.jsonassert.JSONCompareMode;
+import org.skyscreamer.jsonassert.comparator.CustomComparator;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.jdbc.core.simple.JdbcClient;
+
+class NrtObservationWriterTest extends UnitTest {
+
+ private final JdbcClient client;
+ private final NrtObservationWriter writer;
+ private static final ObjectMapper mapper =
+ new ObjectMapper()
+ .enable(SerializationFeature.INDENT_OUTPUT)
+ .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
+ .setDateFormat(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"));
+
+ public NrtObservationWriterTest(@Qualifier("rtrClient") final JdbcClient client) {
+ this.client = client;
+ this.writer = new NrtObservationWriter(client);
+ }
+
+ @Test
+ void insertsMaterialData() throws JsonProcessingException, JSONException {
+ // Insert material data
+ ObservationMaterial material = new ObservationMaterial();
+ material.setActUid(2L);
+ material.setTypeCd("SPC");
+ material.setMaterialId(2L);
+ material.setSubjectClassCd("MAT");
+ material.setRecordStatus("ACTIVE");
+ material.setTypeDescTxt("Specimen");
+ material.setLastChgTime("2024-01-01T00:00:00.000");
+ material.setMaterialCd("UNK");
+ material.setMaterialNm("name");
+ material.setMaterialDetails("Details");
+ material.setMaterialCollectionVol("36");
+ material.setMaterialCollectionVolUnit("ML");
+ material.setMaterialDesc("Lymphocytes");
+ material.setRiskCd("rsk");
+ material.setRiskDescTxt("rskDesc");
+
+ writer.persistMaterials(List.of(material));
+
+ // Verify data is as expected
+ Map data =
+ client
+ .sql("SELECT * FROM nrt_observation_material WHERE act_uid = 2 AND material_id = 2")
+ .query()
+ .singleRow();
+
+ String actual = mapper.writeValueAsString(data);
+ String expected = mapper.writeValueAsString(material);
+ JSONAssert.assertEquals(
+ expected,
+ actual,
+ new CustomComparator(
+ JSONCompareMode.LENIENT, new Customization("refresh_datetime", (a, b) -> true)));
+ }
+
+ @Test
+ void updatesMaterialData() throws JsonProcessingException, JSONException {
+ // Insert material data
+ ObservationMaterial material = new ObservationMaterial();
+ material.setActUid(3L);
+ material.setTypeCd("SPC");
+ material.setMaterialId(3L);
+ material.setSubjectClassCd("MAT");
+ material.setRecordStatus("ACTIVE");
+ material.setTypeDescTxt("Specimen");
+ material.setLastChgTime("2024-01-01T00:00:00.000");
+ material.setMaterialCd("UNK");
+ material.setMaterialNm("name");
+ material.setMaterialDetails("Details");
+ material.setMaterialCollectionVol("36");
+ material.setMaterialCollectionVolUnit("ML");
+ material.setMaterialDesc("Lymphocytes");
+ material.setRiskCd("rsk");
+ material.setRiskDescTxt("rskDesc");
+
+ writer.persistMaterials(List.of(material));
+
+ // Upsert material data
+ material.setTypeCd("CPS");
+ material.setSubjectClassCd("TAM");
+ material.setRecordStatus("INACTIVE");
+ material.setTypeDescTxt("NEW");
+ material.setLastChgTime("2025-01-01T00:00:00.000");
+ material.setMaterialCd("ST");
+ material.setMaterialNm("NEW_NAME");
+ material.setMaterialDetails("Updated Details");
+ material.setMaterialCollectionVol("34");
+ material.setMaterialCollectionVolUnit("LM");
+ material.setMaterialDesc("Something");
+ material.setRiskCd("UpdatedRisk");
+ material.setRiskDescTxt("NewRiskDesc");
+ writer.persistMaterials(List.of(material));
+
+ // Verify data is as expected
+ Integer count =
+ client
+ .sql(
+ "SELECT COUNT(*) FROM nrt_observation_material WHERE act_uid = 3 AND material_id = 3")
+ .query(Integer.class)
+ .single();
+
+ assertThat(count).isEqualTo(1);
+
+ Map data =
+ client
+ .sql("SELECT * FROM nrt_observation_material WHERE act_uid = 3 AND material_id = 3")
+ .query()
+ .singleRow();
+
+ String actual = mapper.writeValueAsString(data);
+ String expected = mapper.writeValueAsString(material);
+ JSONAssert.assertEquals(
+ expected,
+ actual,
+ new CustomComparator(
+ JSONCompareMode.LENIENT, new Customization("refresh_datetime", (a, b) -> true)));
+ }
+
+ @Test
+ void insertsCodedEntry() throws JSONException, JsonProcessingException {
+ // Insert coded data
+ ObservationCoded coded = new ObservationCoded();
+ coded.setObservationUid(4L);
+ coded.setBatchId(0L);
+ coded.setOvcCode("CE04");
+ coded.setOvcCodeSystemCd("SNM");
+ coded.setOvcCodeSystemDescTxt("SNOMED");
+ coded.setOvcDisplayName("Normal]");
+ coded.setOvcAltCd("A-124");
+ coded.setOvcAltCdDescTxt("NORMAL");
+
+ writer.persistCoded(List.of(coded));
+
+ // Verify data is as expected
+ Map data =
+ client
+ .sql(
+ "SELECT * FROM nrt_observation_coded WHERE observation_uid = 4 AND ovc_code = 'CE04'")
+ .query()
+ .singleRow();
+
+ String actual = mapper.writeValueAsString(data);
+ String expected = mapper.writeValueAsString(coded);
+ JSONAssert.assertEquals(
+ expected,
+ actual,
+ new CustomComparator(
+ JSONCompareMode.LENIENT, new Customization("refresh_datetime", (a, b) -> true)));
+ }
+
+ @Test
+ void updatesCodedEntry() throws JSONException, JsonProcessingException {
+ // Insert coded data
+ ObservationCoded coded = new ObservationCoded();
+ coded.setObservationUid(5L);
+ coded.setBatchId(0L);
+ coded.setOvcCode("CE05");
+ coded.setOvcCodeSystemCd("SNM");
+ coded.setOvcCodeSystemDescTxt("SNOMED");
+ coded.setOvcDisplayName("Normal]");
+ coded.setOvcAltCd("A-124");
+ coded.setOvcAltCdDescTxt("NORMAL");
+
+ writer.persistCoded(List.of(coded));
+
+ // Upsert coded data
+ coded.setBatchId(1L);
+ coded.setOvcCodeSystemCd("MNS");
+ coded.setOvcCodeSystemDescTxt("LOINC");
+ coded.setOvcDisplayName("NotNormal");
+ coded.setOvcAltCd("D-444");
+ coded.setOvcAltCdDescTxt("ABOVE");
+
+ writer.persistCoded(List.of(coded));
+
+ // Verify data is as expected
+ Integer count =
+ client
+ .sql(
+ "SELECT COUNT(*) FROM nrt_observation_coded WHERE observation_uid = 5 AND ovc_code = 'CE05'")
+ .query(Integer.class)
+ .single();
+
+ assertThat(count).isEqualTo(1);
+
+ Map data =
+ client
+ .sql(
+ "SELECT * FROM nrt_observation_coded WHERE observation_uid = 5 AND ovc_code = 'CE05'")
+ .query()
+ .singleRow();
+
+ String actual = mapper.writeValueAsString(data);
+ String expected = mapper.writeValueAsString(coded);
+ JSONAssert.assertEquals(
+ expected,
+ actual,
+ new CustomComparator(
+ JSONCompareMode.LENIENT, new Customization("refresh_datetime", (a, b) -> true)));
+ }
+
+ @Test
+ void insertDateEntry() throws JsonProcessingException, JSONException {
+ // Insert Date data
+ ObservationDate date = new ObservationDate();
+ date.setObservationUid(6L);
+ date.setBatchId(0L);
+ date.setOvdFromDate("2024-08-16T00:00:00.000");
+ date.setOvdSeq(1);
+
+ writer.persistDate(List.of(date));
+
+ // Verify data is as expected
+ Integer count =
+ client
+ .sql("SELECT COUNT(*) FROM nrt_observation_date WHERE observation_uid = 6")
+ .query(Integer.class)
+ .single();
+
+ assertThat(count).isEqualTo(1);
+
+ Map data =
+ client
+ .sql("SELECT * FROM nrt_observation_date WHERE observation_uid = 6")
+ .query()
+ .singleRow();
+
+ String actual = mapper.writeValueAsString(data);
+ String expected = mapper.writeValueAsString(date);
+ JSONAssert.assertEquals(
+ expected,
+ actual,
+ new CustomComparator(
+ JSONCompareMode.LENIENT, new Customization("refresh_datetime", (a, b) -> true)));
+ }
+
+ @Test
+ void updatesDateEntry() throws JsonProcessingException, JSONException {
+ // Insert Date data
+ ObservationDate date = new ObservationDate();
+ date.setObservationUid(7L);
+ date.setBatchId(0L);
+ date.setOvdFromDate("2024-08-16T00:00:00.000");
+ date.setOvdSeq(1);
+
+ writer.persistDate(List.of(date));
+
+ // Upsert date data
+ date.setBatchId(2L);
+ date.setOvdFromDate("2025-08-16T00:00:00.000");
+ date.setOvdSeq(2);
+
+ writer.persistDate(List.of(date));
+
+ // Verify data is as expected
+ Map data =
+ client
+ .sql("SELECT * FROM nrt_observation_date WHERE observation_uid = 7")
+ .query()
+ .singleRow();
+
+ String actual = mapper.writeValueAsString(data);
+ String expected = mapper.writeValueAsString(date);
+ JSONAssert.assertEquals(
+ expected,
+ actual,
+ new CustomComparator(
+ JSONCompareMode.LENIENT, new Customization("refresh_datetime", (a, b) -> true)));
+ }
+
+ @Test
+ void insertEdx() throws JsonProcessingException, JSONException {
+ // Insert edx data
+ ObservationEdx edx = new ObservationEdx();
+ edx.setEdxActUid(8l);
+ edx.setEdxDocumentUid(9L);
+ edx.setEdxAddTime("2024-09-30T21:08:19.017");
+
+ writer.persistEdx(List.of(edx));
+
+ // Verify data is as expected
+ Map data =
+ client
+ .sql("SELECT * FROM nrt_observation_edx WHERE edx_act_uid = 8 AND edx_document_uid = 9")
+ .query()
+ .singleRow();
+
+ String actual = mapper.writeValueAsString(data);
+ String expected = mapper.writeValueAsString(edx);
+ JSONAssert.assertEquals(
+ expected,
+ actual,
+ new CustomComparator(
+ JSONCompareMode.LENIENT, new Customization("refresh_datetime", (a, b) -> true)));
+ }
+
+ @Test
+ void updatesEdx() throws JsonProcessingException, JSONException {
+ // Insert edx data
+ ObservationEdx edx = new ObservationEdx();
+ edx.setEdxActUid(9l);
+ edx.setEdxDocumentUid(10L);
+ edx.setEdxAddTime("2024-09-30T21:08:19.017");
+
+ writer.persistEdx(List.of(edx));
+
+ // Upsert edx data
+ edx.setEdxAddTime("2025-10-02T20:04:09.000");
+
+ writer.persistEdx(List.of(edx));
+
+ // Verify data is as expected
+ Integer count =
+ client
+ .sql(
+ "SELECT COUNT(*) FROM nrt_observation_edx WHERE edx_act_uid = 9 AND edx_document_uid = 10")
+ .query(Integer.class)
+ .single();
+
+ assertThat(count).isEqualTo(1);
+
+ Map data =
+ client
+ .sql(
+ "SELECT * FROM nrt_observation_edx WHERE edx_act_uid = 9 AND edx_document_uid = 10")
+ .query()
+ .singleRow();
+
+ String actual = mapper.writeValueAsString(data);
+ String expected = mapper.writeValueAsString(edx);
+ JSONAssert.assertEquals(
+ expected,
+ actual,
+ new CustomComparator(
+ JSONCompareMode.LENIENT, new Customization("refresh_datetime", (a, b) -> true)));
+ }
+
+ @Test
+ void insertNumeric() {
+ // Insert numeric data
+ ObservationNumeric numeric = new ObservationNumeric();
+ numeric.setObservationUid(10L);
+ numeric.setOvnSeq(1);
+ numeric.setBatchId(0l);
+ numeric.setOvnComparatorCd1("100");
+ numeric.setOvnLowRange("10-100");
+ numeric.setOvnHighRange("100-1000");
+ numeric.setOvnNumericValue1("23.10000");
+ numeric.setOvnNumericValue2("1.00000");
+ numeric.setOvnNumericUnitCd("mL");
+ numeric.setOvnSeparatorCd(":");
+
+ writer.persistNumeric(List.of(numeric));
+
+ // Verify data is as expected
+ Map data =
+ client
+ .sql("SELECT * FROM nrt_observation_numeric WHERE observation_uid = 10 AND ovn_seq = 1")
+ .query()
+ .singleRow();
+
+ // Field comparison due to type mismatch from String to numeric(15,5) of numeric value fields
+ assertThat(data)
+ .containsEntry("observation_uid", numeric.getObservationUid())
+ .containsEntry("ovn_high_range", numeric.getOvnHighRange())
+ .containsEntry("ovn_low_range", numeric.getOvnLowRange())
+ .containsEntry("ovn_comparator_cd_1", numeric.getOvnComparatorCd1())
+ .containsEntry("ovn_numeric_unit_cd", numeric.getOvnNumericUnitCd())
+ .containsEntry("ovn_separator_cd", numeric.getOvnSeparatorCd())
+ .containsEntry("batch_id", numeric.getBatchId());
+
+ assertThat(data.get("ovn_seq")).hasToString(numeric.getOvnSeq().toString());
+ assertThat(data.get("ovn_numeric_value_1")).hasToString(numeric.getOvnNumericValue1());
+ assertThat(data.get("ovn_numeric_value_2")).hasToString(numeric.getOvnNumericValue2());
+ }
+
+ @Test
+ void updatesNumeric() {
+ // Insert numeric data
+ ObservationNumeric numeric = new ObservationNumeric();
+ numeric.setObservationUid(11L);
+ numeric.setOvnSeq(2);
+ numeric.setBatchId(0l);
+ numeric.setOvnComparatorCd1("100");
+ numeric.setOvnLowRange("10-100");
+ numeric.setOvnHighRange("100-1000");
+ numeric.setOvnNumericValue1("23");
+ numeric.setOvnNumericValue2("1.0");
+ numeric.setOvnNumericUnitCd("mL");
+ numeric.setOvnSeparatorCd(":");
+
+ writer.persistNumeric(List.of(numeric));
+
+ // Upsert numeric data
+ numeric.setBatchId(1l);
+ numeric.setOvnComparatorCd1("200");
+ numeric.setOvnLowRange("20-200");
+ numeric.setOvnHighRange("200-2000");
+ numeric.setOvnNumericValue1("34.00000");
+ numeric.setOvnNumericValue2("2.00000");
+ numeric.setOvnNumericUnitCd("LM");
+ numeric.setOvnSeparatorCd("!");
+
+ writer.persistNumeric(List.of(numeric));
+
+ // Verify data is as expected
+ Integer count =
+ client
+ .sql(
+ "SELECT COUNT(*) FROM nrt_observation_numeric WHERE observation_uid = 11 AND ovn_seq = 2")
+ .query(Integer.class)
+ .single();
+
+ assertThat(count).isEqualTo(1);
+
+ Map data =
+ client
+ .sql("SELECT * FROM nrt_observation_numeric WHERE observation_uid = 11 AND ovn_seq = 2")
+ .query()
+ .singleRow();
+
+ assertThat(data)
+ .containsEntry("observation_uid", numeric.getObservationUid())
+ .containsEntry("ovn_high_range", numeric.getOvnHighRange())
+ .containsEntry("ovn_low_range", numeric.getOvnLowRange())
+ .containsEntry("ovn_comparator_cd_1", numeric.getOvnComparatorCd1())
+ .containsEntry("ovn_numeric_unit_cd", numeric.getOvnNumericUnitCd())
+ .containsEntry("ovn_separator_cd", numeric.getOvnSeparatorCd())
+ .containsEntry("batch_id", numeric.getBatchId());
+
+ assertThat(data.get("ovn_seq")).hasToString(numeric.getOvnSeq().toString());
+ assertThat(data.get("ovn_numeric_value_1")).hasToString(numeric.getOvnNumericValue1());
+ assertThat(data.get("ovn_numeric_value_2")).hasToString(numeric.getOvnNumericValue2());
+ }
+
+ @Test
+ void insertsReason() throws JsonProcessingException, JSONException {
+ // Insert reason data
+ ObservationReason reason = new ObservationReason();
+ reason.setObservationUid(12l);
+ reason.setReasonCd("80008");
+ reason.setReasonDescTxt("PRESENCE OF REASON");
+ reason.setBatchId(12l);
+
+ writer.persistReason(List.of(reason));
+
+ // Verify data is as expected
+ Map data =
+ client
+ .sql(
+ "SELECT * FROM nrt_observation_reason WHERE observation_uid = 12 AND reason_cd = '80008'")
+ .query()
+ .singleRow();
+
+ String actual = mapper.writeValueAsString(data);
+ String expected = mapper.writeValueAsString(reason);
+ JSONAssert.assertEquals(
+ expected,
+ actual,
+ new CustomComparator(
+ JSONCompareMode.LENIENT, new Customization("refresh_datetime", (a, b) -> true)));
+ }
+
+ @Test
+ void updatesReason() throws JsonProcessingException, JSONException {
+ // Insert reason data
+ ObservationReason reason = new ObservationReason();
+ reason.setObservationUid(13l);
+ reason.setReasonCd("9009");
+ reason.setReasonDescTxt("PRESENCE OF REASON");
+ reason.setBatchId(13l);
+
+ writer.persistReason(List.of(reason));
+
+ // Upsert reason data
+ reason.setReasonDescTxt("A DIFFERENT REASON");
+ reason.setBatchId(14l);
+
+ writer.persistReason(List.of(reason));
+
+ // Verify data is as expected
+ Integer count =
+ client
+ .sql(
+ "SELECT COUNT(*) FROM nrt_observation_reason WHERE observation_uid = 13 AND reason_cd = '9009'")
+ .query(Integer.class)
+ .single();
+
+ assertThat(count).isEqualTo(1);
+
+ Map data =
+ client
+ .sql(
+ "SELECT * FROM nrt_observation_reason WHERE observation_uid = 13 AND reason_cd = '9009'")
+ .query()
+ .singleRow();
+
+ String actual = mapper.writeValueAsString(data);
+ String expected = mapper.writeValueAsString(reason);
+ JSONAssert.assertEquals(
+ expected,
+ actual,
+ new CustomComparator(
+ JSONCompareMode.LENIENT, new Customization("refresh_datetime", (a, b) -> true)));
+ }
+
+ @Test
+ void insertsText() throws JsonProcessingException, JSONException {
+ // Insert text data
+ ObservationTxt txt = new ObservationTxt();
+ txt.setObservationUid(14L);
+ txt.setOvtSeq(1);
+ txt.setBatchId(14L);
+ txt.setOvtTxtTypeCd("N");
+ txt.setOvtValueTxt("RECOMMENDED IN SUCH INSTANCES.");
+
+ writer.persistText(List.of(txt));
+
+ // Verify data is as expected
+ Map data =
+ client
+ .sql("SELECT * FROM nrt_observation_txt WHERE observation_uid = 14 AND ovt_seq = 1")
+ .query()
+ .singleRow();
+
+ String actual = mapper.writeValueAsString(data);
+ String expected = mapper.writeValueAsString(txt);
+ JSONAssert.assertEquals(
+ expected,
+ actual,
+ new CustomComparator(
+ JSONCompareMode.LENIENT, new Customization("refresh_datetime", (a, b) -> true)));
+ }
+
+ @Test
+ void updatesText() throws JsonProcessingException, JSONException {
+ // Insert text data
+ ObservationTxt txt = new ObservationTxt();
+ txt.setObservationUid(15L);
+ txt.setOvtSeq(2);
+ txt.setBatchId(15L);
+ txt.setOvtTxtTypeCd("N");
+ txt.setOvtValueTxt("RECOMMENDED IN SUCH INSTANCES.");
+
+ writer.persistText(List.of(txt));
+
+ // Update text data
+ txt.setBatchId(16L);
+ txt.setOvtTxtTypeCd("J");
+ txt.setOvtValueTxt("UPDATED VALUE");
+
+ writer.persistText(List.of(txt));
+
+ // Verify data is as expected
+ Integer count =
+ client
+ .sql(
+ "SELECT COUNT(*) FROM nrt_observation_txt WHERE observation_uid = 15 AND ovt_seq = 2")
+ .query(Integer.class)
+ .single();
+
+ assertThat(count).isEqualTo(1);
+
+ Map data =
+ client
+ .sql("SELECT * FROM nrt_observation_txt WHERE observation_uid = 15 AND ovt_seq = 2")
+ .query()
+ .singleRow();
+
+ String actual = mapper.writeValueAsString(data);
+ String expected = mapper.writeValueAsString(txt);
+ JSONAssert.assertEquals(
+ expected,
+ actual,
+ new CustomComparator(
+ JSONCompareMode.LENIENT, new Customization("refresh_datetime", (a, b) -> true)));
+ }
+}
diff --git a/reporting-pipeline-service/src/test/java/gov/cdc/nbs/report/pipeline/observation/service/ObservationProcessorTest.java b/reporting-pipeline-service/src/test/java/gov/cdc/nbs/report/pipeline/observation/service/ObservationProcessorTest.java
new file mode 100644
index 000000000..a5cb952a1
--- /dev/null
+++ b/reporting-pipeline-service/src/test/java/gov/cdc/nbs/report/pipeline/observation/service/ObservationProcessorTest.java
@@ -0,0 +1,166 @@
+package gov.cdc.nbs.report.pipeline.observation.service;
+
+import static gov.cdc.etldatapipeline.commonutil.TestUtils.readFileData;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import gov.cdc.etldatapipeline.commonutil.NoDataException;
+import gov.cdc.etldatapipeline.commonutil.metrics.CustomMetrics;
+import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.Observation;
+import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationKey;
+import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationReporting;
+import gov.cdc.nbs.report.pipeline.observation.repository.NrtObservationWriter;
+import gov.cdc.nbs.report.pipeline.observation.repository.ObservationRepository;
+import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
+import java.util.Optional;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.springframework.kafka.core.KafkaTemplate;
+
+@ExtendWith(MockitoExtension.class)
+class ObservationProcessorTest {
+
+ @Spy private CustomMetrics metrics = new CustomMetrics(new SimpleMeterRegistry());
+ @Mock private ObservationRepository repository;
+ @Mock private NrtObservationWriter writer;
+ @Mock private KafkaTemplate kafkaTemplate;
+ @Captor private ArgumentCaptor keyCaptor;
+ @Captor private ArgumentCaptor messageCaptor;
+ private final ObjectMapper mapper = new ObjectMapper();
+ private final String nrtObservationTopic = "nrt_observation";
+
+ private ObservationProcessor processor;
+
+ @BeforeEach
+ void init() {
+ processor =
+ new ObservationProcessor(metrics, repository, kafkaTemplate, nrtObservationTopic, writer);
+ }
+
+ @Test
+ void successfullyProcessesObservation() throws JsonProcessingException {
+ // Mock database response
+ Observation observation = constructObservation(123L, "Order");
+ when(repository.computeObservations("123")).thenReturn(Optional.of(observation));
+
+ // Act
+ processor.process(1l, "123");
+
+ // Verify
+ verify(repository, times(1)).computeObservations("123");
+ verify(kafkaTemplate, times(1))
+ .send(eq("nrt_observation"), keyCaptor.capture(), messageCaptor.capture());
+
+ ObservationKey expectedKey = new ObservationKey(observation.getObservationUid());
+ ObservationKey actualKey =
+ mapper.readValue(
+ mapper.readTree(keyCaptor.getValue()).path("payload").toString(), ObservationKey.class);
+ assertThat(actualKey).isEqualTo(expectedKey);
+
+ var reportingModel =
+ constructObservationReporting(
+ observation.getObservationUid(), observation.getObsDomainCdSt1());
+
+ var actualReporting =
+ mapper.readValue(
+ mapper.readTree(messageCaptor.getValue()).path("payload").toString(),
+ ObservationReporting.class);
+
+ assertThat(reportingModel).isEqualTo(actualReporting);
+ }
+
+ @Test
+ void verifyThrowsExceptionWhenNoDataFound() {
+ // Mock database response
+ when(repository.computeObservations("123")).thenReturn(Optional.empty());
+
+ // Act + Verify
+ NoDataException ex = assertThrows(NoDataException.class, () -> processor.process(1l, "123"));
+ assertThat(ex.getMessage()).isEqualTo("Unable to find Observation with id: 123");
+ }
+
+ private Observation constructObservation(Long observationUid, String obsDomainCdSt1) {
+ String filePathPrefix = "rawDataFiles/observation/";
+ Observation observation = new Observation();
+ observation.setObservationUid(observationUid);
+ observation.setActUid(observationUid);
+ observation.setClassCd("OBS");
+ observation.setMoodCd("ENV");
+ observation.setLocalId("OBS10003388MA01");
+ observation.setActivityFromTime("2021-01-28 16:06:03.000");
+ observation.setObsDomainCdSt1(obsDomainCdSt1);
+ observation.setPersonParticipations(readFileData(filePathPrefix + "PersonParticipations.json"));
+ observation.setOrganizationParticipations(
+ readFileData(filePathPrefix + "OrganizationParticipations.json"));
+ observation.setMaterialParticipations(
+ readFileData(filePathPrefix + "MaterialParticipations.json"));
+ observation.setFollowupObservations(readFileData(filePathPrefix + "FollowupObservations.json"));
+ observation.setParentObservations(readFileData(filePathPrefix + "ParentObservations.json"));
+ observation.setActIds(readFileData(filePathPrefix + "ActIds.json"));
+ return observation;
+ }
+
+ private ObservationReporting constructObservationReporting(
+ Long observationUid, String obsDomainCdSt1) {
+ ObservationReporting observation = new ObservationReporting();
+ observation.setBatchId(1l);
+ observation.setObservationUid(observationUid);
+ observation.setObsDomainCdSt1(obsDomainCdSt1);
+ observation.setActUid(observationUid);
+ observation.setClassCd("OBS");
+ observation.setMoodCd("ENV");
+ observation.setLocalId("OBS10003388MA01");
+ observation.setOrderingPersonId("10000055");
+ observation.setPatientId(10000066L);
+ observation.setPerformingOrganizationId(null); // not null when obsDomainCdSt1=Result
+ observation.setAuthorOrganizationId(34567890L); // null when obsDomainCdSt1=Result
+ observation.setOrderingOrganizationId(23456789L); // null when obsDomainCdSt1=Result
+ observation.setHealthCareId(56789012L); // null when obsDomainCdSt1=Result
+ observation.setMorbHospReporterId(67890123L); // null when obsDomainCdSt1=Result
+ observation.setMorbHospId(78901234L); // null when obsDomainCdSt1=Result
+ observation.setMaterialId(10000005L);
+ observation.setResultObservationUid("56789012,56789013");
+ observation.setFollowupObservationUid("56789014,56789015");
+ observation.setReportObservationUid(123456788L);
+ observation.setReportRefrUid(123456790L);
+ observation.setReportSprtUid(123456788L);
+
+ observation.setAssistantInterpreterId(10000077L);
+ observation.setAssistantInterpreterVal("22582");
+ observation.setAssistantInterpreterFirstNm("Cara");
+ observation.setAssistantInterpreterLastNm("Dune");
+ observation.setAssistantInterpreterIdAssignAuth("22D7377772");
+ observation.setAssistantInterpreterAuthType("Employee number");
+
+ observation.setTranscriptionistId(10000088L);
+ observation.setTranscriptionistVal("34344355455144");
+ observation.setTranscriptionistFirstNm("Moff");
+ observation.setTranscriptionistLastNm("Gideon");
+ observation.setTranscriptionistIdAssignAuth("18D8181818");
+ observation.setTranscriptionistAuthType("Employee number");
+
+ observation.setResultInterpreterId(10000022L);
+ observation.setLabTestTechnicianId(10000011L);
+
+ observation.setSpecimenCollectorId(10000033L);
+ observation.setCopyToProviderId(10000044L);
+ observation.setAccessionNumber("20120601114");
+ observation.setActivityFromTime("2021-01-28 16:06:03.000");
+ observation.setDeviceInstanceId1("No Equipment");
+ observation.setDeviceInstanceId2("NEW TOOLS");
+
+ return observation;
+ }
+}
diff --git a/reporting-pipeline-service/src/test/java/gov/cdc/nbs/report/pipeline/observation/service/ObservationServiceTest.java b/reporting-pipeline-service/src/test/java/gov/cdc/nbs/report/pipeline/observation/service/ObservationServiceTest.java
deleted file mode 100644
index 356e65446..000000000
--- a/reporting-pipeline-service/src/test/java/gov/cdc/nbs/report/pipeline/observation/service/ObservationServiceTest.java
+++ /dev/null
@@ -1,309 +0,0 @@
-package gov.cdc.nbs.report.pipeline.observation.service;
-
-import static gov.cdc.etldatapipeline.commonutil.TestUtils.readFileData;
-import static gov.cdc.nbs.report.pipeline.observation.service.ObservationService.toBatchId;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.mockito.Mockito.*;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import gov.cdc.etldatapipeline.commonutil.NoDataException;
-import gov.cdc.etldatapipeline.commonutil.metrics.CustomMetrics;
-import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.Observation;
-import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationKey;
-import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationReporting;
-import gov.cdc.nbs.report.pipeline.observation.repository.ObservationRepository;
-import gov.cdc.nbs.report.pipeline.observation.transformer.ProcessObservationDataUtil;
-import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
-import java.util.NoSuchElementException;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.TimeUnit;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.awaitility.Awaitility;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.CsvSource;
-import org.mockito.*;
-import org.springframework.kafka.core.KafkaTemplate;
-
-class ObservationServiceTest {
-
- @InjectMocks private ObservationService observationService;
-
- @Mock private ObservationRepository observationRepository;
-
- @Mock private KafkaTemplate kafkaTemplate;
-
- @Captor private ArgumentCaptor topicCaptor;
-
- @Captor private ArgumentCaptor keyCaptor;
-
- @Captor private ArgumentCaptor messageCaptor;
-
- private final ObjectMapper objectMapper = new ObjectMapper();
- private AutoCloseable closeable;
-
- private final String inputTopicNameObservation = "Observation";
- private final String outputTopicNameObservation = "ObservationOutput";
-
- private final String inputTopicNameActRelationship = "Act_relationship";
-
- @BeforeEach
- void setUp() {
- closeable = MockitoAnnotations.openMocks(this);
- ProcessObservationDataUtil transformer = new ProcessObservationDataUtil(kafkaTemplate);
- transformer.setMaterialTopicName("materialTopic");
- observationService =
- new ObservationService(
- observationRepository,
- kafkaTemplate,
- transformer,
- new CustomMetrics(new SimpleMeterRegistry()));
- observationService.setObservationTopic(inputTopicNameObservation);
- observationService.setActRelationshipTopic(inputTopicNameActRelationship);
- observationService.setObservationTopicOutputReporting(outputTopicNameObservation);
- observationService.setThreadPoolSize(1);
- observationService.initMetrics();
-
- transformer.setCodedTopicName("ObservationCoded");
- transformer.setReasonTopicName("ObservationReason");
- transformer.setTxtTopicName("ObservationTxt");
- }
-
- @AfterEach
- void closeService() throws Exception {
- closeable.close();
- }
-
- @Test
- void testProcessMessage() throws JsonProcessingException {
- // Mocked input data
- Long observationUid = 123456789L;
- String obsDomainCdSt = "Order";
- String payload =
- "{\"payload\": {\"after\": {\"observation_uid\": \"" + observationUid + "\"}}}";
-
- Observation observation = constructObservation(observationUid, obsDomainCdSt);
- when(observationRepository.computeObservations(String.valueOf(observationUid)))
- .thenReturn(Optional.of(observation));
- when(kafkaTemplate.send(anyString(), anyString(), anyString()))
- .thenReturn(CompletableFuture.completedFuture(null));
- when(kafkaTemplate.send(anyString(), anyString(), isNull()))
- .thenReturn(CompletableFuture.completedFuture(null));
-
- validateData(payload, observation, inputTopicNameObservation);
-
- verify(observationRepository).computeObservations(String.valueOf(observationUid));
- }
-
- @ParameterizedTest
- @CsvSource({
- "d,LabReport,OBS",
- "d,LabReport,OTHER",
- "c,LabReport,OBS",
- "c,LabReport,OTHER",
- "d,OTHER,OBS",
- "d,OTHER,OTHER"
- })
- void testProcessActRelationship(String op, String typeCd, String targetClassCd)
- throws JsonProcessingException {
- Long sourceActUid = 123456789L;
- String obsDomainCdSt = "Order";
- String payload =
- "{\"payload\": {\"before\": {\"source_act_uid\": \""
- + sourceActUid
- + "\", \"type_cd\": \""
- + typeCd
- + "\", \"target_class_cd\": \""
- + targetClassCd
- + "\"},"
- + "\"after\": {\"source_act_uid\": \"123\"},"
- + "\"op\": \""
- + op
- + "\"}}";
-
- if (typeCd.equals("OTHER") || !op.equals("d") || targetClassCd.equals("OTHER")) {
- ConsumerRecord rec = getRecord(payload, inputTopicNameActRelationship);
-
- observationService.processMessage(rec);
- verify(kafkaTemplate, never()).send(anyString(), anyString(), anyString());
- } else {
- Observation observation = constructObservation(sourceActUid, obsDomainCdSt);
- when(observationRepository.computeObservations(String.valueOf(sourceActUid)))
- .thenReturn(Optional.of(observation));
- when(kafkaTemplate.send(anyString(), anyString(), anyString()))
- .thenReturn(CompletableFuture.completedFuture(null));
- when(kafkaTemplate.send(anyString(), anyString(), isNull()))
- .thenReturn(CompletableFuture.completedFuture(null));
-
- validateData(payload, observation, inputTopicNameActRelationship);
-
- verify(observationRepository).computeObservations(String.valueOf(sourceActUid));
- }
- }
-
- @Test
- void testProcessActRelationshipNullPayload() {
- ConsumerRecord rec = getRecord(null, inputTopicNameActRelationship);
-
- observationService.processMessage(rec);
-
- verify(kafkaTemplate, never()).send(anyString(), anyString(), anyString());
- }
-
- @Test
- void testProcessMessageUnknownTopic() {
- ConsumerRecord rec = getRecord(null, "dummyTopicName");
-
- observationService.processMessage(rec);
-
- verify(kafkaTemplate, never()).send(anyString(), anyString(), anyString());
- }
-
- @ParameterizedTest
- @CsvSource({
- "{\"payload\": {\"after\": {}}},Error",
- "{\"payload\": {\"after\": {}}},Observation",
- "{\"payload\": {\"after\": {\"source_act_uid\": \"123\"}, \"op\": \"d\"}}}, Act_relationship"
- })
- void testProcessMessageException(String payload, String topic) {
-
- ConsumerRecord rec = getRecord(payload, topic);
-
- CompletableFuture future = observationService.processMessage(rec);
- CompletionException ex = assertThrows(CompletionException.class, future::join);
- assertEquals(NoSuchElementException.class, ex.getCause().getCause().getClass());
- }
-
- @Test
- void testProcessMessageNoDataException() {
- Long observationUid = 123456789L;
- String payload =
- "{\"payload\": {\"after\": {\"observation_uid\": \"" + observationUid + "\"}}}";
- ConsumerRecord rec = getRecord(payload, inputTopicNameObservation);
-
- when(observationRepository.computeObservations(String.valueOf(observationUid)))
- .thenReturn(Optional.empty());
- CompletableFuture future = observationService.processMessage(rec);
- CompletionException ex = assertThrows(CompletionException.class, future::join);
- assertEquals(NoDataException.class, ex.getCause().getClass());
- }
-
- private void validateData(String payload, Observation observation, String inputTopic)
- throws JsonProcessingException {
- ConsumerRecord rec = getRecord(payload, inputTopic);
- observationService.processMessage(rec);
-
- ObservationKey observationKey = new ObservationKey();
- observationKey.setObservationUid(observation.getObservationUid());
-
- var reportingModel =
- constructObservationReporting(
- observation.getObservationUid(), observation.getObsDomainCdSt1());
- reportingModel.setBatchId(toBatchId.applyAsLong(rec));
-
- Awaitility.await()
- .atMost(1, TimeUnit.SECONDS)
- .untilAsserted(
- () ->
- verify(kafkaTemplate, times(2))
- .send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()));
- String actualTopic = topicCaptor.getValue();
- String actualKey = keyCaptor.getValue();
- String actualValue = messageCaptor.getValue();
-
- var actualReporting =
- objectMapper.readValue(
- objectMapper.readTree(actualValue).path("payload").toString(),
- ObservationReporting.class);
-
- var actualObservationKey =
- objectMapper.readValue(
- objectMapper.readTree(actualKey).path("payload").toString(), ObservationKey.class);
-
- assertEquals(outputTopicNameObservation, actualTopic);
- assertEquals(observationKey, actualObservationKey);
- assertEquals(reportingModel, actualReporting);
- }
-
- private Observation constructObservation(Long observationUid, String obsDomainCdSt1) {
- String filePathPrefix = "rawDataFiles/observation/";
- Observation observation = new Observation();
- observation.setObservationUid(observationUid);
- observation.setActUid(observationUid);
- observation.setClassCd("OBS");
- observation.setMoodCd("ENV");
- observation.setLocalId("OBS10003388MA01");
- observation.setActivityFromTime("2021-01-28 16:06:03.000");
- observation.setObsDomainCdSt1(obsDomainCdSt1);
- observation.setPersonParticipations(readFileData(filePathPrefix + "PersonParticipations.json"));
- observation.setOrganizationParticipations(
- readFileData(filePathPrefix + "OrganizationParticipations.json"));
- observation.setMaterialParticipations(
- readFileData(filePathPrefix + "MaterialParticipations.json"));
- observation.setFollowupObservations(readFileData(filePathPrefix + "FollowupObservations.json"));
- observation.setParentObservations(readFileData(filePathPrefix + "ParentObservations.json"));
- observation.setActIds(readFileData(filePathPrefix + "ActIds.json"));
- return observation;
- }
-
- private ObservationReporting constructObservationReporting(
- Long observationUid, String obsDomainCdSt1) {
- ObservationReporting observation = new ObservationReporting();
- observation.setObservationUid(observationUid);
- observation.setObsDomainCdSt1(obsDomainCdSt1);
- observation.setActUid(observationUid);
- observation.setClassCd("OBS");
- observation.setMoodCd("ENV");
- observation.setLocalId("OBS10003388MA01");
- observation.setOrderingPersonId("10000055");
- observation.setPatientId(10000066L);
- observation.setPerformingOrganizationId(null); // not null when obsDomainCdSt1=Result
- observation.setAuthorOrganizationId(34567890L); // null when obsDomainCdSt1=Result
- observation.setOrderingOrganizationId(23456789L); // null when obsDomainCdSt1=Result
- observation.setHealthCareId(56789012L); // null when obsDomainCdSt1=Result
- observation.setMorbHospReporterId(67890123L); // null when obsDomainCdSt1=Result
- observation.setMorbHospId(78901234L); // null when obsDomainCdSt1=Result
- observation.setMaterialId(10000005L);
- observation.setResultObservationUid("56789012,56789013");
- observation.setFollowupObservationUid("56789014,56789015");
- observation.setReportObservationUid(123456788L);
- observation.setReportRefrUid(123456790L);
- observation.setReportSprtUid(123456788L);
-
- observation.setAssistantInterpreterId(10000077L);
- observation.setAssistantInterpreterVal("22582");
- observation.setAssistantInterpreterFirstNm("Cara");
- observation.setAssistantInterpreterLastNm("Dune");
- observation.setAssistantInterpreterIdAssignAuth("22D7377772");
- observation.setAssistantInterpreterAuthType("Employee number");
-
- observation.setTranscriptionistId(10000088L);
- observation.setTranscriptionistVal("34344355455144");
- observation.setTranscriptionistFirstNm("Moff");
- observation.setTranscriptionistLastNm("Gideon");
- observation.setTranscriptionistIdAssignAuth("18D8181818");
- observation.setTranscriptionistAuthType("Employee number");
-
- observation.setResultInterpreterId(10000022L);
- observation.setLabTestTechnicianId(10000011L);
-
- observation.setSpecimenCollectorId(10000033L);
- observation.setCopyToProviderId(10000044L);
- observation.setAccessionNumber("20120601114");
- observation.setActivityFromTime("2021-01-28 16:06:03.000");
- observation.setDeviceInstanceId1("No Equipment");
- observation.setDeviceInstanceId2("NEW TOOLS");
-
- return observation;
- }
-
- private ConsumerRecord getRecord(String payload, String inputTopic) {
- return new ConsumerRecord<>(inputTopic, 0, 11L, null, payload);
- }
-}
diff --git a/reporting-pipeline-service/src/test/java/gov/cdc/nbs/report/pipeline/observation/ObservationDataProcessTests.java b/reporting-pipeline-service/src/test/java/gov/cdc/nbs/report/pipeline/observation/transformer/ObservationParserTest.java
similarity index 55%
rename from reporting-pipeline-service/src/test/java/gov/cdc/nbs/report/pipeline/observation/ObservationDataProcessTests.java
rename to reporting-pipeline-service/src/test/java/gov/cdc/nbs/report/pipeline/observation/transformer/ObservationParserTest.java
index ec9a6f265..8aea0d29b 100644
--- a/reporting-pipeline-service/src/test/java/gov/cdc/nbs/report/pipeline/observation/ObservationDataProcessTests.java
+++ b/reporting-pipeline-service/src/test/java/gov/cdc/nbs/report/pipeline/observation/transformer/ObservationParserTest.java
@@ -1,21 +1,24 @@
-package gov.cdc.nbs.report.pipeline.observation;
+package gov.cdc.nbs.report.pipeline.observation.transformer;
import static gov.cdc.etldatapipeline.commonutil.TestUtils.readFileData;
-import static org.junit.jupiter.api.Assertions.*;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.*;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.*;
import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.Observation;
+import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationCoded;
+import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationDate;
+import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationEdx;
+import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationMaterial;
+import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationNumeric;
+import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationReason;
import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationTransformed;
-import gov.cdc.nbs.report.pipeline.observation.transformer.ProcessObservationDataUtil;
+import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationTxt;
+import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ParsedObservation;
import java.util.List;
-import java.util.concurrent.CompletableFuture;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
@@ -23,68 +26,26 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
import org.slf4j.LoggerFactory;
-import org.springframework.kafka.core.KafkaTemplate;
import org.testcontainers.shaded.org.checkerframework.checker.nullness.qual.NonNull;
-class ObservationDataProcessTests {
- @Mock KafkaTemplate kafkaTemplate;
-
- @Captor private ArgumentCaptor topicCaptor;
-
- @Captor private ArgumentCaptor keyCaptor;
-
- @Captor private ArgumentCaptor messageCaptor;
+class ObservationParserTest {
private static final String FILE_PREFIX = "rawDataFiles/observation/";
- private static final String CODED_TOPIC = "codedTopic";
- private static final String DATE_TOPIC = "dateTopic";
- private static final String EDX_TOPIC = "edxTopic";
- private static final String MATERIAL_TOPIC = "materialTopic";
- private static final String NUMERIC_TOPIC = "numericTopic";
- private static final String REASON_TOPIC = "reasonTopic";
- private static final String TXT_TOPIC = "txtTopic";
-
private static final Long BATCH_ID = 11L;
-
- ProcessObservationDataUtil transformer;
-
- private AutoCloseable closeable;
private final ListAppender listAppender = new ListAppender<>();
- private final ObjectMapper objectMapper = new ObjectMapper();
@BeforeEach
void setUp() {
- closeable = MockitoAnnotations.openMocks(this);
- transformer = new ProcessObservationDataUtil(kafkaTemplate);
-
- transformer.setCodedTopicName(CODED_TOPIC);
- transformer.setEdxTopicName(EDX_TOPIC);
- transformer.setDateTopicName(DATE_TOPIC);
- transformer.setMaterialTopicName(MATERIAL_TOPIC);
- transformer.setNumericTopicName(NUMERIC_TOPIC);
- transformer.setReasonTopicName(REASON_TOPIC);
- transformer.setTxtTopicName(TXT_TOPIC);
-
- Logger logger = (Logger) LoggerFactory.getLogger(ProcessObservationDataUtil.class);
+ Logger logger = (Logger) LoggerFactory.getLogger(ObservationParser.class);
listAppender.start();
logger.addAppender(listAppender);
-
- when(kafkaTemplate.send(anyString(), anyString(), isNull()))
- .thenReturn(CompletableFuture.completedFuture(null));
- when(kafkaTemplate.send(anyString(), anyString(), anyString()))
- .thenReturn(CompletableFuture.completedFuture(null));
}
@AfterEach
- void tearDown() throws Exception {
- Logger logger = (Logger) LoggerFactory.getLogger(ProcessObservationDataUtil.class);
+ void tearDown() {
+ Logger logger = (Logger) LoggerFactory.getLogger(ObservationParser.class);
logger.detachAppender(listAppender);
- closeable.close();
}
@Test
@@ -100,16 +61,15 @@ void consolidatedDataTransformationTest() {
readFileData(FILE_PREFIX + "MaterialParticipations.json"));
observation.setFollowupObservations(readFileData(FILE_PREFIX + "FollowupObservations.json"));
- ObservationTransformed observationTransformed =
- transformer.transformObservationData(observation, BATCH_ID);
+ ParsedObservation parsedObservation = ObservationParser.parse(observation, BATCH_ID);
- Long patId = observationTransformed.getPatientId();
- String ordererId = observationTransformed.getOrderingPersonId();
- Long authorOrgId = observationTransformed.getAuthorOrganizationId();
- Long ordererOrgId = observationTransformed.getOrderingOrganizationId();
- Long performerOrgId = observationTransformed.getPerformingOrganizationId();
- Long materialId = observationTransformed.getMaterialId();
- String resultObsUid = observationTransformed.getResultObservationUid();
+ Long patId = parsedObservation.transformed().getPatientId();
+ String ordererId = parsedObservation.transformed().getOrderingPersonId();
+ Long authorOrgId = parsedObservation.transformed().getAuthorOrganizationId();
+ Long ordererOrgId = parsedObservation.transformed().getOrderingOrganizationId();
+ Long performerOrgId = parsedObservation.transformed().getPerformingOrganizationId();
+ Long materialId = parsedObservation.transformed().getMaterialId();
+ String resultObsUid = parsedObservation.transformed().getResultObservationUid();
Assertions.assertEquals("10000055", ordererId);
Assertions.assertEquals(10000066L, patId);
@@ -129,9 +89,9 @@ void testPersonParticipationTransformation() {
final var expected = getObservationTransformed();
observation.setPersonParticipations(readFileData(FILE_PREFIX + "PersonParticipations.json"));
- ObservationTransformed observationTransformed =
- transformer.transformObservationData(observation, BATCH_ID);
- Assertions.assertEquals(expected, observationTransformed);
+
+ ParsedObservation parsedObservation = ObservationParser.parse(observation, BATCH_ID);
+ Assertions.assertEquals(expected, parsedObservation.transformed());
}
@Test
@@ -151,9 +111,8 @@ void testMorbReportTransformation() {
observation.setPersonParticipations(
readFileData(FILE_PREFIX + "PersonParticipationsMorb.json"));
- ObservationTransformed observationTransformed =
- transformer.transformObservationData(observation, BATCH_ID);
- Assertions.assertEquals(expected, observationTransformed);
+ ParsedObservation parsedObservation = ObservationParser.parse(observation, BATCH_ID);
+ Assertions.assertEquals(expected, parsedObservation.transformed());
}
@Test
@@ -165,11 +124,10 @@ void testOrganizationParticipationTransformation() {
observation.setOrganizationParticipations(
readFileData(FILE_PREFIX + "OrganizationParticipations.json"));
- ObservationTransformed observationTransformed =
- transformer.transformObservationData(observation, BATCH_ID);
- Long authorOrgId = observationTransformed.getAuthorOrganizationId();
- Long ordererOrgId = observationTransformed.getOrderingOrganizationId();
- Long performerOrgId = observationTransformed.getPerformingOrganizationId();
+ ParsedObservation parsedObservation = ObservationParser.parse(observation, BATCH_ID);
+ Long authorOrgId = parsedObservation.transformed().getAuthorOrganizationId();
+ Long ordererOrgId = parsedObservation.transformed().getOrderingOrganizationId();
+ Long performerOrgId = parsedObservation.transformed().getPerformingOrganizationId();
Assertions.assertNull(authorOrgId);
Assertions.assertNull(ordererOrgId);
@@ -177,7 +135,7 @@ void testOrganizationParticipationTransformation() {
}
@Test
- void testObservationMaterialTransformation() throws JsonProcessingException {
+ void testObservationMaterialTransformation() {
Observation observation = new Observation();
observation.setObservationUid(100000003L);
observation.setObsDomainCdSt1("Order");
@@ -185,27 +143,11 @@ void testObservationMaterialTransformation() throws JsonProcessingException {
readFileData(FILE_PREFIX + "MaterialParticipations.json"));
ObservationMaterial material = constructObservationMaterial(100000003L);
- ObservationTransformed observationTransformed =
- transformer.transformObservationData(observation, BATCH_ID);
- verify(kafkaTemplate).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture());
- assertEquals(MATERIAL_TOPIC, topicCaptor.getValue());
- assertEquals(10000005L, observationTransformed.getMaterialId());
+ ParsedObservation parsedObservation = ObservationParser.parse(observation, BATCH_ID);
- List logs = listAppender.list;
- assertTrue(
- logs.get(2)
- .getFormattedMessage()
- .contains("Observation Material data (uid=10000005) sent to " + MATERIAL_TOPIC));
-
- var actualMaterial =
- objectMapper.readValue(
- objectMapper
- .readTree(messageCaptor.getAllValues().getFirst())
- .path("payload")
- .toString(),
- ObservationMaterial.class);
-
- assertEquals(material, actualMaterial);
+ assertEquals(10000005L, parsedObservation.transformed().getMaterialId());
+
+ assertEquals(material, parsedObservation.materialEntries().get(0));
}
@ParameterizedTest
@@ -217,15 +159,14 @@ void testParentObservationsTransformation(String domainCd) {
"[{\"parent_type_cd\":\"MorbFrmQ\",\"parent_uid\":234567888,\"parent_domain_cd_st_1\":\"R_Order\"}]");
observation.setObsDomainCdSt1(domainCd);
- ObservationTransformed observationTransformed =
- transformer.transformObservationData(observation, BATCH_ID);
- assertEquals(234567888L, observationTransformed.getReportObservationUid());
- assertNull(observationTransformed.getReportRefrUid());
- assertNull(observationTransformed.getReportSprtUid());
+ ParsedObservation parsedObservation = ObservationParser.parse(observation, BATCH_ID);
+ assertEquals(234567888L, parsedObservation.transformed().getReportObservationUid());
+ assertNull(parsedObservation.transformed().getReportRefrUid());
+ assertNull(parsedObservation.transformed().getReportSprtUid());
}
@Test
- void testObservationCodedTransformation() throws JsonProcessingException {
+ void testObservationCodedTransformation() {
Observation observation = new Observation();
observation.setObservationUid(10001234L);
observation.setObsCode(readFileData(FILE_PREFIX + "ObservationCoded.json"));
@@ -240,25 +181,13 @@ void testObservationCodedTransformation() throws JsonProcessingException {
coded.setOvcAltCdDescTxt("NORMAL");
coded.setBatchId(BATCH_ID);
- transformer.transformObservationData(observation, BATCH_ID);
- verify(kafkaTemplate).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture());
- assertEquals(CODED_TOPIC, topicCaptor.getValue());
- List logs = listAppender.list;
- assertTrue(
- logs.get(6)
- .getFormattedMessage()
- .contains("Observation Coded data (uid=10001234) sent to " + CODED_TOPIC));
-
- var actualCoded =
- objectMapper.readValue(
- objectMapper.readTree(messageCaptor.getValue()).path("payload").toString(),
- ObservationCoded.class);
+ ParsedObservation parsedObservation = ObservationParser.parse(observation, BATCH_ID);
- assertEquals(coded, actualCoded);
+ assertEquals(coded, parsedObservation.codedEntries().get(0));
}
@Test
- void testObservationDateTransformation() throws JsonProcessingException {
+ void testObservationDateTransformation() {
Observation observation = new Observation();
observation.setObservationUid(10001234L);
observation.setObsDate(readFileData(FILE_PREFIX + "ObservationDate.json"));
@@ -269,25 +198,13 @@ void testObservationDateTransformation() throws JsonProcessingException {
obd.setOvdSeq(1);
obd.setBatchId(BATCH_ID);
- transformer.transformObservationData(observation, BATCH_ID);
- verify(kafkaTemplate).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture());
- assertEquals(DATE_TOPIC, topicCaptor.getValue());
- List logs = listAppender.list;
- assertTrue(
- logs.get(7)
- .getFormattedMessage()
- .contains("Observation Date data (uid=10001234) sent to " + DATE_TOPIC));
-
- var actualObd =
- objectMapper.readValue(
- objectMapper.readTree(messageCaptor.getValue()).path("payload").toString(),
- ObservationDate.class);
+ ParsedObservation parsedObservation = ObservationParser.parse(observation, BATCH_ID);
- assertEquals(obd, actualObd);
+ assertEquals(obd, parsedObservation.dateEntries().get(0));
}
@Test
- void testObservationEdxTransformation() throws JsonProcessingException {
+ void testObservationEdxTransformation() {
Observation observation = new Observation();
observation.setActUid(10001234L);
observation.setObservationUid(10001234L);
@@ -298,29 +215,13 @@ void testObservationEdxTransformation() throws JsonProcessingException {
edx.setEdxActUid(observation.getActUid());
edx.setEdxAddTime("2024-09-30T21:08:19.017");
- transformer.transformObservationData(observation, BATCH_ID);
- verify(kafkaTemplate, times(2))
- .send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture());
- assertEquals(EDX_TOPIC, topicCaptor.getValue());
- List logs = listAppender.list;
- assertTrue(
- logs.get(8)
- .getFormattedMessage()
- .contains("Observation Edx data (edx doc uid=10101) sent to " + EDX_TOPIC));
-
- var actualEdx =
- objectMapper.readValue(
- objectMapper
- .readTree(messageCaptor.getAllValues().getFirst())
- .path("payload")
- .toString(),
- ObservationEdx.class);
-
- assertEquals(edx, actualEdx);
+ ParsedObservation parsedObservation = ObservationParser.parse(observation, BATCH_ID);
+
+ assertEquals(edx, parsedObservation.edxEntries().get(0));
}
@Test
- void testObservationNumericTransformation() throws JsonProcessingException {
+ void testObservationNumericTransformation() {
Observation observation = new Observation();
observation.setObservationUid(10001234L);
observation.setObsNum(readFileData(FILE_PREFIX + "ObservationNumeric.json"));
@@ -337,25 +238,13 @@ void testObservationNumericTransformation() throws JsonProcessingException {
numeric.setOvnSeq(1);
numeric.setBatchId(BATCH_ID);
- transformer.transformObservationData(observation, BATCH_ID);
- verify(kafkaTemplate).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture());
- assertEquals(NUMERIC_TOPIC, topicCaptor.getValue());
- List logs = listAppender.list;
- assertTrue(
- logs.get(9)
- .getFormattedMessage()
- .contains("Observation Numeric data (uid=10001234) sent to " + NUMERIC_TOPIC));
+ ParsedObservation parsedObservation = ObservationParser.parse(observation, BATCH_ID);
- var actualNumeric =
- objectMapper.readValue(
- objectMapper.readTree(messageCaptor.getValue()).path("payload").toString(),
- ObservationNumeric.class);
-
- assertEquals(numeric, actualNumeric);
+ assertEquals(numeric, parsedObservation.numericEntries().get(0));
}
@Test
- void testObservationReasonTransformation() throws JsonProcessingException {
+ void testObservationReasonTransformation() {
Observation observation = new Observation();
observation.setObservationUid(10001234L);
observation.setObsReason(readFileData(FILE_PREFIX + "ObservationReason.json"));
@@ -366,25 +255,13 @@ void testObservationReasonTransformation() throws JsonProcessingException {
reason.setReasonDescTxt("PRESENCE OF REASON");
reason.setBatchId(BATCH_ID);
- transformer.transformObservationData(observation, BATCH_ID);
- verify(kafkaTemplate).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture());
- assertEquals(REASON_TOPIC, topicCaptor.getValue());
- List logs = listAppender.list;
- assertTrue(
- logs.get(10)
- .getFormattedMessage()
- .contains("Observation Reason data (uid=10001234) sent to " + REASON_TOPIC));
-
- var actualReason =
- objectMapper.readValue(
- objectMapper.readTree(messageCaptor.getValue()).path("payload").toString(),
- ObservationReason.class);
+ ParsedObservation parsedObservation = ObservationParser.parse(observation, BATCH_ID);
- assertEquals(reason, actualReason);
+ assertEquals(reason, parsedObservation.reasonEntries().get(0));
}
@Test
- void testObservationTxtTransformation() throws JsonProcessingException {
+ void testObservationTxtTransformation() {
Observation observation = new Observation();
observation.setObservationUid(10001234L);
observation.setObsTxt(readFileData(FILE_PREFIX + "ObservationTxt.json"));
@@ -396,25 +273,9 @@ void testObservationTxtTransformation() throws JsonProcessingException {
txt.setOvtValueTxt("RECOMMENDED IN SUCH INSTANCES.");
txt.setBatchId(BATCH_ID);
- transformer.transformObservationData(observation, BATCH_ID);
- verify(kafkaTemplate, times(2))
- .send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture());
- assertEquals(TXT_TOPIC, topicCaptor.getValue());
- List logs = listAppender.list;
- assertTrue(
- logs.get(11)
- .getFormattedMessage()
- .contains("Observation Txt data (uid=10001234) sent to " + TXT_TOPIC));
-
- var actualTxt =
- objectMapper.readValue(
- objectMapper
- .readTree(messageCaptor.getAllValues().getFirst())
- .path("payload")
- .toString(),
- ObservationTxt.class);
-
- assertEquals(txt, actualTxt);
+ ParsedObservation parsedObservation = ObservationParser.parse(observation, BATCH_ID);
+
+ assertEquals(txt, parsedObservation.textEntries().get(0));
}
@Test
@@ -422,7 +283,7 @@ void testTransformNoObservationData() {
Observation observation = new Observation();
observation.setObservationUid(10001234L);
observation.setOrganizationParticipations("{\"act_uid\": 10000003}");
- transformer.transformObservationData(observation, BATCH_ID);
+ ObservationParser.parse(observation, BATCH_ID);
List logs = listAppender.list;
logs.forEach(le -> assertTrue(le.getFormattedMessage().matches("^\\w+ array is null.")));
@@ -447,7 +308,7 @@ void testTransformObservationDataError() {
observation.setObsReason(invalidJSON);
observation.setObsTxt(invalidJSON);
- transformer.transformObservationData(observation, BATCH_ID);
+ ObservationParser.parse(observation, BATCH_ID);
List logs = listAppender.list;
logs.forEach(le -> assertTrue(le.getFormattedMessage().contains(invalidJSON)));
@@ -467,7 +328,7 @@ void testTransformObservationInvalidDomainError() {
observation.setMaterialParticipations(dummyJSON);
observation.setFollowupObservations(dummyJSON);
- transformer.transformObservationData(observation, BATCH_ID);
+ ObservationParser.parse(observation, BATCH_ID);
List logs = listAppender.list.subList(0, 4);
logs.forEach(
@@ -491,7 +352,7 @@ void testTransformObservationNullError(String payload) {
observation.setFollowupObservations(payload);
observation.setParentObservations(payload);
- transformer.transformObservationData(observation, BATCH_ID);
+ ObservationParser.parse(observation, BATCH_ID);
List logs = listAppender.list.subList(0, 4);
logs.forEach(