diff --git a/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/ObservationConsumer.java b/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/ObservationConsumer.java new file mode 100644 index 000000000..1a7f86be8 --- /dev/null +++ b/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/ObservationConsumer.java @@ -0,0 +1,157 @@ +package gov.cdc.nbs.report.pipeline.observation; + +import static gov.cdc.etldatapipeline.commonutil.UtilHelper.errorMessage; +import static gov.cdc.etldatapipeline.commonutil.UtilHelper.extractChangeDataCaptureOperation; +import static gov.cdc.etldatapipeline.commonutil.UtilHelper.extractUid; +import static gov.cdc.etldatapipeline.commonutil.UtilHelper.extractValue; + +import com.fasterxml.jackson.core.JsonProcessingException; +import gov.cdc.etldatapipeline.commonutil.DataProcessingException; +import gov.cdc.etldatapipeline.commonutil.NoDataException; +import gov.cdc.nbs.report.pipeline.observation.service.ObservationProcessor; +import java.util.NoSuchElementException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.ToLongFunction; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.errors.SerializationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.annotation.RetryableTopic; +import org.springframework.kafka.retrytopic.DltStrategy; +import org.springframework.kafka.retrytopic.TopicSuffixingStrategy; +import org.springframework.kafka.support.serializer.DeserializationException; +import org.springframework.retry.annotation.Backoff; +import org.springframework.scheduling.concurrent.CustomizableThreadFactory; +import org.springframework.stereotype.Service; + +/** + * Service class for processing Observation-related change events in the Real Time Reporting (RTR) + * pipeline. This service handles the "hydration" of data for Observations by consuming Kafka events + * from transactional source topics, transforming them, and producing them to reporting topics. This + * service operates differently than other RTR services in the fact that it also inserts directly + * into the nrt_ database tables to eliminate a race condition between the PostProcessingService and + * the Kafka Sink Connector. More info can be found here: + * https://cdc-nbs.atlassian.net/browse/APP-519 + * + *

Key responsibilities include: + * + *

+ */ +@Service +public class ObservationConsumer { + private static final Logger logger = LoggerFactory.getLogger(ObservationConsumer.class); + private static final String BEFORE_PATH = "before"; + private static final String TOPIC_DEBUG_LOG = "Received Observation with id: {} from topic: {}"; + + private final String observationTopic; + private final String actRelationshipTopic; + private final ObservationProcessor observationProcessor; + private final ExecutorService obsExecutor; + + public ObservationConsumer( + final ObservationProcessor observationProcessor, + @Value("${spring.kafka.topics.nbs.observation}") final String observationTopic, + @Value("${spring.kafka.topics.nbs.act-relationship}") final String actRelationshipTopic, + @Value("${featureFlag.thread-pool-size:1}") final int threadPoolSize) { + this.observationProcessor = observationProcessor; + this.observationTopic = observationTopic; + this.actRelationshipTopic = actRelationshipTopic; + + obsExecutor = + Executors.newFixedThreadPool(threadPoolSize, new CustomizableThreadFactory("obs-")); + } + + public static final ToLongFunction> toBatchId = + rec -> rec.timestamp() + rec.offset() + rec.partition(); + + @RetryableTopic( + attempts = "${spring.kafka.consumer.max-retry}", + autoCreateTopics = "false", + dltStrategy = DltStrategy.FAIL_ON_ERROR, + retryTopicSuffix = "${spring.kafka.dlq.retry-suffix}", + dltTopicSuffix = "${spring.kafka.dlq.dlq-suffix}", + // retry topic name, such as topic-retry-1, topic-retry-2, etc + topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE, + // time to wait before attempting to retry + backoff = @Backoff(delay = 1000, multiplier = 2.0), + exclude = { + SerializationException.class, + DeserializationException.class, + RuntimeException.class, + NoDataException.class + }, + kafkaTemplate = "observationKafkaTemplate") + @KafkaListener( + topics = { + "${spring.kafka.topics.nbs.observation}", + "${spring.kafka.topics.nbs.act-relationship}" + }, + containerFactory = "observationKafkaListenerContainerFactory") + public CompletableFuture processMessage(ConsumerRecord rec) { + + long batchId = toBatchId.applyAsLong(rec); + String topic = rec.topic(); + String message = rec.value(); + logger.debug(TOPIC_DEBUG_LOG, message, topic); + + if (topic.equals(observationTopic)) { + return CompletableFuture.runAsync(() -> handleObservation(message, batchId), obsExecutor); + } else if (topic.equals(actRelationshipTopic) && message != null) { + return CompletableFuture.runAsync(() -> handleActRelationship(message, batchId), obsExecutor); + } else { + return CompletableFuture.failedFuture( + new DataProcessingException( + "Received data from an unknown topic: " + topic, new NoSuchElementException())); + } + } + + private void handleObservation(String message, long batchId) { + try { + String observationUid = extractUid(message, "observation_uid"); + observationProcessor.process(batchId, observationUid); + } catch (JsonProcessingException e) { + throw new DataProcessingException(errorMessage("Observation", "", e), e); + } + } + + private void handleActRelationship(String value, long batchId) { + String sourceActUid = ""; + + try { + String typeCd; + String targetClassCd; + String operationType = extractChangeDataCaptureOperation(value); + + if (operationType.equals("d")) { + sourceActUid = extractUid(value, "source_act_uid", BEFORE_PATH); + typeCd = extractValue(value, "type_cd", BEFORE_PATH); + targetClassCd = extractValue(value, "target_class_cd", BEFORE_PATH); + } else { + return; + } + + logger.info(TOPIC_DEBUG_LOG, "Act_relationship", sourceActUid, actRelationshipTopic); + // For LabReport values, we only need to trigger if the relationship is deleted (not covered + // in updates to Observation) + // PHC targets are excluded from the LabReport association updates, as the LabReport will + // receive an update in Observation + if (typeCd.equals("LabReport") && targetClassCd.equals("OBS")) { + observationProcessor.process(batchId, sourceActUid); + } + } catch (Exception e) { + throw new DataProcessingException(errorMessage("ActRelationship", sourceActUid, e), e); + } + } +} diff --git a/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/model/dto/observation/ObservationCodedKey.java b/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/model/dto/observation/ObservationCodedKey.java deleted file mode 100644 index 71162fa64..000000000 --- a/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/model/dto/observation/ObservationCodedKey.java +++ /dev/null @@ -1,14 +0,0 @@ -package gov.cdc.nbs.report.pipeline.observation.model.dto.observation; - -import com.fasterxml.jackson.databind.PropertyNamingStrategies; -import com.fasterxml.jackson.databind.annotation.JsonNaming; -import lombok.Data; -import lombok.NoArgsConstructor; - -@Data -@NoArgsConstructor -@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class) -public class ObservationCodedKey { - private Long observationUid; - private String ovcCode; -} diff --git a/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/model/dto/observation/ObservationEdxKey.java b/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/model/dto/observation/ObservationEdxKey.java deleted file mode 100644 index de559e4ca..000000000 --- a/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/model/dto/observation/ObservationEdxKey.java +++ /dev/null @@ -1,14 +0,0 @@ -package gov.cdc.nbs.report.pipeline.observation.model.dto.observation; - -import com.fasterxml.jackson.databind.PropertyNamingStrategies; -import com.fasterxml.jackson.databind.annotation.JsonNaming; -import lombok.Data; -import lombok.NoArgsConstructor; -import lombok.NonNull; - -@Data -@NoArgsConstructor -@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class) -public class ObservationEdxKey { - @NonNull private Long edxDocumentUid; -} diff --git a/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/model/dto/observation/ObservationKey.java b/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/model/dto/observation/ObservationKey.java index fd7e0fdc7..c0b225c1f 100644 --- a/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/model/dto/observation/ObservationKey.java +++ b/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/model/dto/observation/ObservationKey.java @@ -1,17 +1,5 @@ package gov.cdc.nbs.report.pipeline.observation.model.dto.observation; import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.databind.PropertyNamingStrategies; -import com.fasterxml.jackson.databind.annotation.JsonNaming; -import lombok.Data; -import lombok.NoArgsConstructor; -import lombok.NonNull; -@Data -@NoArgsConstructor -@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class) -public class ObservationKey { - @NonNull - @JsonProperty("observation_uid") - private Long observationUid; -} +public record ObservationKey(@JsonProperty("observation_uid") Long observationUid) {} diff --git a/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/model/dto/observation/ObservationMaterialKey.java b/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/model/dto/observation/ObservationMaterialKey.java deleted file mode 100644 index de2770bfb..000000000 --- a/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/model/dto/observation/ObservationMaterialKey.java +++ /dev/null @@ -1,17 +0,0 @@ -package gov.cdc.nbs.report.pipeline.observation.model.dto.observation; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.databind.PropertyNamingStrategies; -import com.fasterxml.jackson.databind.annotation.JsonNaming; -import lombok.Data; -import lombok.NoArgsConstructor; -import lombok.NonNull; - -@Data -@NoArgsConstructor -@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class) -public class ObservationMaterialKey { - @NonNull - @JsonProperty("material_id") - private Long materialId; -} diff --git a/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/model/dto/observation/ObservationReasonKey.java b/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/model/dto/observation/ObservationReasonKey.java deleted file mode 100644 index da8f43c23..000000000 --- a/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/model/dto/observation/ObservationReasonKey.java +++ /dev/null @@ -1,14 +0,0 @@ -package gov.cdc.nbs.report.pipeline.observation.model.dto.observation; - -import com.fasterxml.jackson.databind.PropertyNamingStrategies; -import com.fasterxml.jackson.databind.annotation.JsonNaming; -import lombok.Data; -import lombok.NoArgsConstructor; - -@Data -@NoArgsConstructor -@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class) -public class ObservationReasonKey { - private Long observationUid; - private String reasonCd; -} diff --git a/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/model/dto/observation/ObservationTxtKey.java b/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/model/dto/observation/ObservationTxtKey.java deleted file mode 100644 index 911f51cc7..000000000 --- a/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/model/dto/observation/ObservationTxtKey.java +++ /dev/null @@ -1,14 +0,0 @@ -package gov.cdc.nbs.report.pipeline.observation.model.dto.observation; - -import com.fasterxml.jackson.databind.PropertyNamingStrategies; -import com.fasterxml.jackson.databind.annotation.JsonNaming; -import lombok.Data; -import lombok.NoArgsConstructor; - -@Data -@NoArgsConstructor -@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class) -public class ObservationTxtKey { - private Long observationUid; - private Integer ovtSeq; -} diff --git a/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/model/dto/observation/ParsedObservation.java b/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/model/dto/observation/ParsedObservation.java new file mode 100644 index 000000000..bd97b18bb --- /dev/null +++ b/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/model/dto/observation/ParsedObservation.java @@ -0,0 +1,30 @@ +package gov.cdc.nbs.report.pipeline.observation.model.dto.observation; + +import java.util.ArrayList; +import java.util.List; + +/** + * Output of the ObservationTransformer that contains the ObservationTransformed object as well as + * lists of entites that need to be persisted to the database + */ +public record ParsedObservation( + ObservationTransformed transformed, + List materialEntries, + List codedEntries, + List dateEntries, + List edxEntries, + List numericEntries, + List reasonEntries, + List textEntries) { + public ParsedObservation(ObservationTransformed observationTransformed) { + this( + observationTransformed, + new ArrayList<>(), + new ArrayList<>(), + new ArrayList<>(), + new ArrayList<>(), + new ArrayList<>(), + new ArrayList<>(), + new ArrayList<>()); + } +} diff --git a/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/repository/NrtObservationWriter.java b/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/repository/NrtObservationWriter.java new file mode 100644 index 000000000..843a6faac --- /dev/null +++ b/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/repository/NrtObservationWriter.java @@ -0,0 +1,456 @@ +package gov.cdc.nbs.report.pipeline.observation.repository; + +import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationCoded; +import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationDate; +import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationEdx; +import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationMaterial; +import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationNumeric; +import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationReason; +import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationTxt; +import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ParsedObservation; +import java.util.List; +import org.springframework.jdbc.core.simple.JdbcClient; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +/** + * Responsible for upserting Observation data to the following tables: + * + *
    + *
  • nrt_observation_material + *
  • nrt_observation_coded + *
  • nrt_observation_date + *
  • nrt_observation_edx + *
  • nrt_observation_numeric + *
  • nrt_observation_reason + *
  • nrt_observation_txt + *
+ */ +@Component +@Transactional +public class NrtObservationWriter { + + private final JdbcClient client; + + public NrtObservationWriter(final JdbcClient client) { + this.client = client; + } + + public void persist(ParsedObservation parsedObservation) { + persistMaterials(parsedObservation.materialEntries()); + persistCoded(parsedObservation.codedEntries()); + persistDate(parsedObservation.dateEntries()); + persistEdx(parsedObservation.edxEntries()); + persistNumeric(parsedObservation.numericEntries()); + persistReason(parsedObservation.reasonEntries()); + persistText(parsedObservation.textEntries()); + } + + private static final String UPSERT_MATERIAL = + """ + MERGE INTO nrt_observation_material + USING ( + SELECT + :act_uid AS act_uid, + :material_id AS material_id, + :type_cd AS type_cd, + :subject_class_cd AS subject_class_cd, + :record_status AS record_status, + :type_desc_txt AS type_desc_txt, + :last_chg_time AS last_chg_time, + :material_cd AS material_cd, + :material_nm AS material_nm, + :material_details AS material_details, + :material_collection_vol AS material_collection_vol, + :material_collection_vol_unit AS material_collection_vol_unit, + :material_desc AS material_desc, + :risk_cd AS risk_cd, + :risk_desc_txt AS risk_desc_txt + ) AS source + ON nrt_observation_material.act_uid = source.act_uid AND nrt_observation_material.material_id = source.material_id + WHEN MATCHED THEN + UPDATE SET + type_cd = source.type_cd, + subject_class_cd = source.subject_class_cd, + record_status = source.record_status, + type_desc_txt = source.type_desc_txt, + last_chg_time = source.last_chg_time, + material_cd = source.material_cd, + material_nm = source.material_nm, + material_details = source.material_details, + material_collection_vol = source.material_collection_vol, + material_collection_vol_unit = source.material_collection_vol_unit, + material_desc = source.material_desc, + risk_cd = source.risk_cd, + risk_desc_txt = source.risk_desc_txt + WHEN NOT MATCHED THEN + INSERT( + act_uid, + type_cd, + material_id, + subject_class_cd, + record_status, + type_desc_txt, + last_chg_time, + material_cd, + material_nm, + material_details, + material_collection_vol, + material_collection_vol_unit, + material_desc, + risk_cd, + risk_desc_txt + ) VALUES ( + source.act_uid, + source.type_cd, + source.material_id, + source.subject_class_cd, + source.record_status, + source.type_desc_txt, + source.last_chg_time, + source.material_cd, + source.material_nm, + source.material_details, + source.material_collection_vol, + source.material_collection_vol_unit, + source.material_desc, + source.risk_cd, + source.risk_desc_txt + ); + """; + + void persistMaterials(List materials) { + materials.forEach( + m -> + client + .sql(UPSERT_MATERIAL) + .param("act_uid", m.getActUid()) + .param("material_id", m.getMaterialId()) + .param("type_cd", m.getTypeCd()) + .param("subject_class_cd", m.getSubjectClassCd()) + .param("record_status", m.getRecordStatus()) + .param("type_desc_txt", m.getTypeDescTxt()) + .param("last_chg_time", m.getLastChgTime()) + .param("material_cd", m.getMaterialCd()) + .param("material_nm", m.getMaterialNm()) + .param("material_details", m.getMaterialDetails()) + .param("material_collection_vol", m.getMaterialCollectionVol()) + .param("material_collection_vol_unit", m.getMaterialCollectionVolUnit()) + .param("material_desc", m.getMaterialDesc()) + .param("risk_cd", m.getRiskCd()) + .param("risk_desc_txt", m.getRiskDescTxt()) + .update()); + } + + private static final String UPSERT_CODED = + """ + MERGE INTO nrt_observation_coded + USING ( + SELECT + :observation_uid AS observation_uid, + :ovc_code AS ovc_code, + :ovc_code_system_cd AS ovc_code_system_cd, + :ovc_code_system_desc_txt AS ovc_code_system_desc_txt, + :ovc_display_name AS ovc_display_name, + :ovc_alt_cd AS ovc_alt_cd, + :ovc_alt_cd_desc_txt AS ovc_alt_cd_desc_txt, + :ovc_alt_cd_system_cd AS ovc_alt_cd_system_cd, + :ovc_alt_cd_system_desc_txt AS ovc_alt_cd_system_desc_txt, + :batch_id AS batch_id + ) AS source + ON nrt_observation_coded.observation_uid = source.observation_uid AND nrt_observation_coded.ovc_code = source.ovc_code + WHEN MATCHED THEN + UPDATE SET + ovc_code_system_cd = source.ovc_code_system_cd, + ovc_code_system_desc_txt = source.ovc_code_system_desc_txt, + ovc_display_name = source.ovc_display_name, + ovc_alt_cd = source.ovc_alt_cd, + ovc_alt_cd_desc_txt = source.ovc_alt_cd_desc_txt, + ovc_alt_cd_system_cd = source.ovc_alt_cd_system_cd, + ovc_alt_cd_system_desc_txt = source.ovc_alt_cd_system_desc_txt, + batch_id = source.batch_id + WHEN NOT MATCHED THEN + INSERT ( + observation_uid, + ovc_code, + ovc_code_system_cd, + ovc_code_system_desc_txt, + ovc_display_name, + ovc_alt_cd, + ovc_alt_cd_desc_txt, + ovc_alt_cd_system_cd, + ovc_alt_cd_system_desc_txt, + batch_id + ) VALUES ( + source.observation_uid, + source.ovc_code, + source.ovc_code_system_cd, + source.ovc_code_system_desc_txt, + source.ovc_display_name, + source.ovc_alt_cd, + source.ovc_alt_cd_desc_txt, + source.ovc_alt_cd_system_cd, + source.ovc_alt_cd_system_desc_txt, + source.batch_id + ); + """; + + void persistCoded(List codedEntries) { + codedEntries.forEach( + c -> + client + .sql(UPSERT_CODED) + .param("observation_uid", c.getObservationUid()) + .param("ovc_code", c.getOvcCode()) + .param("ovc_code_system_cd", c.getOvcCodeSystemCd()) + .param("ovc_code_system_desc_txt", c.getOvcCodeSystemDescTxt()) + .param("ovc_display_name", c.getOvcDisplayName()) + .param("ovc_alt_cd", c.getOvcAltCd()) + .param("ovc_alt_cd_desc_txt", c.getOvcAltCdDescTxt()) + .param("ovc_alt_cd_system_cd", c.getOvcAltCdSystemCd()) + .param("ovc_alt_cd_system_desc_txt", c.getOvcAltCdSystemDescTxt()) + .param("batch_id", c.getBatchId()) + .update()); + } + + private static final String UPSERT_DATE = + """ + MERGE INTO nrt_observation_date + USING ( + SELECT + :observation_uid AS observation_uid, + :ovd_from_date AS ovd_from_date, + :ovd_to_date AS ovd_to_date, + :ovd_seq AS ovd_seq, + :batch_id AS batch_id + ) AS source + ON nrt_observation_date.observation_uid = source.observation_uid + WHEN MATCHED THEN + UPDATE SET + ovd_from_date = source.ovd_from_date, + ovd_to_date = source.ovd_to_date, + ovd_seq = source.ovd_seq, + batch_id = source.batch_id + WHEN NOT MATCHED THEN + INSERT ( + observation_uid, + ovd_from_date, + ovd_to_date, + ovd_seq, + batch_id + ) VALUES ( + source.observation_uid, + source.ovd_from_date, + source.ovd_to_date, + source.ovd_seq, + source.batch_id + ); + """; + + void persistDate(List dateEntries) { + dateEntries.forEach( + d -> + client + .sql(UPSERT_DATE) + .param("observation_uid", d.getObservationUid()) + .param("ovd_from_date", d.getOvdFromDate()) + .param("ovd_to_date", d.getOvdToDate()) + .param("ovd_seq", d.getOvdSeq()) + .param("batch_id", d.getBatchId()) + .update()); + } + + private static final String UPSERT_EDX = + """ + MERGE INTO nrt_observation_edx + USING ( + SELECT + :edx_document_uid AS edx_document_uid, + :edx_act_uid AS edx_act_uid, + :edx_add_time AS edx_add_time + ) AS source + ON nrt_observation_edx.edx_document_uid = source.edx_document_uid AND nrt_observation_edx.edx_act_uid = source.edx_act_uid + WHEN MATCHED THEN + UPDATE SET + edx_add_time = source.edx_add_time + WHEN NOT MATCHED THEN + INSERT ( + edx_document_uid, + edx_act_uid, + edx_add_time + ) VALUES ( + source.edx_document_uid, + source.edx_act_uid, + source.edx_add_time + ); + """; + + void persistEdx(List edxEntries) { + edxEntries.forEach( + e -> + client + .sql(UPSERT_EDX) + .param("edx_document_uid", e.getEdxDocumentUid()) + .param("edx_act_uid", e.getEdxActUid()) + .param("edx_add_time", e.getEdxAddTime()) + .update()); + } + + private static final String UPSERT_NUMERIC = + """ + MERGE INTO nrt_observation_numeric + USING ( + SELECT + :observation_uid AS observation_uid, + :ovn_high_range AS ovn_high_range, + :ovn_low_range AS ovn_low_range, + :ovn_comparator_cd_1 AS ovn_comparator_cd_1, + :ovn_numeric_value_1 AS ovn_numeric_value_1, + :ovn_numeric_value_2 AS ovn_numeric_value_2, + :ovn_numeric_unit_cd AS ovn_numeric_unit_cd, + :ovn_separator_cd AS ovn_separator_cd, + :ovn_seq AS ovn_seq, + :batch_id AS batch_id + ) AS source + ON nrt_observation_numeric.observation_uid = source.observation_uid AND nrt_observation_numeric.ovn_seq = source.ovn_seq + WHEN MATCHED THEN + UPDATE SET + ovn_high_range = source.ovn_high_range, + ovn_low_range = source.ovn_low_range, + ovn_comparator_cd_1 = source.ovn_comparator_cd_1, + ovn_numeric_value_1 = source.ovn_numeric_value_1, + ovn_numeric_value_2 = source.ovn_numeric_value_2, + ovn_numeric_unit_cd = source.ovn_numeric_unit_cd, + ovn_separator_cd = source.ovn_separator_cd, + batch_id = source.batch_id + WHEN NOT MATCHED THEN + INSERT ( + observation_uid, + ovn_high_range, + ovn_low_range, + ovn_comparator_cd_1, + ovn_numeric_value_1, + ovn_numeric_value_2, + ovn_numeric_unit_cd, + ovn_separator_cd, + ovn_seq, + batch_id + ) VALUES ( + source.observation_uid, + source.ovn_high_range, + source.ovn_low_range, + source.ovn_comparator_cd_1, + source.ovn_numeric_value_1, + source.ovn_numeric_value_2, + source.ovn_numeric_unit_cd, + source.ovn_separator_cd, + source.ovn_seq, + source.batch_id + ); + """; + + void persistNumeric(List numericEntries) { + numericEntries.forEach( + n -> + client + .sql(UPSERT_NUMERIC) + .param("observation_uid", n.getObservationUid()) + .param("ovn_seq", n.getOvnSeq()) + .param("ovn_high_range", n.getOvnHighRange()) + .param("ovn_low_range", n.getOvnLowRange()) + .param("ovn_comparator_cd_1", n.getOvnComparatorCd1()) + .param("ovn_numeric_value_1", n.getOvnNumericValue1()) + .param("ovn_numeric_value_2", n.getOvnNumericValue2()) + .param("ovn_numeric_unit_cd", n.getOvnNumericUnitCd()) + .param("ovn_separator_cd", n.getOvnSeparatorCd()) + .param("batch_id", n.getBatchId()) + .update()); + } + + private static final String UPSERT_REASON = + """ + MERGE INTO nrt_observation_reason + USING ( + SELECT + :observation_uid as observation_uid, + :reason_cd as reason_cd, + :reason_desc_txt as reason_desc_txt, + :batch_id as batch_id + ) AS source + ON nrt_observation_reason.observation_uid = source.observation_uid AND nrt_observation_reason.reason_cd = source.reason_cd + WHEN MATCHED THEN + UPDATE SET + reason_desc_txt = source.reason_desc_txt, + batch_id = source.batch_id + WHEN NOT MATCHED THEN + INSERT ( + observation_uid, + reason_cd, + reason_desc_txt, + batch_id + ) VALUES ( + source.observation_uid, + source.reason_cd, + source.reason_desc_txt, + source.batch_id + ); + """; + + void persistReason(List reasonEntries) { + reasonEntries.forEach( + r -> + client + .sql(UPSERT_REASON) + .param("observation_uid", r.getObservationUid()) + .param("reason_cd", r.getReasonCd()) + .param("reason_desc_txt", r.getReasonDescTxt()) + .param("batch_id", r.getBatchId()) + .update()); + } + + private static final String UPSERT_TEXT = + """ + MERGE INTO nrt_observation_txt + USING ( + SELECT + :observation_uid AS observation_uid, + :ovt_seq AS ovt_seq, + :ovt_txt_type_cd AS ovt_txt_type_cd, + :ovt_value_txt AS ovt_value_txt, + :batch_id AS batch_id + ) AS source + ON nrt_observation_txt.observation_uid = source.observation_uid AND nrt_observation_txt.ovt_seq = source.ovt_seq + WHEN MATCHED THEN + UPDATE SET + ovt_txt_type_cd = source.ovt_txt_type_cd, + ovt_value_txt = source.ovt_value_txt, + batch_id = source.batch_id + WHEN NOT MATCHED THEN + INSERT ( + observation_uid, + ovt_seq, + ovt_txt_type_cd, + ovt_value_txt, + batch_id + ) VALUES ( + source.observation_uid, + source.ovt_seq, + source.ovt_txt_type_cd, + source.ovt_value_txt, + source.batch_id + ); + """; + + void persistText(List textEntries) { + textEntries.forEach( + t -> + client + .sql(UPSERT_TEXT) + .param("observation_uid", t.getObservationUid()) + .param("ovt_seq", t.getOvtSeq()) + .param("ovt_txt_type_cd", t.getOvtTxtTypeCd()) + .param("ovt_value_txt", t.getOvtValueTxt()) + .param("batch_id", t.getBatchId()) + .update()); + } +} diff --git a/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/service/ObservationProcessor.java b/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/service/ObservationProcessor.java new file mode 100644 index 000000000..fead27561 --- /dev/null +++ b/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/service/ObservationProcessor.java @@ -0,0 +1,120 @@ +package gov.cdc.nbs.report.pipeline.observation.service; + +import static gov.cdc.etldatapipeline.commonutil.UtilHelper.errorMessage; + +import gov.cdc.etldatapipeline.commonutil.DataProcessingException; +import gov.cdc.etldatapipeline.commonutil.NoDataException; +import gov.cdc.etldatapipeline.commonutil.json.CustomJsonGeneratorImpl; +import gov.cdc.etldatapipeline.commonutil.metrics.CustomMetrics; +import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.Observation; +import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationKey; +import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationReporting; +import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ParsedObservation; +import gov.cdc.nbs.report.pipeline.observation.repository.NrtObservationWriter; +import gov.cdc.nbs.report.pipeline.observation.repository.ObservationRepository; +import gov.cdc.nbs.report.pipeline.observation.transformer.ObservationParser; +import io.micrometer.core.instrument.Counter; +import jakarta.persistence.EntityNotFoundException; +import java.util.Optional; +import org.modelmapper.ModelMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Service; + +/** Handles the processing of Observation data */ +@Service +public class ObservationProcessor { + private static final Logger logger = LoggerFactory.getLogger(ObservationProcessor.class); + private final ModelMapper modelMapper = new ModelMapper(); + private final CustomJsonGeneratorImpl jsonGenerator = new CustomJsonGeneratorImpl(); + + private final CustomMetrics metrics; + private static final String[] TAGS = {"service", "observation-reporting"}; + private Counter msgProcessed; + private Counter msgSuccess; + private Counter msgFailure; + + private final ObservationRepository observationRepository; + private final KafkaTemplate kafkaTemplate; + private final NrtObservationWriter nrtWriter; + private final String nrtObservationTopic; + + public ObservationProcessor( + final CustomMetrics metrics, + final ObservationRepository observationRepository, + @Qualifier("observationKafkaTemplate") final KafkaTemplate kafkaTemplate, + @Value("${spring.kafka.topics.nrt.observation}") final String nrtObservationTopic, + final NrtObservationWriter nrtWriter) { + this.metrics = metrics; + this.observationRepository = observationRepository; + this.kafkaTemplate = kafkaTemplate; + this.nrtObservationTopic = nrtObservationTopic; + this.nrtWriter = nrtWriter; + + msgProcessed = metrics.counter("obs_msg_processed", TAGS); + msgSuccess = metrics.counter("obs_msg_success", TAGS); + msgFailure = metrics.counter("obs_msg_failure", TAGS); + } + + public void process(final long batchId, final String observationUid) { + msgProcessed.increment(); + + metrics.recordTime( + "obs_msg_processing_seconds", + () -> { + try { + logger.info("Received Observation with id: {}", observationUid); + + // Query NBS_ODSE for observation data + Optional observationData = + observationRepository.computeObservations(observationUid); + + // Ensure data is returned + if (observationData.isEmpty()) { + throw new EntityNotFoundException( + "Unable to find Observation with id: " + observationUid); + } + + // Convert Entity to reporting object that will be sent to nrt_observation + ObservationReporting reportingModel = + modelMapper.map(observationData.get(), ObservationReporting.class); + + // Parse all fields from incoming mesage + ParsedObservation parsed = ObservationParser.parse(observationData.get(), batchId); + + // Push parsed fields into reporting object + modelMapper.map(parsed.transformed(), reportingModel); + + // Insert parsed data into nrt_observation_* database tables + nrtWriter.persist(parsed); + + // Send observation data to nrt_observation kafka topic + ObservationKey observationKey = new ObservationKey(Long.valueOf(observationUid)); + pushKeyValuePairToKafka(observationKey, reportingModel, nrtObservationTopic); + logger.info( + "Observation data (uid={}) sent to {}", observationUid, nrtObservationTopic); + + msgSuccess.increment(); + + } catch (EntityNotFoundException ex) { + msgFailure.increment(); + throw new NoDataException(ex.getMessage(), ex); + + } catch (Exception e) { + msgFailure.increment(); + throw new DataProcessingException(errorMessage("Observation", observationUid, e), e); + } + }, + TAGS); + } + + private void pushKeyValuePairToKafka( + ObservationKey observationKey, Object model, String topicName) { + String jsonKey = jsonGenerator.generateStringJson(observationKey); + String jsonValue = jsonGenerator.generateStringJson(model); + kafkaTemplate.send(topicName, jsonKey, jsonValue); + } +} diff --git a/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/service/ObservationService.java b/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/service/ObservationService.java deleted file mode 100644 index 57dfb08a1..000000000 --- a/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/service/ObservationService.java +++ /dev/null @@ -1,245 +0,0 @@ -package gov.cdc.nbs.report.pipeline.observation.service; - -import static gov.cdc.etldatapipeline.commonutil.UtilHelper.*; - -import gov.cdc.etldatapipeline.commonutil.DataProcessingException; -import gov.cdc.etldatapipeline.commonutil.NoDataException; -import gov.cdc.etldatapipeline.commonutil.json.CustomJsonGeneratorImpl; -import gov.cdc.etldatapipeline.commonutil.metrics.CustomMetrics; -import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.Observation; -import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationKey; -import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationReporting; -import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationTransformed; -import gov.cdc.nbs.report.pipeline.observation.repository.ObservationRepository; -import gov.cdc.nbs.report.pipeline.observation.transformer.ProcessObservationDataUtil; -import io.micrometer.core.instrument.Counter; -import jakarta.annotation.PostConstruct; -import jakarta.persistence.EntityNotFoundException; -import java.util.NoSuchElementException; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.function.ToLongFunction; -import lombok.RequiredArgsConstructor; -import lombok.Setter; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.errors.SerializationException; -import org.modelmapper.ModelMapper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.kafka.annotation.RetryableTopic; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.retrytopic.DltStrategy; -import org.springframework.kafka.retrytopic.TopicSuffixingStrategy; -import org.springframework.kafka.support.serializer.DeserializationException; -import org.springframework.retry.annotation.Backoff; -import org.springframework.scheduling.concurrent.CustomizableThreadFactory; -import org.springframework.stereotype.Service; - -/** - * Service class for processing Observation-related change events in the Real Time Reporting (RTR) - * pipeline. This service handles the "hydration" of data for Observations by consuming Kafka events - * from transactional source topics, transforming them, and producing them to reporting topics. - * - *

Key responsibilities include: - * - *

    - *
  • Consuming CDC (Change Data Capture) events for Observations and Act Relationships. - *
  • Fetching enriched data from the database using stored procedures. - *
  • Transforming raw data into reporting-optimized formats for Observation and its related - * entities (Coded, Date, EDX, Material, Numeric, Reason, Txt). - *
  • Pushing transformed data to corresponding output topics in Kafka. - *
  • Handling retries and dead-letter topics (DLT) for resilient processing. - *
- */ -@Service -@Setter -@RequiredArgsConstructor -public class ObservationService { - private static final Logger logger = LoggerFactory.getLogger(ObservationService.class); - private static final String BEFORE_PATH = "before"; - - @Value("${spring.kafka.topics.nbs.observation}") - private String observationTopic; - - @Value("${spring.kafka.topics.nbs.act-relationship}") - private String actRelationshipTopic; - - @Value("${spring.kafka.topics.nrt.observation}") - private String observationTopicOutputReporting; - - @Value("${featureFlag.thread-pool-size:1}") - private int threadPoolSize; - - private final ObservationRepository observationRepository; - - @Qualifier("observationKafkaTemplate") - private final KafkaTemplate kafkaTemplate; - - private final ProcessObservationDataUtil processObservationDataUtil; - private final ModelMapper modelMapper = new ModelMapper(); - private final CustomJsonGeneratorImpl jsonGenerator = new CustomJsonGeneratorImpl(); - - private ExecutorService obsExecutor; - - private static String topicDebugLog = "Received Observation with id: {} from topic: {}"; - public static final ToLongFunction> toBatchId = - rec -> rec.timestamp() + rec.offset() + rec.partition(); - - ObservationKey observationKey = new ObservationKey(); - - private static final String SERVICE_NAME = "observation-reporting"; - - private final CustomMetrics metrics; - - private Counter msgProcessed; - private Counter msgSuccess; - private Counter msgFailure; - - @PostConstruct - void initMetrics() { - String[] tags = {"service", SERVICE_NAME}; - - msgProcessed = metrics.counter("obs_msg_processed", tags); - msgSuccess = metrics.counter("obs_msg_success", tags); - msgFailure = metrics.counter("obs_msg_failure", tags); - - obsExecutor = - Executors.newFixedThreadPool(threadPoolSize, new CustomizableThreadFactory("obs-")); - } - - @RetryableTopic( - attempts = "${spring.kafka.consumer.max-retry}", - autoCreateTopics = "false", - dltStrategy = DltStrategy.FAIL_ON_ERROR, - retryTopicSuffix = "${spring.kafka.dlq.retry-suffix}", - dltTopicSuffix = "${spring.kafka.dlq.dlq-suffix}", - // retry topic name, such as topic-retry-1, topic-retry-2, etc - topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE, - // time to wait before attempting to retry - backoff = @Backoff(delay = 1000, multiplier = 2.0), - exclude = { - SerializationException.class, - DeserializationException.class, - RuntimeException.class, - NoDataException.class - }, - kafkaTemplate = "observationKafkaTemplate") - @KafkaListener( - topics = { - "${spring.kafka.topics.nbs.observation}", - "${spring.kafka.topics.nbs.act-relationship}" - }, - containerFactory = "observationKafkaListenerContainerFactory") - public CompletableFuture processMessage(ConsumerRecord rec) { - - long batchId = toBatchId.applyAsLong(rec); - String topic = rec.topic(); - String message = rec.value(); - logger.debug(topicDebugLog, message, topic); - - if (topic.equals(observationTopic)) { - return CompletableFuture.runAsync( - () -> processObservation(message, batchId, true, ""), obsExecutor); - } else if (topic.equals(actRelationshipTopic) && message != null) { - return CompletableFuture.runAsync( - () -> processActRelationship(message, batchId), obsExecutor); - } else { - return CompletableFuture.failedFuture( - new DataProcessingException( - "Received data from an unknown topic: " + topic, new NoSuchElementException())); - } - } - - private void processObservation( - String value, - long batchId, - boolean isFromObservationTopic, - String actRelationshipSourceActUid) { - msgProcessed.increment(); - metrics.recordTime( - "obs_msg_processing_seconds", - () -> { - String observationUid = ""; - try { - observationUid = - isFromObservationTopic - ? extractUid(value, "observation_uid") - : actRelationshipSourceActUid; - observationKey.setObservationUid(Long.valueOf(observationUid)); - logger.info(topicDebugLog, observationUid, observationTopic); - Optional observationData = - observationRepository.computeObservations(observationUid); - if (observationData.isPresent()) { - ObservationReporting reportingModel = - modelMapper.map(observationData.get(), ObservationReporting.class); - ObservationTransformed observationTransformed = - processObservationDataUtil.transformObservationData( - observationData.get(), batchId); - modelMapper.map(observationTransformed, reportingModel); - pushKeyValuePairToKafka( - observationKey, reportingModel, observationTopicOutputReporting); - logger.info( - "Observation data (uid={}) sent to {}", - observationUid, - observationTopicOutputReporting); - msgSuccess.increment(); - } else { - throw new EntityNotFoundException( - "Unable to find Observation with id: " + observationUid); - } - } catch (EntityNotFoundException ex) { - msgFailure.increment(); - throw new NoDataException(ex.getMessage(), ex); - } catch (Exception e) { - msgFailure.increment(); - throw new DataProcessingException(errorMessage("Observation", observationUid, e), e); - } - }, - "service", - SERVICE_NAME); - } - - private void processActRelationship(String value, long batchId) { - String sourceActUid = ""; - - try { - String typeCd; - String targetClassCd; - String operationType = extractChangeDataCaptureOperation(value); - - if (operationType.equals("d")) { - sourceActUid = extractUid(value, "source_act_uid", BEFORE_PATH); - typeCd = extractValue(value, "type_cd", BEFORE_PATH); - targetClassCd = extractValue(value, "target_class_cd", BEFORE_PATH); - } else { - return; - } - - logger.info(topicDebugLog, "Act_relationship", sourceActUid, actRelationshipTopic); - // For LabReport values, we only need to trigger if the relationship is deleted (not covered - // in updates to Observation) - // PHC targets are excluded from the LabReport association updates, as the LabReport will - // receive - // an update in Observation - if (typeCd.equals("LabReport") && targetClassCd.equals("OBS")) { - processObservation(value, batchId, false, sourceActUid); - } - } catch (Exception e) { - throw new DataProcessingException(errorMessage("ActRelationship", sourceActUid, e), e); - } - } - - // This same method can be used for elastic search as well and that is why the generic model is - // present - private void pushKeyValuePairToKafka( - ObservationKey observationKey, Object model, String topicName) { - String jsonKey = jsonGenerator.generateStringJson(observationKey); - String jsonValue = jsonGenerator.generateStringJson(model); - kafkaTemplate.send(topicName, jsonKey, jsonValue); - } -} diff --git a/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/transformer/ProcessObservationDataUtil.java b/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/transformer/ObservationParser.java similarity index 51% rename from reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/transformer/ProcessObservationDataUtil.java rename to reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/transformer/ObservationParser.java index 9066b9d21..507ed4c05 100644 --- a/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/transformer/ProcessObservationDataUtil.java +++ b/reporting-pipeline-service/src/main/java/gov/cdc/nbs/report/pipeline/observation/transformer/ObservationParser.java @@ -4,60 +4,30 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; -import gov.cdc.etldatapipeline.commonutil.json.CustomJsonGeneratorImpl; -import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.*; import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.Observation; +import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationCoded; +import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationDate; +import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationEdx; +import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationMaterial; +import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationNumeric; +import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationReason; import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationTransformed; +import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationTxt; +import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ParsedObservation; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Optional; +import java.util.function.Consumer; import java.util.function.Function; -import lombok.RequiredArgsConstructor; -import lombok.Setter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.stereotype.Component; - -@Component -@RequiredArgsConstructor -@Setter -public class ProcessObservationDataUtil { - private static final Logger logger = LoggerFactory.getLogger(ProcessObservationDataUtil.class); + +public class ObservationParser { + private static final Logger logger = LoggerFactory.getLogger(ObservationParser.class); private static final ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()); - @Qualifier("observationKafkaTemplate") - private final KafkaTemplate kafkaTemplate; - - private final CustomJsonGeneratorImpl jsonGenerator = new CustomJsonGeneratorImpl(); - - @Value("${spring.kafka.topics.nrt.observation-coded}") - public String codedTopicName; - - @Value("${spring.kafka.topics.nrt.observation-date}") - public String dateTopicName; - - @Value("${spring.kafka.topics.nrt.observation-edx}") - public String edxTopicName; - - @Value("${spring.kafka.topics.nrt.observation-material}") - public String materialTopicName; - - @Value("${spring.kafka.topics.nrt.observation-numeric}") - public String numericTopicName; - - @Value("${spring.kafka.topics.nrt.observation-reason}") - public String reasonTopicName; - - @Value("${spring.kafka.topics.nrt.observation-txt}") - public String txtTopicName; - - ObservationKey observationKey = new ObservationKey(); - private static final String SUBJECT_CLASS_CD = "subject_class_cd"; public static final String TYPE_CD = "type_cd"; public static final String ENTITY_ID = "entity_id"; @@ -66,39 +36,69 @@ public class ProcessObservationDataUtil { public static final String ACT_ID_SEQ = "act_id_seq"; public static final String ROOT_EXTENSION_TXT = "root_extension_txt"; - public ObservationTransformed transformObservationData(Observation observation, long batchId) { - ObservationTransformed observationTransformed = new ObservationTransformed(); + private ObservationParser() {} + + public static ParsedObservation parse(final Observation observation, final long batchId) { + ParsedObservation parsedObservation = new ParsedObservation(new ObservationTransformed()); + ObservationTransformed observationTransformed = parsedObservation.transformed(); + observationTransformed.setObservationUid(observation.getObservationUid()); observationTransformed.setReportObservationUid(observation.getObservationUid()); observationTransformed.setBatchId(batchId); - observationKey.setObservationUid(observation.getObservationUid()); - String obsDomainCdSt1 = observation.getObsDomainCdSt1(); - - transformPersonParticipations( - observation.getPersonParticipations(), obsDomainCdSt1, observationTransformed); - transformOrganizationParticipations( - observation.getOrganizationParticipations(), obsDomainCdSt1, observationTransformed); - transformMaterialParticipations( - observation.getMaterialParticipations(), obsDomainCdSt1, observationTransformed); - transformFollowupObservations( - observation.getFollowupObservations(), obsDomainCdSt1, observationTransformed); - transformParentObservations(observation.getParentObservations(), observationTransformed); - transformActIds(observation.getActIds(), observationTransformed); - transformObservationCoded(observation.getObsCode(), batchId); - transformObservationDate(observation.getObsDate(), batchId); - transformObservationEdx(observation.getEdxIds()); - transformObservationNumeric(observation.getObsNum(), batchId); - transformObservationReasons(observation.getObsReason(), batchId); - transformObservationTxt(observation.getObsTxt(), batchId); - - return observationTransformed; + // Person Participations + parsePersonParticipations( + observation.getPersonParticipations(), + observation.getObsDomainCdSt1(), + observationTransformed); + + // Organization Participations + parseOrganizationParticipations( + observation.getOrganizationParticipations(), + observation.getObsDomainCdSt1(), + observationTransformed); + + // Material Participations + parseMaterialParticipations( + observation.getMaterialParticipations(), + observation.getObsDomainCdSt1(), + parsedObservation); + + // Follow up Observations + parseFollowupObservations( + observation.getFollowupObservations(), + observation.getObsDomainCdSt1(), + observationTransformed); + + // Parent Observations + parseParentObservations(observation.getParentObservations(), observationTransformed); + + // Act Ids + parseActIds(observation.getActIds(), observationTransformed); + + // Observation Coded data + parseObservationCoded(observation.getObsCode(), parsedObservation); + + // Observation Date data + parseObservationDate(observation.getObsDate(), parsedObservation); + + // Observation Edx data + parseObservationEdx(observation.getEdxIds(), parsedObservation); + + // Observation Numeric data + parseObservationNumeric(observation.getObsNum(), parsedObservation); + + // Observation Reason data + parseObservationReasons(observation.getObsReason(), parsedObservation); + + // Observation Text data + parseObservationTxt(observation.getObsTxt(), parsedObservation); + + return parsedObservation; } - private void transformPersonParticipations( - String personParticipations, - String obsDomainCdSt1, - ObservationTransformed observationTransformed) { + private static void parsePersonParticipations( + String personParticipations, String obsDomainCdSt1, ObservationTransformed transformed) { try { JsonNode personParticipationsJsonArray = parseJsonArray(personParticipations); @@ -110,7 +110,7 @@ private void transformPersonParticipations( Long entityId = getNodeValue(jsonNode, ENTITY_ID, JsonNode::asLong); if (typeCd.equals("PATSBJ")) { - transformPersonParticipationRoles(jsonNode, observationTransformed, entityId); + setPersonParticipationRoles(jsonNode, transformed, entityId); } if (ORDER.equals(obsDomainCdSt1)) { @@ -121,61 +121,45 @@ private void transformPersonParticipations( orderers.add(String.valueOf(entityId)); break; case "PATSBJ", "SubjOfMorbReport": - observationTransformed.setPatientId(entityId); + transformed.setPatientId(entityId); break; case "PhysicianOfMorb": - observationTransformed.setMorbPhysicianId(entityId); + transformed.setMorbPhysicianId(entityId); break; case "ReporterOfMorbReport": - observationTransformed.setMorbReporterId(entityId); + transformed.setMorbReporterId(entityId); break; case "ENT": - observationTransformed.setTranscriptionistId(entityId); - Optional.ofNullable(jsonNode.get("first_nm")) - .filter(n -> !n.isNull()) - .ifPresent(n -> observationTransformed.setTranscriptionistFirstNm(n.asText())); - Optional.ofNullable(jsonNode.get("last_nm")) - .filter(n -> !n.isNull()) - .ifPresent(n -> observationTransformed.setTranscriptionistLastNm(n.asText())); - Optional.ofNullable(jsonNode.get("person_id_val")) - .filter(n -> !n.isNull()) - .ifPresent(n -> observationTransformed.setTranscriptionistVal(n.asText())); - Optional.ofNullable(jsonNode.get("person_id_assign_auth_cd")) - .filter(n -> !n.isNull()) - .ifPresent( - n -> observationTransformed.setTranscriptionistIdAssignAuth(n.asText())); - Optional.ofNullable(jsonNode.get("person_id_type_desc")) - .filter(n -> !n.isNull()) - .ifPresent(n -> observationTransformed.setTranscriptionistAuthType(n.asText())); + transformed.setTranscriptionistId(entityId); + + ifPresentSet(jsonNode, "first_nm", transformed::setTranscriptionistFirstNm); + ifPresentSet(jsonNode, "last_nm", transformed::setTranscriptionistLastNm); + ifPresentSet(jsonNode, "person_id_val", transformed::setTranscriptionistVal); + ifPresentSet( + jsonNode, + "person_id_assign_auth_cd", + transformed::setTranscriptionistIdAssignAuth); + ifPresentSet( + jsonNode, "person_id_type_desc", transformed::setTranscriptionistAuthType); break; case "ASS": - observationTransformed.setAssistantInterpreterId(entityId); - Optional.ofNullable(jsonNode.get("first_nm")) - .filter(n -> !n.isNull()) - .ifPresent( - n -> observationTransformed.setAssistantInterpreterFirstNm(n.asText())); - Optional.ofNullable(jsonNode.get("last_nm")) - .filter(n -> !n.isNull()) - .ifPresent( - n -> observationTransformed.setAssistantInterpreterLastNm(n.asText())); - Optional.ofNullable(jsonNode.get("person_id_val")) - .filter(n -> !n.isNull()) - .ifPresent(n -> observationTransformed.setAssistantInterpreterVal(n.asText())); - Optional.ofNullable(jsonNode.get("person_id_assign_auth_cd")) - .filter(n -> !n.isNull()) - .ifPresent( - n -> - observationTransformed.setAssistantInterpreterIdAssignAuth(n.asText())); - Optional.ofNullable(jsonNode.get("person_id_type_desc")) - .filter(n -> !n.isNull()) - .ifPresent( - n -> observationTransformed.setAssistantInterpreterAuthType(n.asText())); + transformed.setAssistantInterpreterId(entityId); + + ifPresentSet(jsonNode, "first_nm", transformed::setAssistantInterpreterFirstNm); + ifPresentSet(jsonNode, "last_nm", transformed::setAssistantInterpreterLastNm); + ifPresentSet(jsonNode, "person_id_val", transformed::setAssistantInterpreterVal); + ifPresentSet( + jsonNode, + "person_id_assign_auth_cd", + transformed::setAssistantInterpreterIdAssignAuth); + ifPresentSet( + jsonNode, "person_id_type_desc", transformed::setAssistantInterpreterAuthType); break; case "VRF": - observationTransformed.setResultInterpreterId(entityId); + transformed.setResultInterpreterId(entityId); break; case "PRF": - observationTransformed.setLabTestTechnicianId(entityId); + transformed.setLabTestTechnicianId(entityId); break; default: } @@ -183,7 +167,7 @@ private void transformPersonParticipations( } } if (!orderers.isEmpty()) { - observationTransformed.setOrderingPersonId(String.join(",", orderers)); + transformed.setOrderingPersonId(String.join(",", orderers)); } } catch (IllegalArgumentException ex) { logger.info(ex.getMessage(), "PersonParticipations", personParticipations); @@ -194,26 +178,10 @@ private void transformPersonParticipations( } } - private void transformPersonParticipationRoles( - JsonNode node, ObservationTransformed observationTransformed, Long entityId) { - String roleSubject = node.path("role_subject_class_cd").asText(); - if ("PROV".equals(roleSubject)) { - String roleCd = node.path("role_cd").asText(); - if ("SPP".equals(roleCd)) { - String roleScoping = node.path("role_scoping_class_cd").asText(); - if ("PSN".equals(roleScoping)) { - observationTransformed.setSpecimenCollectorId(entityId); - } - } else if ("CT".equals(roleCd)) { - observationTransformed.setCopyToProviderId(entityId); - } - } - } - - private void transformOrganizationParticipations( + private static void parseOrganizationParticipations( String organizationParticipations, String obsDomainCdSt1, - ObservationTransformed observationTransformed) { + ObservationTransformed transformed) { try { JsonNode organizationParticipationsJsonArray = parseJsonArray(organizationParticipations); @@ -225,26 +193,25 @@ private void transformOrganizationParticipations( Long entityId = getNodeValue(jsonNode, ENTITY_ID, JsonNode::asLong); if (subjectClassCd.equals("ORG")) { - if (RESULT.equals(obsDomainCdSt1)) { - if ("PRF".equals(typeCd)) { - observationTransformed.setPerformingOrganizationId(entityId); - } + + if (RESULT.equals(obsDomainCdSt1) && "PRF".equals(typeCd)) { + transformed.setPerformingOrganizationId(entityId); } else if (ORDER.equals(obsDomainCdSt1)) { switch (typeCd) { case "AUT": - observationTransformed.setAuthorOrganizationId(entityId); + transformed.setAuthorOrganizationId(entityId); break; case "ORD": - observationTransformed.setOrderingOrganizationId(entityId); + transformed.setOrderingOrganizationId(entityId); break; case "HCFAC": - observationTransformed.setHealthCareId(entityId); + transformed.setHealthCareId(entityId); break; case "ReporterOfMorbReport": - observationTransformed.setMorbHospReporterId(entityId); + transformed.setMorbHospReporterId(entityId); break; case "HospOfMorbObs": - observationTransformed.setMorbHospId(entityId); + transformed.setMorbHospId(entityId); break; default: break; @@ -261,10 +228,8 @@ private void transformOrganizationParticipations( } } - private void transformMaterialParticipations( - String materialParticipations, - String obsDomainCdSt1, - ObservationTransformed observationTransformed) { + private static void parseMaterialParticipations( + String materialParticipations, String obsDomainCdSt1, ParsedObservation parsedObservation) { try { JsonNode materialParticipationsJsonArray = parseJsonArray(materialParticipations); @@ -275,19 +240,14 @@ private void transformMaterialParticipations( assertDomainCdMatches(obsDomainCdSt1, ORDER); if ("SPC".equals(typeCd) && "MAT".equals(subjectClassCd)) { Long materialId = jsonNode.get(ENTITY_ID).asLong(); - observationTransformed.setMaterialId(materialId); + parsedObservation.transformed().setMaterialId(materialId); ObservationMaterial material = objectMapper.treeToValue(jsonNode, ObservationMaterial.class); material.setMaterialId(materialId); - ObservationMaterialKey key = new ObservationMaterialKey(); - key.setMaterialId(observationTransformed.getMaterialId()); - sendToKafka( - key, - material, - materialTopicName, - materialId, - "Observation Material data (uid={}) sent to {}"); + + // Add material to list that will be persisted to the database + parsedObservation.materialEntries().add(material); } } } catch (IllegalArgumentException ex) { @@ -299,20 +259,34 @@ private void transformMaterialParticipations( } } - private void transformFollowupObservations( - String followupObservations, - String obsDomainCdSt1, - ObservationTransformed observationTransformed) { + private static void setPersonParticipationRoles( + JsonNode node, ObservationTransformed observationTransformed, Long entityId) { + String roleSubject = fieldAsText(node, "role_subject_class_cd"); + if ("PROV".equals(roleSubject)) { + String roleCd = fieldAsText(node, "role_cd"); + if ("SPP".equals(roleCd)) { + String roleScoping = fieldAsText(node, "role_scoping_class_cd"); + if ("PSN".equals(roleScoping)) { + observationTransformed.setSpecimenCollectorId(entityId); + } + } else if ("CT".equals(roleCd)) { + observationTransformed.setCopyToProviderId(entityId); + } + } + } + + private static void parseFollowupObservations( + String followupObservations, String obsDomainCdSt1, ObservationTransformed transformed) { try { JsonNode followupObservationsJsonArray = parseJsonArray(followupObservations); List results = new ArrayList<>(); List followUps = new ArrayList<>(); for (JsonNode jsonNode : followupObservationsJsonArray) { - Optional domainCd = Optional.ofNullable(jsonNode.get("domain_cd_st_1")); + String domainCd = fieldAsText(jsonNode, "domain_cd_st_1"); assertDomainCdMatches(obsDomainCdSt1, ORDER); - if (domainCd.isPresent() && RESULT.equals(domainCd.get().asText())) { + if (RESULT.equals(domainCd)) { Optional.ofNullable(jsonNode.get("result_observation_uid")) .ifPresent(r -> results.add(r.asText())); } else { @@ -322,10 +296,10 @@ private void transformFollowupObservations( } if (!results.isEmpty()) { - observationTransformed.setResultObservationUid(String.join(",", results)); + transformed.setResultObservationUid(String.join(",", results)); } if (!followUps.isEmpty()) { - observationTransformed.setFollowUpObservationUid(String.join(",", followUps)); + transformed.setFollowUpObservationUid(String.join(",", followUps)); } } catch (IllegalArgumentException ex) { logger.info(ex.getMessage(), "FollowupObservations", followupObservations); @@ -336,24 +310,24 @@ private void transformFollowupObservations( } } - private void transformParentObservations( - String parentObservations, ObservationTransformed observationTransformed) { + private static void parseParentObservations( + String parentObservations, ObservationTransformed transformed) { try { JsonNode parentObservationsJsonArray = parseJsonArray(parentObservations); for (JsonNode jsonNode : parentObservationsJsonArray) { Long parentUid = getNodeValue(jsonNode, "parent_uid", JsonNode::asLong); - String parentTypeCd = jsonNode.path("parent_type_cd").asText(); - String parentDomainCd = jsonNode.path("parent_domain_cd_st_1").asText(); + String parentTypeCd = fieldAsText(jsonNode, "parent_type_cd"); + String parentDomainCd = fieldAsText(jsonNode, "parent_domain_cd_st_1"); - if (parentTypeCd.equals("SPRT")) { - observationTransformed.setReportSprtUid(parentUid); - } else if (parentTypeCd.equals("REFR")) { - observationTransformed.setReportRefrUid(parentUid); + if ("SPRT".equals(parentTypeCd)) { + transformed.setReportSprtUid(parentUid); + } else if ("REFR".equals(parentTypeCd)) { + transformed.setReportRefrUid(parentUid); } if (parentDomainCd.contains(ORDER)) { - observationTransformed.setReportObservationUid(parentUid); + transformed.setReportObservationUid(parentUid); } } } catch (IllegalArgumentException ex) { @@ -365,7 +339,7 @@ private void transformParentObservations( } } - private void transformActIds(String actIds, ObservationTransformed observationTransformed) { + private static void parseActIds(String actIds, ObservationTransformed observationTransformed) { try { JsonNode actIdsJsonArray = parseJsonArray(actIds); @@ -392,22 +366,16 @@ private void transformActIds(String actIds, ObservationTransformed observationTr } } - private void transformObservationCoded(String observationCoded, long batchId) { + private static void parseObservationCoded( + String observationCoded, ParsedObservation parsedObservation) { try { JsonNode observationCodedJsonArray = parseJsonArray(observationCoded); - ObservationCodedKey codedKey = new ObservationCodedKey(); for (JsonNode jsonNode : observationCodedJsonArray) { ObservationCoded coded = objectMapper.treeToValue(jsonNode, ObservationCoded.class); - coded.setBatchId(batchId); - codedKey.setObservationUid(coded.getObservationUid()); - codedKey.setOvcCode(coded.getOvcCode()); - sendToKafka( - codedKey, - coded, - codedTopicName, - coded.getObservationUid(), - "Observation Coded data (uid={}) sent to {}"); + coded.setBatchId(parsedObservation.transformed().getBatchId()); + + parsedObservation.codedEntries().add(coded); } } catch (IllegalArgumentException ex) { logger.info(ex.getMessage(), "ObservationCoded"); @@ -418,19 +386,16 @@ private void transformObservationCoded(String observationCoded, long batchId) { } } - private void transformObservationDate(String observationDate, long batchId) { + private static void parseObservationDate( + String observationDate, ParsedObservation parsedObservation) { try { JsonNode observationDateJsonArray = parseJsonArray(observationDate); for (JsonNode jsonNode : observationDateJsonArray) { ObservationDate obsDate = objectMapper.treeToValue(jsonNode, ObservationDate.class); - obsDate.setBatchId(batchId); - sendToKafka( - observationKey, - obsDate, - dateTopicName, - obsDate.getObservationUid(), - "Observation Date data (uid={}) sent to {}"); + obsDate.setBatchId(parsedObservation.transformed().getBatchId()); + + parsedObservation.dateEntries().add(obsDate); } } catch (IllegalArgumentException ex) { logger.info(ex.getMessage(), "ObservationDate"); @@ -440,20 +405,14 @@ private void transformObservationDate(String observationDate, long batchId) { } } - private void transformObservationEdx(String observationEdx) { + private static void parseObservationEdx( + String observationEdx, ParsedObservation parsedObservation) { try { JsonNode observationEdxJsonArray = parseJsonArray(observationEdx); - ObservationEdxKey edxKey = new ObservationEdxKey(); - for (JsonNode jsonNode : observationEdxJsonArray) { ObservationEdx edx = objectMapper.treeToValue(jsonNode, ObservationEdx.class); - edxKey.setEdxDocumentUid(edx.getEdxDocumentUid()); - sendToKafka( - edxKey, - edx, - edxTopicName, - edx.getEdxDocumentUid(), - "Observation Edx data (edx doc uid={}) sent to {}"); + + parsedObservation.edxEntries().add(edx); } } catch (IllegalArgumentException ex) { logger.info(ex.getMessage(), "ObservationEdx"); @@ -463,19 +422,15 @@ private void transformObservationEdx(String observationEdx) { } } - private void transformObservationNumeric(String observationNumeric, long batchId) { + private static void parseObservationNumeric( + String observationNumeric, ParsedObservation parsedObservation) { try { JsonNode observationNumericJsonArray = parseJsonArray(observationNumeric); for (JsonNode jsonNode : observationNumericJsonArray) { ObservationNumeric numeric = objectMapper.treeToValue(jsonNode, ObservationNumeric.class); - numeric.setBatchId(batchId); - sendToKafka( - observationKey, - numeric, - numericTopicName, - numeric.getObservationUid(), - "Observation Numeric data (uid={}) sent to {}"); + numeric.setBatchId(parsedObservation.transformed().getBatchId()); + parsedObservation.numericEntries().add(numeric); } } catch (IllegalArgumentException ex) { logger.info(ex.getMessage(), "ObservationNumeric"); @@ -486,22 +441,15 @@ private void transformObservationNumeric(String observationNumeric, long batchId } } - private void transformObservationReasons(String observationReasons, long batchId) { + private static void parseObservationReasons( + String observationReasons, ParsedObservation parsedObservation) { try { JsonNode observationReasonsJsonArray = parseJsonArray(observationReasons); - ObservationReasonKey reasonKey = new ObservationReasonKey(); for (JsonNode jsonNode : observationReasonsJsonArray) { ObservationReason reason = objectMapper.treeToValue(jsonNode, ObservationReason.class); - reason.setBatchId(batchId); - reasonKey.setObservationUid(reason.getObservationUid()); - reasonKey.setReasonCd(reason.getReasonCd()); - sendToKafka( - reasonKey, - reason, - reasonTopicName, - reason.getObservationUid(), - "Observation Reason data (uid={}) sent to {}"); + reason.setBatchId(parsedObservation.transformed().getBatchId()); + parsedObservation.reasonEntries().add(reason); } } catch (IllegalArgumentException ex) { logger.info(ex.getMessage(), "ObservationReasons"); @@ -512,22 +460,16 @@ private void transformObservationReasons(String observationReasons, long batchId } } - private void transformObservationTxt(String observationTxt, long batchId) { + private static void parseObservationTxt( + String observationTxt, ParsedObservation parsedObservation) { try { JsonNode observationTxtJsonArray = parseJsonArray(observationTxt); - ObservationTxtKey txtKey = new ObservationTxtKey(); for (JsonNode jsonNode : observationTxtJsonArray) { ObservationTxt txt = objectMapper.treeToValue(jsonNode, ObservationTxt.class); - txt.setBatchId(batchId); - txtKey.setObservationUid(txt.getObservationUid()); - txtKey.setOvtSeq(txt.getOvtSeq()); - sendToKafka( - txtKey, - txt, - txtTopicName, - txt.getObservationUid(), - "Observation Txt data (uid={}) sent to {}"); + txt.setBatchId(parsedObservation.transformed().getBatchId()); + + parsedObservation.textEntries().add(txt); } } catch (IllegalArgumentException ex) { logger.info(ex.getMessage(), "ObservationTxt"); @@ -537,21 +479,7 @@ private void transformObservationTxt(String observationTxt, long batchId) { } } - private void sendToKafka(Object key, Object value, String topicName, Long uid, String message) { - String jsonKey = jsonGenerator.generateStringJson(key); - String jsonValue = - Optional.ofNullable(value).map(jsonGenerator::generateStringJson).orElse(null); - kafkaTemplate - .send(topicName, jsonKey, jsonValue) - .whenComplete( - (res, e) -> { - if (message != null) { - logger.info(message, uid, topicName); - } - }); - } - - private JsonNode parseJsonArray(String jsonString) + private static JsonNode parseJsonArray(String jsonString) throws JsonProcessingException, IllegalArgumentException { JsonNode jsonArray = jsonString != null ? objectMapper.readTree(jsonString) : null; if (jsonArray != null && jsonArray.isArray()) { @@ -561,7 +489,8 @@ private JsonNode parseJsonArray(String jsonString) } } - private T getNodeValue(JsonNode jsonNode, String fieldName, Function mapper) { + private static T getNodeValue( + JsonNode jsonNode, String fieldName, Function mapper) { JsonNode node = jsonNode.get(fieldName); if (node == null || node.isNull()) { throw new IllegalArgumentException("Field " + fieldName + " is null or not found in {}: {}"); @@ -569,9 +498,20 @@ private T getNodeValue(JsonNode jsonNode, String fieldName, Function setter) { + String value = fieldAsText(node, field); + if (value != null) { + setter.accept(value); + } + } } diff --git a/reporting-pipeline-service/src/main/resources/application.yaml b/reporting-pipeline-service/src/main/resources/application.yaml index ea8a680ff..d76d1938c 100644 --- a/reporting-pipeline-service/src/main/resources/application.yaml +++ b/reporting-pipeline-service/src/main/resources/application.yaml @@ -45,13 +45,6 @@ spring: ldf-data: ${TOPIC_NRT_LDF_DATA:nrt_ldf_data} metadata-columns: ${TOPIC_NRT_METADATA_COLUMNS:nrt_metadata_columns} observation: ${TOPIC_NRT_OBSERVATION:nrt_observation} - observation-coded: ${TOPIC_NRT_OBSERVATION_CODED:nrt_observation_coded} - observation-date: ${TOPIC_NRT_OBSERVATION_DATE:nrt_observation_date} - observation-edx: ${TOPIC_NRT_OBSERVATION_EDX:nrt_observation_edx} - observation-material: ${TOPIC_NRT_OBSERVATION_MATERIAL:nrt_observation_material} - observation-numeric: ${TOPIC_NRT_OBSERVATION_NUMERIC:nrt_observation_numeric} - observation-reason: ${TOPIC_NRT_OBSERVATION_REASON:nrt_observation_reason} - observation-txt: ${TOPIC_NRT_OBSERVATION_TXT:nrt_observation_txt} odse-nbs-page: ${TOPIC_NRT_ODSE_NBS_PAGE:nrt_odse_NBS_page} odse-state-defined-field-metadata: ${TOPIC_NRT_ODSE_STATE_DEFINED_FIELD_METADATA:nrt_odse_state_defined_field_metadata} organization: ${TOPIC_NRT_ORGANIZATION:nrt_organization} diff --git a/reporting-pipeline-service/src/test/java/gov/cdc/nbs/report/pipeline/integration/support/config/DataSourceConfig.java b/reporting-pipeline-service/src/test/java/gov/cdc/nbs/report/pipeline/integration/support/config/DataSourceConfig.java index b2b34176f..4740dd1b9 100644 --- a/reporting-pipeline-service/src/test/java/gov/cdc/nbs/report/pipeline/integration/support/config/DataSourceConfig.java +++ b/reporting-pipeline-service/src/test/java/gov/cdc/nbs/report/pipeline/integration/support/config/DataSourceConfig.java @@ -28,7 +28,7 @@ public DataSource dataSource(DataSourceProperties properties) { } @Primary - @Bean + @Bean("rtrClient") public JdbcClient jdbcClient(DataSource dataSource) { return JdbcClient.create(dataSource); } diff --git a/reporting-pipeline-service/src/test/java/gov/cdc/nbs/report/pipeline/observation/ObservationConsumerTest.java b/reporting-pipeline-service/src/test/java/gov/cdc/nbs/report/pipeline/observation/ObservationConsumerTest.java new file mode 100644 index 000000000..a72d77417 --- /dev/null +++ b/reporting-pipeline-service/src/test/java/gov/cdc/nbs/report/pipeline/observation/ObservationConsumerTest.java @@ -0,0 +1,149 @@ +package gov.cdc.nbs.report.pipeline.observation; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.*; + +import gov.cdc.nbs.report.pipeline.observation.service.ObservationProcessor; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.junit.jupiter.api.Test; +import org.mockito.*; + +class ObservationConsumerTest { + + private final String observationTopic = "Observation"; + private final String actRelationshipTopic = "Act_relationship"; + + @Mock ObservationProcessor processor = Mockito.mock(ObservationProcessor.class); + private ObservationConsumer consumer = + new ObservationConsumer(processor, observationTopic, actRelationshipTopic, 1); + + @Test + void processesObservationMessage() { // observation_uid + String message = + """ + { + "payload" : { + "op": "c", + "after": { + "observation_uid": "123" + } + } + } + """; + ConsumerRecord consumerRecord = + new ConsumerRecord<>(observationTopic, 0, 1l, null, message); + // receives valid observation message + consumer.processMessage(consumerRecord).join(); + + // sends to ObservationProcessor + verify(processor, times(1)).process(0, "123"); + } + + @Test + void processesActRelationshipMessage() { + String message = + """ + { + "payload" : { + "op": "d", + "before": { + "source_act_uid": "1", + "type_cd": "LabReport", + "target_class_cd": "OBS" + } + } + } + """; + ConsumerRecord consumerRecord = + new ConsumerRecord<>(actRelationshipTopic, 0, 1l, null, message); + // receives valid act_relationship message + consumer.processMessage(consumerRecord).join(); + + // sends to ObservationProcessor + verify(processor, times(1)).process(0, "1"); + } + + @Test + void doesNotProcessActRelationshipMessageBadOp() { + String message = + """ + { + "payload" : { + "op": "c", + "before": { + "source_act_uid": "1", + "type_cd": "LabReport", + "target_class_cd": "OBS" + } + } + } + """; + ConsumerRecord consumerRecord = + new ConsumerRecord<>(actRelationshipTopic, 0, 1l, null, message); + // receives non 'delete' act_relationship message + consumer.processMessage(consumerRecord).join(); + + // does not send to ObservationProcessor + verifyNoInteractions(processor); + } + + @Test + void doesNotProcessActRelationshipMessageBadTypeCd() { + String message = + """ + { + "payload" : { + "op": "d", + "before": { + "source_act_uid": "1", + "type_cd": "BadValue", + "target_class_cd": "OBS" + } + } + } + """; + ConsumerRecord consumerRecord = + new ConsumerRecord<>(actRelationshipTopic, 0, 1l, null, message); + // receives act_relationship message with a type_cd other than 'LabReport' + consumer.processMessage(consumerRecord).join(); + + // does not send to ObservationProcessor + verifyNoInteractions(processor); + } + + @Test + void throwsExceptionForBadTopic() { + ConsumerRecord consumerRecord = + new ConsumerRecord<>("bad_topic", 0, 1l, null, ""); + CompletableFuture future = consumer.processMessage(consumerRecord); + + CompletionException ex = assertThrows(CompletionException.class, future::join); + assertThat(ex.getCause().getMessage()) + .isEqualTo("Received data from an unknown topic: bad_topic"); + } + + @Test + void throwsExceptionForBadActRelationshipMessage() { + String message = + """ + { + "payload" : { + "op": "d", + "before": { + } + } + } + """; + ConsumerRecord consumerRecord = + new ConsumerRecord<>(actRelationshipTopic, 0, 1l, null, message); + CompletableFuture future = consumer.processMessage(consumerRecord); + + CompletionException ex = assertThrows(CompletionException.class, future::join); + assertThat(ex.getCause().getMessage()) + .isEqualTo( + "Error processing ActRelationship data: The source_act_uid field is missing in the message payload."); + } +} diff --git a/reporting-pipeline-service/src/test/java/gov/cdc/nbs/report/pipeline/observation/repository/NrtObservationWriterTest.java b/reporting-pipeline-service/src/test/java/gov/cdc/nbs/report/pipeline/observation/repository/NrtObservationWriterTest.java new file mode 100644 index 000000000..fe52cc054 --- /dev/null +++ b/reporting-pipeline-service/src/test/java/gov/cdc/nbs/report/pipeline/observation/repository/NrtObservationWriterTest.java @@ -0,0 +1,603 @@ +package gov.cdc.nbs.report.pipeline.observation.repository; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import gov.cdc.nbs.report.pipeline.integration.unit.UnitTest; +import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationCoded; +import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationDate; +import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationEdx; +import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationMaterial; +import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationNumeric; +import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationReason; +import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationTxt; +import java.text.SimpleDateFormat; +import java.util.List; +import java.util.Map; +import org.json.JSONException; +import org.junit.jupiter.api.Test; +import org.skyscreamer.jsonassert.Customization; +import org.skyscreamer.jsonassert.JSONAssert; +import org.skyscreamer.jsonassert.JSONCompareMode; +import org.skyscreamer.jsonassert.comparator.CustomComparator; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.jdbc.core.simple.JdbcClient; + +class NrtObservationWriterTest extends UnitTest { + + private final JdbcClient client; + private final NrtObservationWriter writer; + private static final ObjectMapper mapper = + new ObjectMapper() + .enable(SerializationFeature.INDENT_OUTPUT) + .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS) + .setDateFormat(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS")); + + public NrtObservationWriterTest(@Qualifier("rtrClient") final JdbcClient client) { + this.client = client; + this.writer = new NrtObservationWriter(client); + } + + @Test + void insertsMaterialData() throws JsonProcessingException, JSONException { + // Insert material data + ObservationMaterial material = new ObservationMaterial(); + material.setActUid(2L); + material.setTypeCd("SPC"); + material.setMaterialId(2L); + material.setSubjectClassCd("MAT"); + material.setRecordStatus("ACTIVE"); + material.setTypeDescTxt("Specimen"); + material.setLastChgTime("2024-01-01T00:00:00.000"); + material.setMaterialCd("UNK"); + material.setMaterialNm("name"); + material.setMaterialDetails("Details"); + material.setMaterialCollectionVol("36"); + material.setMaterialCollectionVolUnit("ML"); + material.setMaterialDesc("Lymphocytes"); + material.setRiskCd("rsk"); + material.setRiskDescTxt("rskDesc"); + + writer.persistMaterials(List.of(material)); + + // Verify data is as expected + Map data = + client + .sql("SELECT * FROM nrt_observation_material WHERE act_uid = 2 AND material_id = 2") + .query() + .singleRow(); + + String actual = mapper.writeValueAsString(data); + String expected = mapper.writeValueAsString(material); + JSONAssert.assertEquals( + expected, + actual, + new CustomComparator( + JSONCompareMode.LENIENT, new Customization("refresh_datetime", (a, b) -> true))); + } + + @Test + void updatesMaterialData() throws JsonProcessingException, JSONException { + // Insert material data + ObservationMaterial material = new ObservationMaterial(); + material.setActUid(3L); + material.setTypeCd("SPC"); + material.setMaterialId(3L); + material.setSubjectClassCd("MAT"); + material.setRecordStatus("ACTIVE"); + material.setTypeDescTxt("Specimen"); + material.setLastChgTime("2024-01-01T00:00:00.000"); + material.setMaterialCd("UNK"); + material.setMaterialNm("name"); + material.setMaterialDetails("Details"); + material.setMaterialCollectionVol("36"); + material.setMaterialCollectionVolUnit("ML"); + material.setMaterialDesc("Lymphocytes"); + material.setRiskCd("rsk"); + material.setRiskDescTxt("rskDesc"); + + writer.persistMaterials(List.of(material)); + + // Upsert material data + material.setTypeCd("CPS"); + material.setSubjectClassCd("TAM"); + material.setRecordStatus("INACTIVE"); + material.setTypeDescTxt("NEW"); + material.setLastChgTime("2025-01-01T00:00:00.000"); + material.setMaterialCd("ST"); + material.setMaterialNm("NEW_NAME"); + material.setMaterialDetails("Updated Details"); + material.setMaterialCollectionVol("34"); + material.setMaterialCollectionVolUnit("LM"); + material.setMaterialDesc("Something"); + material.setRiskCd("UpdatedRisk"); + material.setRiskDescTxt("NewRiskDesc"); + writer.persistMaterials(List.of(material)); + + // Verify data is as expected + Integer count = + client + .sql( + "SELECT COUNT(*) FROM nrt_observation_material WHERE act_uid = 3 AND material_id = 3") + .query(Integer.class) + .single(); + + assertThat(count).isEqualTo(1); + + Map data = + client + .sql("SELECT * FROM nrt_observation_material WHERE act_uid = 3 AND material_id = 3") + .query() + .singleRow(); + + String actual = mapper.writeValueAsString(data); + String expected = mapper.writeValueAsString(material); + JSONAssert.assertEquals( + expected, + actual, + new CustomComparator( + JSONCompareMode.LENIENT, new Customization("refresh_datetime", (a, b) -> true))); + } + + @Test + void insertsCodedEntry() throws JSONException, JsonProcessingException { + // Insert coded data + ObservationCoded coded = new ObservationCoded(); + coded.setObservationUid(4L); + coded.setBatchId(0L); + coded.setOvcCode("CE04"); + coded.setOvcCodeSystemCd("SNM"); + coded.setOvcCodeSystemDescTxt("SNOMED"); + coded.setOvcDisplayName("Normal]"); + coded.setOvcAltCd("A-124"); + coded.setOvcAltCdDescTxt("NORMAL"); + + writer.persistCoded(List.of(coded)); + + // Verify data is as expected + Map data = + client + .sql( + "SELECT * FROM nrt_observation_coded WHERE observation_uid = 4 AND ovc_code = 'CE04'") + .query() + .singleRow(); + + String actual = mapper.writeValueAsString(data); + String expected = mapper.writeValueAsString(coded); + JSONAssert.assertEquals( + expected, + actual, + new CustomComparator( + JSONCompareMode.LENIENT, new Customization("refresh_datetime", (a, b) -> true))); + } + + @Test + void updatesCodedEntry() throws JSONException, JsonProcessingException { + // Insert coded data + ObservationCoded coded = new ObservationCoded(); + coded.setObservationUid(5L); + coded.setBatchId(0L); + coded.setOvcCode("CE05"); + coded.setOvcCodeSystemCd("SNM"); + coded.setOvcCodeSystemDescTxt("SNOMED"); + coded.setOvcDisplayName("Normal]"); + coded.setOvcAltCd("A-124"); + coded.setOvcAltCdDescTxt("NORMAL"); + + writer.persistCoded(List.of(coded)); + + // Upsert coded data + coded.setBatchId(1L); + coded.setOvcCodeSystemCd("MNS"); + coded.setOvcCodeSystemDescTxt("LOINC"); + coded.setOvcDisplayName("NotNormal"); + coded.setOvcAltCd("D-444"); + coded.setOvcAltCdDescTxt("ABOVE"); + + writer.persistCoded(List.of(coded)); + + // Verify data is as expected + Integer count = + client + .sql( + "SELECT COUNT(*) FROM nrt_observation_coded WHERE observation_uid = 5 AND ovc_code = 'CE05'") + .query(Integer.class) + .single(); + + assertThat(count).isEqualTo(1); + + Map data = + client + .sql( + "SELECT * FROM nrt_observation_coded WHERE observation_uid = 5 AND ovc_code = 'CE05'") + .query() + .singleRow(); + + String actual = mapper.writeValueAsString(data); + String expected = mapper.writeValueAsString(coded); + JSONAssert.assertEquals( + expected, + actual, + new CustomComparator( + JSONCompareMode.LENIENT, new Customization("refresh_datetime", (a, b) -> true))); + } + + @Test + void insertDateEntry() throws JsonProcessingException, JSONException { + // Insert Date data + ObservationDate date = new ObservationDate(); + date.setObservationUid(6L); + date.setBatchId(0L); + date.setOvdFromDate("2024-08-16T00:00:00.000"); + date.setOvdSeq(1); + + writer.persistDate(List.of(date)); + + // Verify data is as expected + Integer count = + client + .sql("SELECT COUNT(*) FROM nrt_observation_date WHERE observation_uid = 6") + .query(Integer.class) + .single(); + + assertThat(count).isEqualTo(1); + + Map data = + client + .sql("SELECT * FROM nrt_observation_date WHERE observation_uid = 6") + .query() + .singleRow(); + + String actual = mapper.writeValueAsString(data); + String expected = mapper.writeValueAsString(date); + JSONAssert.assertEquals( + expected, + actual, + new CustomComparator( + JSONCompareMode.LENIENT, new Customization("refresh_datetime", (a, b) -> true))); + } + + @Test + void updatesDateEntry() throws JsonProcessingException, JSONException { + // Insert Date data + ObservationDate date = new ObservationDate(); + date.setObservationUid(7L); + date.setBatchId(0L); + date.setOvdFromDate("2024-08-16T00:00:00.000"); + date.setOvdSeq(1); + + writer.persistDate(List.of(date)); + + // Upsert date data + date.setBatchId(2L); + date.setOvdFromDate("2025-08-16T00:00:00.000"); + date.setOvdSeq(2); + + writer.persistDate(List.of(date)); + + // Verify data is as expected + Map data = + client + .sql("SELECT * FROM nrt_observation_date WHERE observation_uid = 7") + .query() + .singleRow(); + + String actual = mapper.writeValueAsString(data); + String expected = mapper.writeValueAsString(date); + JSONAssert.assertEquals( + expected, + actual, + new CustomComparator( + JSONCompareMode.LENIENT, new Customization("refresh_datetime", (a, b) -> true))); + } + + @Test + void insertEdx() throws JsonProcessingException, JSONException { + // Insert edx data + ObservationEdx edx = new ObservationEdx(); + edx.setEdxActUid(8l); + edx.setEdxDocumentUid(9L); + edx.setEdxAddTime("2024-09-30T21:08:19.017"); + + writer.persistEdx(List.of(edx)); + + // Verify data is as expected + Map data = + client + .sql("SELECT * FROM nrt_observation_edx WHERE edx_act_uid = 8 AND edx_document_uid = 9") + .query() + .singleRow(); + + String actual = mapper.writeValueAsString(data); + String expected = mapper.writeValueAsString(edx); + JSONAssert.assertEquals( + expected, + actual, + new CustomComparator( + JSONCompareMode.LENIENT, new Customization("refresh_datetime", (a, b) -> true))); + } + + @Test + void updatesEdx() throws JsonProcessingException, JSONException { + // Insert edx data + ObservationEdx edx = new ObservationEdx(); + edx.setEdxActUid(9l); + edx.setEdxDocumentUid(10L); + edx.setEdxAddTime("2024-09-30T21:08:19.017"); + + writer.persistEdx(List.of(edx)); + + // Upsert edx data + edx.setEdxAddTime("2025-10-02T20:04:09.000"); + + writer.persistEdx(List.of(edx)); + + // Verify data is as expected + Integer count = + client + .sql( + "SELECT COUNT(*) FROM nrt_observation_edx WHERE edx_act_uid = 9 AND edx_document_uid = 10") + .query(Integer.class) + .single(); + + assertThat(count).isEqualTo(1); + + Map data = + client + .sql( + "SELECT * FROM nrt_observation_edx WHERE edx_act_uid = 9 AND edx_document_uid = 10") + .query() + .singleRow(); + + String actual = mapper.writeValueAsString(data); + String expected = mapper.writeValueAsString(edx); + JSONAssert.assertEquals( + expected, + actual, + new CustomComparator( + JSONCompareMode.LENIENT, new Customization("refresh_datetime", (a, b) -> true))); + } + + @Test + void insertNumeric() { + // Insert numeric data + ObservationNumeric numeric = new ObservationNumeric(); + numeric.setObservationUid(10L); + numeric.setOvnSeq(1); + numeric.setBatchId(0l); + numeric.setOvnComparatorCd1("100"); + numeric.setOvnLowRange("10-100"); + numeric.setOvnHighRange("100-1000"); + numeric.setOvnNumericValue1("23.10000"); + numeric.setOvnNumericValue2("1.00000"); + numeric.setOvnNumericUnitCd("mL"); + numeric.setOvnSeparatorCd(":"); + + writer.persistNumeric(List.of(numeric)); + + // Verify data is as expected + Map data = + client + .sql("SELECT * FROM nrt_observation_numeric WHERE observation_uid = 10 AND ovn_seq = 1") + .query() + .singleRow(); + + // Field comparison due to type mismatch from String to numeric(15,5) of numeric value fields + assertThat(data) + .containsEntry("observation_uid", numeric.getObservationUid()) + .containsEntry("ovn_high_range", numeric.getOvnHighRange()) + .containsEntry("ovn_low_range", numeric.getOvnLowRange()) + .containsEntry("ovn_comparator_cd_1", numeric.getOvnComparatorCd1()) + .containsEntry("ovn_numeric_unit_cd", numeric.getOvnNumericUnitCd()) + .containsEntry("ovn_separator_cd", numeric.getOvnSeparatorCd()) + .containsEntry("batch_id", numeric.getBatchId()); + + assertThat(data.get("ovn_seq")).hasToString(numeric.getOvnSeq().toString()); + assertThat(data.get("ovn_numeric_value_1")).hasToString(numeric.getOvnNumericValue1()); + assertThat(data.get("ovn_numeric_value_2")).hasToString(numeric.getOvnNumericValue2()); + } + + @Test + void updatesNumeric() { + // Insert numeric data + ObservationNumeric numeric = new ObservationNumeric(); + numeric.setObservationUid(11L); + numeric.setOvnSeq(2); + numeric.setBatchId(0l); + numeric.setOvnComparatorCd1("100"); + numeric.setOvnLowRange("10-100"); + numeric.setOvnHighRange("100-1000"); + numeric.setOvnNumericValue1("23"); + numeric.setOvnNumericValue2("1.0"); + numeric.setOvnNumericUnitCd("mL"); + numeric.setOvnSeparatorCd(":"); + + writer.persistNumeric(List.of(numeric)); + + // Upsert numeric data + numeric.setBatchId(1l); + numeric.setOvnComparatorCd1("200"); + numeric.setOvnLowRange("20-200"); + numeric.setOvnHighRange("200-2000"); + numeric.setOvnNumericValue1("34.00000"); + numeric.setOvnNumericValue2("2.00000"); + numeric.setOvnNumericUnitCd("LM"); + numeric.setOvnSeparatorCd("!"); + + writer.persistNumeric(List.of(numeric)); + + // Verify data is as expected + Integer count = + client + .sql( + "SELECT COUNT(*) FROM nrt_observation_numeric WHERE observation_uid = 11 AND ovn_seq = 2") + .query(Integer.class) + .single(); + + assertThat(count).isEqualTo(1); + + Map data = + client + .sql("SELECT * FROM nrt_observation_numeric WHERE observation_uid = 11 AND ovn_seq = 2") + .query() + .singleRow(); + + assertThat(data) + .containsEntry("observation_uid", numeric.getObservationUid()) + .containsEntry("ovn_high_range", numeric.getOvnHighRange()) + .containsEntry("ovn_low_range", numeric.getOvnLowRange()) + .containsEntry("ovn_comparator_cd_1", numeric.getOvnComparatorCd1()) + .containsEntry("ovn_numeric_unit_cd", numeric.getOvnNumericUnitCd()) + .containsEntry("ovn_separator_cd", numeric.getOvnSeparatorCd()) + .containsEntry("batch_id", numeric.getBatchId()); + + assertThat(data.get("ovn_seq")).hasToString(numeric.getOvnSeq().toString()); + assertThat(data.get("ovn_numeric_value_1")).hasToString(numeric.getOvnNumericValue1()); + assertThat(data.get("ovn_numeric_value_2")).hasToString(numeric.getOvnNumericValue2()); + } + + @Test + void insertsReason() throws JsonProcessingException, JSONException { + // Insert reason data + ObservationReason reason = new ObservationReason(); + reason.setObservationUid(12l); + reason.setReasonCd("80008"); + reason.setReasonDescTxt("PRESENCE OF REASON"); + reason.setBatchId(12l); + + writer.persistReason(List.of(reason)); + + // Verify data is as expected + Map data = + client + .sql( + "SELECT * FROM nrt_observation_reason WHERE observation_uid = 12 AND reason_cd = '80008'") + .query() + .singleRow(); + + String actual = mapper.writeValueAsString(data); + String expected = mapper.writeValueAsString(reason); + JSONAssert.assertEquals( + expected, + actual, + new CustomComparator( + JSONCompareMode.LENIENT, new Customization("refresh_datetime", (a, b) -> true))); + } + + @Test + void updatesReason() throws JsonProcessingException, JSONException { + // Insert reason data + ObservationReason reason = new ObservationReason(); + reason.setObservationUid(13l); + reason.setReasonCd("9009"); + reason.setReasonDescTxt("PRESENCE OF REASON"); + reason.setBatchId(13l); + + writer.persistReason(List.of(reason)); + + // Upsert reason data + reason.setReasonDescTxt("A DIFFERENT REASON"); + reason.setBatchId(14l); + + writer.persistReason(List.of(reason)); + + // Verify data is as expected + Integer count = + client + .sql( + "SELECT COUNT(*) FROM nrt_observation_reason WHERE observation_uid = 13 AND reason_cd = '9009'") + .query(Integer.class) + .single(); + + assertThat(count).isEqualTo(1); + + Map data = + client + .sql( + "SELECT * FROM nrt_observation_reason WHERE observation_uid = 13 AND reason_cd = '9009'") + .query() + .singleRow(); + + String actual = mapper.writeValueAsString(data); + String expected = mapper.writeValueAsString(reason); + JSONAssert.assertEquals( + expected, + actual, + new CustomComparator( + JSONCompareMode.LENIENT, new Customization("refresh_datetime", (a, b) -> true))); + } + + @Test + void insertsText() throws JsonProcessingException, JSONException { + // Insert text data + ObservationTxt txt = new ObservationTxt(); + txt.setObservationUid(14L); + txt.setOvtSeq(1); + txt.setBatchId(14L); + txt.setOvtTxtTypeCd("N"); + txt.setOvtValueTxt("RECOMMENDED IN SUCH INSTANCES."); + + writer.persistText(List.of(txt)); + + // Verify data is as expected + Map data = + client + .sql("SELECT * FROM nrt_observation_txt WHERE observation_uid = 14 AND ovt_seq = 1") + .query() + .singleRow(); + + String actual = mapper.writeValueAsString(data); + String expected = mapper.writeValueAsString(txt); + JSONAssert.assertEquals( + expected, + actual, + new CustomComparator( + JSONCompareMode.LENIENT, new Customization("refresh_datetime", (a, b) -> true))); + } + + @Test + void updatesText() throws JsonProcessingException, JSONException { + // Insert text data + ObservationTxt txt = new ObservationTxt(); + txt.setObservationUid(15L); + txt.setOvtSeq(2); + txt.setBatchId(15L); + txt.setOvtTxtTypeCd("N"); + txt.setOvtValueTxt("RECOMMENDED IN SUCH INSTANCES."); + + writer.persistText(List.of(txt)); + + // Update text data + txt.setBatchId(16L); + txt.setOvtTxtTypeCd("J"); + txt.setOvtValueTxt("UPDATED VALUE"); + + writer.persistText(List.of(txt)); + + // Verify data is as expected + Integer count = + client + .sql( + "SELECT COUNT(*) FROM nrt_observation_txt WHERE observation_uid = 15 AND ovt_seq = 2") + .query(Integer.class) + .single(); + + assertThat(count).isEqualTo(1); + + Map data = + client + .sql("SELECT * FROM nrt_observation_txt WHERE observation_uid = 15 AND ovt_seq = 2") + .query() + .singleRow(); + + String actual = mapper.writeValueAsString(data); + String expected = mapper.writeValueAsString(txt); + JSONAssert.assertEquals( + expected, + actual, + new CustomComparator( + JSONCompareMode.LENIENT, new Customization("refresh_datetime", (a, b) -> true))); + } +} diff --git a/reporting-pipeline-service/src/test/java/gov/cdc/nbs/report/pipeline/observation/service/ObservationProcessorTest.java b/reporting-pipeline-service/src/test/java/gov/cdc/nbs/report/pipeline/observation/service/ObservationProcessorTest.java new file mode 100644 index 000000000..a5cb952a1 --- /dev/null +++ b/reporting-pipeline-service/src/test/java/gov/cdc/nbs/report/pipeline/observation/service/ObservationProcessorTest.java @@ -0,0 +1,166 @@ +package gov.cdc.nbs.report.pipeline.observation.service; + +import static gov.cdc.etldatapipeline.commonutil.TestUtils.readFileData; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import gov.cdc.etldatapipeline.commonutil.NoDataException; +import gov.cdc.etldatapipeline.commonutil.metrics.CustomMetrics; +import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.Observation; +import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationKey; +import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationReporting; +import gov.cdc.nbs.report.pipeline.observation.repository.NrtObservationWriter; +import gov.cdc.nbs.report.pipeline.observation.repository.ObservationRepository; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import java.util.Optional; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.kafka.core.KafkaTemplate; + +@ExtendWith(MockitoExtension.class) +class ObservationProcessorTest { + + @Spy private CustomMetrics metrics = new CustomMetrics(new SimpleMeterRegistry()); + @Mock private ObservationRepository repository; + @Mock private NrtObservationWriter writer; + @Mock private KafkaTemplate kafkaTemplate; + @Captor private ArgumentCaptor keyCaptor; + @Captor private ArgumentCaptor messageCaptor; + private final ObjectMapper mapper = new ObjectMapper(); + private final String nrtObservationTopic = "nrt_observation"; + + private ObservationProcessor processor; + + @BeforeEach + void init() { + processor = + new ObservationProcessor(metrics, repository, kafkaTemplate, nrtObservationTopic, writer); + } + + @Test + void successfullyProcessesObservation() throws JsonProcessingException { + // Mock database response + Observation observation = constructObservation(123L, "Order"); + when(repository.computeObservations("123")).thenReturn(Optional.of(observation)); + + // Act + processor.process(1l, "123"); + + // Verify + verify(repository, times(1)).computeObservations("123"); + verify(kafkaTemplate, times(1)) + .send(eq("nrt_observation"), keyCaptor.capture(), messageCaptor.capture()); + + ObservationKey expectedKey = new ObservationKey(observation.getObservationUid()); + ObservationKey actualKey = + mapper.readValue( + mapper.readTree(keyCaptor.getValue()).path("payload").toString(), ObservationKey.class); + assertThat(actualKey).isEqualTo(expectedKey); + + var reportingModel = + constructObservationReporting( + observation.getObservationUid(), observation.getObsDomainCdSt1()); + + var actualReporting = + mapper.readValue( + mapper.readTree(messageCaptor.getValue()).path("payload").toString(), + ObservationReporting.class); + + assertThat(reportingModel).isEqualTo(actualReporting); + } + + @Test + void verifyThrowsExceptionWhenNoDataFound() { + // Mock database response + when(repository.computeObservations("123")).thenReturn(Optional.empty()); + + // Act + Verify + NoDataException ex = assertThrows(NoDataException.class, () -> processor.process(1l, "123")); + assertThat(ex.getMessage()).isEqualTo("Unable to find Observation with id: 123"); + } + + private Observation constructObservation(Long observationUid, String obsDomainCdSt1) { + String filePathPrefix = "rawDataFiles/observation/"; + Observation observation = new Observation(); + observation.setObservationUid(observationUid); + observation.setActUid(observationUid); + observation.setClassCd("OBS"); + observation.setMoodCd("ENV"); + observation.setLocalId("OBS10003388MA01"); + observation.setActivityFromTime("2021-01-28 16:06:03.000"); + observation.setObsDomainCdSt1(obsDomainCdSt1); + observation.setPersonParticipations(readFileData(filePathPrefix + "PersonParticipations.json")); + observation.setOrganizationParticipations( + readFileData(filePathPrefix + "OrganizationParticipations.json")); + observation.setMaterialParticipations( + readFileData(filePathPrefix + "MaterialParticipations.json")); + observation.setFollowupObservations(readFileData(filePathPrefix + "FollowupObservations.json")); + observation.setParentObservations(readFileData(filePathPrefix + "ParentObservations.json")); + observation.setActIds(readFileData(filePathPrefix + "ActIds.json")); + return observation; + } + + private ObservationReporting constructObservationReporting( + Long observationUid, String obsDomainCdSt1) { + ObservationReporting observation = new ObservationReporting(); + observation.setBatchId(1l); + observation.setObservationUid(observationUid); + observation.setObsDomainCdSt1(obsDomainCdSt1); + observation.setActUid(observationUid); + observation.setClassCd("OBS"); + observation.setMoodCd("ENV"); + observation.setLocalId("OBS10003388MA01"); + observation.setOrderingPersonId("10000055"); + observation.setPatientId(10000066L); + observation.setPerformingOrganizationId(null); // not null when obsDomainCdSt1=Result + observation.setAuthorOrganizationId(34567890L); // null when obsDomainCdSt1=Result + observation.setOrderingOrganizationId(23456789L); // null when obsDomainCdSt1=Result + observation.setHealthCareId(56789012L); // null when obsDomainCdSt1=Result + observation.setMorbHospReporterId(67890123L); // null when obsDomainCdSt1=Result + observation.setMorbHospId(78901234L); // null when obsDomainCdSt1=Result + observation.setMaterialId(10000005L); + observation.setResultObservationUid("56789012,56789013"); + observation.setFollowupObservationUid("56789014,56789015"); + observation.setReportObservationUid(123456788L); + observation.setReportRefrUid(123456790L); + observation.setReportSprtUid(123456788L); + + observation.setAssistantInterpreterId(10000077L); + observation.setAssistantInterpreterVal("22582"); + observation.setAssistantInterpreterFirstNm("Cara"); + observation.setAssistantInterpreterLastNm("Dune"); + observation.setAssistantInterpreterIdAssignAuth("22D7377772"); + observation.setAssistantInterpreterAuthType("Employee number"); + + observation.setTranscriptionistId(10000088L); + observation.setTranscriptionistVal("34344355455144"); + observation.setTranscriptionistFirstNm("Moff"); + observation.setTranscriptionistLastNm("Gideon"); + observation.setTranscriptionistIdAssignAuth("18D8181818"); + observation.setTranscriptionistAuthType("Employee number"); + + observation.setResultInterpreterId(10000022L); + observation.setLabTestTechnicianId(10000011L); + + observation.setSpecimenCollectorId(10000033L); + observation.setCopyToProviderId(10000044L); + observation.setAccessionNumber("20120601114"); + observation.setActivityFromTime("2021-01-28 16:06:03.000"); + observation.setDeviceInstanceId1("No Equipment"); + observation.setDeviceInstanceId2("NEW TOOLS"); + + return observation; + } +} diff --git a/reporting-pipeline-service/src/test/java/gov/cdc/nbs/report/pipeline/observation/service/ObservationServiceTest.java b/reporting-pipeline-service/src/test/java/gov/cdc/nbs/report/pipeline/observation/service/ObservationServiceTest.java deleted file mode 100644 index 356e65446..000000000 --- a/reporting-pipeline-service/src/test/java/gov/cdc/nbs/report/pipeline/observation/service/ObservationServiceTest.java +++ /dev/null @@ -1,309 +0,0 @@ -package gov.cdc.nbs.report.pipeline.observation.service; - -import static gov.cdc.etldatapipeline.commonutil.TestUtils.readFileData; -import static gov.cdc.nbs.report.pipeline.observation.service.ObservationService.toBatchId; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.Mockito.*; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import gov.cdc.etldatapipeline.commonutil.NoDataException; -import gov.cdc.etldatapipeline.commonutil.metrics.CustomMetrics; -import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.Observation; -import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationKey; -import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationReporting; -import gov.cdc.nbs.report.pipeline.observation.repository.ObservationRepository; -import gov.cdc.nbs.report.pipeline.observation.transformer.ProcessObservationDataUtil; -import io.micrometer.core.instrument.simple.SimpleMeterRegistry; -import java.util.NoSuchElementException; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.TimeUnit; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.awaitility.Awaitility; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.CsvSource; -import org.mockito.*; -import org.springframework.kafka.core.KafkaTemplate; - -class ObservationServiceTest { - - @InjectMocks private ObservationService observationService; - - @Mock private ObservationRepository observationRepository; - - @Mock private KafkaTemplate kafkaTemplate; - - @Captor private ArgumentCaptor topicCaptor; - - @Captor private ArgumentCaptor keyCaptor; - - @Captor private ArgumentCaptor messageCaptor; - - private final ObjectMapper objectMapper = new ObjectMapper(); - private AutoCloseable closeable; - - private final String inputTopicNameObservation = "Observation"; - private final String outputTopicNameObservation = "ObservationOutput"; - - private final String inputTopicNameActRelationship = "Act_relationship"; - - @BeforeEach - void setUp() { - closeable = MockitoAnnotations.openMocks(this); - ProcessObservationDataUtil transformer = new ProcessObservationDataUtil(kafkaTemplate); - transformer.setMaterialTopicName("materialTopic"); - observationService = - new ObservationService( - observationRepository, - kafkaTemplate, - transformer, - new CustomMetrics(new SimpleMeterRegistry())); - observationService.setObservationTopic(inputTopicNameObservation); - observationService.setActRelationshipTopic(inputTopicNameActRelationship); - observationService.setObservationTopicOutputReporting(outputTopicNameObservation); - observationService.setThreadPoolSize(1); - observationService.initMetrics(); - - transformer.setCodedTopicName("ObservationCoded"); - transformer.setReasonTopicName("ObservationReason"); - transformer.setTxtTopicName("ObservationTxt"); - } - - @AfterEach - void closeService() throws Exception { - closeable.close(); - } - - @Test - void testProcessMessage() throws JsonProcessingException { - // Mocked input data - Long observationUid = 123456789L; - String obsDomainCdSt = "Order"; - String payload = - "{\"payload\": {\"after\": {\"observation_uid\": \"" + observationUid + "\"}}}"; - - Observation observation = constructObservation(observationUid, obsDomainCdSt); - when(observationRepository.computeObservations(String.valueOf(observationUid))) - .thenReturn(Optional.of(observation)); - when(kafkaTemplate.send(anyString(), anyString(), anyString())) - .thenReturn(CompletableFuture.completedFuture(null)); - when(kafkaTemplate.send(anyString(), anyString(), isNull())) - .thenReturn(CompletableFuture.completedFuture(null)); - - validateData(payload, observation, inputTopicNameObservation); - - verify(observationRepository).computeObservations(String.valueOf(observationUid)); - } - - @ParameterizedTest - @CsvSource({ - "d,LabReport,OBS", - "d,LabReport,OTHER", - "c,LabReport,OBS", - "c,LabReport,OTHER", - "d,OTHER,OBS", - "d,OTHER,OTHER" - }) - void testProcessActRelationship(String op, String typeCd, String targetClassCd) - throws JsonProcessingException { - Long sourceActUid = 123456789L; - String obsDomainCdSt = "Order"; - String payload = - "{\"payload\": {\"before\": {\"source_act_uid\": \"" - + sourceActUid - + "\", \"type_cd\": \"" - + typeCd - + "\", \"target_class_cd\": \"" - + targetClassCd - + "\"}," - + "\"after\": {\"source_act_uid\": \"123\"}," - + "\"op\": \"" - + op - + "\"}}"; - - if (typeCd.equals("OTHER") || !op.equals("d") || targetClassCd.equals("OTHER")) { - ConsumerRecord rec = getRecord(payload, inputTopicNameActRelationship); - - observationService.processMessage(rec); - verify(kafkaTemplate, never()).send(anyString(), anyString(), anyString()); - } else { - Observation observation = constructObservation(sourceActUid, obsDomainCdSt); - when(observationRepository.computeObservations(String.valueOf(sourceActUid))) - .thenReturn(Optional.of(observation)); - when(kafkaTemplate.send(anyString(), anyString(), anyString())) - .thenReturn(CompletableFuture.completedFuture(null)); - when(kafkaTemplate.send(anyString(), anyString(), isNull())) - .thenReturn(CompletableFuture.completedFuture(null)); - - validateData(payload, observation, inputTopicNameActRelationship); - - verify(observationRepository).computeObservations(String.valueOf(sourceActUid)); - } - } - - @Test - void testProcessActRelationshipNullPayload() { - ConsumerRecord rec = getRecord(null, inputTopicNameActRelationship); - - observationService.processMessage(rec); - - verify(kafkaTemplate, never()).send(anyString(), anyString(), anyString()); - } - - @Test - void testProcessMessageUnknownTopic() { - ConsumerRecord rec = getRecord(null, "dummyTopicName"); - - observationService.processMessage(rec); - - verify(kafkaTemplate, never()).send(anyString(), anyString(), anyString()); - } - - @ParameterizedTest - @CsvSource({ - "{\"payload\": {\"after\": {}}},Error", - "{\"payload\": {\"after\": {}}},Observation", - "{\"payload\": {\"after\": {\"source_act_uid\": \"123\"}, \"op\": \"d\"}}}, Act_relationship" - }) - void testProcessMessageException(String payload, String topic) { - - ConsumerRecord rec = getRecord(payload, topic); - - CompletableFuture future = observationService.processMessage(rec); - CompletionException ex = assertThrows(CompletionException.class, future::join); - assertEquals(NoSuchElementException.class, ex.getCause().getCause().getClass()); - } - - @Test - void testProcessMessageNoDataException() { - Long observationUid = 123456789L; - String payload = - "{\"payload\": {\"after\": {\"observation_uid\": \"" + observationUid + "\"}}}"; - ConsumerRecord rec = getRecord(payload, inputTopicNameObservation); - - when(observationRepository.computeObservations(String.valueOf(observationUid))) - .thenReturn(Optional.empty()); - CompletableFuture future = observationService.processMessage(rec); - CompletionException ex = assertThrows(CompletionException.class, future::join); - assertEquals(NoDataException.class, ex.getCause().getClass()); - } - - private void validateData(String payload, Observation observation, String inputTopic) - throws JsonProcessingException { - ConsumerRecord rec = getRecord(payload, inputTopic); - observationService.processMessage(rec); - - ObservationKey observationKey = new ObservationKey(); - observationKey.setObservationUid(observation.getObservationUid()); - - var reportingModel = - constructObservationReporting( - observation.getObservationUid(), observation.getObsDomainCdSt1()); - reportingModel.setBatchId(toBatchId.applyAsLong(rec)); - - Awaitility.await() - .atMost(1, TimeUnit.SECONDS) - .untilAsserted( - () -> - verify(kafkaTemplate, times(2)) - .send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture())); - String actualTopic = topicCaptor.getValue(); - String actualKey = keyCaptor.getValue(); - String actualValue = messageCaptor.getValue(); - - var actualReporting = - objectMapper.readValue( - objectMapper.readTree(actualValue).path("payload").toString(), - ObservationReporting.class); - - var actualObservationKey = - objectMapper.readValue( - objectMapper.readTree(actualKey).path("payload").toString(), ObservationKey.class); - - assertEquals(outputTopicNameObservation, actualTopic); - assertEquals(observationKey, actualObservationKey); - assertEquals(reportingModel, actualReporting); - } - - private Observation constructObservation(Long observationUid, String obsDomainCdSt1) { - String filePathPrefix = "rawDataFiles/observation/"; - Observation observation = new Observation(); - observation.setObservationUid(observationUid); - observation.setActUid(observationUid); - observation.setClassCd("OBS"); - observation.setMoodCd("ENV"); - observation.setLocalId("OBS10003388MA01"); - observation.setActivityFromTime("2021-01-28 16:06:03.000"); - observation.setObsDomainCdSt1(obsDomainCdSt1); - observation.setPersonParticipations(readFileData(filePathPrefix + "PersonParticipations.json")); - observation.setOrganizationParticipations( - readFileData(filePathPrefix + "OrganizationParticipations.json")); - observation.setMaterialParticipations( - readFileData(filePathPrefix + "MaterialParticipations.json")); - observation.setFollowupObservations(readFileData(filePathPrefix + "FollowupObservations.json")); - observation.setParentObservations(readFileData(filePathPrefix + "ParentObservations.json")); - observation.setActIds(readFileData(filePathPrefix + "ActIds.json")); - return observation; - } - - private ObservationReporting constructObservationReporting( - Long observationUid, String obsDomainCdSt1) { - ObservationReporting observation = new ObservationReporting(); - observation.setObservationUid(observationUid); - observation.setObsDomainCdSt1(obsDomainCdSt1); - observation.setActUid(observationUid); - observation.setClassCd("OBS"); - observation.setMoodCd("ENV"); - observation.setLocalId("OBS10003388MA01"); - observation.setOrderingPersonId("10000055"); - observation.setPatientId(10000066L); - observation.setPerformingOrganizationId(null); // not null when obsDomainCdSt1=Result - observation.setAuthorOrganizationId(34567890L); // null when obsDomainCdSt1=Result - observation.setOrderingOrganizationId(23456789L); // null when obsDomainCdSt1=Result - observation.setHealthCareId(56789012L); // null when obsDomainCdSt1=Result - observation.setMorbHospReporterId(67890123L); // null when obsDomainCdSt1=Result - observation.setMorbHospId(78901234L); // null when obsDomainCdSt1=Result - observation.setMaterialId(10000005L); - observation.setResultObservationUid("56789012,56789013"); - observation.setFollowupObservationUid("56789014,56789015"); - observation.setReportObservationUid(123456788L); - observation.setReportRefrUid(123456790L); - observation.setReportSprtUid(123456788L); - - observation.setAssistantInterpreterId(10000077L); - observation.setAssistantInterpreterVal("22582"); - observation.setAssistantInterpreterFirstNm("Cara"); - observation.setAssistantInterpreterLastNm("Dune"); - observation.setAssistantInterpreterIdAssignAuth("22D7377772"); - observation.setAssistantInterpreterAuthType("Employee number"); - - observation.setTranscriptionistId(10000088L); - observation.setTranscriptionistVal("34344355455144"); - observation.setTranscriptionistFirstNm("Moff"); - observation.setTranscriptionistLastNm("Gideon"); - observation.setTranscriptionistIdAssignAuth("18D8181818"); - observation.setTranscriptionistAuthType("Employee number"); - - observation.setResultInterpreterId(10000022L); - observation.setLabTestTechnicianId(10000011L); - - observation.setSpecimenCollectorId(10000033L); - observation.setCopyToProviderId(10000044L); - observation.setAccessionNumber("20120601114"); - observation.setActivityFromTime("2021-01-28 16:06:03.000"); - observation.setDeviceInstanceId1("No Equipment"); - observation.setDeviceInstanceId2("NEW TOOLS"); - - return observation; - } - - private ConsumerRecord getRecord(String payload, String inputTopic) { - return new ConsumerRecord<>(inputTopic, 0, 11L, null, payload); - } -} diff --git a/reporting-pipeline-service/src/test/java/gov/cdc/nbs/report/pipeline/observation/ObservationDataProcessTests.java b/reporting-pipeline-service/src/test/java/gov/cdc/nbs/report/pipeline/observation/transformer/ObservationParserTest.java similarity index 55% rename from reporting-pipeline-service/src/test/java/gov/cdc/nbs/report/pipeline/observation/ObservationDataProcessTests.java rename to reporting-pipeline-service/src/test/java/gov/cdc/nbs/report/pipeline/observation/transformer/ObservationParserTest.java index ec9a6f265..8aea0d29b 100644 --- a/reporting-pipeline-service/src/test/java/gov/cdc/nbs/report/pipeline/observation/ObservationDataProcessTests.java +++ b/reporting-pipeline-service/src/test/java/gov/cdc/nbs/report/pipeline/observation/transformer/ObservationParserTest.java @@ -1,21 +1,24 @@ -package gov.cdc.nbs.report.pipeline.observation; +package gov.cdc.nbs.report.pipeline.observation.transformer; import static gov.cdc.etldatapipeline.commonutil.TestUtils.readFileData; -import static org.junit.jupiter.api.Assertions.*; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import ch.qos.logback.classic.Logger; import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.read.ListAppender; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.*; import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.Observation; +import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationCoded; +import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationDate; +import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationEdx; +import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationMaterial; +import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationNumeric; +import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationReason; import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationTransformed; -import gov.cdc.nbs.report.pipeline.observation.transformer.ProcessObservationDataUtil; +import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ObservationTxt; +import gov.cdc.nbs.report.pipeline.observation.model.dto.observation.ParsedObservation; import java.util.List; -import java.util.concurrent.CompletableFuture; import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; @@ -23,68 +26,26 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; -import org.mockito.ArgumentCaptor; -import org.mockito.Captor; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; import org.slf4j.LoggerFactory; -import org.springframework.kafka.core.KafkaTemplate; import org.testcontainers.shaded.org.checkerframework.checker.nullness.qual.NonNull; -class ObservationDataProcessTests { - @Mock KafkaTemplate kafkaTemplate; - - @Captor private ArgumentCaptor topicCaptor; - - @Captor private ArgumentCaptor keyCaptor; - - @Captor private ArgumentCaptor messageCaptor; +class ObservationParserTest { private static final String FILE_PREFIX = "rawDataFiles/observation/"; - private static final String CODED_TOPIC = "codedTopic"; - private static final String DATE_TOPIC = "dateTopic"; - private static final String EDX_TOPIC = "edxTopic"; - private static final String MATERIAL_TOPIC = "materialTopic"; - private static final String NUMERIC_TOPIC = "numericTopic"; - private static final String REASON_TOPIC = "reasonTopic"; - private static final String TXT_TOPIC = "txtTopic"; - private static final Long BATCH_ID = 11L; - - ProcessObservationDataUtil transformer; - - private AutoCloseable closeable; private final ListAppender listAppender = new ListAppender<>(); - private final ObjectMapper objectMapper = new ObjectMapper(); @BeforeEach void setUp() { - closeable = MockitoAnnotations.openMocks(this); - transformer = new ProcessObservationDataUtil(kafkaTemplate); - - transformer.setCodedTopicName(CODED_TOPIC); - transformer.setEdxTopicName(EDX_TOPIC); - transformer.setDateTopicName(DATE_TOPIC); - transformer.setMaterialTopicName(MATERIAL_TOPIC); - transformer.setNumericTopicName(NUMERIC_TOPIC); - transformer.setReasonTopicName(REASON_TOPIC); - transformer.setTxtTopicName(TXT_TOPIC); - - Logger logger = (Logger) LoggerFactory.getLogger(ProcessObservationDataUtil.class); + Logger logger = (Logger) LoggerFactory.getLogger(ObservationParser.class); listAppender.start(); logger.addAppender(listAppender); - - when(kafkaTemplate.send(anyString(), anyString(), isNull())) - .thenReturn(CompletableFuture.completedFuture(null)); - when(kafkaTemplate.send(anyString(), anyString(), anyString())) - .thenReturn(CompletableFuture.completedFuture(null)); } @AfterEach - void tearDown() throws Exception { - Logger logger = (Logger) LoggerFactory.getLogger(ProcessObservationDataUtil.class); + void tearDown() { + Logger logger = (Logger) LoggerFactory.getLogger(ObservationParser.class); logger.detachAppender(listAppender); - closeable.close(); } @Test @@ -100,16 +61,15 @@ void consolidatedDataTransformationTest() { readFileData(FILE_PREFIX + "MaterialParticipations.json")); observation.setFollowupObservations(readFileData(FILE_PREFIX + "FollowupObservations.json")); - ObservationTransformed observationTransformed = - transformer.transformObservationData(observation, BATCH_ID); + ParsedObservation parsedObservation = ObservationParser.parse(observation, BATCH_ID); - Long patId = observationTransformed.getPatientId(); - String ordererId = observationTransformed.getOrderingPersonId(); - Long authorOrgId = observationTransformed.getAuthorOrganizationId(); - Long ordererOrgId = observationTransformed.getOrderingOrganizationId(); - Long performerOrgId = observationTransformed.getPerformingOrganizationId(); - Long materialId = observationTransformed.getMaterialId(); - String resultObsUid = observationTransformed.getResultObservationUid(); + Long patId = parsedObservation.transformed().getPatientId(); + String ordererId = parsedObservation.transformed().getOrderingPersonId(); + Long authorOrgId = parsedObservation.transformed().getAuthorOrganizationId(); + Long ordererOrgId = parsedObservation.transformed().getOrderingOrganizationId(); + Long performerOrgId = parsedObservation.transformed().getPerformingOrganizationId(); + Long materialId = parsedObservation.transformed().getMaterialId(); + String resultObsUid = parsedObservation.transformed().getResultObservationUid(); Assertions.assertEquals("10000055", ordererId); Assertions.assertEquals(10000066L, patId); @@ -129,9 +89,9 @@ void testPersonParticipationTransformation() { final var expected = getObservationTransformed(); observation.setPersonParticipations(readFileData(FILE_PREFIX + "PersonParticipations.json")); - ObservationTransformed observationTransformed = - transformer.transformObservationData(observation, BATCH_ID); - Assertions.assertEquals(expected, observationTransformed); + + ParsedObservation parsedObservation = ObservationParser.parse(observation, BATCH_ID); + Assertions.assertEquals(expected, parsedObservation.transformed()); } @Test @@ -151,9 +111,8 @@ void testMorbReportTransformation() { observation.setPersonParticipations( readFileData(FILE_PREFIX + "PersonParticipationsMorb.json")); - ObservationTransformed observationTransformed = - transformer.transformObservationData(observation, BATCH_ID); - Assertions.assertEquals(expected, observationTransformed); + ParsedObservation parsedObservation = ObservationParser.parse(observation, BATCH_ID); + Assertions.assertEquals(expected, parsedObservation.transformed()); } @Test @@ -165,11 +124,10 @@ void testOrganizationParticipationTransformation() { observation.setOrganizationParticipations( readFileData(FILE_PREFIX + "OrganizationParticipations.json")); - ObservationTransformed observationTransformed = - transformer.transformObservationData(observation, BATCH_ID); - Long authorOrgId = observationTransformed.getAuthorOrganizationId(); - Long ordererOrgId = observationTransformed.getOrderingOrganizationId(); - Long performerOrgId = observationTransformed.getPerformingOrganizationId(); + ParsedObservation parsedObservation = ObservationParser.parse(observation, BATCH_ID); + Long authorOrgId = parsedObservation.transformed().getAuthorOrganizationId(); + Long ordererOrgId = parsedObservation.transformed().getOrderingOrganizationId(); + Long performerOrgId = parsedObservation.transformed().getPerformingOrganizationId(); Assertions.assertNull(authorOrgId); Assertions.assertNull(ordererOrgId); @@ -177,7 +135,7 @@ void testOrganizationParticipationTransformation() { } @Test - void testObservationMaterialTransformation() throws JsonProcessingException { + void testObservationMaterialTransformation() { Observation observation = new Observation(); observation.setObservationUid(100000003L); observation.setObsDomainCdSt1("Order"); @@ -185,27 +143,11 @@ void testObservationMaterialTransformation() throws JsonProcessingException { readFileData(FILE_PREFIX + "MaterialParticipations.json")); ObservationMaterial material = constructObservationMaterial(100000003L); - ObservationTransformed observationTransformed = - transformer.transformObservationData(observation, BATCH_ID); - verify(kafkaTemplate).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()); - assertEquals(MATERIAL_TOPIC, topicCaptor.getValue()); - assertEquals(10000005L, observationTransformed.getMaterialId()); + ParsedObservation parsedObservation = ObservationParser.parse(observation, BATCH_ID); - List logs = listAppender.list; - assertTrue( - logs.get(2) - .getFormattedMessage() - .contains("Observation Material data (uid=10000005) sent to " + MATERIAL_TOPIC)); - - var actualMaterial = - objectMapper.readValue( - objectMapper - .readTree(messageCaptor.getAllValues().getFirst()) - .path("payload") - .toString(), - ObservationMaterial.class); - - assertEquals(material, actualMaterial); + assertEquals(10000005L, parsedObservation.transformed().getMaterialId()); + + assertEquals(material, parsedObservation.materialEntries().get(0)); } @ParameterizedTest @@ -217,15 +159,14 @@ void testParentObservationsTransformation(String domainCd) { "[{\"parent_type_cd\":\"MorbFrmQ\",\"parent_uid\":234567888,\"parent_domain_cd_st_1\":\"R_Order\"}]"); observation.setObsDomainCdSt1(domainCd); - ObservationTransformed observationTransformed = - transformer.transformObservationData(observation, BATCH_ID); - assertEquals(234567888L, observationTransformed.getReportObservationUid()); - assertNull(observationTransformed.getReportRefrUid()); - assertNull(observationTransformed.getReportSprtUid()); + ParsedObservation parsedObservation = ObservationParser.parse(observation, BATCH_ID); + assertEquals(234567888L, parsedObservation.transformed().getReportObservationUid()); + assertNull(parsedObservation.transformed().getReportRefrUid()); + assertNull(parsedObservation.transformed().getReportSprtUid()); } @Test - void testObservationCodedTransformation() throws JsonProcessingException { + void testObservationCodedTransformation() { Observation observation = new Observation(); observation.setObservationUid(10001234L); observation.setObsCode(readFileData(FILE_PREFIX + "ObservationCoded.json")); @@ -240,25 +181,13 @@ void testObservationCodedTransformation() throws JsonProcessingException { coded.setOvcAltCdDescTxt("NORMAL"); coded.setBatchId(BATCH_ID); - transformer.transformObservationData(observation, BATCH_ID); - verify(kafkaTemplate).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()); - assertEquals(CODED_TOPIC, topicCaptor.getValue()); - List logs = listAppender.list; - assertTrue( - logs.get(6) - .getFormattedMessage() - .contains("Observation Coded data (uid=10001234) sent to " + CODED_TOPIC)); - - var actualCoded = - objectMapper.readValue( - objectMapper.readTree(messageCaptor.getValue()).path("payload").toString(), - ObservationCoded.class); + ParsedObservation parsedObservation = ObservationParser.parse(observation, BATCH_ID); - assertEquals(coded, actualCoded); + assertEquals(coded, parsedObservation.codedEntries().get(0)); } @Test - void testObservationDateTransformation() throws JsonProcessingException { + void testObservationDateTransformation() { Observation observation = new Observation(); observation.setObservationUid(10001234L); observation.setObsDate(readFileData(FILE_PREFIX + "ObservationDate.json")); @@ -269,25 +198,13 @@ void testObservationDateTransformation() throws JsonProcessingException { obd.setOvdSeq(1); obd.setBatchId(BATCH_ID); - transformer.transformObservationData(observation, BATCH_ID); - verify(kafkaTemplate).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()); - assertEquals(DATE_TOPIC, topicCaptor.getValue()); - List logs = listAppender.list; - assertTrue( - logs.get(7) - .getFormattedMessage() - .contains("Observation Date data (uid=10001234) sent to " + DATE_TOPIC)); - - var actualObd = - objectMapper.readValue( - objectMapper.readTree(messageCaptor.getValue()).path("payload").toString(), - ObservationDate.class); + ParsedObservation parsedObservation = ObservationParser.parse(observation, BATCH_ID); - assertEquals(obd, actualObd); + assertEquals(obd, parsedObservation.dateEntries().get(0)); } @Test - void testObservationEdxTransformation() throws JsonProcessingException { + void testObservationEdxTransformation() { Observation observation = new Observation(); observation.setActUid(10001234L); observation.setObservationUid(10001234L); @@ -298,29 +215,13 @@ void testObservationEdxTransformation() throws JsonProcessingException { edx.setEdxActUid(observation.getActUid()); edx.setEdxAddTime("2024-09-30T21:08:19.017"); - transformer.transformObservationData(observation, BATCH_ID); - verify(kafkaTemplate, times(2)) - .send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()); - assertEquals(EDX_TOPIC, topicCaptor.getValue()); - List logs = listAppender.list; - assertTrue( - logs.get(8) - .getFormattedMessage() - .contains("Observation Edx data (edx doc uid=10101) sent to " + EDX_TOPIC)); - - var actualEdx = - objectMapper.readValue( - objectMapper - .readTree(messageCaptor.getAllValues().getFirst()) - .path("payload") - .toString(), - ObservationEdx.class); - - assertEquals(edx, actualEdx); + ParsedObservation parsedObservation = ObservationParser.parse(observation, BATCH_ID); + + assertEquals(edx, parsedObservation.edxEntries().get(0)); } @Test - void testObservationNumericTransformation() throws JsonProcessingException { + void testObservationNumericTransformation() { Observation observation = new Observation(); observation.setObservationUid(10001234L); observation.setObsNum(readFileData(FILE_PREFIX + "ObservationNumeric.json")); @@ -337,25 +238,13 @@ void testObservationNumericTransformation() throws JsonProcessingException { numeric.setOvnSeq(1); numeric.setBatchId(BATCH_ID); - transformer.transformObservationData(observation, BATCH_ID); - verify(kafkaTemplate).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()); - assertEquals(NUMERIC_TOPIC, topicCaptor.getValue()); - List logs = listAppender.list; - assertTrue( - logs.get(9) - .getFormattedMessage() - .contains("Observation Numeric data (uid=10001234) sent to " + NUMERIC_TOPIC)); + ParsedObservation parsedObservation = ObservationParser.parse(observation, BATCH_ID); - var actualNumeric = - objectMapper.readValue( - objectMapper.readTree(messageCaptor.getValue()).path("payload").toString(), - ObservationNumeric.class); - - assertEquals(numeric, actualNumeric); + assertEquals(numeric, parsedObservation.numericEntries().get(0)); } @Test - void testObservationReasonTransformation() throws JsonProcessingException { + void testObservationReasonTransformation() { Observation observation = new Observation(); observation.setObservationUid(10001234L); observation.setObsReason(readFileData(FILE_PREFIX + "ObservationReason.json")); @@ -366,25 +255,13 @@ void testObservationReasonTransformation() throws JsonProcessingException { reason.setReasonDescTxt("PRESENCE OF REASON"); reason.setBatchId(BATCH_ID); - transformer.transformObservationData(observation, BATCH_ID); - verify(kafkaTemplate).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()); - assertEquals(REASON_TOPIC, topicCaptor.getValue()); - List logs = listAppender.list; - assertTrue( - logs.get(10) - .getFormattedMessage() - .contains("Observation Reason data (uid=10001234) sent to " + REASON_TOPIC)); - - var actualReason = - objectMapper.readValue( - objectMapper.readTree(messageCaptor.getValue()).path("payload").toString(), - ObservationReason.class); + ParsedObservation parsedObservation = ObservationParser.parse(observation, BATCH_ID); - assertEquals(reason, actualReason); + assertEquals(reason, parsedObservation.reasonEntries().get(0)); } @Test - void testObservationTxtTransformation() throws JsonProcessingException { + void testObservationTxtTransformation() { Observation observation = new Observation(); observation.setObservationUid(10001234L); observation.setObsTxt(readFileData(FILE_PREFIX + "ObservationTxt.json")); @@ -396,25 +273,9 @@ void testObservationTxtTransformation() throws JsonProcessingException { txt.setOvtValueTxt("RECOMMENDED IN SUCH INSTANCES."); txt.setBatchId(BATCH_ID); - transformer.transformObservationData(observation, BATCH_ID); - verify(kafkaTemplate, times(2)) - .send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()); - assertEquals(TXT_TOPIC, topicCaptor.getValue()); - List logs = listAppender.list; - assertTrue( - logs.get(11) - .getFormattedMessage() - .contains("Observation Txt data (uid=10001234) sent to " + TXT_TOPIC)); - - var actualTxt = - objectMapper.readValue( - objectMapper - .readTree(messageCaptor.getAllValues().getFirst()) - .path("payload") - .toString(), - ObservationTxt.class); - - assertEquals(txt, actualTxt); + ParsedObservation parsedObservation = ObservationParser.parse(observation, BATCH_ID); + + assertEquals(txt, parsedObservation.textEntries().get(0)); } @Test @@ -422,7 +283,7 @@ void testTransformNoObservationData() { Observation observation = new Observation(); observation.setObservationUid(10001234L); observation.setOrganizationParticipations("{\"act_uid\": 10000003}"); - transformer.transformObservationData(observation, BATCH_ID); + ObservationParser.parse(observation, BATCH_ID); List logs = listAppender.list; logs.forEach(le -> assertTrue(le.getFormattedMessage().matches("^\\w+ array is null."))); @@ -447,7 +308,7 @@ void testTransformObservationDataError() { observation.setObsReason(invalidJSON); observation.setObsTxt(invalidJSON); - transformer.transformObservationData(observation, BATCH_ID); + ObservationParser.parse(observation, BATCH_ID); List logs = listAppender.list; logs.forEach(le -> assertTrue(le.getFormattedMessage().contains(invalidJSON))); @@ -467,7 +328,7 @@ void testTransformObservationInvalidDomainError() { observation.setMaterialParticipations(dummyJSON); observation.setFollowupObservations(dummyJSON); - transformer.transformObservationData(observation, BATCH_ID); + ObservationParser.parse(observation, BATCH_ID); List logs = listAppender.list.subList(0, 4); logs.forEach( @@ -491,7 +352,7 @@ void testTransformObservationNullError(String payload) { observation.setFollowupObservations(payload); observation.setParentObservations(payload); - transformer.transformObservationData(observation, BATCH_ID); + ObservationParser.parse(observation, BATCH_ID); List logs = listAppender.list.subList(0, 4); logs.forEach(