diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManagerImpl.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManagerImpl.java index 2de99bfa5..0c0431c44 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManagerImpl.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManagerImpl.java @@ -369,7 +369,8 @@ public Optional findBestFit(TaskExecutorBatchAssignmentRequest request) } if (noResourcesAvailable) { - log.warn("Not all scheduling constraints had enough workers available to fulfill the request {}", request); + log.warn("Not all scheduling constraints had enough workers for jobId={}, cluster={}", + request.getJobId(), request.getClusterID()); return Optional.empty(); } else { // Return best fit only if there are enough available TEs for all scheduling constraints @@ -393,28 +394,41 @@ private Optional> findBestFitFor(TaskExec return Optional.empty(); } - Stream availableTEs = this.executorsByGroup.get(bestFitTeGroupKey.get()) + NavigableSet groupHolders = this.executorsByGroup.get(bestFitTeGroupKey.get()); + + List availableTEList = groupHolders .descendingSet() .stream() .filter(teHolder -> { if (!this.taskExecutorStateMap.containsKey(teHolder.getId())) { + log.debug("findBestFitFor: TE {} excluded - not in stateMap", teHolder.getId()); return false; } if (currentBestFit.contains(teHolder.getId())) { + log.debug("findBestFitFor: TE {} excluded - already in bestFit", teHolder.getId()); return false; } TaskExecutorState st = this.taskExecutorStateMap.get(teHolder.getId()); - return st.isAvailable() && - // when a TE is returned from here to be used for scheduling, its state remain active until - // the scheduler trigger another message to update (lock) the state. However when large number - // of the requests are active at the same time on same sku, the gap between here and the message - // to lock the state can be large so another schedule request message can be in between and - // got the same set of TEs. To avoid this, a lease is added to each TE state to temporarily - // lock the TE to be used again. Since this is only lock between actor messages and lease - // duration can be short. - st.getLastSchedulerLeasedDuration().compareTo(this.schedulerLeaseExpirationDuration) > 0 && - st.getRegistration() != null; - }); + if (!st.isAvailable()) { + log.debug("findBestFitFor: TE {} excluded - not available", teHolder.getId()); + return false; + } + if (st.getLastSchedulerLeasedDuration().compareTo(this.schedulerLeaseExpirationDuration) <= 0) { + log.debug("findBestFitFor: TE {} excluded - lease not expired ({}ms)", teHolder.getId(), st.getLastSchedulerLeasedDuration().toMillis()); + return false; + } + if (st.getRegistration() == null) { + log.debug("findBestFitFor: TE {} excluded - no registration", teHolder.getId()); + return false; + } + return true; + }) + .collect(Collectors.toList()); + + log.info("findBestFitFor: group={}, holdersInGroup={}, availableAfterFilter={}, requested={}", + bestFitTeGroupKey.get(), groupHolders.size(), availableTEList.size(), numWorkers); + + Stream availableTEs = availableTEList.stream(); if(availableTaskExecutorMutatorHook != null) { availableTEs = availableTaskExecutorMutatorHook.mutate(availableTEs, request, schedulingConstraints); @@ -642,6 +656,7 @@ private Map mapReservationsToSku( Map reservationCountBySku = new HashMap<>(); + log.info("mapReservationsToSku called with {} reservations", pendingReservations == null ? 0 : pendingReservations.size()); if (pendingReservations == null || pendingReservations.isEmpty()) { return reservationCountBySku; } @@ -783,14 +798,12 @@ private Optional> findTaskExecutorsFor(Ta if (taskExecutors.isPresent() && taskExecutors.get().size() == allocationRequests.size()) { return taskExecutors; } else { - log.warn("Not enough available TEs found for scheduling constraints {}, request: {}", schedulingConstraints, request); - if (taskExecutors.isPresent()) { - log.debug("Found {} Task Executors: {} for request: {} with constraints: {}", - taskExecutors.get().size(), taskExecutors.get(), request, schedulingConstraints); - } else { - log.warn("No suitable Task Executors found for request: {} with constraints: {}", - request, schedulingConstraints); - } + log.warn("Not enough available TEs for constraints {} (found={}, needed={}, jobId={}, workerId={})", + schedulingConstraints, + taskExecutors.isPresent() ? taskExecutors.get().size() : 0, + allocationRequests.size(), + request.getJobId(), + allocationRequests.stream().map(a -> a.getWorkerId().toString()).collect(Collectors.joining(","))); // If there are not enough workers with the given spec then add the request the pending ones if (!isJobIdAlreadyPending && request.getAllocationRequests().size() > 2) {