1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.serial;
21
22 import gnu.io.SerialPort;
23 import gnu.io.SerialPortEvent;
24 import gnu.io.SerialPortEventListener;
25 import gnu.io.UnsupportedCommOperationException;
26
27 import java.io.IOException;
28 import java.io.InputStream;
29 import java.io.OutputStream;
30 import java.util.TooManyListenersException;
31
32 import org.apache.mina.core.session.AbstractIoSession;
33 import org.apache.mina.core.filterchain.DefaultIoFilterChain;
34 import org.apache.mina.core.service.DefaultTransportMetadata;
35 import org.apache.mina.util.ExceptionMonitor;
36 import org.apache.mina.core.buffer.IoBuffer;
37 import org.apache.mina.core.filterchain.IoFilterChain;
38 import org.apache.mina.core.service.IoHandler;
39 import org.apache.mina.core.service.IoProcessor;
40 import org.apache.mina.core.service.IoService;
41 import org.apache.mina.core.service.IoServiceListenerSupport;
42 import org.apache.mina.core.service.TransportMetadata;
43 import org.apache.mina.core.write.WriteRequest;
44
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48
49
50
51
52
53 class SerialSessionImpl extends AbstractIoSession implements
54 SerialSession, SerialPortEventListener {
55
56 static final TransportMetadata METADATA =
57 new DefaultTransportMetadata(
58 "rxtx", "serial", false, true, SerialAddress.class,
59 SerialSessionConfig.class, IoBuffer.class);
60
61 private final SerialSessionConfig config = new DefaultSerialSessionConfig();
62 private final IoProcessor<SerialSessionImpl> processor = new SerialIoProcessor();
63 private final IoHandler ioHandler;
64 private final IoFilterChain filterChain;
65 private final SerialConnector service;
66 private final IoServiceListenerSupport serviceListeners;
67 private final SerialAddress address;
68 private final SerialPort port;
69 private final Logger log;
70
71 private InputStream inputStream;
72 private OutputStream outputStream;
73
74 SerialSessionImpl(
75 SerialConnector service, IoServiceListenerSupport serviceListeners,
76 SerialAddress address, SerialPort port) {
77 this.service = service;
78 this.serviceListeners = serviceListeners;
79 ioHandler = service.getHandler();
80 filterChain = new DefaultIoFilterChain(this);
81 this.port = port;
82 this.address = address;
83
84 log = LoggerFactory.getLogger(SerialSessionImpl.class);
85 }
86
87 public SerialSessionConfig getConfig() {
88 return config;
89 }
90
91 public IoFilterChain getFilterChain() {
92 return filterChain;
93 }
94
95 public IoHandler getHandler() {
96 return ioHandler;
97 }
98
99 public TransportMetadata getTransportMetadata() {
100 return METADATA;
101 }
102
103 public SerialAddress getLocalAddress() {
104 return null;
105 }
106
107 public SerialAddress getRemoteAddress() {
108 return address;
109 }
110
111 @Override
112 public SerialAddress getServiceAddress() {
113 return (SerialAddress) super.getServiceAddress();
114 }
115
116 public IoService getService() {
117 return service;
118 }
119
120 public void setDTR(boolean dtr) {
121 port.setDTR(dtr);
122 }
123
124 public boolean isDTR() {
125 return port.isDTR();
126 }
127
128 public void setRTS(boolean rts) {
129 port.setRTS(rts);
130 }
131
132 public boolean isRTS() {
133 return port.isRTS();
134 }
135
136
137
138
139
140
141
142 void start() throws IOException, TooManyListenersException {
143 inputStream = port.getInputStream();
144 outputStream = port.getOutputStream();
145 ReadWorker w = new ReadWorker();
146 w.start();
147 port.addEventListener(this);
148 service.getIdleStatusChecker0().addSession(this);
149 try {
150 getService().getFilterChainBuilder().buildFilterChain(getFilterChain());
151 serviceListeners.fireSessionCreated(this);
152 } catch (Throwable e) {
153 getFilterChain().fireExceptionCaught(e);
154 processor.remove(this);
155 }
156 }
157
158 private final Object writeMonitor = new Object();
159 private WriteWorker writeWorker;
160
161 private class WriteWorker extends Thread {
162 @Override
163 public void run() {
164 while (isConnected() && !isClosing()) {
165 flushWrites();
166
167
168 synchronized (writeMonitor) {
169 try {
170 writeMonitor.wait();
171 } catch (InterruptedException e) {
172 log.error("InterruptedException", e);
173 }
174 }
175 }
176 }
177 }
178
179 private void flushWrites() {
180 for (; ;) {
181 WriteRequest req = getCurrentWriteRequest();
182 if (req == null) {
183 req = getWriteRequestQueue().poll(this);
184 if (req == null) {
185 break;
186 }
187 }
188
189 IoBuffer buf = (IoBuffer) req.getMessage();
190 if (buf.remaining() == 0) {
191 setCurrentWriteRequest(null);
192 buf.reset();
193 this.getFilterChain().fireMessageSent(req);
194 continue;
195 }
196
197 int writtenBytes = buf.remaining();
198 try {
199 outputStream.write(buf.array(), buf.position(), writtenBytes);
200 buf.position(buf.position() + writtenBytes);
201
202
203 increaseWrittenBytes(writtenBytes, System.currentTimeMillis());
204
205 setCurrentWriteRequest(null);
206 buf.reset();
207
208
209 getFilterChain().fireMessageSent(req);
210 } catch (IOException e) {
211 this.getFilterChain().fireExceptionCaught(e);
212 }
213
214 }
215 }
216
217 private final Object readReadyMonitor = new Object();
218
219 private class ReadWorker extends Thread {
220 @Override
221 public void run() {
222 while (isConnected() && !isClosing()) {
223 synchronized (readReadyMonitor) {
224 try {
225 readReadyMonitor.wait();
226 } catch (InterruptedException e) {
227 log.error("InterruptedException", e);
228 }
229 if (isClosing() || !isConnected()) {
230 break;
231 }
232 int dataSize;
233 try {
234 dataSize = inputStream.available();
235 byte[] data = new byte[dataSize];
236 int readBytes = inputStream.read(data);
237
238 if (readBytes > 0) {
239 IoBuffer buf = IoBuffer
240 .wrap(data, 0, readBytes);
241 buf.put(data, 0, readBytes);
242 buf.flip();
243 getFilterChain().fireMessageReceived(
244 buf);
245 }
246 } catch (IOException e) {
247 getFilterChain().fireExceptionCaught(
248 e);
249 }
250 }
251 }
252 }
253 }
254
255 public void serialEvent(SerialPortEvent evt) {
256 if (evt.getEventType() == SerialPortEvent.DATA_AVAILABLE) {
257 synchronized (readReadyMonitor) {
258 readReadyMonitor.notifyAll();
259 }
260 }
261 }
262
263 @Override
264 public IoProcessor<SerialSessionImpl> getProcessor() {
265 return processor;
266 }
267
268 private class SerialIoProcessor implements IoProcessor<SerialSessionImpl> {
269 public void add(SerialSessionImpl session) {
270
271 }
272
273 public void flush(SerialSessionImpl session) {
274 if (writeWorker == null) {
275 writeWorker = new WriteWorker();
276 writeWorker.start();
277 } else {
278 synchronized (writeMonitor) {
279 writeMonitor.notifyAll();
280 }
281 }
282 }
283
284 public void remove(SerialSessionImpl session) {
285 try {
286 inputStream.close();
287 } catch (IOException e) {
288 ExceptionMonitor.getInstance().exceptionCaught(e);
289 }
290 try {
291 outputStream.close();
292 } catch (IOException e) {
293 ExceptionMonitor.getInstance().exceptionCaught(e);
294 }
295
296 try {
297 port.setFlowControlMode(SerialPort.FLOWCONTROL_NONE);
298 } catch (UnsupportedCommOperationException e) {
299 ExceptionMonitor.getInstance().exceptionCaught(e);
300 }
301
302 port.close();
303 flush(session);
304 synchronized (readReadyMonitor) {
305 readReadyMonitor.notifyAll();
306 }
307
308 serviceListeners.fireSessionDestroyed(SerialSessionImpl.this);
309 }
310
311 public void updateTrafficControl(SerialSessionImpl session) {
312 throw new UnsupportedOperationException();
313 }
314
315 public void dispose() {
316
317 }
318
319 public boolean isDisposed() {
320 return false;
321 }
322
323 public boolean isDisposing() {
324 return false;
325 }
326 }
327 }