99import org .zstack .core .db .SimpleQuery ;
1010import org .zstack .core .db .SimpleQuery .Op ;
1111import org .zstack .core .errorcode .ErrorFacade ;
12- import org .zstack .core .thread .SyncTask ;
12+ import org .zstack .core .thread .ChainTask ;
13+ import org .zstack .core .thread .SyncTaskChain ;
1314import org .zstack .core .thread .ThreadFacade ;
1415import org .zstack .header .AbstractService ;
1516import org .zstack .header .allocator .*;
1617import org .zstack .header .cluster .ReportHostCapacityMessage ;
18+ import org .zstack .header .core .Completion ;
19+ import org .zstack .header .core .NopeCompletion ;
1720import org .zstack .header .core .ReturnValueCompletion ;
1821import org .zstack .header .core .workflow .Flow ;
1922import org .zstack .header .core .workflow .FlowRollback ;
@@ -302,6 +305,40 @@ private void handle(ReportHostCapacityMessage msg) {
302305 }
303306
304307 private void handle (final AllocateHostMsg msg ) {
308+ if (HostAllocatorGlobalConfig .HOST_ALLOCATOR_ALLOW_CONCURRENT .value (Boolean .class )) {
309+ doHandleAllocateHost (msg , new NopeCompletion (null ));
310+ return ;
311+ }
312+
313+ thdf .chainSubmit (new ChainTask (msg ) {
314+ @ Override
315+ public String getSyncSignature () {
316+ return "host-allocator" ;
317+ }
318+
319+ @ Override
320+ public void run (SyncTaskChain chain ) {
321+ doHandleAllocateHost (msg , new Completion (chain ) {
322+ @ Override
323+ public void success () {
324+ chain .next ();
325+ }
326+
327+ @ Override
328+ public void fail (ErrorCode errorCode ) {
329+ chain .next ();
330+ }
331+ });
332+ }
333+
334+ @ Override
335+ public String getName () {
336+ return "allocate-host-for-vm-" + msg .getVmInstance ().getUuid ();
337+ }
338+ });
339+ }
340+
341+ private void doHandleAllocateHost (final AllocateHostMsg msg , Completion completion ) {
305342 HostAllocatorSpec spec = HostAllocatorSpec .fromAllocationMsg (msg );
306343 spec .setBackupStoragePrimaryStorageMetrics (backupStoragePrimaryStorageMetrics );
307344
@@ -333,19 +370,23 @@ public void success(List<HostInventory> hosts) {
333370 if (hosts .isEmpty ()){
334371 reply .setHosts (new ArrayList <>());
335372 bus .reply (msg , reply );
373+ completion .success ();
336374 return ;
337375 }
376+
338377 sortors .dryRunSort (spec , hosts , new ReturnValueCompletion <List <HostInventory >>(msg ) {
339378 @ Override
340379 public void success (List <HostInventory > returnValue ) {
341380 reply .setHosts (returnValue );
342381 bus .reply (msg , reply );
382+ completion .success ();
343383 }
344384
345385 @ Override
346386 public void fail (ErrorCode errorCode ) {
347387 reply .setError (errorCode );
348388 bus .reply (msg , reply );
389+ completion .fail (errorCode );
349390 }
350391 });
351392 }
@@ -354,50 +395,27 @@ public void fail(ErrorCode errorCode) {
354395 public void fail (ErrorCode errorCode ) {
355396 reply .setError (errorCode );
356397 bus .reply (msg , reply );
398+ completion .fail (errorCode );
357399 }
358400 });
359401 } else {
360402 final AllocateHostReply reply = new AllocateHostReply ();
361403 strategy .allocate (spec , new ReturnValueCompletion <List <HostInventory >>(msg ) {
362404 @ Override
363405 public void success (List <HostInventory > hosts ) {
364- // flow control for host reservation
365- Integer n = HostAllocatorGlobalConfig .HOST_ALLOCATOR_CONCURRENT_LEVEL .value (Integer .class );
366- final int syncLevel = (n == null || n < 0 ) ? 0 : n ;
367-
368- thdf .syncSubmit (new SyncTask <Void >() {
369- @ Override
370- public String getSyncSignature () {
371- return "host-reserve-flow-control" ;
372- }
373-
406+ sortors .sort (spec , hosts , new ReturnValueCompletion <HostInventory >(msg ) {
374407 @ Override
375- public int getSyncLevel () {
376- return syncLevel ;
377- }
378-
379- @ Override
380- public String getName () {
381- return "reserve-host" ;
408+ public void success (HostInventory returnValue ) {
409+ reply .setHost (returnValue );
410+ bus .reply (msg , reply );
411+ completion .success ();
382412 }
383413
384414 @ Override
385- public Void call () throws Exception {
386- sortors .sort (spec , hosts , new ReturnValueCompletion <HostInventory >(msg ) {
387- @ Override
388- public void success (HostInventory returnValue ) {
389- reply .setHost (returnValue );
390- bus .reply (msg , reply );
391- }
392-
393- @ Override
394- public void fail (ErrorCode errorCode ) {
395- reply .setError (errorCode );
396- bus .reply (msg , reply );
397- }
398- });
399-
400- return null ;
415+ public void fail (ErrorCode errorCode ) {
416+ reply .setError (errorCode );
417+ bus .reply (msg , reply );
418+ completion .fail (errorCode );
401419 }
402420 });
403421 }
@@ -406,6 +424,7 @@ public void fail(ErrorCode errorCode) {
406424 public void fail (ErrorCode errorCode ) {
407425 reply .setError (errorCode );
408426 bus .reply (msg , reply );
427+ completion .fail (errorCode );
409428 }
410429 });
411430 }
0 commit comments