@@ -369,7 +369,8 @@ public Optional<BestFit> findBestFit(TaskExecutorBatchAssignmentRequest request)
369369 }
370370
371371 if (noResourcesAvailable ) {
372- log .warn ("Not all scheduling constraints had enough workers available to fulfill the request {}" , request );
372+ log .warn ("Not all scheduling constraints had enough workers for jobId={}, cluster={}" ,
373+ request .getJobId (), request .getClusterID ());
373374 return Optional .empty ();
374375 } else {
375376 // Return best fit only if there are enough available TEs for all scheduling constraints
@@ -393,28 +394,42 @@ private Optional<Map<TaskExecutorID, TaskExecutorState>> findBestFitFor(TaskExec
393394 return Optional .empty ();
394395 }
395396
396- Stream <TaskExecutorHolder > availableTEs = this .executorsByGroup .get (bestFitTeGroupKey .get ())
397+ NavigableSet <TaskExecutorHolder > groupHolders = this .executorsByGroup .get (bestFitTeGroupKey .get ());
398+
399+ List <TaskExecutorHolder > availableTEList = groupHolders
397400 .descendingSet ()
398401 .stream ()
399402 .filter (teHolder -> {
400403 if (!this .taskExecutorStateMap .containsKey (teHolder .getId ())) {
404+ // TODO: turn these two debug level
405+ log .info ("findBestFitFor: TE {} excluded - not in stateMap" , teHolder .getId ());
401406 return false ;
402407 }
403408 if (currentBestFit .contains (teHolder .getId ())) {
409+ log .info ("findBestFitFor: TE {} excluded - already in bestFit" , teHolder .getId ());
404410 return false ;
405411 }
406412 TaskExecutorState st = this .taskExecutorStateMap .get (teHolder .getId ());
407- return st .isAvailable () &&
408- // when a TE is returned from here to be used for scheduling, its state remain active until
409- // the scheduler trigger another message to update (lock) the state. However when large number
410- // of the requests are active at the same time on same sku, the gap between here and the message
411- // to lock the state can be large so another schedule request message can be in between and
412- // got the same set of TEs. To avoid this, a lease is added to each TE state to temporarily
413- // lock the TE to be used again. Since this is only lock between actor messages and lease
414- // duration can be short.
415- st .getLastSchedulerLeasedDuration ().compareTo (this .schedulerLeaseExpirationDuration ) > 0 &&
416- st .getRegistration () != null ;
417- });
413+ if (!st .isAvailable ()) {
414+ log .info ("findBestFitFor: TE {} excluded - not available" , teHolder .getId ());
415+ return false ;
416+ }
417+ if (st .getLastSchedulerLeasedDuration ().compareTo (this .schedulerLeaseExpirationDuration ) <= 0 ) {
418+ log .info ("findBestFitFor: TE {} excluded - lease not expired ({}ms)" , teHolder .getId (), st .getLastSchedulerLeasedDuration ().toMillis ());
419+ return false ;
420+ }
421+ if (st .getRegistration () == null ) {
422+ log .info ("findBestFitFor: TE {} excluded - no registration" , teHolder .getId ());
423+ return false ;
424+ }
425+ return true ;
426+ })
427+ .collect (Collectors .toList ());
428+
429+ log .info ("findBestFitFor: group={}, holdersInGroup={}, availableAfterFilter={}, requested={}" ,
430+ bestFitTeGroupKey .get (), groupHolders .size (), availableTEList .size (), numWorkers );
431+
432+ Stream <TaskExecutorHolder > availableTEs = availableTEList .stream ();
418433
419434 if (availableTaskExecutorMutatorHook != null ) {
420435 availableTEs = availableTaskExecutorMutatorHook .mutate (availableTEs , request , schedulingConstraints );
@@ -642,6 +657,7 @@ private Map<String, Integer> mapReservationsToSku(
642657
643658 Map <String , Integer > reservationCountBySku = new HashMap <>();
644659
660+ log .info ("mapReservationsToSku called with {} reservations" , pendingReservations == null ? 0 : pendingReservations .size ());
645661 if (pendingReservations == null || pendingReservations .isEmpty ()) {
646662 return reservationCountBySku ;
647663 }
@@ -783,14 +799,12 @@ private Optional<Map<TaskExecutorID, TaskExecutorState>> findTaskExecutorsFor(Ta
783799 if (taskExecutors .isPresent () && taskExecutors .get ().size () == allocationRequests .size ()) {
784800 return taskExecutors ;
785801 } else {
786- log .warn ("Not enough available TEs found for scheduling constraints {}, request: {}" , schedulingConstraints , request );
787- if (taskExecutors .isPresent ()) {
788- log .debug ("Found {} Task Executors: {} for request: {} with constraints: {}" ,
789- taskExecutors .get ().size (), taskExecutors .get (), request , schedulingConstraints );
790- } else {
791- log .warn ("No suitable Task Executors found for request: {} with constraints: {}" ,
792- request , schedulingConstraints );
793- }
802+ log .warn ("Not enough available TEs for constraints {} (found={}, needed={}, jobId={}, workerId={})" ,
803+ schedulingConstraints ,
804+ taskExecutors .isPresent () ? taskExecutors .get ().size () : 0 ,
805+ allocationRequests .size (),
806+ request .getJobId (),
807+ allocationRequests .stream ().map (a -> a .getWorkerId ().toString ()).collect (Collectors .joining ("," )));
794808
795809 // If there are not enough workers with the given spec then add the request the pending ones
796810 if (!isJobIdAlreadyPending && request .getAllocationRequests ().size () > 2 ) {
0 commit comments