diff --git a/pom.xml b/pom.xml index 57fec7d2..4a7290c7 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ org.archive ia-web-commons - 1.1.0 + 1.1.1-SNAPSHOT jar ia-web-commons diff --git a/src/main/java/org/archive/io/RecordingInputStream.java b/src/main/java/org/archive/io/RecordingInputStream.java index b46905ed..36f539a9 100644 --- a/src/main/java/org/archive/io/RecordingInputStream.java +++ b/src/main/java/org/archive/io/RecordingInputStream.java @@ -74,8 +74,10 @@ public RecordingInputStream(int bufferSize, String backingFilename) } public void open(InputStream wrappedStream) throws IOException { - logger.fine(Thread.currentThread().getName() + " opening " + - wrappedStream + ", " + Thread.currentThread().getName()); + if (logger.isLoggable(Level.FINE)) { + logger.fine("wrapping " + wrappedStream + " in thread " + + Thread.currentThread().getName()); + } if(isOpen()) { // error; should not be opening/wrapping in an unclosed // stream remains open @@ -135,11 +137,11 @@ public int read(byte[] b) throws IOException { public void close() throws IOException { if (logger.isLoggable(Level.FINE)) { - logger.fine(Thread.currentThread().getName() + " closing " + - this.in + ", " + Thread.currentThread().getName()); + logger.fine("closing " + this.in + " in thread " + + Thread.currentThread().getName()); } IOUtils.closeQuietly(this.in); - this.in = null; + this.in = null; IOUtils.closeQuietly(this.recordingOutputStream); } @@ -159,20 +161,77 @@ public long readFully() throws IOException { return this.recordingOutputStream.getSize(); } + public void readToEndOfContent(long contentLength) + throws IOException, InterruptedException { + // Check we're open before proceeding. + if (!isOpen()) { + // TODO: should this be a noisier exception-raising error? + return; + } + + long totalBytes = recordingOutputStream.position - recordingOutputStream.getMessageBodyBegin(); + long bytesRead = -1L; + long maxToRead = -1; + while (contentLength <= 0 || totalBytes < contentLength) { + try { + // read no more than soft max + maxToRead = (contentLength <= 0) + ? drainBuffer.length + : Math.min(drainBuffer.length, contentLength - totalBytes); + // nor more than hard max + maxToRead = Math.min(maxToRead, recordingOutputStream.getRemainingLength()); + // but always at least 1 (to trigger hard max exception) XXX wtf is this? + maxToRead = Math.max(maxToRead, 1); + + bytesRead = read(drainBuffer,0,(int)maxToRead); + if (bytesRead == -1) { + break; + } + totalBytes += bytesRead; + + if (Thread.interrupted()) { + throw new InterruptedException("Interrupted during IO"); + } + } catch (SocketTimeoutException e) { + // A socket timeout is just a transient problem, meaning + // nothing was available in the configured timeout period, + // but something else might become available later. + // Take this opportunity to check the overall + // timeout (below). One reason for this timeout is + // servers that keep up the connection, 'keep-alive', even + // though we asked them to not keep the connection open. + if (logger.isLoggable(Level.FINE)) { + logger.log(Level.FINE, "socket timeout", e); + } + // check for interrupt + if (Thread.interrupted()) { + throw new InterruptedException("Interrupted during IO"); + } + // check for overall timeout + recordingOutputStream.checkLimits(); + } catch (SocketException se) { + throw se; + } catch (NullPointerException e) { + // [ 896757 ] NPEs in Andy's Th-Fri Crawl. + // A crawl was showing NPE's in this part of the code but can + // not reproduce. Adding this rethrowing catch block w/ + // diagnostics to help should we come across the problem in the + // future. + throw new NullPointerException("Stream " + this.in + ", " + + e.getMessage() + " " + Thread.currentThread().getName()); + } + } + } + /** * Read all of a stream (Or read until we timeout or have read to the max). * @param softMaxLength Maximum length to read; if zero or < 0, then no * limit. If met, return normally. - * @param hardMaxLength Maximum length to read; if zero or < 0, then no - * limit. If exceeded, throw RecorderLengthExceededException - * @param timeout Timeout in milliseconds for total read; if zero or - * negative, timeout is Long.MAX_VALUE. If exceeded, throw - * RecorderTimeoutException - * @param maxBytesPerMs How many bytes per millisecond. * @throws IOException failed read. * @throws RecorderLengthExceededException * @throws RecorderTimeoutException * @throws InterruptedException + * @deprecated */ public void readFullyOrUntil(long softMaxLength) throws IOException, RecorderLengthExceededException, @@ -349,6 +408,13 @@ public int getRecordedBufferLength() { return recordingOutputStream.getBufferLength(); } + /** + * See doc on {@link RecordingOutputStream#chopAtMessageBodyBegin()} + */ + public void chopAtMessageBodyBegin() { + recordingOutputStream.chopAtMessageBodyBegin(); + } + public void clearForReuse() throws IOException { recordingOutputStream.clearForReuse(); } diff --git a/src/main/java/org/archive/io/RecordingOutputStream.java b/src/main/java/org/archive/io/RecordingOutputStream.java index 4d0713da..95e444cc 100644 --- a/src/main/java/org/archive/io/RecordingOutputStream.java +++ b/src/main/java/org/archive/io/RecordingOutputStream.java @@ -84,7 +84,7 @@ public class RecordingOutputStream extends OutputStream { private byte[] buffer; /** current virtual position in the recording */ - private long position; + long position; /** flag to disable recording */ private boolean recording; @@ -132,6 +132,29 @@ public class RecordingOutputStream extends OutputStream { */ protected long messageBodyBeginMark; + /** + * While messageBodyBeginMark is not set, the last two bytes seen. + * + *

+ * This class does automatic detection of http message body begin (i.e. end + * of http headers). Unfortunately httpcomponents did not want to add + * functionality to help us with this, see + * https://issues.apache.org/jira/browse/HTTPCORE-325 + * + *

+ * It works like this: while messageBodyBeginMark is not set, we remember + * the last two bytes seen, and look at each byte we write. If the + * lastTwoBytes+currentByte is "\n\r\n", or lastTwoBytes[1]+currentByte is + * "\n\n" then we call markMessageBodyBegin() at the position after + * currentByte. + * + *

+ * An assumption here is that protocols other than http don't have headers, + * and for those protocols the user of this class will call + * markMessageBodyBegin() at position 0 before writing anything. + */ + protected int[] lastTwoBytes = new int[] {-1, -1}; + /** * Stream to record. */ @@ -204,6 +227,20 @@ public void write(int b) throws IOException { if (this.out != null) { this.out.write(b); } + + // see comment on int[] lastTwoBytes + if (messageBodyBeginMark < 0l) { + // looking for "\n\n" or "\n\r\n" + if (b == '\n' + && (lastTwoBytes[1] == '\n' + || (lastTwoBytes[0] == '\n' && lastTwoBytes[1] == '\r'))) { + markMessageBodyBegin(); + } else { + lastTwoBytes[0] = lastTwoBytes[1]; + lastTwoBytes[1] = b; + } + } + checkLimits(); } @@ -220,6 +257,14 @@ public void write(byte[] b, int off, int len) throws IOException { off += consumeRange; len -= consumeRange; } + + // see comment on int[] lastTwoBytes + while (messageBodyBeginMark < 0 && len > 0) { + write(b[off]); + off++; + len--; + } + if(recording) { record(b, off, len); } @@ -251,7 +296,7 @@ protected void checkLimits() throws RecorderIOException { throw new RecorderTimeoutException(); } // need to throttle reading to hit max configured rate? - if(position/duration > maxRateBytesPerMs) { + if(position/duration >= maxRateBytesPerMs) { long desiredDuration = position / maxRateBytesPerMs; try { Thread.sleep(desiredDuration-duration); @@ -557,6 +602,18 @@ public long getRemainingLength() { return maxLength - position; } + /** + * Forget about anything past the point where the content-body starts. This + * is needed to support FetchHTTP's shouldFetchBody setting. See also the + * docs on {@link #lastTwoBytes} + */ + public void chopAtMessageBodyBegin() { + if (messageBodyBeginMark >= 0) { + this.size = messageBodyBeginMark; + this.position = messageBodyBeginMark; + } + } + public void clearForReuse() throws IOException { this.out = null; this.position = 0; diff --git a/src/main/java/org/archive/io/ReplayInputStream.java b/src/main/java/org/archive/io/ReplayInputStream.java index fccf5fd3..35ea8175 100644 --- a/src/main/java/org/archive/io/ReplayInputStream.java +++ b/src/main/java/org/archive/io/ReplayInputStream.java @@ -192,11 +192,15 @@ public int read(byte[] b, int off, int len) throws IOException { } public void readFullyTo(OutputStream os) throws IOException { + readFullyTo(this, os); + } + + public static void readFullyTo(InputStream in, OutputStream os) throws IOException { byte[] buf = new byte[4096]; - int c = read(buf); + int c = in.read(buf); while (c != -1) { os.write(buf,0,c); - c = read(buf); + c = in.read(buf); } } @@ -218,12 +222,7 @@ public void readHeaderTo(OutputStream os) throws IOException { */ public void readContentTo(OutputStream os) throws IOException { setToResponseBodyStart(); - byte[] buf = new byte[4096]; - int c = read(buf); - while (c != -1) { - os.write(buf,0,c); - c = read(buf); - } + readFullyTo(os); } /**