1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.handler.support;
21
22 import java.io.IOException;
23 import java.io.InputStream;
24
25 import org.apache.mina.common.ByteBuffer;
26 import org.apache.mina.common.IoHandler;
27 import org.apache.mina.common.IoSession;
28
29
30
31
32
33
34
35
36 public class IoSessionInputStream extends InputStream {
37 private final Object mutex = new Object();
38
39 private final ByteBuffer buf;
40
41 private volatile boolean closed;
42
43 private volatile boolean released;
44
45 private IOException exception;
46
47 public IoSessionInputStream() {
48 buf = ByteBuffer.allocate(16);
49 buf.setAutoExpand(true);
50 buf.limit(0);
51 }
52
53 public int available() {
54 if (released) {
55 return 0;
56 } else {
57 synchronized (mutex) {
58 return buf.remaining();
59 }
60 }
61 }
62
63 public void close() {
64 if (closed) {
65 return;
66 }
67
68 synchronized (mutex) {
69 closed = true;
70 releaseBuffer();
71
72 mutex.notifyAll();
73 }
74 }
75
76 public int read() throws IOException {
77 synchronized (mutex) {
78 if (!waitForData()) {
79 return -1;
80 }
81
82 return buf.get() & 0xff;
83 }
84 }
85
86 public int read(byte[] b, int off, int len) throws IOException {
87 synchronized (mutex) {
88 if (!waitForData()) {
89 return -1;
90 }
91
92 int readBytes;
93
94 if (len > buf.remaining()) {
95 readBytes = buf.remaining();
96 } else {
97 readBytes = len;
98 }
99
100 buf.get(b, off, readBytes);
101
102 return readBytes;
103 }
104 }
105
106 private boolean waitForData() throws IOException {
107 if (released) {
108 return false;
109 }
110
111 synchronized (mutex) {
112 while (!released && buf.remaining() == 0 && exception == null) {
113 try {
114 mutex.wait();
115 } catch (InterruptedException e) {
116 IOException ioe = new IOException(
117 "Interrupted while waiting for more data");
118 ioe.initCause(e);
119 throw ioe;
120 }
121 }
122 }
123
124 if (exception != null) {
125 releaseBuffer();
126 throw exception;
127 }
128
129 if (closed && buf.remaining() == 0) {
130 releaseBuffer();
131
132 return false;
133 }
134
135 return true;
136 }
137
138 private void releaseBuffer() {
139 if (released) {
140 return;
141 }
142
143 released = true;
144 buf.release();
145 }
146
147 public void write(ByteBuffer src) {
148 synchronized (mutex) {
149 if (closed) {
150 return;
151 }
152
153 if (buf.hasRemaining()) {
154 this.buf.compact();
155 this.buf.put(src);
156 this.buf.flip();
157 } else {
158 this.buf.clear();
159 this.buf.put(src);
160 this.buf.flip();
161 mutex.notifyAll();
162 }
163 }
164 }
165
166 public void throwException(IOException e) {
167 synchronized (mutex) {
168 if (exception == null) {
169 exception = e;
170
171 mutex.notifyAll();
172 }
173 }
174 }
175 }