Skip to content

Commit 9243096

Browse files
committed
door: Make DcacheResourceFactory zone aware, make write requests carry door's zone
1 parent 8c80adb commit 9243096

9 files changed

Lines changed: 129 additions & 9 deletions

File tree

modules/dcache-webdav/src/main/java/org/dcache/webdav/DcacheResourceFactory.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@
130130
import org.dcache.auth.attributes.Restriction;
131131
import org.dcache.auth.attributes.Restrictions;
132132
import org.dcache.cells.CellStub;
133+
import org.dcache.cells.ZoneAware;
133134
import org.dcache.http.AuthenticationHandler;
134135
import org.dcache.http.PathMapper;
135136
import org.dcache.missingfiles.AlwaysFailMissingFileStrategy;
@@ -170,7 +171,7 @@
170171
*/
171172
public class DcacheResourceFactory
172173
extends AbstractCellComponent
173-
implements ResourceFactory, CellMessageReceiver, CellCommandListener, CellInfoProvider {
174+
implements ResourceFactory, CellMessageReceiver, CellCommandListener, CellInfoProvider, ZoneAware {
174175

175176
private static final Logger LOGGER =
176177
LoggerFactory.getLogger(DcacheResourceFactory.class);
@@ -251,6 +252,8 @@ static Optional<String> findHeaderIgnoreCase(HttpServletRequest request,
251252

252253
private ScheduledExecutorService _executor;
253254

255+
private Optional<String> _zone = Optional.empty();
256+
254257
private int _moverTimeout;
255258
private TimeUnit _moverTimeoutUnit;
256259
private long _killTimeout = 1500;
@@ -325,6 +328,11 @@ public void setRemoteTransferHandler(RemoteTransferHandler handler) {
325328
_remoteTransferHandler = requireNonNull(handler);
326329
}
327330

331+
@Override
332+
public void setZone(Optional<String> zone) {
333+
_zone = zone;
334+
}
335+
328336
/**
329337
* Returns the kill timeout in milliseconds.
330338
*/
@@ -762,6 +770,7 @@ public DcacheResource createFile(FsPath path, InputStream inputStream, Long leng
762770
checkUploadSize(length);
763771

764772
WriteTransfer transfer = new WriteTransfer(_pnfs, subject, restriction, path);
773+
transfer.setZone(_zone);
765774
_transfers.put((int) transfer.getId(), transfer);
766775
try {
767776
boolean success = false;
@@ -845,6 +854,7 @@ public String getWriteUrl(FsPath path, Long length)
845854
String uri = null;
846855
WriteTransfer transfer = new WriteTransfer(_pnfs, subject, restriction, path);
847856
transfer.setSSL(_redirectToHttps && ServletRequest.getRequest().isSecure());
857+
transfer.setZone(_zone);
848858
_transfers.put((int) transfer.getId(), transfer);
849859
try {
850860
transfer.createNameSpaceEntry();
@@ -1364,6 +1374,7 @@ private ReadTransfer beginRead(FsPath path, PnfsId pnfsid, boolean isProxyTransf
13641374
transfer.setIsChecksumNeeded(isDigestRequested());
13651375
transfer.setSSL(
13661376
!isProxyTransfer && _redirectToHttps && ServletRequest.getRequest().isSecure());
1377+
transfer.setZone(_zone);
13671378
_transfers.put((int) transfer.getId(), transfer);
13681379
try {
13691380
transfer.setProxyTransfer(isProxyTransfer);
@@ -1576,6 +1587,7 @@ private void initializeTransfer(HttpTransfer transfer, Subject subject)
15761587
));
15771588
transfer.setOverwriteAllowed(_isOverwriteAllowed);
15781589
transfer.setKafkaSender(_kafkaSender);
1590+
transfer.setZone(_zone);
15791591
}
15801592

15811593
private Set<FileAttribute> buildRequestedAttributes() {

modules/dcache/src/main/java/diskCacheV111/poolManager/PoolManagerV5.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -703,6 +703,7 @@ public void run() {
703703
.getPoolSelector(fileAttributes,
704704
protocolInfo,
705705
_request.getLinkGroup(),
706+
_request.getZone(),
706707
_request.getExcludedHosts())
707708
.selectWritePool(_request.getPreallocated());
708709
LOGGER.info("{} write handler selected {} after {} ms", _pnfsId, pool.name(),

modules/dcache/src/main/java/diskCacheV111/poolManager/PoolMonitorV5.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,16 @@ public PoolSelector getPoolSelector(FileAttributes fileAttributes,
103103
ProtocolInfo protocolInfo,
104104
String linkGroup,
105105
Set<String> excludedHosts) {
106-
return new PnfsFileLocation(fileAttributes, protocolInfo, linkGroup, excludedHosts);
106+
return new PnfsFileLocation(fileAttributes, protocolInfo, linkGroup, Optional.empty(), excludedHosts);
107+
}
108+
109+
@Override
110+
public PoolSelector getPoolSelector(FileAttributes fileAttributes,
111+
ProtocolInfo protocolInfo,
112+
String linkGroup,
113+
Optional<String> zone,
114+
Set<String> excludedHosts) {
115+
return new PnfsFileLocation(fileAttributes, protocolInfo, linkGroup, zone, excludedHosts);
107116
}
108117

109118
public class PnfsFileLocation implements PoolSelector {
@@ -113,6 +122,7 @@ public class PnfsFileLocation implements PoolSelector {
113122
private final FileAttributes _fileAttributes;
114123
private final ProtocolInfo _protocolInfo;
115124
private final String _linkGroup;
125+
private final Optional<String> _zone;
116126
private final Set<String> _excludedHosts;
117127
private final Predicate<String> _locationFilter;
118128
private final Predicate<String> _excludeFilter;
@@ -121,9 +131,18 @@ public PnfsFileLocation(FileAttributes fileAttributes,
121131
ProtocolInfo protocolInfo,
122132
String linkGroup,
123133
Set<String> excludedHosts) {
134+
this(fileAttributes, protocolInfo, linkGroup, Optional.empty(), excludedHosts);
135+
}
136+
137+
public PnfsFileLocation(FileAttributes fileAttributes,
138+
ProtocolInfo protocolInfo,
139+
String linkGroup,
140+
Optional<String> zone,
141+
Set<String> excludedHosts) {
124142
_fileAttributes = fileAttributes;
125143
_protocolInfo = protocolInfo;
126144
_linkGroup = linkGroup;
145+
_zone = zone;
127146
_excludedHosts = excludedHosts == null ?
128147
Collections.EMPTY_SET : excludedHosts;
129148

@@ -240,6 +259,8 @@ public SelectedPool selectWritePool(long preallocated)
240259
.map(_costModule::getPoolInfo)
241260
.filter(Objects::nonNull)
242261
.collect(toList());
262+
pools = filterByZone(pools);
263+
243264
if (!pools.isEmpty()) {
244265
Partition partition = _partitionManager.getPartition(level.getTag());
245266
try {
@@ -322,6 +343,8 @@ public SelectedPool selectReadPool()
322343
LOGGER.debug("[read] Available pools at level {}: {}",
323344
prio, pools);
324345

346+
pools = filterByZone(pools);
347+
325348
/* If allowed, fallback to next link if current link doesn't point
326349
* to any pool holding the file.
327350
*/
@@ -374,6 +397,17 @@ public SelectedPool selectReadPool()
374397
"File is online, but not in read-allowed pool");
375398
}
376399

400+
private List<PoolInfo> filterByZone(List<PoolInfo> pools) {
401+
if (_zone.isPresent()) {
402+
List<PoolInfo> poolsZ =
403+
pools.stream()
404+
.filter(p -> Objects.equals(p.getTags().get("zone"), _zone.get()))
405+
.toList();
406+
pools = (poolsZ.isEmpty()) ? pools : poolsZ;
407+
}
408+
return pools;
409+
}
410+
377411
@Nullable
378412
private String getHostName() {
379413
if (_protocolInfo instanceof IpProtocolInfo) {

modules/dcache/src/main/java/diskCacheV111/poolManager/RequestContainerV5.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -911,6 +911,7 @@ private class PoolRequestHandler {
911911
private final StorageInfo _storageInfo;
912912
private final ProtocolInfo _protocolInfo;
913913
private final String _linkGroup;
914+
private final Optional<String> _zone;
914915
private final String _billingPath;
915916
private final String _transferPath;
916917
private final PoolSelector _poolSelector;
@@ -946,6 +947,8 @@ public PoolRequestHandler(PnfsId pnfsId, String poolGroup,
946947
_billingPath = request.getBillingPath();
947948
_transferPath = request.getTransferPath();
948949

950+
_zone = request.getZone();
951+
949952
_retryCounter = request.getContext().getRetryCounter();
950953
_stageCandidate = Optional.ofNullable(request.getContext().getPreviousStagePool());
951954

@@ -960,8 +963,13 @@ public PoolRequestHandler(PnfsId pnfsId, String poolGroup,
960963
Set<String> excluded = request.getExcludedHosts();
961964
_failOnExcluded = excluded != null && !excluded.isEmpty();
962965

963-
_poolSelector =
964-
_poolMonitor.getPoolSelector(_fileAttributes,
966+
_poolSelector = _zone.isPresent() ?
967+
_poolMonitor.getPoolSelector(_fileAttributes,
968+
_protocolInfo,
969+
_linkGroup,
970+
_zone,
971+
excluded)
972+
: _poolMonitor.getPoolSelector(_fileAttributes,
965973
_protocolInfo,
966974
_linkGroup,
967975
excluded);

modules/dcache/src/main/java/diskCacheV111/vehicles/PoolMgrSelectPoolMsg.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import diskCacheV111.poolManager.RequestContainerV5;
66
import java.util.EnumSet;
7+
import java.util.Optional;
78
import java.util.Set;
89
import javax.annotation.Nonnull;
910
import org.dcache.vehicles.FileAttributes;
@@ -16,6 +17,7 @@ public class PoolMgrSelectPoolMsg extends PoolMgrGetPoolMsg {
1617
private String _ioQueueName;
1718
private String _pnfsPath;
1819
private String _linkGroup;
20+
private String _zone;
1921
private Set<String> _excludedHosts;
2022
private final EnumSet<RequestContainerV5.RequestState> _allowedStates;
2123

@@ -75,6 +77,14 @@ public String getLinkGroup() {
7577
return _linkGroup;
7678
}
7779

80+
public void setZone(Optional<String> zone) {
81+
_zone = zone.orElse(null);
82+
}
83+
84+
public Optional<String> getZone() {
85+
return Optional.ofNullable(_zone);
86+
}
87+
7888
@Nonnull
7989
public EnumSet<RequestContainerV5.RequestState> getAllowedStates() {
8090
return _allowedStates;

modules/dcache/src/main/java/org/dcache/poolmanager/PoolMonitor.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import diskCacheV111.util.FileLocality;
77
import diskCacheV111.vehicles.ProtocolInfo;
88
import java.util.Collection;
9+
import java.util.Optional;
910
import java.util.Set;
1011
import org.dcache.vehicles.FileAttributes;
1112

@@ -29,6 +30,15 @@ PoolSelector getPoolSelector(
2930
String linkGroup,
3031
Set<String> excludedHosts);
3132

33+
default PoolSelector getPoolSelector(
34+
FileAttributes fileAttributes,
35+
ProtocolInfo protocolInfo,
36+
String linkGroup,
37+
Optional<String> zone,
38+
Set<String> excludedHosts) {
39+
return getPoolSelector(fileAttributes, protocolInfo, linkGroup, excludedHosts);
40+
}
41+
3242
Collection<PoolCostInfo> queryPoolsByLinkName(String linkName);
3343

3444
FileLocality getFileLocality(FileAttributes attributes, String hostName);

modules/dcache/src/main/java/org/dcache/util/Transfer.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,7 @@
6464
import java.net.InetAddress;
6565
import java.net.InetSocketAddress;
6666
import java.time.Duration;
67-
import java.util.Collections;
68-
import java.util.EnumSet;
69-
import java.util.List;
70-
import java.util.OptionalLong;
71-
import java.util.Set;
67+
import java.util.*;
7268
import java.util.concurrent.Executors;
7369
import java.util.concurrent.Future;
7470
import java.util.concurrent.ScheduledExecutorService;
@@ -145,6 +141,8 @@ public class Transfer implements Comparable<Transfer> {
145141
private List<InetSocketAddress> _clientAddresses;
146142
private String _ioQueue;
147143

144+
private Optional<String> _zone = Optional.empty();
145+
148146
private Set<String> _tried;
149147

150148
private long _allocated;
@@ -459,6 +457,14 @@ public synchronized Pool getPool() {
459457
return _pool;
460458
}
461459

460+
public synchronized void setZone(Optional<String> zone) {
461+
_zone = zone;
462+
}
463+
464+
public synchronized Optional<String> getZone() {
465+
return _zone;
466+
}
467+
462468
/**
463469
* Clear selected pool and enforce re-selection during retry procedure in {@link
464470
* #selectPoolAndStartMoverAsync(TransferRetryPolicy)}.
@@ -1007,6 +1013,7 @@ public ListenableFuture<Void> selectPoolAsync(long timeout) {
10071013
request.setTransferPath(getTransferPath());
10081014
request.setIoQueueName(getIoQueue());
10091015
request.setExcludedHosts(_tried);
1016+
request.setZone(_zone);
10101017

10111018
reply = _poolManager.sendAsync(request, timeout);
10121019
} else {
@@ -1035,6 +1042,7 @@ public ListenableFuture<Void> selectPoolAsync(long timeout) {
10351042
request.setTransferPath(getTransferPath());
10361043
request.setIoQueueName(getIoQueue());
10371044
request.setExcludedHosts(_tried);
1045+
request.setZone(_zone);
10381046

10391047
reply = Futures.transform(_poolManager.sendAsync(request, timeout),
10401048
(PoolMgrSelectReadPoolMsg msg) -> {

modules/dcache/src/test/java/org/dcache/tests/poolmanager/PoolMonitorTest.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import java.util.Collections;
3030
import java.util.HashSet;
3131
import java.util.List;
32+
import java.util.Map;
33+
import java.util.Optional;
3234
import java.util.Set;
3335
import org.dcache.cells.UniversalSpringCell;
3436
import org.dcache.pool.classic.IoQueueManager;
@@ -151,7 +153,37 @@ public void testGsonDeserialization() throws Exception {
151153
.disableHtmlEscaping().create().toJson(obj);
152154
}
153155

156+
@Test
157+
public void testWritePoolZonePreference() throws Exception {
158+
prepareCostModule(false, true);
159+
160+
FileAttributes attributes = FileAttributes.of().pnfsId(_pnfsId).build();
161+
StorageInfos.injectInto(_storageInfo, attributes);
162+
163+
PoolSelector selector = _poolMonitor.getPoolSelector(attributes, _protocolInfo,
164+
null, Optional.of("1"), Collections.EMPTY_SET);
165+
166+
assertEquals("pool1", selector.selectWritePool(0).name());
167+
}
168+
169+
@Test
170+
public void testReadPoolZonePreference() throws Exception {
171+
prepareCostModule(false, true);
172+
173+
FileAttributes attributes = FileAttributes.of().pnfsId(_pnfsId).locations(_pools).build();
174+
StorageInfos.injectInto(_storageInfo, attributes);
175+
176+
PoolSelector selector = _poolMonitor.getPoolSelector(attributes, _protocolInfo,
177+
null, Optional.of("1"), Collections.EMPTY_SET);
178+
179+
assertEquals("pool1", selector.selectReadPool().name());
180+
}
181+
154182
private void prepareCostModule(boolean linkPerPool) throws Exception {
183+
prepareCostModule(linkPerPool, false);
184+
}
185+
186+
private void prepareCostModule(boolean linkPerPool, boolean withZones) throws Exception {
155187
if (linkPerPool) {
156188
PoolMonitorHelper.prepareLinkPerPool(_selectionUnit, _access, _pools);
157189
} else {
@@ -179,6 +211,11 @@ private void prepareCostModule(boolean linkPerPool) throws Exception {
179211
pool1UpMessage.setHostName(_localhost);
180212
pool2UpMessage.setHostName(_localhost);
181213

214+
if (withZones) {
215+
pool1UpMessage.setTagMap(Map.of("zone", "1"));
216+
pool2UpMessage.setTagMap(Map.of("zone", "2"));
217+
}
218+
182219
CellMessage envelope1 = new CellMessage(new CellAddressCore("PoolManager"), null);
183220
envelope1.addSourceAddress(new CellAddressCore("pool1"));
184221
CellMessage envelope2 = new CellMessage(new CellAddressCore("PoolManager"), null);

packages/tar/src/main/container/staging-libs/.gitkeep

Whitespace-only changes.

0 commit comments

Comments
 (0)