1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.polling;
21
22 import java.net.SocketAddress;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.HashSet;
26 import java.util.Iterator;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Queue;
30 import java.util.Set;
31 import java.util.concurrent.ConcurrentLinkedQueue;
32 import java.util.concurrent.Executor;
33
34 import org.apache.mina.core.RuntimeIoException;
35 import org.apache.mina.core.buffer.IoBuffer;
36 import org.apache.mina.core.future.IoFuture;
37 import org.apache.mina.core.service.AbstractIoAcceptor;
38 import org.apache.mina.core.service.IoAcceptor;
39 import org.apache.mina.core.service.IoProcessor;
40 import org.apache.mina.core.session.AbstractIoSession;
41 import org.apache.mina.core.session.ExpiringSessionRecycler;
42 import org.apache.mina.core.session.IoSession;
43 import org.apache.mina.core.session.IoSessionConfig;
44 import org.apache.mina.core.session.IoSessionRecycler;
45 import org.apache.mina.core.write.WriteRequest;
46 import org.apache.mina.core.write.WriteRequestQueue;
47 import org.apache.mina.util.ExceptionMonitor;
48
49
50
51
52
53
54
55
56
57 public abstract class AbstractPollingConnectionlessIoAcceptor<T extends AbstractIoSession, H>
58 extends AbstractIoAcceptor {
59
60 private static final IoSessionRecycler DEFAULT_RECYCLER = new ExpiringSessionRecycler();
61
62 private final Object lock = new Object();
63 private final IoProcessor<T> processor = new ConnectionlessAcceptorProcessor();
64 private final Queue<AcceptorOperationFuture> registerQueue =
65 new ConcurrentLinkedQueue<AcceptorOperationFuture>();
66 private final Queue<AcceptorOperationFuture> cancelQueue =
67 new ConcurrentLinkedQueue<AcceptorOperationFuture>();
68 private final Queue<T> flushingSessions = new ConcurrentLinkedQueue<T>();
69 private final Map<SocketAddress, H> boundHandles =
70 Collections.synchronizedMap(new HashMap<SocketAddress, H>());
71
72 private IoSessionRecycler sessionRecycler = DEFAULT_RECYCLER;
73
74 private final ServiceOperationFuture disposalFuture =
75 new ServiceOperationFuture();
76 private volatile boolean selectable;
77
78
79 private Acceptor acceptor;
80
81 private long lastIdleCheckTime;
82
83
84
85
86 protected AbstractPollingConnectionlessIoAcceptor(IoSessionConfig sessionConfig) {
87 this(sessionConfig, null);
88 }
89
90
91
92
93 protected AbstractPollingConnectionlessIoAcceptor(IoSessionConfig sessionConfig, Executor executor) {
94 super(sessionConfig, executor);
95
96 try {
97 init();
98 selectable = true;
99 } catch (RuntimeException e){
100 throw e;
101 } catch (Exception e) {
102 throw new RuntimeIoException("Failed to initialize.", e);
103 } finally {
104 if (!selectable) {
105 try {
106 destroy();
107 } catch (Exception e) {
108 ExceptionMonitor.getInstance().exceptionCaught(e);
109 }
110 }
111 }
112 }
113
114 protected abstract void init() throws Exception;
115 protected abstract void destroy() throws Exception;
116 protected abstract int select() throws Exception;
117 protected abstract int select(int timeout) throws Exception;
118 protected abstract void wakeup();
119 protected abstract Iterator<H> selectedHandles();
120 protected abstract H open(SocketAddress localAddress) throws Exception;
121 protected abstract void close(H handle) throws Exception;
122 protected abstract SocketAddress localAddress(H handle) throws Exception;
123 protected abstract boolean isReadable(H handle);
124 protected abstract boolean isWritable(H handle);
125 protected abstract SocketAddress receive(H handle, IoBuffer buffer) throws Exception;
126 protected abstract int send(T session, IoBuffer buffer, SocketAddress remoteAddress) throws Exception;
127 protected abstract T newSession(IoProcessor<T> processor, H handle, SocketAddress remoteAddress) throws Exception;
128 protected abstract void setInterestedInWrite(T session, boolean interested) throws Exception;
129
130
131
132
133 @Override
134 protected IoFuture dispose0() throws Exception {
135 unbind();
136 if (!disposalFuture.isDone()) {
137 startupAcceptor();
138 wakeup();
139 }
140 return disposalFuture;
141 }
142
143
144
145
146 @Override
147 protected final Set<SocketAddress> bindInternal(
148 List<? extends SocketAddress> localAddresses) throws Exception {
149
150
151 AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
152
153
154
155 registerQueue.add(request);
156
157
158
159 startupAcceptor();
160
161
162
163
164 wakeup();
165
166
167 request.awaitUninterruptibly();
168
169 if (request.getException() != null) {
170 throw request.getException();
171 }
172
173
174
175
176 Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
177
178 for (H handle:boundHandles.values()) {
179 newLocalAddresses.add(localAddress(handle));
180 }
181
182 return newLocalAddresses;
183 }
184
185
186
187
188 @Override
189 protected final void unbind0(
190 List<? extends SocketAddress> localAddresses) throws Exception {
191 AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
192
193 cancelQueue.add(request);
194 startupAcceptor();
195 wakeup();
196
197 request.awaitUninterruptibly();
198
199 if (request.getException() != null) {
200 throw request.getException();
201 }
202 }
203
204
205
206
207 public final IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) {
208 if (isDisposing()) {
209 throw new IllegalStateException("Already disposed.");
210 }
211
212 if (remoteAddress == null) {
213 throw new NullPointerException("remoteAddress");
214 }
215
216 synchronized (bindLock) {
217 if (!isActive()) {
218 throw new IllegalStateException(
219 "Can't create a session from a unbound service.");
220 }
221
222 try {
223 return newSessionWithoutLock(remoteAddress, localAddress);
224 } catch (RuntimeException e) {
225 throw e;
226 } catch (Error e) {
227 throw e;
228 } catch (Exception e) {
229 throw new RuntimeIoException("Failed to create a session.", e);
230 }
231 }
232 }
233
234 private IoSession newSessionWithoutLock(
235 SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
236 H handle = boundHandles.get(localAddress);
237 if (handle == null) {
238 throw new IllegalArgumentException("Unknown local address: " + localAddress);
239 }
240
241 IoSession session;
242 IoSessionRecycler sessionRecycler = getSessionRecycler();
243 synchronized (sessionRecycler) {
244 session = sessionRecycler.recycle(localAddress, remoteAddress);
245 if (session != null) {
246 return session;
247 }
248
249
250 T newSession = newSession(processor, handle, remoteAddress);
251 getSessionRecycler().put(newSession);
252 session = newSession;
253 }
254
255 initSession(session, null, null);
256
257 try {
258 this.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
259 getListeners().fireSessionCreated(session);
260 } catch (Throwable t) {
261 ExceptionMonitor.getInstance().exceptionCaught(t);
262 }
263
264 return session;
265 }
266
267 public final IoSessionRecycler getSessionRecycler() {
268 return sessionRecycler;
269 }
270
271 public final void setSessionRecycler(IoSessionRecycler sessionRecycler) {
272 synchronized (bindLock) {
273 if (isActive()) {
274 throw new IllegalStateException(
275 "sessionRecycler can't be set while the acceptor is bound.");
276 }
277
278 if (sessionRecycler == null) {
279 sessionRecycler = DEFAULT_RECYCLER;
280 }
281 this.sessionRecycler = sessionRecycler;
282 }
283 }
284
285 private class ConnectionlessAcceptorProcessor implements IoProcessor<T> {
286
287 public void add(T session) {
288 }
289
290 public void flush(T session) {
291 if (scheduleFlush(session)) {
292 wakeup();
293 }
294 }
295
296 public void remove(T session) {
297 getSessionRecycler().remove(session);
298 getListeners().fireSessionDestroyed(session);
299 }
300
301 public void updateTrafficControl(T session) {
302 throw new UnsupportedOperationException();
303 }
304
305 public void dispose() {
306 }
307
308 public boolean isDisposed() {
309 return false;
310 }
311
312 public boolean isDisposing() {
313 return false;
314 }
315 }
316
317
318
319
320 private void startupAcceptor() {
321 if (!selectable) {
322 registerQueue.clear();
323 cancelQueue.clear();
324 flushingSessions.clear();
325 }
326
327 synchronized (lock) {
328 if (acceptor == null) {
329 acceptor = new Acceptor();
330 executeWorker(acceptor);
331 }
332 }
333 }
334
335 private boolean scheduleFlush(T session) {
336 if (session.setScheduledForFlush(true)) {
337 flushingSessions.add(session);
338 return true;
339 } else {
340 return false;
341 }
342 }
343
344
345
346
347
348
349 private class Acceptor implements Runnable {
350 public void run() {
351 int nHandles = 0;
352 lastIdleCheckTime = System.currentTimeMillis();
353
354 while (selectable) {
355 try {
356 int selected = select();
357
358 nHandles += registerHandles();
359
360 if (selected > 0) {
361 processReadySessions(selectedHandles());
362 }
363
364 long currentTime = System.currentTimeMillis();
365 flushSessions(currentTime);
366 nHandles -= unregisterHandles();
367
368 notifyIdleSessions(currentTime);
369
370 if (nHandles == 0) {
371 synchronized (lock) {
372 if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
373 acceptor = null;
374 break;
375 }
376 }
377 }
378 } catch (Exception e) {
379 ExceptionMonitor.getInstance().exceptionCaught(e);
380
381 try {
382 Thread.sleep(1000);
383 } catch (InterruptedException e1) {
384 }
385 }
386 }
387
388 if (selectable && isDisposing()) {
389 selectable = false;
390 try {
391 destroy();
392 } catch (Exception e) {
393 ExceptionMonitor.getInstance().exceptionCaught(e);
394 } finally {
395 disposalFuture.setValue(true);
396 }
397 }
398 }
399 }
400
401 @SuppressWarnings("unchecked")
402 private void processReadySessions(Iterator<H> handles) {
403 while (handles.hasNext()) {
404 H h = handles.next();
405 handles.remove();
406 try {
407 if (isReadable(h)) {
408 readHandle(h);
409 }
410
411 if (isWritable(h)) {
412 for (IoSession session : getManagedSessions().values()) {
413 scheduleFlush((T) session);
414 }
415 }
416 } catch (Throwable t) {
417 ExceptionMonitor.getInstance().exceptionCaught(t);
418 }
419 }
420 }
421
422 private void readHandle(H handle) throws Exception {
423 IoBuffer readBuf = IoBuffer.allocate(
424 getSessionConfig().getReadBufferSize());
425
426 SocketAddress remoteAddress = receive(handle, readBuf);
427 if (remoteAddress != null) {
428 IoSession session = newSessionWithoutLock(
429 remoteAddress, localAddress(handle));
430
431 readBuf.flip();
432
433 IoBuffer newBuf = IoBuffer.allocate(readBuf.limit());
434 newBuf.put(readBuf);
435 newBuf.flip();
436
437 session.getFilterChain().fireMessageReceived(newBuf);
438 }
439 }
440
441 private void flushSessions(long currentTime) {
442 for (; ;) {
443 T session = flushingSessions.poll();
444 if (session == null) {
445 break;
446 }
447
448 session.setScheduledForFlush(false);
449
450 try {
451 boolean flushedAll = flush(session, currentTime);
452 if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) &&
453 !session.isScheduledForFlush()) {
454 scheduleFlush(session);
455 }
456 } catch (Exception e) {
457 session.getFilterChain().fireExceptionCaught(e);
458 }
459 }
460 }
461
462 private boolean flush(T session, long currentTime) throws Exception {
463
464 setInterestedInWrite(session, false);
465
466 final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
467 final int maxWrittenBytes =
468 session.getConfig().getMaxReadBufferSize() +
469 (session.getConfig().getMaxReadBufferSize() >>> 1);
470
471 int writtenBytes = 0;
472 try {
473 for (; ;) {
474 WriteRequest req = session.getCurrentWriteRequest();
475 if (req == null) {
476 req = writeRequestQueue.poll(session);
477 if (req == null) {
478 break;
479 }
480 session.setCurrentWriteRequest(req);
481 }
482
483 IoBuffer buf = (IoBuffer) req.getMessage();
484 if (buf.remaining() == 0) {
485
486 session.setCurrentWriteRequest(null);
487 buf.reset();
488 session.getFilterChain().fireMessageSent(req);
489 continue;
490 }
491
492 SocketAddress destination = req.getDestination();
493 if (destination == null) {
494 destination = session.getRemoteAddress();
495 }
496
497 int localWrittenBytes = send(session, buf, destination);
498 if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
499
500 setInterestedInWrite(session, true);
501 return false;
502 } else {
503 setInterestedInWrite(session, false);
504
505
506 session.setCurrentWriteRequest(null);
507 writtenBytes += localWrittenBytes;
508 buf.reset();
509 session.getFilterChain().fireMessageSent(req);
510 }
511 }
512 } finally {
513 session.increaseWrittenBytes(writtenBytes, currentTime);
514 }
515
516 return true;
517 }
518
519 private int registerHandles() {
520 for (;;) {
521 AcceptorOperationFuture req = registerQueue.poll();
522 if (req == null) {
523 break;
524 }
525
526 Map<SocketAddress, H> newHandles = new HashMap<SocketAddress, H>();
527 List<SocketAddress> localAddresses = req.getLocalAddresses();
528 try {
529 for (SocketAddress a: localAddresses) {
530 H handle = open(a);
531 newHandles.put(localAddress(handle), handle);
532 }
533 boundHandles.putAll(newHandles);
534
535 getListeners().fireServiceActivated();
536 req.setDone();
537 return newHandles.size();
538 } catch (Exception e) {
539 req.setException(e);
540 } finally {
541
542 if (req.getException() != null) {
543 for (H handle: newHandles.values()) {
544 try {
545 close(handle);
546 } catch (Exception e) {
547 ExceptionMonitor.getInstance().exceptionCaught(e);
548 }
549 }
550 wakeup();
551 }
552 }
553 }
554
555 return 0;
556 }
557
558 private int unregisterHandles() {
559 int nHandles = 0;
560 for (;;) {
561 AcceptorOperationFuture request = cancelQueue.poll();
562 if (request == null) {
563 break;
564 }
565
566
567 for (SocketAddress a: request.getLocalAddresses()) {
568 H handle = boundHandles.remove(a);
569 if (handle == null) {
570 continue;
571 }
572
573 try {
574 close(handle);
575 wakeup();
576 } catch (Throwable e) {
577 ExceptionMonitor.getInstance().exceptionCaught(e);
578 } finally {
579 nHandles ++;
580 }
581 }
582
583 request.setDone();
584 }
585
586 return nHandles;
587 }
588
589 private void notifyIdleSessions(long currentTime) {
590
591 if (currentTime - lastIdleCheckTime >= 1000) {
592 lastIdleCheckTime = currentTime;
593 AbstractIoSession.notifyIdleness(
594 getListeners().getManagedSessions().values().iterator(),
595 currentTime);
596 }
597 }
598 }