|
| 1 | +package org.opencds.cqf.fhir.cr.hapi.common; |
| 2 | + |
| 3 | +import ca.uhn.fhir.context.FhirContext; |
| 4 | +import ca.uhn.fhir.context.FhirVersionEnum; |
| 5 | +import ca.uhn.fhir.parser.IParser; |
| 6 | +import ca.uhn.fhir.parser.path.EncodeContextPath; |
| 7 | +import ca.uhn.fhir.parser.path.EncodeContextPathElement; |
| 8 | +import ca.uhn.fhir.repository.IRepository; |
| 9 | +import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; |
| 10 | +import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException; |
| 11 | +import com.fasterxml.jackson.annotation.JsonInclude.Include; |
| 12 | +import com.fasterxml.jackson.core.JsonGenerator; |
| 13 | +import com.fasterxml.jackson.core.JsonProcessingException; |
| 14 | +import com.fasterxml.jackson.core.Version; |
| 15 | +import com.fasterxml.jackson.databind.ObjectMapper; |
| 16 | +import com.fasterxml.jackson.databind.SerializationFeature; |
| 17 | +import com.fasterxml.jackson.databind.SerializerProvider; |
| 18 | +import com.fasterxml.jackson.databind.module.SimpleModule; |
| 19 | +import com.fasterxml.jackson.databind.ser.std.StdSerializer; |
| 20 | +import java.io.IOException; |
| 21 | +import java.nio.charset.StandardCharsets; |
| 22 | +import java.util.Arrays; |
| 23 | +import java.util.List; |
| 24 | +import java.util.Optional; |
| 25 | +import java.util.concurrent.ExecutionException; |
| 26 | +import java.util.concurrent.ExecutorService; |
| 27 | +import java.util.concurrent.Executors; |
| 28 | +import java.util.concurrent.Future; |
| 29 | +import java.util.concurrent.TimeUnit; |
| 30 | +import java.util.stream.Collectors; |
| 31 | +import org.hl7.fhir.instance.model.api.IBase; |
| 32 | +import org.hl7.fhir.instance.model.api.IBaseBundle; |
| 33 | +import org.hl7.fhir.instance.model.api.IBaseResource; |
| 34 | +import org.hl7.fhir.instance.model.api.IPrimitiveType; |
| 35 | +import org.hl7.fhir.r4.model.Binary; |
| 36 | +import org.hl7.fhir.r4.model.Bundle; |
| 37 | +import org.hl7.fhir.r4.model.Library; |
| 38 | +import org.hl7.fhir.r4.model.MetadataResource; |
| 39 | +import org.hl7.fhir.r4.model.Parameters; |
| 40 | +import org.hl7.fhir.r4.model.Parameters.ParametersParameterComponent; |
| 41 | +import org.hl7.fhir.r4.model.PlanDefinition; |
| 42 | +import org.hl7.fhir.r4.model.Resource; |
| 43 | +import org.hl7.fhir.r4.model.ValueSet; |
| 44 | +import org.opencds.cqf.fhir.cr.common.ArtifactDiffProcessor.DiffCache; |
| 45 | +import org.opencds.cqf.fhir.cr.common.ArtifactDiffProcessor.DiffCache.DiffCacheResource; |
| 46 | +import org.opencds.cqf.fhir.cr.common.ICreateChangelogProcessor; |
| 47 | +import org.opencds.cqf.fhir.cr.common.PackageProcessor; |
| 48 | +import org.opencds.cqf.fhir.cr.crmi.KnowledgeArtifactProcessor; |
| 49 | +import org.opencds.cqf.fhir.cr.crmi.changelog.ChangeLog; |
| 50 | +import org.opencds.cqf.fhir.cr.crmi.changelog.Page; |
| 51 | +import org.opencds.cqf.fhir.utility.Canonicals; |
| 52 | +import org.opencds.cqf.fhir.utility.adapter.IAdapterFactory; |
| 53 | +import org.opencds.cqf.fhir.utility.model.FhirModelResolverCache; |
| 54 | + |
| 55 | +@SuppressWarnings("UnstableApiUsage") |
| 56 | +public class HapiCreateChangelogProcessor implements ICreateChangelogProcessor { |
| 57 | + |
| 58 | + private final FhirVersionEnum fhirVersion; |
| 59 | + private final PackageProcessor packageProcessor; |
| 60 | + |
| 61 | + private final HapiArtifactDiffProcessor hapiArtifactDiffProcessor; |
| 62 | + |
| 63 | + private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(10); |
| 64 | + |
| 65 | + public HapiCreateChangelogProcessor(IRepository repository) { |
| 66 | + this.fhirVersion = repository.fhirContext().getVersion().getVersion(); |
| 67 | + this.packageProcessor = new PackageProcessor(repository); |
| 68 | + this.hapiArtifactDiffProcessor = new HapiArtifactDiffProcessor(repository); |
| 69 | + } |
| 70 | + |
| 71 | + static { |
| 72 | + Runtime.getRuntime().addShutdownHook(new Thread(() -> { |
| 73 | + try { |
| 74 | + EXECUTOR_SERVICE.shutdown(); |
| 75 | + if (!EXECUTOR_SERVICE.awaitTermination(30, TimeUnit.SECONDS)) { |
| 76 | + EXECUTOR_SERVICE.shutdownNow(); |
| 77 | + } |
| 78 | + } catch (InterruptedException e) { |
| 79 | + EXECUTOR_SERVICE.shutdownNow(); |
| 80 | + Thread.currentThread().interrupt(); |
| 81 | + } |
| 82 | + })); |
| 83 | + } |
| 84 | + |
| 85 | + @Override |
| 86 | + public IBaseResource createChangelog( |
| 87 | + IBaseResource source, IBaseResource target, IBaseResource terminologyEndpoint) { |
| 88 | + |
| 89 | + // 1) Use package to get a pair of bundles |
| 90 | + List<Future<IBaseBundle>> packages; |
| 91 | + Bundle sourceBundle; |
| 92 | + Bundle targetBundle; |
| 93 | + Parameters params = new Parameters(); |
| 94 | + params.addParameter().setName("terminologyEndpoint").setResource((Resource) terminologyEndpoint); |
| 95 | + try { |
| 96 | + packages = EXECUTOR_SERVICE.invokeAll(Arrays.asList( |
| 97 | + () -> packageProcessor.packageResource(source, params), |
| 98 | + () -> packageProcessor.packageResource(target, params))); |
| 99 | + sourceBundle = (Bundle) packages.get(0).get(); |
| 100 | + targetBundle = (Bundle) packages.get(1).get(); |
| 101 | + } catch (InterruptedException e) { |
| 102 | + Thread.currentThread().interrupt(); |
| 103 | + throw new InternalErrorException(e.getMessage()); |
| 104 | + } catch (ExecutionException e) { |
| 105 | + throw new InternalErrorException(e.getMessage()); |
| 106 | + } |
| 107 | + |
| 108 | + // 2) Fill the cache with the bundle contents |
| 109 | + var cache = populateCache(source, sourceBundle, target, targetBundle); |
| 110 | + |
| 111 | + // 3) Use cached resources to create diff and changelog |
| 112 | + var targetResource = cache.getTargetResourceForUrl(((MetadataResource) target).getUrl()); |
| 113 | + var sourceResource = cache.getSourceResourceForUrl(((MetadataResource) source).getUrl()); |
| 114 | + if (targetResource.isPresent() && sourceResource.isPresent()) { |
| 115 | + var targetAdapter = IAdapterFactory.forFhirVersion(FhirVersionEnum.R4) |
| 116 | + .createKnowledgeArtifactAdapter(targetResource.get().resource); |
| 117 | + var diffParameters = hapiArtifactDiffProcessor.getArtifactDiff( |
| 118 | + sourceResource.get().resource, |
| 119 | + targetResource.get().resource, |
| 120 | + true, |
| 121 | + true, |
| 122 | + cache, |
| 123 | + terminologyEndpoint); |
| 124 | + var manifestUrl = targetAdapter.getUrl(); |
| 125 | + var changelog = new ChangeLog(manifestUrl); |
| 126 | + processChanges(((Parameters) diffParameters).getParameter(), changelog, cache, manifestUrl); |
| 127 | + |
| 128 | + // 4) Handle the Conditions and Priorities which are in RelatedArtifact changes |
| 129 | + changelog.handleRelatedArtifacts(); |
| 130 | + |
| 131 | + // 5) Generate the output JSON |
| 132 | + var bin = new Binary(); |
| 133 | + var mapper = createSerializer(); |
| 134 | + try { |
| 135 | + bin.setContent(mapper.writeValueAsString(changelog).getBytes(StandardCharsets.UTF_8)); |
| 136 | + } catch (JsonProcessingException e) { |
| 137 | + throw new UnprocessableEntityException(e.getMessage()); |
| 138 | + } |
| 139 | + |
| 140 | + return bin; |
| 141 | + } |
| 142 | + |
| 143 | + throw new UnprocessableEntityException("Could not find source or target resource in cached package responses"); |
| 144 | + } |
| 145 | + |
| 146 | + private DiffCache populateCache( |
| 147 | + IBaseResource source, Bundle sourceBundle, IBaseResource target, Bundle targetBundle) { |
| 148 | + var cache = new DiffCache(); |
| 149 | + for (final var entry : sourceBundle.getEntry()) { |
| 150 | + if (entry.hasResource() && entry.getResource() instanceof MetadataResource metadataResource) { |
| 151 | + cache.addSource(metadataResource.getUrl() + "|" + metadataResource.getVersion(), metadataResource); |
| 152 | + if (metadataResource.getIdPart().equals(source.getIdElement().getIdPart())) { |
| 153 | + cache.addSource(metadataResource.getUrl(), metadataResource); |
| 154 | + } |
| 155 | + } |
| 156 | + } |
| 157 | + for (final var entry : targetBundle.getEntry()) { |
| 158 | + if (entry.hasResource() && entry.getResource() instanceof MetadataResource metadataResource) { |
| 159 | + cache.addTarget(metadataResource.getUrl() + "|" + metadataResource.getVersion(), metadataResource); |
| 160 | + if (metadataResource.getIdPart().equals(target.getIdElement().getIdPart())) { |
| 161 | + cache.addTarget(metadataResource.getUrl(), metadataResource); |
| 162 | + } |
| 163 | + } |
| 164 | + } |
| 165 | + return cache; |
| 166 | + } |
| 167 | + |
| 168 | + private ObjectMapper createSerializer() { |
| 169 | + var mapper = new ObjectMapper() |
| 170 | + .setDefaultPropertyInclusion(Include.NON_NULL) |
| 171 | + .disable(SerializationFeature.FAIL_ON_EMPTY_BEANS); |
| 172 | + SimpleModule module = new SimpleModule("IBaseSerializer", new Version(1, 0, 0, null, null, null)); |
| 173 | + module.addSerializer(IBase.class, new IBaseSerializer(FhirContext.forVersion(this.fhirVersion))); |
| 174 | + mapper.registerModule(module); |
| 175 | + return mapper; |
| 176 | + } |
| 177 | + |
| 178 | + private void processChanges( |
| 179 | + List<Parameters.ParametersParameterComponent> changes, ChangeLog changelog, DiffCache cache, String url) { |
| 180 | + // 1) Get the source and target resources so we can pull additional info as necessary |
| 181 | + var resourceType = Canonicals.getResourceType(url); |
| 182 | + // Check if the resource pair was already processed |
| 183 | + var wasPageAlreadyProcessed = changelog.getPage(url).isPresent(); |
| 184 | + if (!wasPageAlreadyProcessed |
| 185 | + && (cache.getSourceResourceForUrl(url).isPresent() |
| 186 | + || cache.getTargetResourceForUrl(url).isPresent())) { |
| 187 | + final Optional<DiffCacheResource> sourceCacheResource = cache.getSourceResourceForUrl(url); |
| 188 | + final Optional<DiffCacheResource> targetCacheResource = cache.getTargetResourceForUrl(url); |
| 189 | + if (resourceType != null) { |
| 190 | + MetadataResource sourceResource = sourceCacheResource |
| 191 | + .map(diffCacheResource -> diffCacheResource.resource) |
| 192 | + .orElse(null); |
| 193 | + MetadataResource targetResource = targetCacheResource |
| 194 | + .map(diffCacheResource -> diffCacheResource.resource) |
| 195 | + .orElse(null); |
| 196 | + // don't generate changeLog pages for non-grouper ValueSets |
| 197 | + if (resourceType.equals("ValueSet") |
| 198 | + && ((sourceResource != null && !KnowledgeArtifactProcessor.isGrouper(sourceResource)) |
| 199 | + || (targetResource != null && !KnowledgeArtifactProcessor.isGrouper(targetResource)))) { |
| 200 | + return; |
| 201 | + } |
| 202 | + // 2) Generate a page for each resource pair based on ResourceType |
| 203 | + var page = changelog.getPage(url).orElseGet(() -> switch (resourceType) { |
| 204 | + case "ValueSet" -> changelog.addPage((ValueSet) sourceResource, (ValueSet) targetResource, cache); |
| 205 | + case "Library" -> changelog.addPage((Library) sourceResource, (Library) targetResource); |
| 206 | + case "PlanDefinition" -> |
| 207 | + changelog.addPage((PlanDefinition) sourceResource, (PlanDefinition) targetResource); |
| 208 | + default -> changelog.addPage(sourceResource, targetResource, url); |
| 209 | + }); |
| 210 | + // 3) Process each change |
| 211 | + for (var change : changes) { |
| 212 | + processChange(changelog, cache, change, sourceResource, page); |
| 213 | + } |
| 214 | + } |
| 215 | + } |
| 216 | + } |
| 217 | + |
| 218 | + private void processChange( |
| 219 | + ChangeLog changelog, |
| 220 | + DiffCache cache, |
| 221 | + ParametersParameterComponent change, |
| 222 | + MetadataResource sourceResource, |
| 223 | + Page<?> page) { |
| 224 | + if (change.hasName() |
| 225 | + && !change.getName().equals("operation") |
| 226 | + && change.hasResource() |
| 227 | + && change.getResource() instanceof Parameters parameters) { |
| 228 | + // Nested Parameters objects get recursively processed |
| 229 | + processChanges(parameters.getParameter(), changelog, cache, change.getName()); |
| 230 | + } else if (change.getName().equals("operation")) { |
| 231 | + // 1) For each operation get the relevant parameters |
| 232 | + var type = getStringParameter(change, "type") |
| 233 | + .orElseThrow(() -> new UnprocessableEntityException( |
| 234 | + "Type must be provided when adding an operation to the ChangeLog")); |
| 235 | + var newValue = getParameter(change, "value"); |
| 236 | + var path = getPathParameterNoBase(change); |
| 237 | + var originalValue = getParameter(change, "previousValue").map(o -> (Object) o); |
| 238 | + // try to extract the original value from the |
| 239 | + // source object if not present in the Diff |
| 240 | + // Parameters object |
| 241 | + try { |
| 242 | + if (originalValue.isEmpty() && !type.equals("insert") && sourceResource != null && path.isPresent()) { |
| 243 | + originalValue = Optional.of(FhirModelResolverCache.resolverForVersion(fhirVersion) |
| 244 | + .resolvePath(sourceResource, path.get())); |
| 245 | + } |
| 246 | + } catch (Exception e) { |
| 247 | + throw new InternalErrorException("Could not process path: " + path + ": " + e.getMessage()); |
| 248 | + } |
| 249 | + |
| 250 | + // 2) Add a new operation to the ChangeLog |
| 251 | + page.addOperation(type, path.orElse(null), newValue.orElse(null), originalValue.orElse(null)); |
| 252 | + } |
| 253 | + } |
| 254 | + |
| 255 | + private Optional<String> getPathParameterNoBase(Parameters.ParametersParameterComponent change) { |
| 256 | + return getStringParameter(change, "path").map(p -> { |
| 257 | + var e = new EncodeContextPath(p); |
| 258 | + return removeBase(e); |
| 259 | + }); |
| 260 | + } |
| 261 | + |
| 262 | + private String removeBase(EncodeContextPath path) { |
| 263 | + return path.getPath().subList(1, path.getPath().size()).stream() |
| 264 | + .map(EncodeContextPathElement::toString) |
| 265 | + .collect(Collectors.joining(".")); |
| 266 | + } |
| 267 | + |
| 268 | + private Optional<String> getStringParameter(Parameters.ParametersParameterComponent part, String name) { |
| 269 | + return part.getPart().stream() |
| 270 | + .filter(p -> p.getName().equalsIgnoreCase(name)) |
| 271 | + .filter(p -> p.getValue() instanceof IPrimitiveType) |
| 272 | + .map(p -> (IPrimitiveType<?>) p.getValue()) |
| 273 | + .map(s -> (String) s.getValue()) |
| 274 | + .findAny(); |
| 275 | + } |
| 276 | + |
| 277 | + private Optional<IBase> getParameter(Parameters.ParametersParameterComponent part, String name) { |
| 278 | + return part.getPart().stream() |
| 279 | + .filter(p -> p.getName().equalsIgnoreCase(name)) |
| 280 | + .filter(ParametersParameterComponent::hasValue) |
| 281 | + .map(p -> (IBase) p.getValue()) |
| 282 | + .findAny(); |
| 283 | + } |
| 284 | + |
| 285 | + public static class IBaseSerializer extends StdSerializer<IBase> { |
| 286 | + private final transient IParser parser; |
| 287 | + |
| 288 | + public IBaseSerializer(FhirContext fhirCtx) { |
| 289 | + super(IBase.class); |
| 290 | + parser = fhirCtx.newJsonParser().setPrettyPrint(true); |
| 291 | + } |
| 292 | + |
| 293 | + @Override |
| 294 | + public void serialize(IBase resource, JsonGenerator jsonGenerator, SerializerProvider provider) |
| 295 | + throws IOException { |
| 296 | + String resourceJson = parser.encodeToString(resource); |
| 297 | + jsonGenerator.writeRawValue(resourceJson); |
| 298 | + } |
| 299 | + } |
| 300 | +} |
0 commit comments