diff -r ff664e329084 src/org/python/core/io/StreamIO.java --- a/src/org/python/core/io/StreamIO.java Sat Jun 09 01:44:28 2012 +0100 +++ b/src/org/python/core/io/StreamIO.java Thu Jun 28 11:55:46 2012 -0700 @@ -16,6 +16,8 @@ import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; +import java.nio.channels.spi.AbstractInterruptibleChannel; + import org.python.core.Py; import org.python.modules.posix.PosixModule; @@ -95,7 +97,7 @@ * close() (defaults to True) */ public StreamIO(InputStream inputStream, boolean closefd) { - this(Channels.newChannel(inputStream), closefd); + this(newChannel(inputStream), closefd); this.inputStream = inputStream; } @@ -107,7 +109,7 @@ * close() (defaults to True) */ public StreamIO(OutputStream outputStream, boolean closefd) { - this(Channels.newChannel(outputStream), closefd); + this(Channels.newChannel(outputStream), closefd); // XXX: mer this.outputStream = outputStream; } @@ -270,4 +272,88 @@ public Channel getChannel() { return readable() ? readChannel : writeChannel; } + + private static ReadableByteChannel newChannel(InputStream in) { + return new ReadableByteChannelImpl(in); + //return Channels.newChannel(in); + } + + + private static class ReadableByteChannelImpl + extends AbstractInterruptibleChannel // Not really interruptible + implements ReadableByteChannel { + + private InputStream in; + private boolean open = true; + private Object readLock = new Object(); + + ReadableByteChannelImpl(InputStream in) { + this.in = in; + } + + public int read(ByteBuffer dst) throws IOException { + int len = dst.remaining(); + int totalRead = 0; + int bytesRead = 0; + int TRANSFER_SIZE = 8192; + byte buf[] = new byte[0]; + synchronized (readLock) { + while (totalRead < len) { + int bytesToRead = Math.min((len - totalRead), + TRANSFER_SIZE); + if (buf.length < bytesToRead) + buf = new byte[bytesToRead]; + //if ((totalRead > 0) && !(in.available() > 0)) + // break; // block at most once + try { + begin(); + bytesRead = in.read(buf, 0, bytesToRead); + } finally { + end(bytesRead > 0); + } + if (bytesRead < 0) + break; + else + totalRead += bytesRead; + dst.put(buf, 0, bytesRead); + } + if ((bytesRead < 0) && (totalRead == 0)) + return -1; + + return totalRead; + } + } + + /* + public int read(ByteBuffer dst) throws IOException { + int read; + + if (dst.hasArray()) { + // array() can throw RuntimeExceptions but really shouldn't when + // hasArray() is true + read = in.read(dst.array(), dst.arrayOffset(), dst.remaining()); + System.out.println("dst.position() " + dst.position() + "read: " + read); + if (read > 0) { + dst.position(dst.position() + read); + } + return read; + } + + int len = dst.remaining(); + byte[] buf = new byte[len]; + + read = in.read(buf, 0, len); + dst.put(buf, 0, read); + return read; + } + */ + + protected void implCloseChannel() throws IOException { + if (open) { + in.close(); + open = false; + } + } + } + }