Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@

public class Cache {

private static final String APP_CODE = "prebid-Java";
private static final String APPLICATION = "optable-targeting";
private static final String APPLICATION = "prebid-Java";
private static final String APP_CODE = "optable-targeting";

private final PbcStorageService cacheService;
private final JacksonMapper mapper;
Expand Down
61 changes: 51 additions & 10 deletions src/main/java/org/prebid/server/cache/BasicPbcStorageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@
import org.prebid.server.exception.PreBidException;
import org.prebid.server.json.DecodeException;
import org.prebid.server.json.JacksonMapper;
import org.prebid.server.metric.MetricName;
import org.prebid.server.metric.Metrics;
import org.prebid.server.util.HttpUtil;
import org.prebid.server.vertx.httpclient.HttpClient;

import java.net.URL;
import java.time.Clock;
import java.util.Objects;

public class BasicPbcStorageService implements PbcStorageService {
Expand All @@ -27,18 +30,24 @@ public class BasicPbcStorageService implements PbcStorageService {
private final String apiKey;
private final int callTimeoutMs;
private final JacksonMapper mapper;
private final Clock clock;
private final Metrics metrics;

public BasicPbcStorageService(HttpClient httpClient,
URL endpointUrl,
String apiKey,
int callTimeoutMs,
JacksonMapper mapper) {
JacksonMapper mapper,
Clock clock,
Metrics metrics) {

this.httpClient = Objects.requireNonNull(httpClient);
this.endpointUrl = Objects.requireNonNull(endpointUrl);
this.apiKey = Objects.requireNonNull(apiKey);
this.callTimeoutMs = callTimeoutMs;
this.mapper = Objects.requireNonNull(mapper);
this.clock = Objects.requireNonNull(clock);
this.metrics = metrics;
Comment thread
AntoxaAntoxic marked this conversation as resolved.
Outdated
}

@Override
Expand All @@ -55,20 +64,28 @@ public Future<Void> storeEntry(String key,
return Future.failedFuture(e);
}

final String valueToStore = prepareValueForStoring(value, type);
final ModuleCacheRequest moduleCacheRequest =
ModuleCacheRequest.of(
constructEntryKey(key, appCode),
type,
prepareValueForStoring(value, type),
valueToStore,
application,
ttlseconds);

updateCreativeMetrics(valueToStore, type, ttlseconds, appCode);

final long startTime = clock.millis();
return httpClient.post(
endpointUrl.toString(),
securedCallHeaders(),
mapper.encodeToString(moduleCacheRequest),
callTimeoutMs)
.compose(response -> processStoreResponse(response.getStatusCode(), response.getBody()));
.compose(response -> processStoreResponse(
response.getStatusCode(),
response.getBody(),
startTime,
appCode));

}

Expand Down Expand Up @@ -99,6 +116,19 @@ private static void validateStoreData(String key,
}
}

private void updateCreativeMetrics(String value, StorageDataType type, Integer ttlseconds, String appCode) {
final MetricName metricName = switch (type) {
case XML -> MetricName.xml;
case JSON -> MetricName.json;
case TEXT -> MetricName.text;
};

metrics.updateModuleStorageCacheEntrySize(appCode, value.length(), metricName);
if (ttlseconds != null) {
metrics.updateModuleStorageCacheEntryTtl(appCode, ttlseconds, metricName);
}
}

private static String prepareValueForStoring(String value, StorageDataType type) {
return type == StorageDataType.TEXT
? new String(Base64.encodeBase64(value.getBytes()))
Expand All @@ -114,31 +144,35 @@ private String constructEntryKey(String key, String moduleCode) {
return MODULE_KEY_PREFIX + MODULE_KEY_DELIMETER + moduleCode + MODULE_KEY_DELIMETER + key;
}

private Future<Void> processStoreResponse(int statusCode, String responseBody) {
private Future<Void> processStoreResponse(int statusCode, String responseBody, long startTime, String appCode) {
if (statusCode != 204) {
metrics.updateModuleStorageCacheWriteRequestTime(appCode, clock.millis() - startTime, MetricName.err);
throw new PreBidException("HTTP status code: '%s', body: '%s' "
.formatted(statusCode, responseBody));
}

metrics.updateModuleStorageCacheWriteRequestTime(appCode, clock.millis() - startTime, MetricName.ok);
return Future.succeededFuture();
}

@Override
public Future<ModuleCacheResponse> retrieveEntry(String key,
String appCode,
String application) {

public Future<ModuleCacheResponse> retrieveEntry(String key, String appCode, String application) {
try {
validateRetrieveData(key, application, appCode);
} catch (PreBidException e) {
return Future.failedFuture(e);
}

final long startTime = clock.millis();
return httpClient.get(
getRetrieveEndpoint(key, appCode, application),
securedCallHeaders(),
callTimeoutMs)
.map(response -> toModuleCacheResponse(response.getStatusCode(), response.getBody()));
.map(response -> toModuleCacheResponse(
response.getStatusCode(),
response.getBody(),
startTime,
appCode));

}

Expand All @@ -165,11 +199,18 @@ private String getRetrieveEndpoint(String key,
+ "&a=" + application;
}

private ModuleCacheResponse toModuleCacheResponse(int statusCode, String responseBody) {
private ModuleCacheResponse toModuleCacheResponse(int statusCode,
String responseBody,
long startTime,
String application) {

if (statusCode != 200) {
metrics.updateModuleStorageCacheReadRequestTime(application, clock.millis() - startTime, MetricName.err);
throw new PreBidException("HTTP status code " + statusCode);
}

metrics.updateModuleStorageCacheReadRequestTime(application, clock.millis() - startTime, MetricName.ok);

final ModuleCacheResponse moduleCacheResponse;
try {
moduleCacheResponse = mapper.decodeValue(responseBody, ModuleCacheResponse.class);
Expand Down
118 changes: 98 additions & 20 deletions src/main/java/org/prebid/server/cache/CoreCacheService.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.utils.URIBuilder;
import org.prebid.server.auction.model.AuctionContext;
import org.prebid.server.auction.model.BidInfo;
import org.prebid.server.auction.model.CachedDebugLog;
Expand All @@ -22,6 +23,7 @@
import org.prebid.server.cache.model.DebugHttpCall;
import org.prebid.server.cache.proto.request.bid.BidCacheRequest;
import org.prebid.server.cache.proto.request.bid.BidPutObject;
import org.prebid.server.cache.proto.response.CacheErrorResponse;
import org.prebid.server.cache.proto.response.bid.BidCacheResponse;
import org.prebid.server.cache.proto.response.bid.CacheObject;
import org.prebid.server.cache.utils.CacheServiceUtil;
Expand All @@ -45,6 +47,7 @@
import org.prebid.server.vertx.httpclient.HttpClient;
import org.prebid.server.vertx.httpclient.model.HttpClientResponse;

import java.net.URISyntaxException;
import java.net.URL;
import java.time.Clock;
import java.util.ArrayList;
Expand All @@ -55,6 +58,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -66,6 +70,8 @@ public class CoreCacheService {
private static final String BID_WURL_ATTRIBUTE = "wurl";
private static final String TRACE_INFO_SEPARATOR = "-";
private static final int MAX_DATACENTER_REGION_LENGTH = 4;
private static final String UUID_QUERY_PARAMETER = "uuid";
private static final String CH_QUERY_PARAMETER = "ch";

private final HttpClient httpClient;
private final URL externalEndpointUrl;
Expand Down Expand Up @@ -186,18 +192,20 @@ private Future<BidCacheResponse> makeRequest(BidCacheRequest bidCacheRequest,
cacheHeaders,
mapper.encodeToString(bidCacheRequest),
remainingTimeout)
.map(response -> toBidCacheResponse(
.map(response -> processVtrackWriteCacheResponse(
response.getStatusCode(), response.getBody(), bidCount, accountId, startTime))
.recover(exception -> failResponse(exception, accountId, startTime));
.recover(exception -> failVtrackCacheWriteResponse(exception, accountId, startTime));
}

private Future<BidCacheResponse> failResponse(Throwable exception, String accountId, long startTime) {
metrics.updateCacheRequestFailedTime(accountId, clock.millis() - startTime);
private BidCacheResponse processVtrackWriteCacheResponse(int statusCode,
String responseBody,
int bidCount,
String accountId,
long startTime) {

logger.warn("Error occurred while interacting with cache service: {}", exception.getMessage());
logger.debug("Error occurred while interacting with cache service", exception);

return Future.failedFuture(exception);
final BidCacheResponse bidCacheResponse = toBidCacheResponse(statusCode, responseBody, bidCount);
metrics.updateVtrackCacheWriteRequestTime(accountId, clock.millis() - startTime, MetricName.ok);
return bidCacheResponse;
}

public Future<BidCacheResponse> cachePutObjects(List<BidPutObject> bidPutObjects,
Expand All @@ -210,7 +218,10 @@ public Future<BidCacheResponse> cachePutObjects(List<BidPutObject> bidPutObjects
final List<CachedCreative> cachedCreatives =
updatePutObjects(bidPutObjects, isEventsEnabled, biddersAllowingVastUpdate, accountId, integration);

updateCreativeMetrics(accountId, cachedCreatives);
updateCreativeMetrics(
cachedCreatives,
(ttl, type) -> metrics.updateVtrackCacheCreativeTtl(accountId, ttl, type),
(size, type) -> metrics.updateVtrackCacheCreativeSize(accountId, size, type));

return makeRequest(toBidCacheRequest(cachedCreatives), cachedCreatives.size(), timeout, accountId);
}
Expand Down Expand Up @@ -309,7 +320,10 @@ private Future<CacheServiceResult> doCacheOpenrtb(List<CacheBid> bids,

final BidCacheRequest bidCacheRequest = toBidCacheRequest(cachedCreatives);

updateCreativeMetrics(accountId, cachedCreatives);
updateCreativeMetrics(
cachedCreatives,
(ttl, type) -> metrics.updateCacheCreativeTtl(accountId, ttl, type),
(size, type) -> metrics.updateCacheCreativeSize(accountId, size, type));

final String url = ObjectUtils.firstNonNull(internalEndpointUrl, externalEndpointUrl).toString();
final String body = mapper.encodeToString(bidCacheRequest);
Expand Down Expand Up @@ -343,8 +357,8 @@ private CacheServiceResult processResponseOpenrtb(HttpClientResponse response,
externalEndpointUrl.toString(), httpRequest, httpResponse, startTime);
final BidCacheResponse bidCacheResponse;
try {
bidCacheResponse = toBidCacheResponse(
responseStatusCode, response.getBody(), bidCount, accountId, startTime);
bidCacheResponse = toBidCacheResponse(responseStatusCode, response.getBody(), bidCount);
metrics.updateAuctionCacheRequestTime(accountId, clock.millis() - startTime, MetricName.ok);
} catch (PreBidException e) {
return CacheServiceResult.of(httpCall, e, Collections.emptyMap());
}
Expand All @@ -361,7 +375,7 @@ private CacheServiceResult failResponseOpenrtb(Throwable exception,
logger.warn("Error occurred while interacting with cache service: {}", exception.getMessage());
logger.debug("Error occurred while interacting with cache service", exception);

metrics.updateCacheRequestFailedTime(accountId, clock.millis() - startTime);
metrics.updateAuctionCacheRequestTime(accountId, clock.millis() - startTime, MetricName.err);

final DebugHttpCall httpCall = makeDebugHttpCall(externalEndpointUrl.toString(), request, null, startTime);
return CacheServiceResult.of(httpCall, exception, Collections.emptyMap());
Expand Down Expand Up @@ -460,9 +474,7 @@ private String generateWinUrl(String bidId,

private BidCacheResponse toBidCacheResponse(int statusCode,
String responseBody,
int bidCount,
String accountId,
long startTime) {
int bidCount) {

if (statusCode != 200) {
throw new PreBidException("HTTP status code " + statusCode);
Expand All @@ -480,7 +492,6 @@ private BidCacheResponse toBidCacheResponse(int statusCode,
throw new PreBidException("The number of response cache objects doesn't match with bids");
}

metrics.updateCacheRequestSuccessTime(accountId, clock.millis() - startTime);
return bidCacheResponse;
}

Expand Down Expand Up @@ -538,17 +549,20 @@ private static String resolveVideoBidUuid(String uuid, String hbCacheId) {
return hbCacheId != null && uuid.endsWith(hbCacheId) ? hbCacheId : uuid;
}

private void updateCreativeMetrics(String accountId, List<CachedCreative> cachedCreatives) {
private void updateCreativeMetrics(List<CachedCreative> cachedCreatives,
BiConsumer<Integer, MetricName> updateCreativeTtlMetric,
BiConsumer<Integer, MetricName> updateCreativeSiseMetric) {

for (CachedCreative cachedCreative : cachedCreatives) {
final BidPutObject payload = cachedCreative.getPayload();
final MetricName creativeType = resolveCreativeTypeName(payload);
final Integer creativeTtl = ObjectUtils.defaultIfNull(payload.getTtlseconds(), payload.getExpiry());

if (creativeTtl != null) {
metrics.updateCacheCreativeTtl(accountId, creativeTtl, creativeType);
updateCreativeTtlMetric.accept(creativeTtl, creativeType);
}

metrics.updateCacheCreativeSize(accountId, cachedCreative.getSize(), creativeType);
updateCreativeSiseMetric.accept(cachedCreative.getSize(), creativeType);
}
}

Expand Down Expand Up @@ -627,4 +641,68 @@ private static String normalizeDatacenterRegion(String datacenterRegion) {
? trimmedDatacenterRegion.substring(0, MAX_DATACENTER_REGION_LENGTH)
: trimmedDatacenterRegion;
}

public Future<HttpClientResponse> getCachedObject(String key, String ch, Timeout timeout) {
final long remainingTimeout = timeout.remaining();
if (remainingTimeout <= 0) {
return Future.failedFuture(new TimeoutException("Timeout has been exceeded"));
}

final URL endpointUrl = ObjectUtils.firstNonNull(internalEndpointUrl, externalEndpointUrl);
final String url;
try {
final URIBuilder uriBuilder = new URIBuilder(endpointUrl.toString());
uriBuilder.addParameter(UUID_QUERY_PARAMETER, key);
if (StringUtils.isNotBlank(ch)) {
uriBuilder.addParameter(CH_QUERY_PARAMETER, ch);
}
url = uriBuilder.build().toString();
} catch (URISyntaxException e) {
return Future.failedFuture(new IllegalArgumentException("Configured cache url is malformed", e));
}

final long startTime = clock.millis();
return httpClient.get(url, cacheHeaders, remainingTimeout)
.map(response -> processVtrackReadResponse(response, startTime))
.recover(exception -> failVtrackCacheReadResponse(exception, startTime));
}

private HttpClientResponse processVtrackReadResponse(HttpClientResponse response, long startTime) {
final int statusCode = response.getStatusCode();
final String body = response.getBody();

if (statusCode == 200) {
metrics.updateVtrackCacheReadRequestTime(clock.millis() - startTime, MetricName.ok);
return response;
}

try {
final CacheErrorResponse errorResponse = mapper.decodeValue(body, CacheErrorResponse.class);
metrics.updateVtrackCacheReadRequestTime(clock.millis() - startTime, MetricName.err);
return HttpClientResponse.of(statusCode, response.getHeaders(), errorResponse.getMessage());
} catch (DecodeException e) {
throw new PreBidException("Cannot parse response: " + body, e);
}
}

private <T> Future<T> failVtrackCacheWriteResponse(Throwable exception, String accountId, long startTime) {
if (exception instanceof PreBidException) {
metrics.updateVtrackCacheWriteRequestTime(accountId, clock.millis() - startTime, MetricName.err);
}
return failResponse(exception);
}

private <T> Future<T> failVtrackCacheReadResponse(Throwable exception, long startTime) {
if (exception instanceof PreBidException) {
metrics.updateVtrackCacheReadRequestTime(clock.millis() - startTime, MetricName.err);
}
return failResponse(exception);
}

private static <T> Future<T> failResponse(Throwable exception) {
logger.warn("Error occurred while interacting with cache service: {}", exception.getMessage());
logger.debug("Error occurred while interacting with cache service", exception);

return Future.failedFuture(exception);
}
}
Loading