1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.common.support;
21
22 import java.net.SocketAddress;
23 import java.util.Collections;
24 import java.util.List;
25 import java.util.Set;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.ConcurrentMap;
28 import java.util.concurrent.CopyOnWriteArrayList;
29 import java.util.concurrent.CopyOnWriteArraySet;
30 import java.util.concurrent.CountDownLatch;
31
32 import org.apache.mina.common.IoAcceptorConfig;
33 import org.apache.mina.common.IoConnector;
34 import org.apache.mina.common.IoFuture;
35 import org.apache.mina.common.IoFutureListener;
36 import org.apache.mina.common.IoHandler;
37 import org.apache.mina.common.IoService;
38 import org.apache.mina.common.IoServiceConfig;
39 import org.apache.mina.common.IoServiceListener;
40 import org.apache.mina.common.IoSession;
41 import org.apache.mina.common.RuntimeIOException;
42 import org.apache.mina.util.IdentityHashSet;
43
44
45
46
47
48
49
50
51 public class IoServiceListenerSupport {
52
53
54
55 private final List<IoServiceListener> listeners = new CopyOnWriteArrayList<IoServiceListener>();
56
57
58
59
60 private final Set<SocketAddress> managedServiceAddresses = new CopyOnWriteArraySet<SocketAddress>();
61
62
63
64
65 private final ConcurrentMap<SocketAddress, Set<IoSession>> managedSessions = new ConcurrentHashMap<SocketAddress, Set<IoSession>>();
66
67
68
69
70 public IoServiceListenerSupport() {
71 }
72
73
74
75
76 public void add(IoServiceListener listener) {
77 listeners.add(listener);
78 }
79
80
81
82
83 public void remove(IoServiceListener listener) {
84 listeners.remove(listener);
85 }
86
87 public Set<SocketAddress> getManagedServiceAddresses() {
88 return Collections.unmodifiableSet(managedServiceAddresses);
89 }
90
91 public boolean isManaged(SocketAddress serviceAddress) {
92 return managedServiceAddresses.contains(serviceAddress);
93 }
94
95 public Set<IoSession> getManagedSessions(SocketAddress serviceAddress) {
96 Set<IoSession> sessions = managedSessions.get(serviceAddress);
97
98 if (null == sessions) {
99 return Collections.emptySet();
100 }
101
102 synchronized (sessions) {
103 return new IdentityHashSet<IoSession>(sessions);
104 }
105 }
106
107
108
109
110
111 public void fireServiceActivated(IoService service,
112 SocketAddress serviceAddress, IoHandler handler,
113 IoServiceConfig config) {
114 if (!managedServiceAddresses.add(serviceAddress)) {
115 return;
116 }
117
118 for (IoServiceListener listener : listeners) {
119 listener.serviceActivated(service, serviceAddress, handler, config);
120 }
121 }
122
123
124
125
126
127 public synchronized void fireServiceDeactivated(IoService service,
128 SocketAddress serviceAddress, IoHandler handler,
129 IoServiceConfig config) {
130 if (!managedServiceAddresses.remove(serviceAddress)) {
131 return;
132 }
133
134 try {
135 for (IoServiceListener listener : listeners) {
136 listener.serviceDeactivated(service, serviceAddress, handler,
137 config);
138 }
139 } finally {
140 disconnectSessions(serviceAddress, config);
141 }
142 }
143
144
145
146
147 public void fireSessionCreated(IoSession session) {
148 SocketAddress serviceAddress = session.getServiceAddress();
149
150
151 Set<IoSession> s = new IdentityHashSet<IoSession>();
152 Set<IoSession> sessions = managedSessions.putIfAbsent(serviceAddress,
153 Collections.synchronizedSet(s));
154 boolean firstSession;
155
156 if (null == sessions) {
157 sessions = s;
158 firstSession = true;
159 } else {
160 firstSession = false;
161 }
162
163
164 if (!sessions.add(session)) {
165 return;
166 }
167
168
169 if (session.getService() instanceof IoConnector && firstSession) {
170 fireServiceActivated(session.getService(), session
171 .getServiceAddress(), session.getHandler(), session
172 .getServiceConfig());
173 }
174
175
176 session.getFilterChain().fireSessionCreated(session);
177 session.getFilterChain().fireSessionOpened(session);
178
179
180 for (IoServiceListener listener : listeners) {
181 listener.sessionCreated(session);
182 }
183 }
184
185
186
187
188 public void fireSessionDestroyed(IoSession session) {
189 SocketAddress serviceAddress = session.getServiceAddress();
190
191
192 Set<IoSession> sessions = managedSessions.get(serviceAddress);
193
194 if (sessions == null) {
195 return;
196 }
197
198 sessions.remove(session);
199
200 boolean lastSession = false;
201
202
203 if (sessions.isEmpty()) {
204 lastSession = managedSessions.remove(serviceAddress, sessions);
205 }
206
207
208 session.getFilterChain().fireSessionClosed(session);
209
210
211 try {
212 for (IoServiceListener listener : listeners) {
213 listener.sessionDestroyed(session);
214 }
215 } finally {
216
217
218 if (session.getService() instanceof IoConnector && lastSession) {
219 fireServiceDeactivated(session.getService(), session
220 .getServiceAddress(), session.getHandler(), session
221 .getServiceConfig());
222 }
223 }
224 }
225
226 private void disconnectSessions(SocketAddress serviceAddress,
227 IoServiceConfig config) {
228 if (!(config instanceof IoAcceptorConfig)) {
229 return;
230 }
231
232 if (!((IoAcceptorConfig) config).isDisconnectOnUnbind()) {
233 return;
234 }
235
236 Set<IoSession> sessions = getManagedSessions(serviceAddress);
237
238 if (sessions.isEmpty()) {
239 return;
240 }
241
242 final CountDownLatch latch = new CountDownLatch(sessions.size());
243
244 for (IoSession session : sessions) {
245 session.close().addListener(new IoFutureListener() {
246 public void operationComplete(IoFuture future) {
247 latch.countDown();
248 }
249 });
250 }
251
252 try {
253 latch.await();
254 } catch (InterruptedException e) {
255 throw new RuntimeIOException(e);
256 }
257 }
258 }