@@ -369,7 +369,8 @@ public Optional<BestFit> findBestFit(TaskExecutorBatchAssignmentRequest request)
369369 }
370370
371371 if (noResourcesAvailable ) {
372- log .warn ("Not all scheduling constraints had enough workers available to fulfill the request {}" , request );
372+ log .warn ("Not all scheduling constraints had enough workers for jobId={}, cluster={}" ,
373+ request .getJobId (), request .getClusterID ());
373374 return Optional .empty ();
374375 } else {
375376 // Return best fit only if there are enough available TEs for all scheduling constraints
@@ -393,28 +394,41 @@ private Optional<Map<TaskExecutorID, TaskExecutorState>> findBestFitFor(TaskExec
393394 return Optional .empty ();
394395 }
395396
396- Stream <TaskExecutorHolder > availableTEs = this .executorsByGroup .get (bestFitTeGroupKey .get ())
397+ NavigableSet <TaskExecutorHolder > groupHolders = this .executorsByGroup .get (bestFitTeGroupKey .get ());
398+
399+ List <TaskExecutorHolder > availableTEList = groupHolders
397400 .descendingSet ()
398401 .stream ()
399402 .filter (teHolder -> {
400403 if (!this .taskExecutorStateMap .containsKey (teHolder .getId ())) {
404+ log .debug ("findBestFitFor: TE {} excluded - not in stateMap" , teHolder .getId ());
401405 return false ;
402406 }
403407 if (currentBestFit .contains (teHolder .getId ())) {
408+ log .debug ("findBestFitFor: TE {} excluded - already in bestFit" , teHolder .getId ());
404409 return false ;
405410 }
406411 TaskExecutorState st = this .taskExecutorStateMap .get (teHolder .getId ());
407- return st .isAvailable () &&
408- // when a TE is returned from here to be used for scheduling, its state remain active until
409- // the scheduler trigger another message to update (lock) the state. However when large number
410- // of the requests are active at the same time on same sku, the gap between here and the message
411- // to lock the state can be large so another schedule request message can be in between and
412- // got the same set of TEs. To avoid this, a lease is added to each TE state to temporarily
413- // lock the TE to be used again. Since this is only lock between actor messages and lease
414- // duration can be short.
415- st .getLastSchedulerLeasedDuration ().compareTo (this .schedulerLeaseExpirationDuration ) > 0 &&
416- st .getRegistration () != null ;
417- });
412+ if (!st .isAvailable ()) {
413+ log .debug ("findBestFitFor: TE {} excluded - not available" , teHolder .getId ());
414+ return false ;
415+ }
416+ if (st .getLastSchedulerLeasedDuration ().compareTo (this .schedulerLeaseExpirationDuration ) <= 0 ) {
417+ log .debug ("findBestFitFor: TE {} excluded - lease not expired ({}ms)" , teHolder .getId (), st .getLastSchedulerLeasedDuration ().toMillis ());
418+ return false ;
419+ }
420+ if (st .getRegistration () == null ) {
421+ log .debug ("findBestFitFor: TE {} excluded - no registration" , teHolder .getId ());
422+ return false ;
423+ }
424+ return true ;
425+ })
426+ .collect (Collectors .toList ());
427+
428+ log .info ("findBestFitFor: group={}, holdersInGroup={}, availableAfterFilter={}, requested={}" ,
429+ bestFitTeGroupKey .get (), groupHolders .size (), availableTEList .size (), numWorkers );
430+
431+ Stream <TaskExecutorHolder > availableTEs = availableTEList .stream ();
418432
419433 if (availableTaskExecutorMutatorHook != null ) {
420434 availableTEs = availableTaskExecutorMutatorHook .mutate (availableTEs , request , schedulingConstraints );
@@ -642,6 +656,7 @@ private Map<String, Integer> mapReservationsToSku(
642656
643657 Map <String , Integer > reservationCountBySku = new HashMap <>();
644658
659+ log .info ("mapReservationsToSku called with {} reservations" , pendingReservations == null ? 0 : pendingReservations .size ());
645660 if (pendingReservations == null || pendingReservations .isEmpty ()) {
646661 return reservationCountBySku ;
647662 }
@@ -783,14 +798,12 @@ private Optional<Map<TaskExecutorID, TaskExecutorState>> findTaskExecutorsFor(Ta
783798 if (taskExecutors .isPresent () && taskExecutors .get ().size () == allocationRequests .size ()) {
784799 return taskExecutors ;
785800 } else {
786- log .warn ("Not enough available TEs found for scheduling constraints {}, request: {}" , schedulingConstraints , request );
787- if (taskExecutors .isPresent ()) {
788- log .debug ("Found {} Task Executors: {} for request: {} with constraints: {}" ,
789- taskExecutors .get ().size (), taskExecutors .get (), request , schedulingConstraints );
790- } else {
791- log .warn ("No suitable Task Executors found for request: {} with constraints: {}" ,
792- request , schedulingConstraints );
793- }
801+ log .warn ("Not enough available TEs for constraints {} (found={}, needed={}, jobId={}, workerId={})" ,
802+ schedulingConstraints ,
803+ taskExecutors .isPresent () ? taskExecutors .get ().size () : 0 ,
804+ allocationRequests .size (),
805+ request .getJobId (),
806+ allocationRequests .stream ().map (a -> a .getWorkerId ().toString ()).collect (Collectors .joining ("," )));
794807
795808 // If there are not enough workers with the given spec then add the request the pending ones
796809 if (!isJobIdAlreadyPending && request .getAllocationRequests ().size () > 2 ) {
0 commit comments