Frames | No Frames |
1: /* PipedInputStream.java -- Read portion of piped streams. 2: Copyright (C) 1998, 1999, 2000, 2001, 2003, 2005 Free Software Foundation, Inc. 3: 4: This file is part of GNU Classpath. 5: 6: GNU Classpath is free software; you can redistribute it and/or modify 7: it under the terms of the GNU General Public License as published by 8: the Free Software Foundation; either version 2, or (at your option) 9: any later version. 10: 11: GNU Classpath is distributed in the hope that it will be useful, but 12: WITHOUT ANY WARRANTY; without even the implied warranty of 13: MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14: General Public License for more details. 15: 16: You should have received a copy of the GNU General Public License 17: along with GNU Classpath; see the file COPYING. If not, write to the 18: Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 19: 02110-1301 USA. 20: 21: Linking this library statically or dynamically with other modules is 22: making a combined work based on this library. Thus, the terms and 23: conditions of the GNU General Public License cover the whole 24: combination. 25: 26: As a special exception, the copyright holders of this library give you 27: permission to link this library with independent modules to produce an 28: executable, regardless of the license terms of these independent 29: modules, and to copy and distribute the resulting executable under 30: terms of your choice, provided that you also meet, for each linked 31: independent module, the terms and conditions of the license of that 32: module. An independent module is a module which is not derived from 33: or based on this library. If you modify this library, you may extend 34: this exception to your version of the library, but you are not 35: obligated to do so. If you do not wish to do so, delete this 36: exception statement from your version. */ 37: 38: package java.io; 39: 40: // NOTE: This implementation is very similar to that of PipedReader. If you 41: // fix a bug in here, chances are you should make a similar change to the 42: // PipedReader code. 43: 44: /** 45: * An input stream that reads its bytes from an output stream 46: * to which it is connected. 47: * <p> 48: * Data is read and written to an internal buffer. It is highly recommended 49: * that the <code>PipedInputStream</code> and connected 50: * <code>PipedOutputStream</code> 51: * be part of different threads. If they are not, the read and write 52: * operations could deadlock their thread. 53: * 54: * @specnote The JDK implementation appears to have some undocumented 55: * functionality where it keeps track of what thread is writing 56: * to pipe and throws an IOException if that thread susequently 57: * dies. This behaviour seems dubious and unreliable - we don't 58: * implement it. 59: * 60: * @author Aaron M. Renn (arenn@urbanophile.com) 61: */ 62: public class PipedInputStream extends InputStream 63: { 64: /** PipedOutputStream to which this is connected. Null only if this 65: * InputStream hasn't been connected yet. */ 66: PipedOutputStream source; 67: 68: /** Set to true if close() has been called on this InputStream. */ 69: boolean closed; 70: 71: 72: /** 73: * The size of the internal buffer used for input/output. 74: */ 75: /* The "Constant Field Values" Javadoc of the Sun J2SE 1.4 76: * specifies 1024. 77: */ 78: protected static final int PIPE_SIZE = 1024; 79: 80: 81: /** 82: * This is the internal circular buffer used for storing bytes written 83: * to the pipe and from which bytes are read by this stream 84: */ 85: protected byte[] buffer = null; 86: 87: /** 88: * The index into buffer where the next byte from the connected 89: * <code>PipedOutputStream</code> will be written. If this variable is 90: * equal to <code>out</code>, then the buffer is full. If set to < 0, 91: * the buffer is empty. 92: */ 93: protected int in = -1; 94: 95: /** 96: * This index into the buffer where bytes will be read from. 97: */ 98: protected int out = 0; 99: 100: /** Buffer used to implement single-argument read/receive */ 101: private byte[] read_buf = new byte[1]; 102: 103: /** 104: * Creates a new <code>PipedInputStream</code> that is not connected to a 105: * <code>PipedOutputStream</code>. It must be connected before bytes can 106: * be read from this stream. 107: */ 108: public PipedInputStream() 109: { 110: this(PIPE_SIZE); 111: } 112: 113: /** 114: * Creates a new <code>PipedInputStream</code> of the given size that is not 115: * connected to a <code>PipedOutputStream</code>. 116: * It must be connected before bytes can be read from this stream. 117: * 118: * @since 1.6 119: * @since IllegalArgumentException If pipeSize <= 0. 120: */ 121: public PipedInputStream(int pipeSize) throws IllegalArgumentException 122: { 123: if (pipeSize <= 0) 124: throw new IllegalArgumentException("pipeSize must be > 0"); 125: 126: this.buffer = new byte[pipeSize]; 127: } 128: 129: /** 130: * This constructor creates a new <code>PipedInputStream</code> and connects 131: * it to the passed in <code>PipedOutputStream</code>. The stream is then 132: * ready for reading. 133: * 134: * @param source The <code>PipedOutputStream</code> to connect this 135: * stream to 136: * 137: * @exception IOException If <code>source</code> is already connected. 138: */ 139: public PipedInputStream(PipedOutputStream source) throws IOException 140: { 141: this(); 142: connect(source); 143: } 144: 145: /** 146: * This constructor creates a new <code>PipedInputStream</code> of the given 147: * size and connects it to the passed in <code>PipedOutputStream</code>. 148: * The stream is then ready for reading. 149: * 150: * @param source The <code>PipedOutputStream</code> to connect this 151: * stream to 152: * 153: * @since 1.6 154: * @exception IOException If <code>source</code> is already connected. 155: */ 156: public PipedInputStream(PipedOutputStream source, int pipeSize) 157: throws IOException 158: { 159: this(pipeSize); 160: connect(source); 161: } 162: 163: /** 164: * This method connects this stream to the passed in 165: * <code>PipedOutputStream</code>. 166: * This stream is then ready for reading. If this stream is already 167: * connected or has been previously closed, then an exception is thrown 168: * 169: * @param source The <code>PipedOutputStream</code> to connect this stream to 170: * 171: * @exception IOException If this PipedInputStream or <code>source</code> 172: * has been connected already. 173: */ 174: public void connect(PipedOutputStream source) throws IOException 175: { 176: // The JDK (1.3) does not appear to check for a previously closed 177: // connection here. 178: 179: if (this.source != null || source.sink != null) 180: throw new IOException ("Already connected"); 181: 182: source.sink = this; 183: this.source = source; 184: } 185: 186: /** 187: * This method receives a byte of input from the source PipedOutputStream. 188: * If the internal circular buffer is full, this method blocks. 189: * 190: * @param val The byte to write to this stream 191: * 192: * @exception IOException if error occurs 193: * @specnote Weird. This method must be some sort of accident. 194: */ 195: protected synchronized void receive(int val) throws IOException 196: { 197: read_buf[0] = (byte) (val & 0xff); 198: receive (read_buf, 0, 1); 199: } 200: 201: /** 202: * This method is used by the connected <code>PipedOutputStream</code> to 203: * write bytes into the buffer. 204: * 205: * @param buf The array containing bytes to write to this stream 206: * @param offset The offset into the array to start writing from 207: * @param len The number of bytes to write. 208: * 209: * @exception IOException If an error occurs 210: * @specnote This code should be in PipedOutputStream.write, but we 211: * put it here in order to support that bizarre recieve(int) 212: * method. 213: */ 214: synchronized void receive(byte[] buf, int offset, int len) 215: throws IOException 216: { 217: if (closed) 218: throw new IOException ("Pipe closed"); 219: 220: int bufpos = offset; 221: int copylen; 222: 223: while (len > 0) 224: { 225: try 226: { 227: while (in == out) 228: { 229: // The pipe is full. Wake up any readers and wait for them. 230: notifyAll(); 231: wait(); 232: // The pipe could have been closed while we were waiting. 233: if (closed) 234: throw new IOException ("Pipe closed"); 235: } 236: } 237: catch (InterruptedException ix) 238: { 239: throw new InterruptedIOException (); 240: } 241: 242: if (in < 0) // The pipe is empty. 243: in = 0; 244: 245: // Figure out how many bytes from buf can be copied without 246: // overrunning out or going past the length of the buffer. 247: if (in < out) 248: copylen = Math.min (len, out - in); 249: else 250: copylen = Math.min (len, buffer.length - in); 251: 252: // Copy bytes until the pipe is filled, wrapping if necessary. 253: System.arraycopy(buf, bufpos, buffer, in, copylen); 254: len -= copylen; 255: bufpos += copylen; 256: in += copylen; 257: if (in == buffer.length) 258: in = 0; 259: } 260: // Notify readers that new data is in the pipe. 261: notifyAll(); 262: } 263: 264: /** 265: * This method reads one byte from the stream. 266: * -1 is returned to indicated that no bytes can be read 267: * because the end of the stream was reached. If the stream is already 268: * closed, a -1 will again be returned to indicate the end of the stream. 269: * 270: * <p>This method will block if no byte is available to be read.</p> 271: * 272: * @return the value of the read byte value, or -1 of the end of the stream 273: * was reached 274: * 275: * @throws IOException if an error occured 276: */ 277: public int read() throws IOException 278: { 279: // Method operates by calling the multibyte overloaded read method 280: // Note that read_buf is an internal instance variable. I allocate it 281: // there to avoid constant reallocation overhead for applications that 282: // call this method in a loop at the cost of some unneeded overhead 283: // if this method is never called. 284: 285: int r = read(read_buf, 0, 1); 286: return r != -1 ? (read_buf[0] & 0xff) : -1; 287: } 288: 289: /** 290: * This method reads bytes from the stream into a caller supplied buffer. 291: * It starts storing bytes at position <code>offset</code> into the 292: * buffer and 293: * reads a maximum of <code>len</code> bytes. Note that this method 294: * can actually 295: * read fewer than <code>len</code> bytes. The actual number of bytes 296: * read is 297: * returned. A -1 is returned to indicated that no bytes can be read 298: * because the end of the stream was reached - ie close() was called on the 299: * connected PipedOutputStream. 300: * <p> 301: * This method will block if no bytes are available to be read. 302: * 303: * @param buf The buffer into which bytes will be stored 304: * @param offset The index into the buffer at which to start writing. 305: * @param len The maximum number of bytes to read. 306: * 307: * @exception IOException If <code>close()</code> was called on this Piped 308: * InputStream. 309: */ 310: public synchronized int read(byte[] buf, int offset, int len) 311: throws IOException 312: { 313: if (source == null) 314: throw new IOException ("Not connected"); 315: if (closed) 316: throw new IOException ("Pipe closed"); 317: 318: // Don't block if nothing was requested. 319: if (len == 0) 320: return 0; 321: 322: // If the buffer is empty, wait until there is something in the pipe 323: // to read. 324: try 325: { 326: while (in < 0) 327: { 328: if (source.closed) 329: return -1; 330: wait(); 331: } 332: } 333: catch (InterruptedException ix) 334: { 335: throw new InterruptedIOException(); 336: } 337: 338: int total = 0; 339: int copylen; 340: 341: while (true) 342: { 343: // Figure out how many bytes from the pipe can be copied without 344: // overrunning in or going past the length of buf. 345: if (out < in) 346: copylen = Math.min (len, in - out); 347: else 348: copylen = Math.min (len, buffer.length - out); 349: 350: System.arraycopy (buffer, out, buf, offset, copylen); 351: offset += copylen; 352: len -= copylen; 353: out += copylen; 354: total += copylen; 355: 356: if (out == buffer.length) 357: out = 0; 358: 359: if (out == in) 360: { 361: // Pipe is now empty. 362: in = -1; 363: out = 0; 364: } 365: 366: // If output buffer is filled or the pipe is empty, we're done. 367: if (len == 0 || in == -1) 368: { 369: // Notify any waiting outputstream that there is now space 370: // to write. 371: notifyAll(); 372: return total; 373: } 374: } 375: } 376: 377: /** 378: * This method returns the number of bytes that can be read from this stream 379: * before blocking could occur. This is the number of bytes that are 380: * currently unread in the internal circular buffer. Note that once this 381: * many additional bytes are read, the stream may block on a subsequent 382: * read, but it not guaranteed to block. 383: * 384: * @return The number of bytes that can be read before blocking might occur 385: * 386: * @exception IOException If an error occurs 387: */ 388: public synchronized int available() throws IOException 389: { 390: // The JDK 1.3 implementation does not appear to check for the closed or 391: // unconnected stream conditions here. 392: 393: if (in < 0) 394: return 0; 395: else if (out < in) 396: return in - out; 397: else 398: return (buffer.length - out) + in; 399: } 400: 401: /** 402: * This methods closes the stream so that no more data can be read 403: * from it. 404: * 405: * @exception IOException If an error occurs 406: */ 407: public synchronized void close() throws IOException 408: { 409: closed = true; 410: // Wake any thread which may be in receive() waiting to write data. 411: notifyAll(); 412: } 413: }