From 1c3154a894490f46f05a77d5d43d7fd252d3bf03 Mon Sep 17 00:00:00 2001 From: ryan Date: Wed, 27 Aug 2025 12:10:45 -0700 Subject: [PATCH 1/5] Explicitly calling blob and clob free. Putting streams from blob and clob in try-with-resources blocks. Calling jOOQ fetchStream in try-with-resources. --- .../api/BinaryTimeSeriesValueController.java | 6 +-- .../java/cwms/cda/api/BlobController.java | 8 ++-- .../java/cwms/cda/api/ClobController.java | 9 ++-- .../cwms/cda/api/ForecastFileController.java | 5 ++- .../main/java/cwms/cda/data/dao/BlobDao.java | 12 ++++-- .../main/java/cwms/cda/data/dao/ClobDao.java | 14 ++++--- .../cda/data/dao/ForecastInstanceDao.java | 42 ++++++++++++------- .../cwms/cda/data/dao/LocationsDaoImpl.java | 39 +++++++++-------- .../binarytimeseries/TimeSeriesBinaryDao.java | 31 +++++++++----- .../RegularTimeSeriesTextDao.java | 24 +++++++---- 10 files changed, 115 insertions(+), 75 deletions(-) diff --git a/cwms-data-api/src/main/java/cwms/cda/api/BinaryTimeSeriesValueController.java b/cwms-data-api/src/main/java/cwms/cda/api/BinaryTimeSeriesValueController.java index dd7828f04e..73e6fa0d2b 100644 --- a/cwms-data-api/src/main/java/cwms/cda/api/BinaryTimeSeriesValueController.java +++ b/cwms-data-api/src/main/java/cwms/cda/api/BinaryTimeSeriesValueController.java @@ -39,7 +39,6 @@ import javax.servlet.http.HttpServletResponse; import java.io.InputStream; -import java.util.logging.Logger; import static com.codahale.metrics.MetricRegistry.name; import static cwms.cda.api.Controllers.*; @@ -100,8 +99,9 @@ public void handle(Context ctx) { } else { long size = blob.length(); requestResultSize.update(size); - InputStream is = blob.getBinaryStream(); - ctx.seekableStream(is, mediaType, size); + try (InputStream is = blob.getBinaryStream()) { + ctx.seekableStream(is, mediaType, size); + } } }); } diff --git a/cwms-data-api/src/main/java/cwms/cda/api/BlobController.java b/cwms-data-api/src/main/java/cwms/cda/api/BlobController.java index e026d922c2..f5b6f49f69 100644 --- a/cwms-data-api/src/main/java/cwms/cda/api/BlobController.java +++ b/cwms-data-api/src/main/java/cwms/cda/api/BlobController.java @@ -3,7 +3,6 @@ import com.codahale.metrics.Histogram; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; -import com.google.common.flogger.FluentLogger; import cwms.cda.api.errors.CdaError; import cwms.cda.data.dao.BlobDao; import cwms.cda.data.dao.JooqDao; @@ -12,7 +11,6 @@ import cwms.cda.data.dto.CwmsDTOPaginated; import cwms.cda.formatters.ContentType; import cwms.cda.formatters.Formats; -import cwms.cda.formatters.FormattingException; import io.javalin.apibuilder.CrudHandler; import io.javalin.core.util.Header; import io.javalin.http.Context; @@ -39,7 +37,6 @@ * */ public class BlobController implements CrudHandler { - private static final FluentLogger logger = FluentLogger.forEnclosingClass(); private static final int DEFAULT_PAGE_SIZE = 20; public static final String TAG = "Blob"; @@ -152,8 +149,9 @@ public void getOne(@NotNull Context ctx, @NotNull String blobId) { } else { long size = blob.length(); requestResultSize.update(size); - InputStream is = blob.getBinaryStream(); - ctx.seekableStream(is, mediaType, size); + try (InputStream is = blob.getBinaryStream()) { + ctx.seekableStream(is, mediaType, size); + } } }; if (office.isPresent()) { diff --git a/cwms-data-api/src/main/java/cwms/cda/api/ClobController.java b/cwms-data-api/src/main/java/cwms/cda/api/ClobController.java index 3c8eef6bf0..1788c394fa 100644 --- a/cwms-data-api/src/main/java/cwms/cda/api/ClobController.java +++ b/cwms-data-api/src/main/java/cwms/cda/api/ClobController.java @@ -3,7 +3,6 @@ import com.codahale.metrics.Histogram; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; -import com.google.common.flogger.FluentLogger; import cwms.cda.api.errors.CdaError; import cwms.cda.data.dao.ClobDao; import cwms.cda.data.dao.JooqDao; @@ -12,7 +11,6 @@ import cwms.cda.data.dto.CwmsDTOPaginated; import cwms.cda.formatters.ContentType; import cwms.cda.formatters.Formats; -import cwms.cda.formatters.FormattingException; import io.javalin.apibuilder.CrudHandler; import io.javalin.core.util.Header; import io.javalin.http.Context; @@ -28,6 +26,8 @@ import org.jooq.DSLContext; import javax.servlet.http.HttpServletResponse; + +import java.io.InputStream; import java.util.Objects; import java.util.Optional; @@ -36,7 +36,6 @@ public class ClobController implements CrudHandler { - private static final FluentLogger log = FluentLogger.forEnclosingClass(); private static final int DEFAULT_PAGE_SIZE = 20; public static final String TAG = "Clob"; public static final String TEXT_PLAIN = "text/plain"; @@ -178,7 +177,9 @@ public void getOne(@NotNull Context ctx, @NotNull String clobId) { ctx.status(HttpServletResponse.SC_NOT_FOUND).json(new CdaError("Unable to find " + "clob based on given parameters")); } else { - ctx.seekableStream(c.getAsciiStream(), TEXT_PLAIN, c.length()); + try (InputStream is = c.getAsciiStream()) { + ctx.seekableStream(is, TEXT_PLAIN, c.length()); + } } }); } else { diff --git a/cwms-data-api/src/main/java/cwms/cda/api/ForecastFileController.java b/cwms-data-api/src/main/java/cwms/cda/api/ForecastFileController.java index 403083040d..aa722fca1b 100644 --- a/cwms-data-api/src/main/java/cwms/cda/api/ForecastFileController.java +++ b/cwms-data-api/src/main/java/cwms/cda/api/ForecastFileController.java @@ -109,8 +109,9 @@ public void handle(Context ctx) { } else { long size = blob.length(); requestResultSize.update(size); - InputStream is = blob.getBinaryStream(); - ctx.seekableStream(is, mediaType, size); + try (InputStream is = blob.getBinaryStream()) { + ctx.seekableStream(is, mediaType, size); + } } }); } diff --git a/cwms-data-api/src/main/java/cwms/cda/data/dao/BlobDao.java b/cwms-data-api/src/main/java/cwms/cda/data/dao/BlobDao.java index c3ab3ecd47..4cff7167f5 100644 --- a/cwms-data-api/src/main/java/cwms/cda/data/dao/BlobDao.java +++ b/cwms-data-api/src/main/java/cwms/cda/data/dao/BlobDao.java @@ -104,10 +104,16 @@ public void getBlob(String id, BlobConsumer consumer) { }); } - private static void handleResultSet(ResultSet resultSet, BlobConsumer consumer) throws SQLException { + private static void handleResultSet(ResultSet resultSet, BlobConsumer consumer) throws SQLException, IOException { String mediaType = resultSet.getString("MEDIA_TYPE_ID"); java.sql.Blob blob = resultSet.getBlob("VALUE"); - consumer.accept(blob, mediaType); + try { + consumer.accept(blob, mediaType); + } finally { + if (blob != null) { + blob.free(); + } + } } @@ -190,6 +196,6 @@ public static byte[] readFully(@NotNull InputStream stream) throws IOException { @FunctionalInterface public interface BlobConsumer { - void accept(java.sql.Blob blob, String mediaType) throws SQLException; + void accept(java.sql.Blob blob, String mediaType) throws SQLException, IOException; } } diff --git a/cwms-data-api/src/main/java/cwms/cda/data/dao/ClobDao.java b/cwms-data-api/src/main/java/cwms/cda/data/dao/ClobDao.java index d76e33b3bb..ca2dcecdb0 100644 --- a/cwms-data-api/src/main/java/cwms/cda/data/dao/ClobDao.java +++ b/cwms-data-api/src/main/java/cwms/cda/data/dao/ClobDao.java @@ -217,7 +217,7 @@ public static String getBoolean(boolean failIfExists) { } public void delete(String officeId, String id) { - dsl.connection(c -> CWMS_TEXT_PACKAGE.call_DELETE_TEXT( + connection(dsl,c -> CWMS_TEXT_PACKAGE.call_DELETE_TEXT( getDslContext(c,officeId).configuration(), id, officeId) ); } @@ -232,7 +232,7 @@ public void update(Clob clob, boolean ignoreNulls) { // it throws - ORA-20244: NULL_ARGUMENT: Argument P_TEXT is not allowed to be null // Also note: when p_ignore_nulls == 'F' and the value is "" (empty string) // it throws - ORA-20244: NULL_ARGUMENT: Argument P_TEXT is not allowed to be null - dsl.connection(c -> + connection(dsl,c -> CWMS_TEXT_PACKAGE.call_UPDATE_TEXT( getDslContext(c,clob.getOfficeId()).configuration(), clob.getValue(), @@ -269,10 +269,12 @@ public void getClob(String clobId, String officeId, ClobConsumer clobConsumer) { try (ResultSet resultSet = preparedStatement.executeQuery()) { if (resultSet.next()) { java.sql.Clob clob = resultSet.getClob("VALUE"); - if (clob != null) { + try { clobConsumer.accept(clob); - } else { - clobConsumer.accept(null); + } finally { + if (clob != null) { + clob.free(); + } } } else { throw new NotFoundException("Unable to find clob with id " + clobId + " in office " + officeId); @@ -296,6 +298,6 @@ public static String readFully(java.sql.Clob clob) throws IOException, SQLExcept @FunctionalInterface public interface ClobConsumer { - void accept(java.sql.Clob blob) throws SQLException; + void accept(java.sql.Clob blob) throws SQLException, IOException; } } diff --git a/cwms-data-api/src/main/java/cwms/cda/data/dao/ForecastInstanceDao.java b/cwms-data-api/src/main/java/cwms/cda/data/dao/ForecastInstanceDao.java index a933c1fabe..5d95aaf3f6 100644 --- a/cwms-data-api/src/main/java/cwms/cda/data/dao/ForecastInstanceDao.java +++ b/cwms-data-api/src/main/java/cwms/cda/data/dao/ForecastInstanceDao.java @@ -23,7 +23,6 @@ import java.sql.Struct; import java.sql.Timestamp; import java.time.Instant; -import java.time.OffsetDateTime; import java.util.ArrayList; import java.util.Arrays; import java.util.Calendar; @@ -203,18 +202,25 @@ private static ForecastInstance map(int byteLimit, ReplaceUtils.OperatorBuilder fileName = (String) attributes[0]; mediaType = (String) attributes[1]; Blob blob = (Blob) attributes[4]; - if (blob.length() > byteLimit) { - String param = "&%s=%s"; - String utf8 = "UTF-8"; - url = urlBuilder.build().apply(specId) + "?" - + format(param, Controllers.NAME, URLEncoder.encode(specId, utf8)) - + format(param, Controllers.FORECAST_DATE, URLEncoder.encode(forecastDate.toString(), utf8)) - + format(param, Controllers.ISSUE_DATE, URLEncoder.encode(issueDate.toString(), utf8)) - + format(param, Controllers.DESIGNATOR, URLEncoder.encode(designator, utf8)) - + format(param, Controllers.OFFICE, URLEncoder.encode(officeId, utf8)); - } else { - try (InputStream is = blob.getBinaryStream()) { - fileData = BlobDao.readFully(is); + try { + if (blob.length() > byteLimit) { + String param = "&%s=%s"; + String utf8 = "UTF-8"; + url = urlBuilder.build().apply(specId) + "?" + + format(param, Controllers.NAME, URLEncoder.encode(specId, utf8)) + + format(param, Controllers.FORECAST_DATE, URLEncoder.encode(forecastDate.toString(), utf8)) + + format(param, Controllers.ISSUE_DATE, URLEncoder.encode(issueDate.toString(), utf8)) + + format(param, Controllers.DESIGNATOR, URLEncoder.encode(designator, utf8)) + + format(param, Controllers.OFFICE, URLEncoder.encode(officeId, utf8)); + + } else { + try (InputStream is = blob.getBinaryStream()) { + fileData = BlobDao.readFully(is); + } + } + } finally { + if (blob != null) { + blob.free(); } } } @@ -314,8 +320,14 @@ public void getFileBlob(String office, String name, String designator, mediaType = "application/octet-stream"; } Blob blob = (Blob) attributes[4]; - consumer.accept(blob, mediaType); - return; + try { + consumer.accept(blob, mediaType); + return; + } finally { + if (blob != null) { + blob.free(); + } + } } } } diff --git a/cwms-data-api/src/main/java/cwms/cda/data/dao/LocationsDaoImpl.java b/cwms-data-api/src/main/java/cwms/cda/data/dao/LocationsDaoImpl.java index 72a2256c8a..fbe3cb4458 100644 --- a/cwms-data-api/src/main/java/cwms/cda/data/dao/LocationsDaoImpl.java +++ b/cwms-data-api/src/main/java/cwms/cda/data/dao/LocationsDaoImpl.java @@ -61,6 +61,7 @@ import java.util.Set; import java.util.logging.Level; import java.util.logging.Logger; +import java.util.stream.Stream; import org.geojson.Feature; import org.geojson.FeatureCollection; import org.geojson.Point; @@ -432,25 +433,27 @@ private Catalog getLocationCatalog(Catalog.CatalogPage catPage, int pageSize, Ca .leftOuterJoin(avLoc2).on(avLoc2.LOCATION_CODE.eq(limitCode)) .orderBy(avLoc2.DB_OFFICE_ID.asc(),limitId.asc(),avLoc2.ALIASED_ITEM.asc()); logger.log(Level.FINER, () -> query.getSQL(ParamType.INLINED)); - List entries = query + try (Stream recordStream = query .fetchSize(1000) - .fetchStream() - .map(r -> r.into(AV_LOC2.AV_LOC2)) - .collect(groupingBy(usace.cwms.db.jooq.codegen.tables.records.AV_LOC2::getLOCATION_CODE)) - .values() - .stream() - .map(l -> { - usace.cwms.db.jooq.codegen.tables.records.AV_LOC2 row = l.stream() - .filter(r -> r.getALIASED_ITEM() == null) - .findFirst() - .orElseThrow(() -> new DataAccessException("Could not find location for list of aliases: " + l)); - Set aliases = l.stream().filter(r -> r.getALIASED_ITEM() != null) - .map(this::buildLocationAlias).collect(toSet()); - return buildCatalogEntry(row, aliases); - }) - .collect(toList()); - - return new Catalog(cursorLocation, total, pageSize, entries, params); + .fetchStream()) { + List entries = recordStream + .map(r -> r.into(AV_LOC2.AV_LOC2)) + .collect(groupingBy(usace.cwms.db.jooq.codegen.tables.records.AV_LOC2::getLOCATION_CODE)) + .values() + .stream() + .map(l -> { + usace.cwms.db.jooq.codegen.tables.records.AV_LOC2 row = l.stream() + .filter(r -> r.getALIASED_ITEM() == null) + .findFirst() + .orElseThrow(() -> new DataAccessException("Could not find location for list of aliases: " + l)); + Set aliases = l.stream().filter(r -> r.getALIASED_ITEM() != null) + .map(this::buildLocationAlias).collect(toSet()); + return buildCatalogEntry(row, aliases); + }) + .collect(toList()); + + return new Catalog(cursorLocation, total, pageSize, entries, params); + } } private static Condition buildWhereCondition(CatalogRequestParameters params) { diff --git a/cwms-data-api/src/main/java/cwms/cda/data/dao/binarytimeseries/TimeSeriesBinaryDao.java b/cwms-data-api/src/main/java/cwms/cda/data/dao/binarytimeseries/TimeSeriesBinaryDao.java index d1ba641a06..43b3c949ea 100644 --- a/cwms-data-api/src/main/java/cwms/cda/data/dao/binarytimeseries/TimeSeriesBinaryDao.java +++ b/cwms-data-api/src/main/java/cwms/cda/data/dao/binarytimeseries/TimeSeriesBinaryDao.java @@ -223,19 +223,28 @@ private BinaryTimeSeriesRow buildRow(long byteLimit, ReplaceUtils.OperatorBuilde .withQualityCode(0L) .withDestFlag(0); Blob b = rs.getBlob(VALUE); - if (b.length() > byteLimit) { - String binaryId = rs.getString(ID); - String url = urlBuilder.build().apply(dateTime.toString()) - //Hard-coding for now. Will be removed with schema update - + format("&%s=%s", Controllers.BLOB_ID, URLEncoder.encode(binaryId, "UTF-8")); - builder.withValueUrl(url); - } else { - try (InputStream is = b.getBinaryStream()) { - byte[] bytes = BlobDao.readFully(is); - builder.withBinaryValue(bytes); + try { + if(b != null) { + if (b.length() > byteLimit) { + String binaryId = rs.getString(ID); + String url = urlBuilder.build().apply(dateTime.toString()) + //Hard-coding for now. Will be removed with schema update + + format("&%s=%s", Controllers.BLOB_ID, URLEncoder.encode(binaryId, "UTF-8")); + builder.withValueUrl(url); + } else { + try (InputStream is = b.getBinaryStream()) { + byte[] bytes = BlobDao.readFully(is); + builder.withBinaryValue(bytes); + } + } + } + + return builder.build(); + } finally { + if (b != null) { + b.free(); } } - return builder.build(); } private void parameterizeRetrieveTsBinText(CallableStatement stmt, String tsId, String mask, diff --git a/cwms-data-api/src/main/java/cwms/cda/data/dao/texttimeseries/RegularTimeSeriesTextDao.java b/cwms-data-api/src/main/java/cwms/cda/data/dao/texttimeseries/RegularTimeSeriesTextDao.java index f6ea7f80e1..2e9883243a 100644 --- a/cwms-data-api/src/main/java/cwms/cda/data/dao/texttimeseries/RegularTimeSeriesTextDao.java +++ b/cwms-data-api/src/main/java/cwms/cda/data/dao/texttimeseries/RegularTimeSeriesTextDao.java @@ -145,14 +145,22 @@ private RegularTextTimeSeriesRow buildRow(ResultSet rs, long characterLimit, .withFilename(dateTime.getEpochSecond() + ".txt") .withMediaType("text/plain"); Clob clob = rs.getClob(TEXT); - if (clob.length() > characterLimit) { - String textId = rs.getString(TEXT_ID); - String url = urlBuilder.build().apply(dateTime.toString()) - //Hard-coding for now. Will be removed with schema update - + format("&%s=%s", Controllers.CLOB_ID, URLEncoder.encode(textId, "UTF-8")); - builder.withValueUrl(url); - } else { - builder.withTextValue(ClobDao.readFully(clob)); + try { + if (clob != null) { + if (clob.length() > characterLimit) { + String textId = rs.getString(TEXT_ID); + String url = urlBuilder.build().apply(dateTime.toString()) + //Hard-coding for now. Will be removed with schema update + + format("&%s=%s", Controllers.CLOB_ID, URLEncoder.encode(textId, "UTF-8")); + builder.withValueUrl(url); + } else { + builder.withTextValue(ClobDao.readFully(clob)); + } + } + } finally { + if (clob != null) { + clob.free(); + } } return builder.build(); } From 263624c0b1f5b28f04425f242747e7b7535aea6a Mon Sep 17 00:00:00 2001 From: ryan Date: Wed, 27 Aug 2025 12:27:24 -0700 Subject: [PATCH 2/5] Fixing merge error --- .../main/java/cwms/cda/data/dao/ForecastInstanceDao.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/cwms-data-api/src/main/java/cwms/cda/data/dao/ForecastInstanceDao.java b/cwms-data-api/src/main/java/cwms/cda/data/dao/ForecastInstanceDao.java index c2be8df2a9..b54977808b 100644 --- a/cwms-data-api/src/main/java/cwms/cda/data/dao/ForecastInstanceDao.java +++ b/cwms-data-api/src/main/java/cwms/cda/data/dao/ForecastInstanceDao.java @@ -28,6 +28,7 @@ import java.sql.Struct; import java.sql.Timestamp; import java.time.Instant; +import java.time.OffsetDateTime; import java.util.ArrayList; import java.util.Arrays; import java.util.Calendar; @@ -220,9 +221,10 @@ private static ForecastInstance map(int byteLimit, ReplaceUtils.OperatorBuilder + format(param, Controllers.NAME, URLEncoder.encode(specId, utf8)) + format(param, Controllers.FORECAST_DATE, URLEncoder.encode(forecastDate.toString(), utf8)) + format(param, Controllers.ISSUE_DATE, URLEncoder.encode(issueDate.toString(), utf8)) - + format(param, Controllers.DESIGNATOR, URLEncoder.encode(designator, utf8)) - + format(param, Controllers.OFFICE, URLEncoder.encode(officeId, utf8)); - + + format(param, Controllers.OFFICE, URLEncoder.encode(officeId, utf8)); + if(designator != null) { + url += format(param, Controllers.DESIGNATOR, URLEncoder.encode(designator, utf8)); + } } else { try (InputStream is = blob.getBinaryStream()) { fileData = BlobDao.readFully(is); From f03118b07b3c1d1b46f6d699cf3c6b6d6b41786c Mon Sep 17 00:00:00 2001 From: ryan Date: Wed, 27 Aug 2025 12:37:49 -0700 Subject: [PATCH 3/5] Fixing merge error --- cwms-data-api/src/main/java/cwms/cda/api/BlobController.java | 1 + 1 file changed, 1 insertion(+) diff --git a/cwms-data-api/src/main/java/cwms/cda/api/BlobController.java b/cwms-data-api/src/main/java/cwms/cda/api/BlobController.java index 465575de9b..acc2b32d4c 100644 --- a/cwms-data-api/src/main/java/cwms/cda/api/BlobController.java +++ b/cwms-data-api/src/main/java/cwms/cda/api/BlobController.java @@ -14,6 +14,7 @@ import cwms.cda.data.dto.CwmsDTOPaginated; import cwms.cda.formatters.ContentType; import cwms.cda.formatters.Formats; +import cwms.cda.formatters.FormattingException; import io.javalin.apibuilder.CrudHandler; import io.javalin.core.util.Header; import io.javalin.http.Context; From 8781bb7aa16650bea2dc383bf79e03d31d8555f4 Mon Sep 17 00:00:00 2001 From: ryan Date: Fri, 29 Aug 2025 09:16:51 -0700 Subject: [PATCH 4/5] Refactor blob and clob handling with a new seekableStream method that consumes the streams synchronously. --- .../api/BinaryTimeSeriesValueController.java | 2 +- .../java/cwms/cda/api/BlobController.java | 76 ++++++++------- .../java/cwms/cda/api/ClobController.java | 2 +- .../cwms/cda/api/ForecastFileController.java | 2 +- .../java/cwms/cda/api/RangeRequestUtil.java | 96 +++++++++++++++++++ .../api/TextTimeSeriesValueController.java | 5 +- .../main/java/cwms/cda/data/dao/BlobDao.java | 61 ++++++------ .../cwms/cda/api/BlobControllerTestIT.java | 16 ++-- 8 files changed, 181 insertions(+), 79 deletions(-) create mode 100644 cwms-data-api/src/main/java/cwms/cda/api/RangeRequestUtil.java diff --git a/cwms-data-api/src/main/java/cwms/cda/api/BinaryTimeSeriesValueController.java b/cwms-data-api/src/main/java/cwms/cda/api/BinaryTimeSeriesValueController.java index 73e6fa0d2b..76d668c014 100644 --- a/cwms-data-api/src/main/java/cwms/cda/api/BinaryTimeSeriesValueController.java +++ b/cwms-data-api/src/main/java/cwms/cda/api/BinaryTimeSeriesValueController.java @@ -100,7 +100,7 @@ public void handle(Context ctx) { long size = blob.length(); requestResultSize.update(size); try (InputStream is = blob.getBinaryStream()) { - ctx.seekableStream(is, mediaType, size); + RangeRequestUtil.seekableStream(ctx, is, mediaType, size); } } }); diff --git a/cwms-data-api/src/main/java/cwms/cda/api/BlobController.java b/cwms-data-api/src/main/java/cwms/cda/api/BlobController.java index acc2b32d4c..f7e84bdb27 100644 --- a/cwms-data-api/src/main/java/cwms/cda/api/BlobController.java +++ b/cwms-data-api/src/main/java/cwms/cda/api/BlobController.java @@ -1,6 +1,7 @@ package cwms.cda.api; import static com.codahale.metrics.MetricRegistry.name; + import static cwms.cda.api.Controllers.*; import com.codahale.metrics.Histogram; @@ -28,12 +29,13 @@ import java.io.InputStream; import java.util.List; import java.util.Optional; + import javax.servlet.http.HttpServletResponse; + import org.jetbrains.annotations.NotNull; import org.jooq.DSLContext; - /** * */ @@ -62,32 +64,32 @@ protected DSLContext getDslContext(Context ctx) { } @OpenApi( - queryParams = { - @OpenApiParam(name = OFFICE, - description = "Specifies the owning office. If this field is not " - + "specified, matching information from all offices shall be " - + "returned."), - @OpenApiParam(name = PAGE, - description = "This end point can return a lot of data, this " - + "identifies where in the request you are. This is an opaque" - + " value, and can be obtained from the 'next-page' value in " - + "the response."), - @OpenApiParam(name = PAGE_SIZE, - type = Integer.class, - description = "How many entries per page returned. Default " - + DEFAULT_PAGE_SIZE + "."), - @OpenApiParam(name = LIKE, - description = "Posix regular expression " - + "describing the blob id's you want") - }, - responses = {@OpenApiResponse(status = STATUS_200, - description = "A list of blobs.", - content = { - @OpenApiContent(type = Formats.JSON, from = Blobs.class), - @OpenApiContent(type = Formats.JSONV2, from = Blobs.class), - }) - }, - tags = {TAG} + queryParams = { + @OpenApiParam(name = OFFICE, + description = "Specifies the owning office. If this field is not " + + "specified, matching information from all offices shall be " + + "returned."), + @OpenApiParam(name = PAGE, + description = "This end point can return a lot of data, this " + + "identifies where in the request you are. This is an opaque" + + " value, and can be obtained from the 'next-page' value in " + + "the response."), + @OpenApiParam(name = PAGE_SIZE, + type = Integer.class, + description = "How many entries per page returned. Default " + + DEFAULT_PAGE_SIZE + "."), + @OpenApiParam(name = LIKE, + description = "Posix regular expression " + + "describing the blob id's you want") + }, + responses = {@OpenApiResponse(status = STATUS_200, + description = "A list of blobs.", + content = { + @OpenApiContent(type = Formats.JSON, from = Blobs.class), + @OpenApiContent(type = Formats.JSONV2, from = Blobs.class), + }) + }, + tags = {TAG} ) @Override public void getAll(@NotNull Context ctx) { @@ -130,7 +132,7 @@ public void getAll(@NotNull Context ctx) { description = "Returns the binary value of the requested blob as a seekable stream with the " + "appropriate media type.", queryParams = { - @OpenApiParam(name = OFFICE, description = "Specifies the owning office."), + @OpenApiParam(name = OFFICE, description = "Specifies the owning office."), }, tags = {TAG} ) @@ -151,8 +153,8 @@ public void getOne(@NotNull Context ctx, @NotNull String blobId) { } else { long size = blob.length(); requestResultSize.update(size); - try (InputStream is = blob.getBinaryStream()) { - ctx.seekableStream(is, mediaType, size); + try (InputStream is = blob.getBinaryStream()) { // is OracleBlobInputStream + RangeRequestUtil.seekableStream(ctx, is, mediaType, size); } } }; @@ -169,12 +171,12 @@ public void getOne(@NotNull Context ctx, @NotNull String blobId) { description = "Create new Blob", requestBody = @OpenApiRequestBody( content = { - @OpenApiContent(from = Blob.class, type = Formats.JSONV2) + @OpenApiContent(from = Blob.class, type = Formats.JSONV2) }, required = true), queryParams = { - @OpenApiParam(name = FAIL_IF_EXISTS, type = Boolean.class, - description = "Create will fail if provided ID already exists. Default: true") + @OpenApiParam(name = FAIL_IF_EXISTS, type = Boolean.class, + description = "Create will fail if provided ID already exists. Default: true") }, method = HttpMethod.POST, tags = {TAG} @@ -200,8 +202,8 @@ public void create(@NotNull Context ctx) { }, requestBody = @OpenApiRequestBody( content = { - @OpenApiContent(from = Blob.class, type = Formats.JSONV2), - @OpenApiContent(from = Blob.class, type = Formats.JSON) + @OpenApiContent(from = Blob.class, type = Formats.JSONV2), + @OpenApiContent(from = Blob.class, type = Formats.JSON) }, required = true), method = HttpMethod.PATCH, @@ -240,10 +242,10 @@ public void update(@NotNull Context ctx, @NotNull String blobId) { @OpenApi( description = "Deletes requested blob", pathParams = { - @OpenApiParam(name = BLOB_ID, description = "The blob identifier to be deleted"), + @OpenApiParam(name = BLOB_ID, description = "The blob identifier to be deleted"), }, queryParams = { - @OpenApiParam(name = OFFICE, required = true, description = "Specifies the " + @OpenApiParam(name = OFFICE, required = true, description = "Specifies the " + "owning office of the blob to be deleted"), }, method = HttpMethod.DELETE, diff --git a/cwms-data-api/src/main/java/cwms/cda/api/ClobController.java b/cwms-data-api/src/main/java/cwms/cda/api/ClobController.java index 6d527bb459..45d228c6bf 100644 --- a/cwms-data-api/src/main/java/cwms/cda/api/ClobController.java +++ b/cwms-data-api/src/main/java/cwms/cda/api/ClobController.java @@ -178,7 +178,7 @@ public void getOne(@NotNull Context ctx, @NotNull String clobId) { + "clob based on given parameters")); } else { try (InputStream is = c.getAsciiStream()) { - ctx.seekableStream(is, TEXT_PLAIN, c.length()); + RangeRequestUtil.seekableStream(ctx, is, TEXT_PLAIN, c.length()); } } }); diff --git a/cwms-data-api/src/main/java/cwms/cda/api/ForecastFileController.java b/cwms-data-api/src/main/java/cwms/cda/api/ForecastFileController.java index d140a0d67d..0fcd431c72 100644 --- a/cwms-data-api/src/main/java/cwms/cda/api/ForecastFileController.java +++ b/cwms-data-api/src/main/java/cwms/cda/api/ForecastFileController.java @@ -110,7 +110,7 @@ public void handle(Context ctx) { long size = blob.length(); requestResultSize.update(size); try (InputStream is = blob.getBinaryStream()) { - ctx.seekableStream(is, mediaType, size); + RangeRequestUtil.seekableStream(ctx, is, mediaType, size); } } }); diff --git a/cwms-data-api/src/main/java/cwms/cda/api/RangeRequestUtil.java b/cwms-data-api/src/main/java/cwms/cda/api/RangeRequestUtil.java new file mode 100644 index 0000000000..c84c74afac --- /dev/null +++ b/cwms-data-api/src/main/java/cwms/cda/api/RangeRequestUtil.java @@ -0,0 +1,96 @@ +package cwms.cda.api; + +import io.javalin.core.util.Header; +import io.javalin.http.Context; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.List; + +public class RangeRequestUtil { + + private RangeRequestUtil() { + // utility class + } + + /** + * Javalin has a method very similar to this in its Context class. The issue is that Javalin decided to + * take the InputStream, wrap it in a CompletedFuture and then process the request asynchronously. This + * causes problems when the InputStream is tied to a database connection that gets closed before the + * async processing happens. This method doesn't do the async thing but tries to support the rest. + * @param ctx + * @param is + * @param mediaType + * @param totalBytes + * @throws IOException + */ + public static void seekableStream(Context ctx, InputStream is, String mediaType, long totalBytes) throws IOException { + long from = 0; + long to = totalBytes - 1; + if (ctx.header(Header.RANGE) == null) { + ctx.res.setContentType(mediaType); + // Javalin's version of this method doesn't set the content-length + // Not setting the content-length makes the servlet container use Transfer-Encoding=chunked. + // Chunked is a worse experience overall, seems like we should just set the length if we know it. + writeRange(ctx.res.getOutputStream(), is, from, Math.min(to, totalBytes - 1)); + } else { + int chunkSize = 128000; + String rangeHeader = ctx.header(Header.RANGE); + String[] eqSplit = rangeHeader.split("=", 2); + String[] dashSplit = eqSplit[1].split("-", -1); // keep empty trailing part + + List requestedRange = Arrays.stream(dashSplit) + .filter(s -> !s.isEmpty()) + .collect(java.util.stream.Collectors.toList()); + + from = Long.parseLong(requestedRange.get(0)); + + if (from + chunkSize > totalBytes) { + // chunk bigger than file, write all + to = totalBytes - 1; + } else if (requestedRange.size() == 2) { + // chunk smaller than file, to/from specified + to = Long.parseLong(requestedRange.get(1)); + } else { + // chunk smaller than file, to/from not specified + to = from + chunkSize - 1; + } + + ctx.status(206); + + ctx.header(Header.ACCEPT_RANGES, "bytes"); + ctx.header(Header.CONTENT_RANGE, "bytes " + from + "-" + to + "/" + totalBytes); + + ctx.res.setContentType(mediaType); + ctx.header(Header.CONTENT_LENGTH, String.valueOf(Math.min(to - from + 1, totalBytes))); + writeRange(ctx.res.getOutputStream(), is, from, Math.min(to, totalBytes - 1)); + } + } + + + public static void writeRange(OutputStream out, InputStream in, long from, long to) throws IOException { + writeRange(out, in, from, to, new byte[8192]); + } + + public static void writeRange(OutputStream out, InputStream is, long from, long to, byte[] buffer) throws IOException { + long toSkip = from; + while (toSkip > 0) { + long skipped = is.skip(toSkip); + toSkip -= skipped; + } + + long bytesLeft = to - from + 1; + while (bytesLeft != 0L) { + int maxRead = (int) Math.min(buffer.length, bytesLeft); + int read = is.read(buffer, 0, maxRead); + if (read == -1) { + break; + } + out.write(buffer, 0, read); + bytesLeft -= read; + } + + } + +} diff --git a/cwms-data-api/src/main/java/cwms/cda/api/TextTimeSeriesValueController.java b/cwms-data-api/src/main/java/cwms/cda/api/TextTimeSeriesValueController.java index 45a3757539..72857b8bd5 100644 --- a/cwms-data-api/src/main/java/cwms/cda/api/TextTimeSeriesValueController.java +++ b/cwms-data-api/src/main/java/cwms/cda/api/TextTimeSeriesValueController.java @@ -99,8 +99,9 @@ public void handle(Context ctx) { } else { long size = clob.length(); requestResultSize.update(size); - InputStream is = clob.getAsciiStream(); - ctx.seekableStream(is, TEXT_PLAIN, size); + try(InputStream is = clob.getAsciiStream()){ + RangeRequestUtil.seekableStream(ctx, is, TEXT_PLAIN, size); + } } }); } diff --git a/cwms-data-api/src/main/java/cwms/cda/data/dao/BlobDao.java b/cwms-data-api/src/main/java/cwms/cda/data/dao/BlobDao.java index d6cec59683..ea1b12b93f 100644 --- a/cwms-data-api/src/main/java/cwms/cda/data/dao/BlobDao.java +++ b/cwms-data-api/src/main/java/cwms/cda/data/dao/BlobDao.java @@ -71,14 +71,22 @@ public void getBlob(String id, String office, BlobConsumer consumer) { // what we want javalin to do with the stream as a consumer. // - dsl.connection(connection -> { + connection(dsl, connection -> { try (PreparedStatement preparedStatement = connection.prepareStatement(BLOB_WITH_OFFICE)) { preparedStatement.setString(1, office); preparedStatement.setString(2, id); try (ResultSet resultSet = preparedStatement.executeQuery()) { if (resultSet.next()) { - handleResultSet(resultSet, consumer); + String mediaType = resultSet.getString("MEDIA_TYPE_ID"); + java.sql.Blob blob = resultSet.getBlob("VALUE"); + try { + consumer.accept(blob, mediaType); + } finally { + if (blob != null) { + blob.free(); + } + } } else { throw new NotFoundException("Unable to find blob with id " + id + " in office " + office); } @@ -89,13 +97,21 @@ public void getBlob(String id, String office, BlobConsumer consumer) { public void getBlob(String id, BlobConsumer consumer) { - dsl.connection(connection -> { + connection(dsl, connection -> { try (PreparedStatement preparedStatement = connection.prepareStatement(BLOB_QUERY)) { preparedStatement.setString(1, id); try (ResultSet resultSet = preparedStatement.executeQuery()) { if (resultSet.next()) { - handleResultSet(resultSet, consumer); + String mediaType = resultSet.getString("MEDIA_TYPE_ID"); + java.sql.Blob blob = resultSet.getBlob("VALUE"); + try { + consumer.accept(blob, mediaType); + } finally { + if (blob != null) { + blob.free(); + } + } } else { throw new NotFoundException("Unable to find blob with id " + id); } @@ -104,25 +120,12 @@ public void getBlob(String id, BlobConsumer consumer) { }); } - private static void handleResultSet(ResultSet resultSet, BlobConsumer consumer) throws SQLException, IOException { - String mediaType = resultSet.getString("MEDIA_TYPE_ID"); - java.sql.Blob blob = resultSet.getBlob("VALUE"); - try { - consumer.accept(blob, mediaType); - } finally { - if (blob != null) { - blob.free(); - } - } - } - - public List getAll(String officeId, String like) { + public List getAll(String officeId, String like) { String queryStr = "SELECT AT_BLOB.ID, AT_BLOB.DESCRIPTION, CWMS_MEDIA_TYPE.MEDIA_TYPE_ID, CWMS_OFFICE.OFFICE_ID\n" + " FROM CWMS_20.AT_BLOB \n" + "join CWMS_20.CWMS_MEDIA_TYPE on AT_BLOB.MEDIA_TYPE_CODE = CWMS_MEDIA_TYPE.MEDIA_TYPE_CODE \n" + "join CWMS_20.CWMS_OFFICE on AT_BLOB.OFFICE_CODE = CWMS_OFFICE.OFFICE_CODE \n" - + " where REGEXP_LIKE (upper(AT_BLOB.ID), upper(?))" - ; + + " where REGEXP_LIKE (upper(AT_BLOB.ID), upper(?))"; ResultQuery query; if (officeId != null) { @@ -147,22 +150,22 @@ public void create(Blob blob, boolean failIfExists, boolean ignoreNulls) { String pIgnoreNulls = formatBool(ignoreNulls); connection(dsl, c -> - CWMS_TEXT_PACKAGE.call_STORE_BINARY( - getDslContext(c, blob.getOfficeId()).configuration(), - blob.getValue(), - blob.getId(), - blob.getMediaTypeId(), - blob.getDescription(), - pFailIfExists, - pIgnoreNulls, - blob.getOfficeId())); + CWMS_TEXT_PACKAGE.call_STORE_BINARY( + getDslContext(c, blob.getOfficeId()).configuration(), + blob.getValue(), + blob.getId(), + blob.getMediaTypeId(), + blob.getDescription(), + pFailIfExists, + pIgnoreNulls, + blob.getOfficeId())); } public void update(Blob blob, boolean ignoreNulls) { String pFailIfExists = formatBool(false); String pIgnoreNulls = formatBool(ignoreNulls); - if(blob == null){ + if (blob == null) { throw new NotFoundException("Null blob provided to update"); } diff --git a/cwms-data-api/src/test/java/cwms/cda/api/BlobControllerTestIT.java b/cwms-data-api/src/test/java/cwms/cda/api/BlobControllerTestIT.java index ae3e6eaae1..7939290bbe 100644 --- a/cwms-data-api/src/test/java/cwms/cda/api/BlobControllerTestIT.java +++ b/cwms-data-api/src/test/java/cwms/cda/api/BlobControllerTestIT.java @@ -45,8 +45,8 @@ static void createExistingBlob() throws Exception .contentType(Formats.JSONV2) .body(serializedBlob) .header("Authorization",user.toHeaderValue()) - .queryParam("office",SPK) - .queryParam("fail-if-exists",false) + .queryParam(Controllers.OFFICE,SPK) + .queryParam(Controllers.FAIL_IF_EXISTS,false) .when() .redirects().follow(true) .redirects().max(3) @@ -117,12 +117,12 @@ void test_blob_range() { // We can now do Range requests! given() - .log().ifValidationFails(LogDetail.ALL,true) - .queryParam(Controllers.OFFICE, SPK) - .header("Range"," bytes=3-") - .when() - .get("/blobs/" + EXISTING_BLOB_ID) - .then() + .log().ifValidationFails(LogDetail.ALL, true) + .queryParam(Controllers.OFFICE, SPK) + .header("Range", " bytes=3-") + .when() + .get("/blobs/" + EXISTING_BLOB_ID) + .then() .log().ifValidationFails(LogDetail.ALL,true) .assertThat() .statusCode(is(HttpServletResponse.SC_PARTIAL_CONTENT)) From eb00286bac203f9c7bb74581259474588939df1f Mon Sep 17 00:00:00 2001 From: ryan Date: Fri, 29 Aug 2025 10:09:39 -0700 Subject: [PATCH 5/5] removeAbandonedOnBorrow and removeAbandonedOn aren't options I can find in: https://tomcat.apache.org/tomcat-9.0-doc/jdbc-pool.html I think they are meant to be removeAbandoned. --- cwms-data-api/src/test/resources/tomcat/conf/context.xml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cwms-data-api/src/test/resources/tomcat/conf/context.xml b/cwms-data-api/src/test/resources/tomcat/conf/context.xml index 1158f62665..b0f3478149 100644 --- a/cwms-data-api/src/test/resources/tomcat/conf/context.xml +++ b/cwms-data-api/src/test/resources/tomcat/conf/context.xml @@ -9,13 +9,12 @@ minIdle="${CDA_POOL_MIN_IDLE}" validationQuery="select 1 from dual" validationQueryTimeout="1" - removeAbandonedOnBorrow="true" + removeAbandoned="true" removeAbandonedTimeout="10" testOnBorrow="true" testOnConnect="true" testWhileIdle="true" timeBetweenEvictionRunsMillis="5000" - removeAbandonedOn="true" logValidationErrors="true" suspectTimeout="15" jdbcInterceptors="QueryTimeoutInterceptor(queryTimeout=600);StatementFinalizer(trace=true);StatementCache(callable=true);SlowQueryReport(threshold=5000);ResetAbandonedTimer"