Skip to content

Commit a660c4e

Browse files
ShawnMcKeemksahakyan
authored andcommitted
firefly: address PR 8044 review fixes
Signed-off-by: Shawn McKee <smckee@umich.edu> (cherry picked from commit db22839) Signed-off-by: Tigran Mkrtchyan <tigran.mkrtchyan@desy.de>
1 parent 2eca064 commit a660c4e

5 files changed

Lines changed: 38 additions & 113 deletions

File tree

modules/dcache-webdav/src/main/java/org/dcache/webdav/DcacheResourceFactory.java

Lines changed: 21 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -177,23 +177,19 @@ public class DcacheResourceFactory
177177
private static final Logger SCITAGS_LOGGER =
178178
LoggerFactory.getLogger("org.dcache.scitags");
179179

180-
static Optional<Map.Entry<String, String>> findHeaderIgnoreCase(HttpServletRequest request,
180+
static Optional<String> findHeaderIgnoreCase(HttpServletRequest request,
181181
String expectedHeaderName) {
182182
Enumeration<String> headerNames = request.getHeaderNames();
183183
if (headerNames != null) {
184184
while (headerNames.hasMoreElements()) {
185185
String actualHeaderName = headerNames.nextElement();
186186
if (actualHeaderName.equalsIgnoreCase(expectedHeaderName)) {
187-
return Optional.of(Map.entry(actualHeaderName,
188-
request.getHeader(actualHeaderName)));
187+
return Optional.ofNullable(request.getHeader(actualHeaderName));
189188
}
190189
}
191190
}
192191

193-
String headerValue = request.getHeader(expectedHeaderName);
194-
return headerValue == null
195-
? Optional.empty()
196-
: Optional.of(Map.entry(expectedHeaderName, headerValue));
192+
return Optional.ofNullable(request.getHeader(expectedHeaderName));
197193
}
198194

199195
private static final XMLOutputFactory XML_OUTPUT_FACTORY = XMLOutputFactory.newFactory();
@@ -1741,43 +1737,45 @@ public HttpTransfer(PnfsHandler pnfs, Subject subject,
17411737
}
17421738

17431739
private String readTransferTag(HttpServletRequest request) {
1740+
String door = getCellName() + '@' + getCellDomainName();
1741+
17441742
// SciTag takes precedence because it is checked first.
17451743
for (String header : SCITAG_HEADERS) {
1746-
var matchedHeader = findHeaderIgnoreCase(request, header);
1747-
if (matchedHeader.isPresent()) {
1748-
String transferTag = matchedHeader.get().getValue();
1749-
if (transferTag != null && !transferTag.isBlank()) {
1750-
return logSciTagsRequest(request,
1751-
matchedHeader.get().getKey() + "-header",
1752-
transferTag.trim());
1753-
}
1744+
var transferTag = findHeaderIgnoreCase(request, header)
1745+
.map(String::trim)
1746+
.filter(tag -> !tag.isEmpty());
1747+
if (transferTag.isPresent()) {
1748+
logSciTagsRequest(request, door, header + "-header", transferTag.get());
1749+
return transferTag.get();
17541750
}
17551751
}
17561752

1757-
String flowFromQuery = request.getParameter("scitag.flow");
1758-
if (flowFromQuery != null && !flowFromQuery.isBlank()) {
1759-
return logSciTagsRequest(request, "scitag.flow-query", flowFromQuery.trim());
1753+
var flowFromQuery = Optional.ofNullable(request.getParameter("scitag.flow"))
1754+
.map(String::trim)
1755+
.filter(tag -> !tag.isEmpty());
1756+
if (flowFromQuery.isPresent()) {
1757+
logSciTagsRequest(request, door, "scitag.flow-query", flowFromQuery.get());
1758+
return flowFromQuery.get();
17601759
}
17611760

1762-
logSciTagsRequest(request, "none", "");
1761+
logSciTagsRequest(request, door, "none", "");
17631762
return "";
17641763
}
17651764

1766-
private String logSciTagsRequest(HttpServletRequest request, String tagSource,
1767-
String transferTag) {
1765+
private static void logSciTagsRequest(HttpServletRequest request, String door,
1766+
String tagSource, String transferTag) {
17681767
if (SCITAGS_LOGGER.isDebugEnabled()) {
17691768
SCITAGS_LOGGER.debug(
17701769
"scitags event=request protocol={} door={} remote={} method={} alias={} local={} tagSource={} transferTag={}",
17711770
request.isSecure() ? PROTOCOL_INFO_SSL_NAME : PROTOCOL_INFO_NAME,
1772-
getCellName() + '@' + getCellDomainName(),
1771+
door,
17731772
request.getRemoteAddr(),
17741773
request.getMethod(),
17751774
request.getServerName(),
17761775
request.getLocalAddr(),
17771776
tagSource,
17781777
transferTag.isEmpty() ? "-" : transferTag);
17791778
}
1780-
return transferTag;
17811779
}
17821780

17831781
protected ProtocolInfo createProtocolInfo(InetSocketAddress address) {

modules/dcache-webdav/src/test/java/org/dcache/webdav/DcacheResourceFactoryTest.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
import java.util.Collections;
99
import java.util.List;
10-
import java.util.Map;
1110
import java.util.Optional;
1211
import javax.servlet.http.HttpServletRequest;
1312
import org.junit.Test;
@@ -20,12 +19,11 @@ public void shouldFindSciTagHeaderIgnoringCase() {
2019
given(request.getHeaderNames()).willReturn(Collections.enumeration(List.of("scitag")));
2120
given(request.getHeader("scitag")).willReturn("313");
2221

23-
Optional<Map.Entry<String, String>> header =
22+
Optional<String> header =
2423
DcacheResourceFactory.findHeaderIgnoreCase(request, "SciTag");
2524

2625
assertThat(header.isPresent(), is(true));
27-
assertThat(header.get().getKey(), is("scitag"));
28-
assertThat(header.get().getValue(), is("313"));
26+
assertThat(header.get(), is("313"));
2927
}
3028

3129
@Test
@@ -35,12 +33,11 @@ public void shouldFindTransferHeaderSciTagIgnoringCase() {
3533
Collections.enumeration(List.of("transferheaderscitag")));
3634
given(request.getHeader("transferheaderscitag")).willReturn("777");
3735

38-
Optional<Map.Entry<String, String>> header =
36+
Optional<String> header =
3937
DcacheResourceFactory.findHeaderIgnoreCase(request, "TransferHeaderSciTag");
4038

4139
assertThat(header.isPresent(), is(true));
42-
assertThat(header.get().getKey(), is("transferheaderscitag"));
43-
assertThat(header.get().getValue(), is("777"));
40+
assertThat(header.get(), is("777"));
4441
}
4542

4643
@Test
@@ -49,11 +46,10 @@ public void shouldFallbackToServletHeaderLookup() {
4946
given(request.getHeaderNames()).willReturn(null);
5047
given(request.getHeader("SciTag")).willReturn("313");
5148

52-
Optional<Map.Entry<String, String>> header =
49+
Optional<String> header =
5350
DcacheResourceFactory.findHeaderIgnoreCase(request, "SciTag");
5451

5552
assertThat(header.isPresent(), is(true));
56-
assertThat(header.get().getKey(), is("SciTag"));
57-
assertThat(header.get().getValue(), is("313"));
53+
assertThat(header.get(), is("313"));
5854
}
59-
}
55+
}

modules/dcache/src/main/java/org/dcache/pool/classic/AbstractMoverProtocolTransferService.java

Lines changed: 0 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,18 @@
2020
import com.google.common.base.Throwables;
2121
import com.google.common.util.concurrent.ThreadFactoryBuilder;
2222
import diskCacheV111.util.CacheException;
23-
import diskCacheV111.vehicles.IpProtocolInfo;
2423
import diskCacheV111.vehicles.PoolIoFileMessage;
2524
import diskCacheV111.vehicles.ProtocolInfo;
2625
import dmg.cells.nucleus.AbstractCellComponent;
2726
import dmg.cells.nucleus.CellInfoProvider;
2827
import dmg.cells.nucleus.CellPath;
2928
import java.io.IOException;
3029
import java.io.InterruptedIOException;
31-
import java.net.InetSocketAddress;
3230
import java.io.SyncFailedException;
3331
import java.lang.reflect.InvocationTargetException;
3432
import java.nio.channels.ClosedChannelException;
3533
import java.nio.channels.CompletionHandler;
3634
import java.nio.file.StandardOpenOption;
37-
import java.util.Optional;
3835
import java.util.concurrent.ExecutorService;
3936
import java.util.concurrent.Executors;
4037
import org.dcache.pool.movers.ChecksumMover;
@@ -199,7 +196,6 @@ private void runMoverForRead(RepositoryChannel fileIoChannel) throws Exception {
199196
_mover.getMover()
200197
.runIO(_mover.getFileAttributes(), fileIoChannel, _mover.getProtocolInfo(),
201198
_mover.getIoMode());
202-
reportTransferStart();
203199
}
204200

205201
private void tryToSync(RepositoryChannel channel) throws IOException {
@@ -219,68 +215,11 @@ private void runMoverForWrite(RepositoryChannel fileIoChannel) throws Exception
219215
_mover.getMover()
220216
.runIO(_mover.getFileAttributes(), fileIoChannel, _mover.getProtocolInfo(),
221217
_mover.getIoMode());
222-
reportTransferStart();
223218
} finally {
224219
tryToSync(fileIoChannel);
225220
}
226221
}
227222

228-
private void reportTransferStart() {
229-
if (_transferLifeCycle == null) {
230-
SCITAGS_LOGGER.debug(
231-
"scitags lifecycle=start skip reason=no-transfer-lifecycle protocol={} pnfsid={} transferTag={}",
232-
protocolName(),
233-
_mover.getFileAttributes().getPnfsId(),
234-
transferTag());
235-
return;
236-
}
237-
238-
if (!(_mover.getProtocolInfo() instanceof IpProtocolInfo ipProtocolInfo)) {
239-
SCITAGS_LOGGER.debug(
240-
"scitags lifecycle=start skip reason=non-ip-protocol protocol={} pnfsid={} transferTag={}",
241-
protocolName(),
242-
_mover.getFileAttributes().getPnfsId(),
243-
transferTag());
244-
return;
245-
}
246-
247-
InetSocketAddress remoteEndpoint = ipProtocolInfo.getSocketAddress();
248-
if (remoteEndpoint == null) {
249-
SCITAGS_LOGGER.debug(
250-
"scitags lifecycle=start skip reason=no-remote-endpoint protocol={} pnfsid={} transferTag={}",
251-
protocolName(),
252-
_mover.getFileAttributes().getPnfsId(),
253-
transferTag());
254-
return;
255-
}
256-
257-
Optional<InetSocketAddress> localEndpoint = _mover.getLocalEndpoint();
258-
if (localEndpoint.isEmpty()) {
259-
SCITAGS_LOGGER.debug(
260-
"scitags lifecycle=start skip reason=no-local-endpoint protocol={} pnfsid={} remote={} transferTag={}",
261-
protocolName(),
262-
_mover.getFileAttributes().getPnfsId(),
263-
formatEndpoint(remoteEndpoint),
264-
transferTag());
265-
return;
266-
}
267-
268-
InetSocketAddress local = localEndpoint.get();
269-
SCITAGS_LOGGER.debug(
270-
"scitags lifecycle=start invoke protocol={} pnfsid={} remote={} local={} transferTag={}",
271-
protocolName(),
272-
_mover.getFileAttributes().getPnfsId(),
273-
formatEndpoint(remoteEndpoint),
274-
formatEndpoint(local),
275-
transferTag());
276-
277-
_transferLifeCycle.onStart(
278-
remoteEndpoint,
279-
local,
280-
_mover.getProtocolInfo(),
281-
_mover.getSubject());
282-
}
283-
284223
private String protocolName() {
285224
return _mover.getProtocolInfo().getProtocol().toLowerCase();
286225
}
@@ -290,18 +229,6 @@ private String transferTag() {
290229
return transferTag == null || transferTag.isEmpty() ? "-" : transferTag;
291230
}
292231

293-
private String formatEndpoint(InetSocketAddress endpoint) {
294-
if (endpoint == null) {
295-
return "-";
296-
}
297-
298-
if (endpoint.getAddress() != null) {
299-
return endpoint.getAddress().getHostAddress() + ":" + endpoint.getPort();
300-
}
301-
302-
return endpoint.getHostString() + ":" + endpoint.getPort();
303-
}
304-
305232
private String formatError(Throwable t) {
306233
String message = t.getMessage();
307234
return message == null || message.isEmpty()

modules/dcache/src/main/java/org/dcache/pool/movers/TransferLifeCycle.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -165,9 +165,9 @@ public void onEnd(InetSocketAddress src, InetSocketAddress dst, MoverInfoMessage
165165
}
166166

167167
int activityId = getActivity(protocolInfo);
168-
long transferDurationMillis = Math.max(1L, mover.getConnectionTime());
169-
Instant finishedAt = Instant.now();
170-
Instant startedAt = finishedAt.minusMillis(transferDurationMillis);
168+
long transferDurationMillis = Math.max(0L, mover.getConnectionTime());
169+
Instant startedAt = Instant.ofEpochMilli(mover.getTimestamp());
170+
Instant finishedAt = startedAt.plusMillis(transferDurationMillis);
171171

172172
var data = new FlowMarkerBuilder()
173173
.withStartedAt(startedAt)
@@ -185,8 +185,8 @@ public void onEnd(InetSocketAddress src, InetSocketAddress dst, MoverInfoMessage
185185
}
186186
var firefly = data.build("end");
187187

188-
InetSocketAddress fireflyDestination = toFireflyDestination.apply(src);
189-
sendToMultipleDestinations(fireflyDestination, firefly);
188+
InetSocketAddress fireflyDestination = toFireflyDestination.apply(src);
189+
sendToMultipleDestinations(fireflyDestination, firefly);
190190
logMarkerEvent("end", src, dst, protocolInfo, optionalExpId.getAsInt(), activityId,
191191
mover.getBytesRead(), mover.getBytesWritten(), fireflyDestination);
192192
}

modules/dcache/src/test/java/org/dcache/pool/movers/TransferLifeCycleTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,9 @@ public void shouldSetEndMarkerStartTimeFromTransferDuration() throws Exception {
9999
mover.setBytesRead(1024);
100100
mover.setBytesWritten(0);
101101

102+
Instant expectedStart = Instant.ofEpochMilli(mover.getTimestamp());
103+
Instant expectedEnd = expectedStart.plusMillis(transferDuration);
104+
102105
lifecycle.onEnd(
103106
new InetSocketAddress("203.0.113.20", 42000),
104107
new InetSocketAddress("198.51.100.55", 2880),
@@ -115,7 +118,8 @@ public void shouldSetEndMarkerStartTimeFromTransferDuration() throws Exception {
115118
Instant startTime = Instant.parse(lifecyclePayload.getString("start-time"));
116119
Instant endTime = Instant.parse(lifecyclePayload.getString("end-time"));
117120

118-
assertTrue(endTime.isAfter(startTime));
121+
assertEquals(expectedStart, startTime);
122+
assertEquals(expectedEnd, endTime);
119123
assertEquals(transferDuration, Duration.between(startTime, endTime).toMillis());
120124
}
121125
}

0 commit comments

Comments
 (0)