Skip to content

Commit

Permalink
Merge pull request #2 from thaerkh/master
Browse files Browse the repository at this point in the history
Add CombineManifestTextInputFormat class implementation.
  • Loading branch information
david-matheson authored Sep 3, 2019
2 parents c0ed57b + e4a088a commit 68b9f67
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 41 deletions.
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@
<version>4.11</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.8.4</version>
</dependency>

</dependencies>

<build>
Expand Down
25 changes: 25 additions & 0 deletions src/main/java/nicknack/CombineManifestTextInputFormat.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package nicknack;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.lib.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.JobContext;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;

/**
* Convert manifest file inputs into FileStatus objects and combines splits based on maxSplitSize.
*/
public class CombineManifestTextInputFormat extends CombineTextInputFormat {

/**
* @return List of FileStatus objects from manifest file.
*/
@Override
protected List<FileStatus> listStatus(JobContext job) throws IOException {
ManifestTextInputFormat manifestInput = new ManifestTextInputFormat();
return Arrays.asList(manifestInput.listStatus(new JobConf(job.getConfiguration())));
}
}
81 changes: 40 additions & 41 deletions src/main/java/nicknack/ManifestTextInputFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.KeyValueTextInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.commons.logging.LogFactory;
Expand All @@ -18,7 +17,7 @@

/**
* Manifest files are inputs with a list of paths to use as the real input.
*
*
* Paths may be directories, globs, or files and will be expanded appropriately.
* Unlike most InputFormats, this class will silently ignore missing and
* unmatched paths in the manifest file.
Expand All @@ -35,17 +34,16 @@ public class ManifestTextInputFormat extends KeyValueTextInputFormat {
*/
@Override
protected FileStatus[] listStatus(JobConf job) throws IOException {
FileStatus[] manifests = super.listStatus(job);
ArrayList<FileStatus> allFileStatuses = new ArrayList<FileStatus>();
ArrayList<Path> allPaths = new ArrayList<Path>();
for (FileStatus manifest : manifests) {
allPaths.addAll(manifestPaths(manifest.getPath(), job));
}
for (Path path : allPaths) {
allFileStatuses.addAll(expandPath(path, job));
}
log.info("Total input paths from manifest : " + allFileStatuses.size());
return allFileStatuses.toArray(new FileStatus[0]);
FileStatus[] manifests = super.listStatus(job);
ArrayList<FileStatus> allFileStatuses = new ArrayList<FileStatus>();
for (FileStatus manifest : manifests) {
allFileStatuses.addAll(expandManifest(manifest.getPath(), job));
if (allFileStatuses.size() % 10007 == 0) {
log.info("Processed " + allFileStatuses.size() + " input paths so far.");
}
}
log.info("Total input paths from manifest : " + allFileStatuses.size());
return allFileStatuses.toArray(new FileStatus[0]);
}

/**
Expand All @@ -56,20 +54,21 @@ protected FileStatus[] listStatus(JobConf job) throws IOException {
* the JobConf object for this job
* @return an ArrayList of Path objects, one for each line in the given manifest file
*/
private ArrayList<Path> manifestPaths(Path manifest, JobConf job) throws IOException {
FileSystem fs = manifest.getFileSystem(job);
FSDataInputStream stream = fs.open(manifest);
BufferedReader buf = new BufferedReader(new InputStreamReader(stream));
ArrayList<Path> paths = new ArrayList<Path>();
String line = buf.readLine();
while (line != null) {
Path p = new Path(line);
paths.add(p);
line = buf.readLine();
}
return paths;
private ArrayList<FileStatus> expandManifest(Path manifest, JobConf job) throws IOException {
FileSystem fs = manifest.getFileSystem(job);
FSDataInputStream stream = fs.open(manifest);
BufferedReader buf = new BufferedReader(new InputStreamReader(stream));
ArrayList<FileStatus> fileStatuses = new ArrayList<FileStatus>();

String line = buf.readLine();
while (line != null) {
Path p = new Path(line);
fileStatuses.addAll(expandPath(p, job));
line = buf.readLine();
}
return fileStatuses;
}

/**
* Expand a path to FileStatuses of:
* - the contents if a directory
Expand All @@ -82,20 +81,20 @@ private ArrayList<Path> manifestPaths(Path manifest, JobConf job) throws IOExcep
* @return an ArrayList of FileStatus objects, expanded from the line from the manifest
*/
private ArrayList<FileStatus> expandPath(Path path, JobConf job) throws IOException {
FileSystem fs = path.getFileSystem(job);
FileStatus[] matches = fs.globStatus(path);
if (matches == null) {
return new ArrayList<FileStatus>();
}
ArrayList<FileStatus> expanded = new ArrayList<FileStatus>();
for (FileStatus match : matches) {
if (match.isDir()) {
expanded.addAll(Arrays.asList(fs.listStatus(match.getPath())));
} else {
expanded.add(match);
}
}
return expanded;
FileSystem fs = path.getFileSystem(job);
FileStatus[] matches = fs.globStatus(path);
if (matches == null) {
return new ArrayList<FileStatus>();
}
ArrayList<FileStatus> expanded = new ArrayList<FileStatus>();
for (FileStatus match : matches) {
if (match.isDir()) {
expanded.addAll(Arrays.asList(fs.listStatus(match.getPath())));
} else {
expanded.add(match);
}
}
return expanded;
}

}
51 changes: 51 additions & 0 deletions src/test/java/nicknack/CombineManifestTextInputFormatTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package nicknack;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.mapreduce.JobContext;
import org.junit.Before;
import org.junit.Test;

import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;

import static org.junit.Assert.assertArrayEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;


public class CombineManifestTextInputFormatTest {
private static String[] FILES = {"file-a.txt", "file-b.txt"};
private JobContext jobContext;

private String stripPath(FileStatus status) {
String pathStr = status.getPath().toString();
return pathStr.substring(pathStr.lastIndexOf('/') + 1);
}

@Before
public void setup() {
URL url = this.getClass().getResource("/manifest.txt");

File testManifest = new File(url.getFile());
Configuration conf = new Configuration();
conf.set("mapreduce.input.fileinputformat.inputdir", testManifest.toString());

jobContext = mock(JobContext.class);
when(jobContext.getConfiguration()).thenReturn(conf);
}

@Test
public void testManifestFile() throws IOException {
CombineManifestTextInputFormat inputFormat = new CombineManifestTextInputFormat();
List<FileStatus> fileStatuses = inputFormat.listStatus(jobContext);
ArrayList<String> actual = new ArrayList<String>();
for (FileStatus status : fileStatuses) {
actual.add(stripPath(status));
}
assertArrayEquals(FILES, actual.toArray(new String[0]));
}
}

0 comments on commit 68b9f67

Please sign in to comment.