Skip to content
This repository has been archived by the owner on Nov 11, 2022. It is now read-only.

Commit

Permalink
Handle exceptions from within the SDK while processing work
Browse files Browse the repository at this point in the history
The DataflowWorkerHarness when processing work wasn't handling
general exceptions from being thrown causing the processing
threads to all die leaving a zombie worker.
----Release Notes----

[]
-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=104664789
  • Loading branch information
lukecwik authored and davorbonaci committed Oct 5, 2015
1 parent ca1ebdc commit 8e1de20
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,7 @@ public Boolean call() {
boolean success = true;
try {
do { // We loop getting and processing work.
try {
LOG.debug("Thread starting getAndPerformWork.");
success = worker.getAndPerformWork();
LOG.debug("{} processing one WorkItem.", success ? "Finished" : "Failed");
} catch (IOException e) { // If there is a problem getting work.
success = false;
}
success = doWork();
if (success) {
backOff.reset();
}
Expand All @@ -158,10 +152,27 @@ public Boolean call() {
LOG.error("Already tried several attempts at working on tasks. Aborting.", e);
} catch (InterruptedException e) {
LOG.error("Interrupted during thread execution or sleep.", e);
} catch (Throwable t) {
LOG.error("Thread {} died.", Thread.currentThread().getId(), t);
}
return false;
}

private boolean doWork() {
try {
LOG.debug("Thread starting getAndPerformWork.");
boolean success = worker.getAndPerformWork();
LOG.debug("{} processing one WorkItem.", success ? "Finished" : "Failed");
return success;
} catch (IOException e) { // If there is a problem getting work.
LOG.debug("There was a problem getting work.", e);
return false;
} catch (Exception e) { // These exceptions are caused by bugs within the SDK
LOG.error("There was an unhandled error caused by the Dataflow SDK.", e);
return false;
}
}

private final DataflowWorker worker;
private final Sleeper sleeper;
private final BackOff backOff;
Expand All @@ -183,6 +194,7 @@ static void processWork(DataflowWorkerHarnessOptions pipelineOptions,
LOG.debug("Waiting for {} worker threads", numThreads);
// We wait forever unless there is a big problem.
executor.invokeAll(tasks);
LOG.error("All threads died.");
}

static DataflowWorker create(DataflowWorkerHarnessOptions options) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,8 @@ public void setUp() throws Exception {
pipelineOptions.setGcpCredential(new TestCredential());
}

@Test
public void testThatWeRetryIfTaskExecutionFailAgainAndAgain() throws Exception {
public void runTestThatWeRetryIfTaskExecutionFailsAgainAndAgain() throws Exception {
final int numWorkers = Math.max(Runtime.getRuntime().availableProcessors(), 1);
when(mockDataflowWorker.getAndPerformWork()).thenReturn(false);
final AtomicInteger sleepCount = new AtomicInteger(0);
final AtomicInteger illegalIntervalCount = new AtomicInteger(0);
DataflowWorkerHarness.processWork(
Expand All @@ -124,6 +122,24 @@ public void sleep(long millis) throws InterruptedException {
assertEquals(0, illegalIntervalCount.get());
}

@Test
public void testThatWeRetryIfTaskExecutionFailAgainAndAgain() throws Exception {
when(mockDataflowWorker.getAndPerformWork()).thenReturn(false);
runTestThatWeRetryIfTaskExecutionFailsAgainAndAgain();
}

@Test
public void testThatWeRetryIfTaskExecutionFailAgainAndAgainByIOException() throws Exception {
when(mockDataflowWorker.getAndPerformWork()).thenThrow(new IOException());
runTestThatWeRetryIfTaskExecutionFailsAgainAndAgain();
}

@Test
public void testThatWeRetryIfTaskExecutionFailAgainAndAgainByUnknownException() throws Exception {
when(mockDataflowWorker.getAndPerformWork()).thenThrow(new RuntimeException());
runTestThatWeRetryIfTaskExecutionFailsAgainAndAgain();
}

@Test
public void testNumberOfWorkerHarnessThreadsIsHonored() throws Exception {
final int expectedNumberOfThreads = 5;
Expand Down

0 comments on commit 8e1de20

Please sign in to comment.