1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.nio.support;
21
22 import java.net.SocketAddress;
23 import java.net.SocketException;
24 import java.nio.channels.DatagramChannel;
25 import java.nio.channels.SelectionKey;
26 import java.util.Queue;
27 import java.util.concurrent.ConcurrentLinkedQueue;
28
29 import org.apache.mina.common.BroadcastIoSession;
30 import org.apache.mina.common.ByteBuffer;
31 import org.apache.mina.common.IoFilter.WriteRequest;
32 import org.apache.mina.common.IoFilterChain;
33 import org.apache.mina.common.IoHandler;
34 import org.apache.mina.common.IoService;
35 import org.apache.mina.common.IoServiceConfig;
36 import org.apache.mina.common.IoSession;
37 import org.apache.mina.common.IoSessionConfig;
38 import org.apache.mina.common.RuntimeIOException;
39 import org.apache.mina.common.TransportType;
40 import org.apache.mina.common.WriteFuture;
41 import org.apache.mina.common.support.BaseIoSession;
42 import org.apache.mina.transport.socket.nio.DatagramServiceConfig;
43 import org.apache.mina.transport.socket.nio.DatagramSessionConfig;
44
45
46
47
48
49
50
51 class DatagramSessionImpl extends BaseIoSession implements BroadcastIoSession {
52 private final IoService wrapperManager;
53
54 private final IoServiceConfig serviceConfig;
55
56 private final DatagramSessionConfig config = new SessionConfigImpl();
57
58 private final DatagramService managerDelegate;
59
60 private final DatagramFilterChain filterChain;
61
62 private final DatagramChannel ch;
63
64 private final Queue<WriteRequest> writeRequestQueue;
65
66 private final IoHandler handler;
67
68 private final SocketAddress localAddress;
69
70 private final SocketAddress serviceAddress;
71
72 private SocketAddress remoteAddress;
73
74 private SelectionKey key;
75
76 private int readBufferSize;
77
78
79
80
81 DatagramSessionImpl(IoService wrapperManager,
82 DatagramService managerDelegate, IoServiceConfig serviceConfig,
83 DatagramChannel ch, IoHandler defaultHandler,
84 SocketAddress serviceAddress, SocketAddress localAddress) {
85 this.wrapperManager = wrapperManager;
86 this.managerDelegate = managerDelegate;
87 this.filterChain = new DatagramFilterChain(this);
88 this.ch = ch;
89 this.writeRequestQueue = new ConcurrentLinkedQueue<WriteRequest>();
90 this.handler = defaultHandler;
91 this.remoteAddress = ch.socket().getRemoteSocketAddress();
92
93 this.serviceAddress = serviceAddress;
94 this.localAddress = localAddress;
95 this.serviceConfig = serviceConfig;
96
97
98 IoSessionConfig sessionConfig = serviceConfig.getSessionConfig();
99 if (sessionConfig instanceof DatagramSessionConfig) {
100 DatagramSessionConfig cfg = (DatagramSessionConfig) sessionConfig;
101 this.config.setBroadcast(cfg.isBroadcast());
102 this.config.setReceiveBufferSize(cfg.getReceiveBufferSize());
103 this.config.setReuseAddress(cfg.isReuseAddress());
104 this.config.setSendBufferSize(cfg.getSendBufferSize());
105
106 if (this.config.getTrafficClass() != cfg.getTrafficClass()) {
107 this.config.setTrafficClass(cfg.getTrafficClass());
108 }
109 }
110 }
111
112 public IoService getService() {
113 return wrapperManager;
114 }
115
116 public IoServiceConfig getServiceConfig() {
117 return serviceConfig;
118 }
119
120 public IoSessionConfig getConfig() {
121 return config;
122 }
123
124 DatagramService getManagerDelegate() {
125 return managerDelegate;
126 }
127
128 public IoFilterChain getFilterChain() {
129 return filterChain;
130 }
131
132 DatagramChannel getChannel() {
133 return ch;
134 }
135
136 SelectionKey getSelectionKey() {
137 return key;
138 }
139
140 void setSelectionKey(SelectionKey key) {
141 this.key = key;
142 }
143
144 public IoHandler getHandler() {
145 return handler;
146 }
147
148 @Override
149 protected void close0() {
150 IoServiceConfig config = getServiceConfig();
151 if (config instanceof DatagramServiceConfig) {
152 ((DatagramServiceConfig) config).getSessionRecycler().remove(this);
153 }
154 filterChain.fireFilterClose(this);
155 }
156
157 Queue<WriteRequest> getWriteRequestQueue() {
158 return writeRequestQueue;
159 }
160
161 @Override
162 public WriteFuture write(Object message, SocketAddress destination) {
163 if (!this.config.isBroadcast()) {
164 throw new IllegalStateException("Non-broadcast session");
165 }
166
167 return super.write(message, destination);
168 }
169
170 @Override
171 protected void write0(WriteRequest writeRequest) {
172 filterChain.fireFilterWrite(this, writeRequest);
173 }
174
175 public int getScheduledWriteRequests() {
176 return writeRequestQueue.size();
177 }
178
179 public int getScheduledWriteBytes() {
180 int byteSize = 0;
181
182 for (WriteRequest request : writeRequestQueue) {
183 byteSize += ((ByteBuffer) request.getMessage()).remaining();
184 }
185
186 return byteSize;
187 }
188
189 public TransportType getTransportType() {
190 return TransportType.DATAGRAM;
191 }
192
193 public SocketAddress getRemoteAddress() {
194 return remoteAddress;
195 }
196
197 void setRemoteAddress(SocketAddress remoteAddress) {
198 this.remoteAddress = remoteAddress;
199 }
200
201 public SocketAddress getLocalAddress() {
202 return localAddress;
203 }
204
205 public SocketAddress getServiceAddress() {
206 return serviceAddress;
207 }
208
209 @Override
210 protected void updateTrafficMask() {
211 managerDelegate.updateTrafficMask(this);
212 }
213
214 int getReadBufferSize() {
215 return readBufferSize;
216 }
217
218 private class SessionConfigImpl extends DatagramSessionConfigImpl implements
219 DatagramSessionConfig {
220 @Override
221 public int getReceiveBufferSize() {
222 try {
223 return ch.socket().getReceiveBufferSize();
224 } catch (SocketException e) {
225 throw new RuntimeIOException(e);
226 }
227 }
228
229 @Override
230 public void setReceiveBufferSize(int receiveBufferSize) {
231 if (DatagramSessionConfigImpl.isSetReceiveBufferSizeAvailable()) {
232 try {
233 ch.socket().setReceiveBufferSize(receiveBufferSize);
234
235 receiveBufferSize = ch.socket().getReceiveBufferSize();
236 DatagramSessionImpl.this.readBufferSize = receiveBufferSize;
237 } catch (SocketException e) {
238 throw new RuntimeIOException(e);
239 }
240 }
241 }
242
243 @Override
244 public boolean isBroadcast() {
245 try {
246 return ch.socket().getBroadcast();
247 } catch (SocketException e) {
248 throw new RuntimeIOException(e);
249 }
250 }
251
252 @Override
253 public void setBroadcast(boolean broadcast) {
254 try {
255 ch.socket().setBroadcast(broadcast);
256 } catch (SocketException e) {
257 throw new RuntimeIOException(e);
258 }
259 }
260
261 @Override
262 public int getSendBufferSize() {
263 try {
264 return ch.socket().getSendBufferSize();
265 } catch (SocketException e) {
266 throw new RuntimeIOException(e);
267 }
268 }
269
270 @Override
271 public void setSendBufferSize(int sendBufferSize) {
272 if (DatagramSessionConfigImpl.isSetSendBufferSizeAvailable()) {
273 try {
274 ch.socket().setSendBufferSize(sendBufferSize);
275 } catch (SocketException e) {
276 throw new RuntimeIOException(e);
277 }
278 }
279 }
280
281 @Override
282 public boolean isReuseAddress() {
283 try {
284 return ch.socket().getReuseAddress();
285 } catch (SocketException e) {
286 throw new RuntimeIOException(e);
287 }
288 }
289
290 @Override
291 public void setReuseAddress(boolean reuseAddress) {
292 try {
293 ch.socket().setReuseAddress(reuseAddress);
294 } catch (SocketException e) {
295 throw new RuntimeIOException(e);
296 }
297 }
298
299 @Override
300 public int getTrafficClass() {
301 if (DatagramSessionConfigImpl.isGetTrafficClassAvailable()) {
302 try {
303 return ch.socket().getTrafficClass();
304 } catch (SocketException e) {
305 throw new RuntimeIOException(e);
306 }
307 } else {
308 return 0;
309 }
310 }
311
312 @Override
313 public void setTrafficClass(int trafficClass) {
314 if (DatagramSessionConfigImpl.isSetTrafficClassAvailable()) {
315 try {
316 ch.socket().setTrafficClass(trafficClass);
317 } catch (SocketException e) {
318 throw new RuntimeIOException(e);
319 }
320 }
321 }
322 }
323 }