1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.nio.support;
21
22 import java.net.SocketAddress;
23 import java.net.SocketException;
24 import java.nio.channels.DatagramChannel;
25 import java.nio.channels.SelectionKey;
26 import java.util.Queue;
27 import java.util.concurrent.ConcurrentLinkedQueue;
28
29 import org.apache.mina.common.BroadcastIoSession;
30 import org.apache.mina.common.ByteBuffer;
31 import org.apache.mina.common.IoFilter.WriteRequest;
32 import org.apache.mina.common.IoFilterChain;
33 import org.apache.mina.common.IoHandler;
34 import org.apache.mina.common.IoService;
35 import org.apache.mina.common.IoServiceConfig;
36 import org.apache.mina.common.IoSession;
37 import org.apache.mina.common.IoSessionConfig;
38 import org.apache.mina.common.RuntimeIOException;
39 import org.apache.mina.common.TransportType;
40 import org.apache.mina.common.WriteFuture;
41 import org.apache.mina.common.support.BaseIoSession;
42 import org.apache.mina.transport.socket.nio.DatagramServiceConfig;
43 import org.apache.mina.transport.socket.nio.DatagramSessionConfig;
44
45
46
47
48
49
50
51 class DatagramSessionImpl extends BaseIoSession implements BroadcastIoSession {
52 private final IoService wrapperManager;
53
54 private final IoServiceConfig serviceConfig;
55
56 private final DatagramSessionConfig config = new SessionConfigImpl();
57
58 private final DatagramService managerDelegate;
59
60 private final DatagramFilterChain filterChain;
61
62 private final DatagramChannel ch;
63
64 private final Queue<WriteRequest> writeRequestQueue;
65
66 private final IoHandler handler;
67
68 private final SocketAddress localAddress;
69
70 private final SocketAddress serviceAddress;
71
72 private SocketAddress remoteAddress;
73
74 private SelectionKey key;
75
76 private int readBufferSize;
77
78
79
80
81 DatagramSessionImpl(IoService wrapperManager,
82 DatagramService managerDelegate, IoServiceConfig serviceConfig,
83 DatagramChannel ch, IoHandler defaultHandler,
84 SocketAddress serviceAddress, SocketAddress localAddress) {
85 this.wrapperManager = wrapperManager;
86 this.managerDelegate = managerDelegate;
87 this.filterChain = new DatagramFilterChain(this);
88 this.ch = ch;
89 this.writeRequestQueue = new ConcurrentLinkedQueue<WriteRequest>();
90 this.handler = defaultHandler;
91 this.remoteAddress = ch.socket().getRemoteSocketAddress();
92
93 this.serviceAddress = serviceAddress;
94 this.localAddress = localAddress;
95 this.serviceConfig = serviceConfig;
96
97
98 IoSessionConfig sessionConfig = serviceConfig.getSessionConfig();
99 if (sessionConfig instanceof DatagramSessionConfig) {
100 DatagramSessionConfig cfg = (DatagramSessionConfig) sessionConfig;
101 this.config.setBroadcast(cfg.isBroadcast());
102 this.config.setReceiveBufferSize(cfg.getReceiveBufferSize());
103 this.config.setReuseAddress(cfg.isReuseAddress());
104 this.config.setSendBufferSize(cfg.getSendBufferSize());
105
106 if (this.config.getTrafficClass() != cfg.getTrafficClass()) {
107 this.config.setTrafficClass(cfg.getTrafficClass());
108 }
109 }
110 }
111
112 public IoService getService() {
113 return wrapperManager;
114 }
115
116 public IoServiceConfig getServiceConfig() {
117 return serviceConfig;
118 }
119
120 public IoSessionConfig getConfig() {
121 return config;
122 }
123
124 DatagramService getManagerDelegate() {
125 return managerDelegate;
126 }
127
128 public IoFilterChain getFilterChain() {
129 return filterChain;
130 }
131
132 DatagramChannel getChannel() {
133 return ch;
134 }
135
136 SelectionKey getSelectionKey() {
137 return key;
138 }
139
140 void setSelectionKey(SelectionKey key) {
141 this.key = key;
142 }
143
144 public IoHandler getHandler() {
145 return handler;
146 }
147
148 @Override
149 protected void close0() {
150 IoServiceConfig config = getServiceConfig();
151 if (config instanceof DatagramServiceConfig) {
152 ((DatagramServiceConfig) config).getSessionRecycler().remove(this);
153 }
154 filterChain.fireFilterClose(this);
155 }
156
157 Queue<WriteRequest> getWriteRequestQueue() {
158 return writeRequestQueue;
159 }
160
161 @Override
162 public WriteFuture write(Object message, SocketAddress destination) {
163 if (!this.config.isBroadcast()) {
164 throw new IllegalStateException("Non-broadcast session");
165 }
166
167 return super.write(message, destination);
168 }
169
170 @Override
171 protected void write0(WriteRequest writeRequest) {
172 filterChain.fireFilterWrite(this, writeRequest);
173 }
174
175 public int getScheduledWriteRequests() {
176 int size = 0;
177 synchronized (writeRequestQueue) {
178 for (WriteRequest request : writeRequestQueue) {
179 Object message = request.getMessage();
180 if (message instanceof ByteBuffer) {
181 if (((ByteBuffer) message).hasRemaining()) {
182 size ++;
183 }
184 } else {
185 size ++;
186 }
187 }
188 }
189
190 return size;
191 }
192
193 public int getScheduledWriteBytes() {
194 int byteSize = 0;
195
196 for (WriteRequest request : writeRequestQueue) {
197 byteSize += ((ByteBuffer) request.getMessage()).remaining();
198 }
199
200 return byteSize;
201 }
202
203 public TransportType getTransportType() {
204 return TransportType.DATAGRAM;
205 }
206
207 public SocketAddress getRemoteAddress() {
208 return remoteAddress;
209 }
210
211 void setRemoteAddress(SocketAddress remoteAddress) {
212 this.remoteAddress = remoteAddress;
213 }
214
215 public SocketAddress getLocalAddress() {
216 return localAddress;
217 }
218
219 public SocketAddress getServiceAddress() {
220 return serviceAddress;
221 }
222
223 @Override
224 protected void updateTrafficMask() {
225 managerDelegate.updateTrafficMask(this);
226 }
227
228 int getReadBufferSize() {
229 return readBufferSize;
230 }
231
232 private class SessionConfigImpl extends DatagramSessionConfigImpl implements
233 DatagramSessionConfig {
234 @Override
235 public int getReceiveBufferSize() {
236 try {
237 return ch.socket().getReceiveBufferSize();
238 } catch (SocketException e) {
239 throw new RuntimeIOException(e);
240 }
241 }
242
243 @Override
244 public void setReceiveBufferSize(int receiveBufferSize) {
245 if (DatagramSessionConfigImpl.isSetReceiveBufferSizeAvailable()) {
246 try {
247 ch.socket().setReceiveBufferSize(receiveBufferSize);
248
249 receiveBufferSize = ch.socket().getReceiveBufferSize();
250 DatagramSessionImpl.this.readBufferSize = receiveBufferSize;
251 } catch (SocketException e) {
252 throw new RuntimeIOException(e);
253 }
254 }
255 }
256
257 @Override
258 public boolean isBroadcast() {
259 try {
260 return ch.socket().getBroadcast();
261 } catch (SocketException e) {
262 throw new RuntimeIOException(e);
263 }
264 }
265
266 @Override
267 public void setBroadcast(boolean broadcast) {
268 try {
269 ch.socket().setBroadcast(broadcast);
270 } catch (SocketException e) {
271 throw new RuntimeIOException(e);
272 }
273 }
274
275 @Override
276 public int getSendBufferSize() {
277 try {
278 return ch.socket().getSendBufferSize();
279 } catch (SocketException e) {
280 throw new RuntimeIOException(e);
281 }
282 }
283
284 @Override
285 public void setSendBufferSize(int sendBufferSize) {
286 if (DatagramSessionConfigImpl.isSetSendBufferSizeAvailable()) {
287 try {
288 ch.socket().setSendBufferSize(sendBufferSize);
289 } catch (SocketException e) {
290 throw new RuntimeIOException(e);
291 }
292 }
293 }
294
295 @Override
296 public boolean isReuseAddress() {
297 try {
298 return ch.socket().getReuseAddress();
299 } catch (SocketException e) {
300 throw new RuntimeIOException(e);
301 }
302 }
303
304 @Override
305 public void setReuseAddress(boolean reuseAddress) {
306 try {
307 ch.socket().setReuseAddress(reuseAddress);
308 } catch (SocketException e) {
309 throw new RuntimeIOException(e);
310 }
311 }
312
313 @Override
314 public int getTrafficClass() {
315 if (DatagramSessionConfigImpl.isGetTrafficClassAvailable()) {
316 try {
317 return ch.socket().getTrafficClass();
318 } catch (SocketException e) {
319 throw new RuntimeIOException(e);
320 }
321 } else {
322 return 0;
323 }
324 }
325
326 @Override
327 public void setTrafficClass(int trafficClass) {
328 if (DatagramSessionConfigImpl.isSetTrafficClassAvailable()) {
329 try {
330 ch.socket().setTrafficClass(trafficClass);
331 } catch (SocketException e) {
332 throw new RuntimeIOException(e);
333 }
334 }
335 }
336 }
337 }