Skip to content

Commit a2a9559

Browse files
committed
Treat some corner cases
1 parent f5e0159 commit a2a9559

File tree

3 files changed

+109
-27
lines changed

3 files changed

+109
-27
lines changed

engine/orchestration/src/main/java/org/apache/cloudstack/engine/orchestration/DataMigrationUtility.java

Lines changed: 68 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,12 @@
2222
import java.util.Collections;
2323
import java.util.Comparator;
2424
import java.util.HashMap;
25+
import java.util.HashSet;
2526
import java.util.LinkedHashMap;
2627
import java.util.LinkedList;
2728
import java.util.List;
2829
import java.util.Map;
30+
import java.util.Set;
2931

3032
import javax.inject.Inject;
3133

@@ -206,12 +208,22 @@ public int compare(DataObject o1, DataObject o2) {
206208

207209
protected List<DataObject> getAllReadyTemplates(DataStore srcDataStore, Map<DataObject, Pair<List<TemplateInfo>, Long>> childTemplates, List<TemplateDataStoreVO> templates) {
208210
List<TemplateInfo> files = new LinkedList<>();
211+
Set<Long> idsForMigration = new HashSet<>();
212+
209213
for (TemplateDataStoreVO template : templates) {
210-
VMTemplateVO templateVO = templateDao.findById(template.getTemplateId());
211-
if (shouldMigrateTemplate(template, templateVO)) {
212-
files.add(templateFactory.getTemplate(template.getTemplateId(), srcDataStore));
214+
long templateId = template.getTemplateId();
215+
if (idsForMigration.contains(templateId)) {
216+
logger.warn("Template store reference [{}] is duplicated; not considering it for migration.", template);
217+
continue;
218+
}
219+
VMTemplateVO templateVO = templateDao.findById(templateId);
220+
if (!shouldMigrateTemplate(template, templateVO)) {
221+
continue;
213222
}
223+
files.add(templateFactory.getTemplate(template.getTemplateId(), srcDataStore));
224+
idsForMigration.add(templateId);
214225
}
226+
215227
for (TemplateInfo template: files) {
216228
List<VMTemplateVO> children = templateDao.listByParentTemplatetId(template.getId());
217229
List<TemplateInfo> temps = new ArrayList<>();
@@ -221,6 +233,7 @@ protected List<DataObject> getAllReadyTemplates(DataStore srcDataStore, Map<Data
221233
}
222234
childTemplates.put(template, new Pair<>(temps, getTotalChainSize(temps)));
223235
}
236+
224237
return (List<DataObject>) (List<?>) files;
225238
}
226239

@@ -263,16 +276,37 @@ protected boolean shouldMigrateTemplate(TemplateDataStoreVO template, VMTemplate
263276
*/
264277
protected List<DataObject> getAllReadySnapshotsAndChains(DataStore srcDataStore, Map<DataObject, Pair<List<SnapshotInfo>, Long>> snapshotChains, List<SnapshotDataStoreVO> snapshots) {
265278
List<SnapshotInfo> files = new LinkedList<>();
279+
Set<Long> idsForMigration = new HashSet<>();
280+
266281
for (SnapshotDataStoreVO snapshot : snapshots) {
267-
SnapshotVO snapshotVO = snapshotDao.findById(snapshot.getSnapshotId());
268-
if (snapshot.getState() == ObjectInDataStoreStateMachine.State.Ready &&
269-
snapshotVO != null && snapshotVO.getHypervisorType() != Hypervisor.HypervisorType.Simulator
270-
&& snapshot.getParentSnapshotId() == 0 ) {
271-
SnapshotInfo snap = snapshotFactory.getSnapshot(snapshotVO.getSnapshotId(), snapshot.getDataStoreId(), snapshot.getRole());
272-
if (snap != null) {
273-
files.add(snap);
274-
}
282+
long snapshotId = snapshot.getSnapshotId();
283+
if (idsForMigration.contains(snapshotId)) {
284+
logger.warn("Snapshot store reference [{}] is duplicated; not considering it for migration.", snapshot);
285+
continue;
286+
}
287+
if (snapshot.getState() != ObjectInDataStoreStateMachine.State.Ready) {
288+
logger.warn("Not migrating snapshot [{}] because its state is not ready.", snapshot);
289+
continue;
290+
}
291+
SnapshotVO snapshotVO = snapshotDao.findById(snapshotId);
292+
if (snapshotVO == null) {
293+
logger.debug("Not migrating snapshot [{}] because we could not find its database entry.", snapshot);
294+
continue;
295+
}
296+
if (snapshotVO.getHypervisorType() == Hypervisor.HypervisorType.Simulator) {
297+
logger.debug("Not migrating snapshot [{}] because its hypervisor type is simulator.", snapshot);
298+
continue;
275299
}
300+
if (snapshot.getParentSnapshotId() != 0) {
301+
continue; // The child snapshot will be migrated in the for loop below.
302+
}
303+
SnapshotInfo snap = snapshotFactory.getSnapshot(snapshotVO.getSnapshotId(), snapshot.getDataStoreId(), snapshot.getRole());
304+
if (snap == null) {
305+
logger.debug("Not migrating snapshot [{}] because we could not get its information.", snapshot);
306+
continue;
307+
}
308+
files.add(snap);
309+
idsForMigration.add(snapshotId);
276310
}
277311

278312
for (SnapshotInfo parent : files) {
@@ -285,7 +319,7 @@ protected List<DataObject> getAllReadySnapshotsAndChains(DataStore srcDataStore,
285319
chain.addAll(children);
286320
}
287321
}
288-
snapshotChains.put(parent, new Pair<List<SnapshotInfo>, Long>(chain, getTotalChainSize(chain)));
322+
snapshotChains.put(parent, new Pair<>(chain, getTotalChainSize(chain)));
289323
}
290324

291325
return (List<DataObject>) (List<?>) files;
@@ -306,14 +340,31 @@ protected Long getTotalChainSize(List<? extends DataObject> chain) {
306340

307341
protected List<DataObject> getAllReadyVolumes(DataStore srcDataStore, List<VolumeDataStoreVO> volumes) {
308342
List<DataObject> files = new LinkedList<>();
343+
Set<Long> idsForMigration = new HashSet<>();
344+
309345
for (VolumeDataStoreVO volume : volumes) {
310-
if (volume.getState() == ObjectInDataStoreStateMachine.State.Ready) {
311-
VolumeInfo volumeInfo = volumeFactory.getVolume(volume.getVolumeId(), srcDataStore);
312-
if (volumeInfo != null && volumeInfo.getHypervisorType() != Hypervisor.HypervisorType.Simulator) {
313-
files.add(volumeInfo);
314-
}
346+
long volumeId = volume.getVolumeId();
347+
if (idsForMigration.contains(volumeId)) {
348+
logger.warn("Volume store reference [{}] is duplicated; not considering it for migration.", volume);
349+
continue;
315350
}
351+
if (volume.getState() != ObjectInDataStoreStateMachine.State.Ready) {
352+
logger.debug("Not migrating volume [{}] because its state is not ready.", volume);
353+
continue;
354+
}
355+
VolumeInfo volumeInfo = volumeFactory.getVolume(volume.getVolumeId(), srcDataStore);
356+
if (volumeInfo == null) {
357+
logger.debug("Not migrating volume [{}] because we could not get its information.", volume);
358+
continue;
359+
}
360+
if (volumeInfo.getHypervisorType() == Hypervisor.HypervisorType.Simulator) {
361+
logger.debug("Not migrating volume [{}] because its hypervisor type is simulator.", volume);
362+
continue;
363+
}
364+
files.add(volumeInfo);
365+
idsForMigration.add(volumeId);
316366
}
367+
317368
return files;
318369
}
319370

engine/orchestration/src/main/java/org/apache/cloudstack/engine/orchestration/StorageOrchestrator.java

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,12 @@
7373
import com.cloud.utils.Pair;
7474
import com.cloud.utils.component.ManagerBase;
7575
import com.cloud.utils.exception.CloudRuntimeException;
76+
import org.apache.logging.log4j.ThreadContext;
7677

7778
public class StorageOrchestrator extends ManagerBase implements StorageOrchestrationService, Configurable {
7879

80+
private static final String LOGCONTEXTID = "logcontextid";
81+
7982
@Inject
8083
SnapshotDataStoreDao snapshotDataStoreDao;
8184
@Inject
@@ -111,6 +114,7 @@ public class StorageOrchestrator extends ManagerBase implements StorageOrchestra
111114
Integer numConcurrentCopyTasksPerSSVM = 2;
112115

113116
private final Map<Long, ThreadPoolExecutor> zoneExecutorMap = new HashMap<>();
117+
private final Map<Long, Integer> zonePendingWorkCountMap = new HashMap<>();
114118

115119
@Override
116120
public String getConfigComponentName() {
@@ -356,13 +360,20 @@ protected Map<Long, Pair<Long, Long>> migrateAway(
356360
return storageCapacities;
357361
}
358362

359-
protected synchronized <T> Future<T> submit(Long zoneId, Callable<T> task) {
360-
if (!zoneExecutorMap.containsKey(zoneId)) {
361-
zoneExecutorMap.put(zoneId, new ThreadPoolExecutor(numConcurrentCopyTasksPerSSVM, numConcurrentCopyTasksPerSSVM,
362-
30, TimeUnit.MINUTES, new MigrateBlockingQueue<>(numConcurrentCopyTasksPerSSVM)));
363+
protected <T> Future<T> submit(Long zoneId, Callable<T> task) {
364+
ThreadPoolExecutor executor;
365+
synchronized (this) {
366+
if (!zoneExecutorMap.containsKey(zoneId)) {
367+
zoneExecutorMap.put(zoneId, new ThreadPoolExecutor(numConcurrentCopyTasksPerSSVM, numConcurrentCopyTasksPerSSVM,
368+
30, TimeUnit.MINUTES, new MigrateBlockingQueue<>(numConcurrentCopyTasksPerSSVM)));
369+
zonePendingWorkCountMap.put(zoneId, 0);
370+
}
371+
zonePendingWorkCountMap.merge(zoneId, 1, Integer::sum);
372+
scaleExecutorIfNecessary(zoneId);
373+
executor = zoneExecutorMap.get(zoneId);
363374
}
364-
scaleExecutorIfNecessary(zoneId);
365-
return zoneExecutorMap.get(zoneId).submit(task);
375+
return executor.submit(task);
376+
366377
}
367378

368379
protected void scaleExecutorIfNecessary(Long zoneId) {
@@ -383,14 +394,15 @@ protected synchronized void tryCleaningUpExecutor(Long zoneId) {
383394
return;
384395
}
385396

386-
ThreadPoolExecutor executor = zoneExecutorMap.get(zoneId);
387-
int activeTasks = executor.getActiveCount();
388-
if (activeTasks > 1) {
389-
logger.debug("Not cleaning executor of zone [{}] yet, as there are [{}] active tasks.", zoneId, activeTasks);
397+
zonePendingWorkCountMap.merge(zoneId, -1, Integer::sum);
398+
Integer pendingWorkCount = zonePendingWorkCountMap.get(zoneId);
399+
if (pendingWorkCount > 0) {
400+
logger.debug("Not cleaning executor of zone [{}] yet, as there is [{}] pending work.", zoneId, pendingWorkCount);
390401
return;
391402
}
392403

393404
logger.debug("Cleaning executor of zone [{}].", zoneId);
405+
ThreadPoolExecutor executor = zoneExecutorMap.get(zoneId);
394406
zoneExecutorMap.remove(zoneId);
395407
executor.shutdown();
396408
}
@@ -568,10 +580,13 @@ private class MigrateDataTask implements Callable<DataObjectResult> {
568580
private DataStore destDataStore;
569581
private Map<DataObject, Pair<List<SnapshotInfo>, Long>> snapshotChain;
570582
private Map<DataObject, Pair<List<TemplateInfo>, Long>> templateChain;
583+
private String logid;
584+
571585
public MigrateDataTask(DataObject file, DataStore srcDataStore, DataStore destDataStore) {
572586
this.file = file;
573587
this.srcDataStore = srcDataStore;
574588
this.destDataStore = destDataStore;
589+
this.logid = ThreadContext.get(LOGCONTEXTID);
575590
}
576591

577592
public void setSnapshotChains(Map<DataObject, Pair<List<SnapshotInfo>, Long>> snapshotChain) {
@@ -593,6 +608,7 @@ public DataObject getFile() {
593608

594609
@Override
595610
public DataObjectResult call() {
611+
ThreadContext.put(LOGCONTEXTID, logid);
596612
DataObjectResult result;
597613
AsyncCallFuture<DataObjectResult> future = secStgSrv.migrateData(file, srcDataStore, destDataStore, snapshotChain, templateChain);
598614
try {
@@ -603,21 +619,25 @@ public DataObjectResult call() {
603619
result.setResult(e.toString());
604620
}
605621
tryCleaningUpExecutor(srcDataStore.getScope().getScopeId());
622+
ThreadContext.clearAll();
606623
return result;
607624
}
608625
}
609626

610627
private class CopyTemplateTask implements Callable<TemplateApiResult> {
611628
private TemplateInfo sourceTmpl;
612629
private DataStore destStore;
630+
private String logid;
613631

614632
public CopyTemplateTask(TemplateInfo sourceTmpl, DataStore destStore) {
615633
this.sourceTmpl = sourceTmpl;
616634
this.destStore = destStore;
635+
this.logid = ThreadContext.get(LOGCONTEXTID);
617636
}
618637

619638
@Override
620639
public TemplateApiResult call() {
640+
ThreadContext.put(LOGCONTEXTID, logid);
621641
TemplateApiResult result;
622642
AsyncCallFuture<TemplateApiResult> future = templateService.copyTemplateToImageStore(sourceTmpl, destStore);
623643
try {
@@ -629,6 +649,7 @@ public TemplateApiResult call() {
629649
result.setResult(e.getMessage());
630650
}
631651
tryCleaningUpExecutor(destStore.getScope().getScopeId());
652+
ThreadContext.clearAll();
632653
return result;
633654
}
634655
}

engine/storage/image/src/main/java/org/apache/cloudstack/storage/image/TemplateServiceImpl.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.apache.cloudstack.framework.messagebus.MessageBus;
5959
import org.apache.cloudstack.framework.messagebus.PublishScope;
6060
import org.apache.cloudstack.storage.command.CommandResult;
61+
import org.apache.cloudstack.storage.command.CopyCmdAnswer;
6162
import org.apache.cloudstack.storage.command.DeleteCommand;
6263
import org.apache.cloudstack.storage.datastore.DataObjectManager;
6364
import org.apache.cloudstack.storage.datastore.ObjectInDataStoreManager;
@@ -648,6 +649,15 @@ public AsyncCallFuture<TemplateApiResult> copyTemplateToImageStore(DataObject so
648649
TemplateOpContext<TemplateApiResult> context = new TemplateOpContext<>(null, destTmpl, future);
649650
AsyncCallbackDispatcher<TemplateServiceImpl, CopyCommandResult> caller = AsyncCallbackDispatcher.create(this);
650651
caller.setCallback(caller.getTarget().copyTemplateToImageStoreCallback(null, null)).setContext(context);
652+
653+
if (source.getDataStore().getId() == destStore.getId()) {
654+
logger.debug("Destination image store [{}] is the same as the origin; returning success to normalize the metadata.");
655+
CopyCmdAnswer answer = new CopyCmdAnswer(source.getTO());
656+
CopyCommandResult result = new CopyCommandResult("", answer);
657+
caller.complete(result);
658+
return future;
659+
}
660+
651661
_motionSrv.copyAsync(sourceTmpl, destTmpl, caller);
652662
return future;
653663
}

0 commit comments

Comments
 (0)