1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.management;
21
22 import java.net.SocketAddress;
23 import java.util.Iterator;
24
25 import org.apache.mina.common.IoHandler;
26 import org.apache.mina.common.IoService;
27 import org.apache.mina.common.IoServiceConfig;
28 import org.apache.mina.common.IoServiceListener;
29 import org.apache.mina.common.IoSession;
30
31 import edu.emory.mathcs.backport.java.util.Queue;
32 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentLinkedQueue;
33 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicLong;
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53 public class StatCollector {
54
55
56
57 public static final String KEY = StatCollector.class.getName() + ".stat";
58
59
60
61
62 private static volatile int nextId = 0;
63
64 private final int id = nextId++;
65
66 private final Object calcLock = new Object();
67
68 private final IoService service;
69
70 private Worker worker;
71
72 private int pollingInterval = 5000;
73
74 private Queue polledSessions;
75
76
77 private AtomicLong totalProcessedSessions = new AtomicLong();
78
79 private float msgWrittenThroughput = 0f;
80
81 private float msgReadThroughput = 0f;
82
83 private float bytesWrittenThroughput = 0f;
84
85 private float bytesReadThroughput = 0f;
86
87 private final IoServiceListener serviceListener = new IoServiceListener() {
88 public void serviceActivated(IoService service,
89 SocketAddress serviceAddress, IoHandler handler,
90 IoServiceConfig config) {
91 }
92
93 public void serviceDeactivated(IoService service,
94 SocketAddress serviceAddress, IoHandler handler,
95 IoServiceConfig config) {
96 }
97
98 public void sessionCreated(IoSession session) {
99 addSession(session);
100 }
101
102 public void sessionDestroyed(IoSession session) {
103 removeSession(session);
104 }
105 };
106
107
108
109
110
111 public StatCollector(IoService service) {
112 this(service, 5000);
113 }
114
115
116
117
118
119
120 public StatCollector(IoService service, int pollingInterval) {
121 this.service = service;
122 this.pollingInterval = pollingInterval;
123 }
124
125
126
127
128
129 public void start() {
130 synchronized (this) {
131 if (worker != null && worker.isAlive())
132 throw new RuntimeException("Stat collecting already started");
133
134
135
136 polledSessions = new ConcurrentLinkedQueue();
137
138 for (Iterator iter = service.getManagedServiceAddresses()
139 .iterator(); iter.hasNext();) {
140 SocketAddress element = (SocketAddress) iter.next();
141
142 for (Iterator iter2 = service.getManagedSessions(element)
143 .iterator(); iter2.hasNext();) {
144 addSession((IoSession) iter2.next());
145
146 }
147 }
148
149
150 service.addListener(serviceListener);
151
152
153 worker = new Worker();
154 worker.start();
155
156 }
157
158 }
159
160
161
162
163
164 public void stop() {
165 synchronized (this) {
166 if (worker == null) {
167 return;
168 }
169
170 service.removeListener(serviceListener);
171
172
173 worker.stop = true;
174 worker.interrupt();
175 while (worker.isAlive()) {
176 try {
177 worker.join();
178 } catch (InterruptedException e) {
179
180 }
181 }
182
183 for (Iterator iter = polledSessions.iterator(); iter.hasNext();) {
184 IoSession session = (IoSession) iter.next();
185 session.removeAttribute(KEY);
186 }
187 polledSessions.clear();
188
189 worker = null;
190 }
191 }
192
193
194
195
196
197 public boolean isRunning() {
198 synchronized (this) {
199 return worker != null && worker.stop != true;
200 }
201 }
202
203 private void addSession(IoSession session) {
204 IoSessionStat sessionStats = new IoSessionStat();
205 session.setAttribute(KEY, sessionStats);
206 totalProcessedSessions.incrementAndGet();
207 polledSessions.add(session);
208 }
209
210 private void removeSession(IoSession session) {
211
212 polledSessions.remove(session);
213
214
215
216 IoSessionStat sessStat = (IoSessionStat) session.getAttribute(KEY);
217
218
219 long currentTime = System.currentTimeMillis();
220 synchronized (calcLock) {
221 bytesReadThroughput += (session.getReadBytes() - sessStat.lastByteRead)
222 / ((currentTime - sessStat.lastPollingTime) / 1000f);
223 bytesWrittenThroughput += (session.getWrittenBytes() - sessStat.lastByteWrite)
224 / ((currentTime - sessStat.lastPollingTime) / 1000f);
225 msgReadThroughput += (session.getReadMessages() - sessStat.lastMessageRead)
226 / ((currentTime - sessStat.lastPollingTime) / 1000f);
227 msgWrittenThroughput += (session.getWrittenMessages() - sessStat.lastMessageWrite)
228 / ((currentTime - sessStat.lastPollingTime) / 1000f);
229 }
230
231 session.removeAttribute(KEY);
232 }
233
234
235
236
237
238 public long getTotalProcessedSessions() {
239 return totalProcessedSessions.get();
240 }
241
242 public float getBytesReadThroughput() {
243 return bytesReadThroughput;
244 }
245
246 public float getBytesWrittenThroughput() {
247 return bytesWrittenThroughput;
248 }
249
250 public float getMsgReadThroughput() {
251 return msgReadThroughput;
252 }
253
254 public float getMsgWrittenThroughput() {
255 return msgWrittenThroughput;
256 }
257
258 public long getSessionCount() {
259 return polledSessions.size();
260 }
261
262 private class Worker extends Thread {
263
264 boolean stop = false;
265
266 private Worker() {
267 super("StatCollectorWorker-" + id);
268 }
269
270 public void run() {
271 while (!stop) {
272 for (Iterator iter = polledSessions.iterator(); iter.hasNext();) {
273 IoSession session = (IoSession) iter.next();
274 IoSessionStat sessStat = (IoSessionStat) session
275 .getAttribute(KEY);
276
277 sessStat.lastByteRead = session.getReadBytes();
278 sessStat.lastByteWrite = session.getWrittenBytes();
279 sessStat.lastMessageRead = session.getReadMessages();
280 sessStat.lastMessageWrite = session.getWrittenMessages();
281 }
282
283
284 try {
285 Thread.sleep(pollingInterval);
286 } catch (InterruptedException e) {
287 }
288
289 float tmpMsgWrittenThroughput = 0f;
290 float tmpMsgReadThroughput = 0f;
291 float tmpBytesWrittenThroughput = 0f;
292 float tmpBytesReadThroughput = 0f;
293
294 for (Iterator iter = polledSessions.iterator(); iter.hasNext();) {
295
296
297 IoSession session = (IoSession) iter.next();
298 IoSessionStat sessStat = (IoSessionStat) session
299 .getAttribute(KEY);
300
301 sessStat.byteReadThroughput = (session.getReadBytes() - sessStat.lastByteRead)
302 / (pollingInterval / 1000f);
303 tmpBytesReadThroughput += sessStat.byteReadThroughput;
304
305 sessStat.byteWrittenThroughput = (session.getWrittenBytes() - sessStat.lastByteWrite)
306 / (pollingInterval / 1000f);
307 tmpBytesWrittenThroughput += sessStat.byteWrittenThroughput;
308
309 sessStat.messageReadThroughput = (session.getReadMessages() - sessStat.lastMessageRead)
310 / (pollingInterval / 1000f);
311 tmpMsgReadThroughput += sessStat.messageReadThroughput;
312
313 sessStat.messageWrittenThroughput = (session
314 .getWrittenMessages() - sessStat.lastMessageWrite)
315 / (pollingInterval / 1000f);
316 tmpMsgWrittenThroughput += sessStat.messageWrittenThroughput;
317
318 synchronized (calcLock) {
319 msgWrittenThroughput = tmpMsgWrittenThroughput;
320 msgReadThroughput = tmpMsgReadThroughput;
321 bytesWrittenThroughput = tmpBytesWrittenThroughput;
322 bytesReadThroughput = tmpBytesReadThroughput;
323 sessStat.lastPollingTime = System.currentTimeMillis();
324 }
325 }
326 }
327 }
328 }
329 }