diff --git a/core/common/src/main/java/alluxio/conf/PropertyKey.java b/core/common/src/main/java/alluxio/conf/PropertyKey.java index 09deab79bdc1..8e68ad73dc6e 100755 --- a/core/common/src/main/java/alluxio/conf/PropertyKey.java +++ b/core/common/src/main/java/alluxio/conf/PropertyKey.java @@ -7501,6 +7501,12 @@ public String toString() { .setDefaultValue("60sec") .setScope(Scope.MASTER) .build(); + public static final PropertyKey JOB_MASTER_JOB_TRACE_RETENTION_TIME = + durationBuilder(Name.JOB_MASTER_JOB_TRACE_RETENTION_TIME) + .setDescription("The length of time the client can trace the submitted job.") + .setDefaultValue("1d") + .setScope(Scope.MASTER) + .build(); public static final PropertyKey JOB_MASTER_JOB_CAPACITY = longBuilder(Name.JOB_MASTER_JOB_CAPACITY) .setDescription("The total possible number of available job statuses in the job master. " @@ -9369,6 +9375,8 @@ public static final class Name { "alluxio.job.master.finished.job.purge.count"; public static final String JOB_MASTER_FINISHED_JOB_RETENTION_TIME = "alluxio.job.master.finished.job.retention.time"; + public static final String JOB_MASTER_JOB_TRACE_RETENTION_TIME = + "alluxio.job.master.job.trace.retention.time"; public static final String JOB_MASTER_JOB_CAPACITY = "alluxio.job.master.job.capacity"; public static final String JOB_MASTER_MASTER_HEARTBEAT_INTERVAL = "alluxio.job.master.master.heartbeat.interval"; diff --git a/job/common/src/main/java/alluxio/job/wire/Status.java b/job/common/src/main/java/alluxio/job/wire/Status.java index 1041b482ac6c..74826b99c56b 100644 --- a/job/common/src/main/java/alluxio/job/wire/Status.java +++ b/job/common/src/main/java/alluxio/job/wire/Status.java @@ -27,6 +27,13 @@ public boolean isFinished() { return this.equals(CANCELED) || this.equals(FAILED) || this.equals(COMPLETED); } + /** + * @return whether this status represents a Completed state + */ + public boolean isCompleted() { + return this.equals(COMPLETED); + } + /** * @return proto representation of the status */ diff --git a/job/server/src/main/java/alluxio/master/job/JobMaster.java b/job/server/src/main/java/alluxio/master/job/JobMaster.java index 7e0de7c731bb..32e6d5638762 100644 --- a/job/server/src/main/java/alluxio/master/job/JobMaster.java +++ b/job/server/src/main/java/alluxio/master/job/JobMaster.java @@ -211,7 +211,7 @@ public JobMaster(MasterContext masterContext, FileSystem filesystem, mWorkerHealth = new ConcurrentHashMap<>(); mCmdJobTracker = new CmdJobTracker( - fsContext, this); + fsContext, this, mPlanTracker); MetricsSystem.registerGaugeIfAbsent( MetricKey.MASTER_JOB_COUNT.getName(), diff --git a/job/server/src/main/java/alluxio/master/job/plan/PlanTracker.java b/job/server/src/main/java/alluxio/master/job/plan/PlanTracker.java index 7b939a3093e3..7b3cd5cdd777 100644 --- a/job/server/src/main/java/alluxio/master/job/plan/PlanTracker.java +++ b/job/server/src/main/java/alluxio/master/job/plan/PlanTracker.java @@ -35,13 +35,13 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingQueue; import java.util.stream.Collectors; import java.util.stream.Stream; import javax.annotation.Nullable; @@ -86,7 +86,7 @@ public class PlanTracker { private final SortedSet mFailed; /** A FIFO queue used to track jobs which have status {@link Status#isFinished()} as true. */ - private final LinkedBlockingQueue mFinished; + private final LinkedList mFinished; private final WorkflowTracker mWorkflowTracker; @@ -114,7 +114,7 @@ public PlanTracker(long capacity, long retentionMs, } return Long.signum(right.getId() - left.getId()); })); - mFinished = new LinkedBlockingQueue<>(); + mFinished = new LinkedList<>(); mWorkflowTracker = workflowTracker; } @@ -300,6 +300,20 @@ public Set findJobs(String name, List statusList) { .map(Map.Entry::getKey).collect(Collectors.toSet()); } + /** + * Remove expired jobs in PlanTracker. + * @param jobIds the list of removed jobId + */ + public void removeJobs(List jobIds) { + mWorkflowTracker.cleanup(jobIds); + for (Long jobId : jobIds) { + PlanInfo removedPlanInfo = mCoordinators.get(jobId).getPlanInfo(); + mCoordinators.remove(jobId); + mFailed.remove(removedPlanInfo); + mFinished.remove(removedPlanInfo); + } + } + private void checkActiveSetReplicaJobs(JobConfig jobConfig) throws JobDoesNotExistException { if (jobConfig instanceof SetReplicaConfig) { Set> activeJobs = mCoordinators.values().stream() diff --git a/job/server/src/main/java/alluxio/master/job/tracker/CmdJobTracker.java b/job/server/src/main/java/alluxio/master/job/tracker/CmdJobTracker.java index 2b7e6246d47f..bd7d65a6203d 100644 --- a/job/server/src/main/java/alluxio/master/job/tracker/CmdJobTracker.java +++ b/job/server/src/main/java/alluxio/master/job/tracker/CmdJobTracker.java @@ -13,8 +13,11 @@ import alluxio.AlluxioURI; import alluxio.client.file.FileSystemContext; +import alluxio.conf.Configuration; +import alluxio.conf.PropertyKey; import alluxio.exception.ExceptionMessage; import alluxio.exception.JobDoesNotExistException; +import alluxio.grpc.OperationType; import alluxio.job.CmdConfig; import alluxio.job.cmd.load.LoadCliConfig; import alluxio.job.cmd.migrate.MigrateCliConfig; @@ -24,11 +27,13 @@ import alluxio.job.wire.Status; import alluxio.master.job.JobMaster; import alluxio.master.job.common.CmdInfo; +import alluxio.master.job.plan.PlanTracker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -36,6 +41,9 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import javax.annotation.concurrent.ThreadSafe; @@ -43,7 +51,7 @@ * CmdJobTracker to schedule a Cmd job to run. */ @ThreadSafe -public class CmdJobTracker { +public class CmdJobTracker implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(CmdJobTracker.class); private final Map mInfoMap = new ConcurrentHashMap<>(0, 0.95f, Math.max(8, 2 * Runtime.getRuntime().availableProcessors())); @@ -52,18 +60,29 @@ public class CmdJobTracker { private final PersistRunner mPersistRunner; protected FileSystemContext mFsContext; public static final String DELIMITER = ","; + private final ScheduledExecutorService mScheduleCleanExecutor; + private final Long mTraceRetentionTime; + + private final PlanTracker mPlanTracker; /** * Create a new instance of {@link CmdJobTracker}. * @param fsContext filesystem context * @param jobMaster the job master + * @param planTracker the planTracker */ public CmdJobTracker(FileSystemContext fsContext, - JobMaster jobMaster) { + JobMaster jobMaster, PlanTracker planTracker) { mFsContext = fsContext; mDistLoadCliRunner = new DistLoadCliRunner(mFsContext, jobMaster); mMigrateCliRunner = new MigrateCliRunner(mFsContext, jobMaster); mPersistRunner = new PersistRunner(mFsContext, jobMaster); + mScheduleCleanExecutor = Executors.newSingleThreadScheduledExecutor(); + mScheduleCleanExecutor.scheduleAtFixedRate(this:: + cleanExpiredJobInfos, 60, 600, TimeUnit.SECONDS); + mTraceRetentionTime = Configuration.getMs( + PropertyKey.JOB_MASTER_JOB_TRACE_RETENTION_TIME); + mPlanTracker = planTracker; } /** @@ -72,15 +91,25 @@ public CmdJobTracker(FileSystemContext fsContext, * @param distLoadCliRunner DistributedLoad runner * @param migrateCliRunner DistributedCopy runner * @param persistRunner Persist runner + * @param retentionTime job retention time + * @param planTracker the planTracker */ public CmdJobTracker(FileSystemContext fsContext, DistLoadCliRunner distLoadCliRunner, MigrateCliRunner migrateCliRunner, - PersistRunner persistRunner) { + PersistRunner persistRunner, + Long retentionTime, + PlanTracker planTracker + ) { mFsContext = fsContext; mDistLoadCliRunner = distLoadCliRunner; mMigrateCliRunner = migrateCliRunner; mPersistRunner = persistRunner; + mScheduleCleanExecutor = Executors.newSingleThreadScheduledExecutor(); + mScheduleCleanExecutor.scheduleAtFixedRate(this:: + cleanExpiredJobInfos, 60, 600, TimeUnit.SECONDS); + mTraceRetentionTime = retentionTime; + mPlanTracker = planTracker; } /** @@ -134,7 +163,7 @@ private void runDistributedCommand(CmdConfig cmdConfig, long jobControlId) /** * Get status information for a CMD. - * @param jobControlId + * @param jobControlId jobControlId to trace a CMD * @return the Command level status */ public Status getCmdStatus(long jobControlId) throws JobDoesNotExistException { @@ -270,4 +299,37 @@ public CmdStatusBlock getCmdStatusBlock(long jobControlId) .collect(Collectors.toList()); return new CmdStatusBlock(cmdInfo.getJobControlId(), blockList, cmdInfo.getOperationType()); } + + private void cleanExpiredJobInfos() { + long currentTime = System.currentTimeMillis(); + for (Map.Entry x : mInfoMap.entrySet()) { + CmdInfo cmdInfo = x.getValue(); + List cleanedJobsId = new ArrayList<>(); + if (OperationType.DIST_LOAD.equals(cmdInfo.getOperationType()) + && currentTime - cmdInfo.getJobSubmissionTime() > mTraceRetentionTime) { + try { + Status jobStatus = getCmdStatus(cmdInfo.getJobControlId()); + if (jobStatus.isFinished()) { + for (CmdRunAttempt runAttempt : cmdInfo.getCmdRunAttempt()) { + cleanedJobsId.add(runAttempt.getJobId()); + } + mPlanTracker.removeJobs(cleanedJobsId); + mInfoMap.remove(cmdInfo.getJobControlId()); + LOG.info("JobControlId:{} has been cleaned in CmdJobTracker," + + " client will not trace the job anymore.The filePaths in CmdInfo are:{}", + cmdInfo.getJobControlId(), String.join(", ", cmdInfo.getFilePath())); + } + } catch (JobDoesNotExistException e) { + LOG.warn("JobControlId:{} can not find in CmdJobTracker when clean expired Job" + + "with unexpected exception.The filePaths in CmdInfo are:{}", + cmdInfo.getJobControlId(), String.join(", ", cmdInfo.getFilePath())); + } + } + } + } + + @Override + public void close() throws Exception { + mScheduleCleanExecutor.shutdown(); + } } diff --git a/job/server/src/test/java/alluxio/master/job/plan/PlanTrackerTest.java b/job/server/src/test/java/alluxio/master/job/plan/PlanTrackerTest.java index 09a7d20b43c4..3e54efa75ce7 100644 --- a/job/server/src/test/java/alluxio/master/job/plan/PlanTrackerTest.java +++ b/job/server/src/test/java/alluxio/master/job/plan/PlanTrackerTest.java @@ -40,6 +40,7 @@ import org.junit.Test; import org.junit.rules.ExpectedException; +import java.util.ArrayList; import java.util.List; import java.util.Queue; @@ -145,6 +146,18 @@ public void testGetCoordinator() throws Exception { ((Queue) AlluxioMockUtil.getInternalState(mTracker, "mFinished")).size()); } + @Test + public void removeExpiredJobsInfo() throws Exception { + List jobIdList = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + jobIdList.add(addJob(100)); + } + mTracker.removeJobs(jobIdList); + for (Long jobId : jobIdList) { + assertNull("job id should not exist", mTracker.getCoordinator(jobId)); + } + } + @Test public void testDuplicateSetReplicaJobs() throws Exception { long jobId = mJobIdGenerator.getNewJobId(); diff --git a/job/server/src/test/java/alluxio/master/job/tracker/CmdJobTrackerTest.java b/job/server/src/test/java/alluxio/master/job/tracker/CmdJobTrackerTest.java index dfc0575fa685..ed0a4cabe5c0 100644 --- a/job/server/src/test/java/alluxio/master/job/tracker/CmdJobTrackerTest.java +++ b/job/server/src/test/java/alluxio/master/job/tracker/CmdJobTrackerTest.java @@ -26,7 +26,10 @@ import alluxio.job.wire.JobSource; import alluxio.job.wire.SimpleJobStatusBlock; import alluxio.job.wire.Status; +import alluxio.master.job.JobMaster; import alluxio.master.job.common.CmdInfo; +import alluxio.master.job.plan.PlanTracker; +import alluxio.master.job.workflow.WorkflowTracker; import com.beust.jcommander.internal.Lists; import org.junit.Assert; @@ -46,6 +49,7 @@ public final class CmdJobTrackerTest { private static final int REPEATED_ATTEMPT_COUNT = 5; private static final int ONE_ATTEMPT = 1; + private static final long CAPACITY = 100; private CmdJobTracker mCmdJobTracker; private FileSystem mFs; @@ -55,10 +59,14 @@ public final class CmdJobTrackerTest { private MigrateCliRunner mMigrateCliRunner; private DistLoadCliRunner mDistLoadRunner; private PersistRunner mPersistRunner; - + private PlanTracker mPlanTracker; private LoadCliConfig mLoad; private MigrateCliConfig mMigrate; private List mSearchingCriteria = Lists.newArrayList(); + private WorkflowTracker mWorkflowTracker; + private JobMaster mMockJobMaster; + + private Long mRetentionTime; @Rule public ExpectedException mException = ExpectedException.none(); @@ -66,14 +74,17 @@ public final class CmdJobTrackerTest { @Before public void before() throws Exception { mFs = mock(FileSystem.class); + mRetentionTime = 1000L; FileSystemContext fsCtx = mock(FileSystemContext.class); mMigrateCliRunner = mock(MigrateCliRunner.class); mDistLoadRunner = mock(DistLoadCliRunner.class); mPersistRunner = mock(PersistRunner.class); - + mPlanTracker = mock(PlanTracker.class); + mMockJobMaster = mock(JobMaster.class); + mWorkflowTracker = new WorkflowTracker(mMockJobMaster); mCmdJobTracker = new CmdJobTracker(fsCtx, - mDistLoadRunner, mMigrateCliRunner, mPersistRunner); + mDistLoadRunner, mMigrateCliRunner, mPersistRunner, mRetentionTime, mPlanTracker); mLoad = new LoadCliConfig("/path/to/load", 3, 1, Collections.EMPTY_SET, Collections.EMPTY_SET, Collections.EMPTY_SET, Collections.EMPTY_SET, true); @@ -97,6 +108,33 @@ public void runDistLoadBatchCompleteTest() throws Exception { Assert.assertEquals(s, Status.COMPLETED); } + @Test + public void runCleanExpiredJobsTest() throws Exception { + generateLoadCommandForStatus(Status.CANCELED); + generateLoadCommandForStatus(Status.RUNNING); + generateLoadCommandForStatus(Status.FAILED); + generateLoadCommandForStatus(Status.COMPLETED); + generateLoadCommandForStatus(Status.CREATED); + Thread.sleep(70000L); + // the expired job has been cleaned in mInfoMap + mSearchingCriteria.clear(); + mSearchingCriteria.add(Status.CANCELED); + Set cancelCmdIds = mCmdJobTracker.findCmdIds(mSearchingCriteria); + Assert.assertEquals(0, cancelCmdIds.size()); + mSearchingCriteria.clear(); + mSearchingCriteria.add(Status.COMPLETED); + Set completedCmdIds = mCmdJobTracker.findCmdIds(mSearchingCriteria); + Assert.assertEquals(0, completedCmdIds.size()); + mSearchingCriteria.clear(); + mSearchingCriteria.add(Status.FAILED); + Set failedCmdIds = mCmdJobTracker.findCmdIds(mSearchingCriteria); + Assert.assertEquals(0, failedCmdIds.size()); + mSearchingCriteria.clear(); + mSearchingCriteria.add(Status.RUNNING); + Set runningCmdIds = mCmdJobTracker.findCmdIds(mSearchingCriteria); + Assert.assertEquals(2, runningCmdIds.size()); + } + @Test public void runDistLoadBatchFailTest() throws Exception { CmdInfo cmdInfo = new CmdInfo(mLoadJobId, OperationType.DIST_LOAD, @@ -316,7 +354,7 @@ public void testGetCmdStatusBlock() throws Exception { // Below are all help functions. private void prepareDistLoadTest( - CmdInfo cmdInfo, LoadCliConfig loadCliConfig, long loadId) throws Exception { + CmdInfo cmdInfo, LoadCliConfig loadCliConfig, long loadId) throws Exception { AlluxioURI filePath = new AlluxioURI(loadCliConfig.getFilePath()); int replication = loadCliConfig.getReplication(); Set workerSet = loadCliConfig.getWorkerSet(); @@ -325,7 +363,7 @@ private void prepareDistLoadTest( Set excludedLocalityIds = loadCliConfig.getExcludedLocalityIds(); boolean directCache = loadCliConfig.getDirectCache(); int batch = loadCliConfig.getBatchSize(); - + // Mock the behavior of runDistLoad when(mDistLoadRunner.runDistLoad(batch, filePath, replication, workerSet, excludedWorkerSet, localityIds, excludedLocalityIds, directCache, loadId)) .thenReturn(cmdInfo);