View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *  
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *  
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License. 
18   *  
19   */
20  package org.apache.mina.management;
21  
22  import java.net.SocketAddress;
23  import java.util.Iterator;
24  
25  import org.apache.mina.common.IoHandler;
26  import org.apache.mina.common.IoService;
27  import org.apache.mina.common.IoServiceConfig;
28  import org.apache.mina.common.IoServiceListener;
29  import org.apache.mina.common.IoSession;
30  
31  import edu.emory.mathcs.backport.java.util.Queue;
32  import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentLinkedQueue;
33  import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicLong;
34  
35  /**
36   * Collects statistics of an {@link IoService}. It's polling all the sessions of a given
37   * IoService. It's attaching a {@link IoSessionStat} object to all the sessions polled
38   * and filling the throughput values.
39   * 
40   * Usage :
41   * <pre>
42   * IoService service = ...
43   * StatCollector collector = new StatCollector( service );
44   * collector.start();
45   * </pre>
46   * 
47   * By default the {@link StatCollector} is polling the sessions every 5 seconds. You can 
48   * give a different polling time using a second constructor.
49   * 
50   * @author The Apache Directory Project (mina-dev@directory.apache.org)
51   * @version $Rev: 555855 $, $Date: 2007-07-13 12:19:00 +0900 (Fri, 13 Jul 2007) $
52   */
53  public class StatCollector {
54      /**
55       * The session attribute key for {@link IoSessionStat}.
56       */
57      public static final String KEY = StatCollector.class.getName() + ".stat";
58  
59      /**
60       * @noinspection StaticNonFinalField
61       */
62      private static volatile int nextId = 0;
63  
64      private final int id = nextId++;
65  
66      private final Object calcLock = new Object();
67  
68      private final IoService service;
69  
70      private Worker worker;
71  
72      private int pollingInterval = 5000;
73  
74      private Queue polledSessions;
75  
76      // resume of session stats, for simplifying acces to the statistics 
77      private AtomicLong totalProcessedSessions = new AtomicLong();
78  
79      private float msgWrittenThroughput = 0f;
80  
81      private float msgReadThroughput = 0f;
82  
83      private float bytesWrittenThroughput = 0f;
84  
85      private float bytesReadThroughput = 0f;
86  
87      private final IoServiceListener serviceListener = new IoServiceListener() {
88          public void serviceActivated(IoService service,
89                  SocketAddress serviceAddress, IoHandler handler,
90                  IoServiceConfig config) {
91          }
92  
93          public void serviceDeactivated(IoService service,
94                  SocketAddress serviceAddress, IoHandler handler,
95                  IoServiceConfig config) {
96          }
97  
98          public void sessionCreated(IoSession session) {
99              addSession(session);
100         }
101 
102         public void sessionDestroyed(IoSession session) {
103             removeSession(session);
104         }
105     };
106 
107     /**
108      * Create a stat collector for the given service with a default polling time of 5 seconds. 
109      * @param service the IoService to inspect
110      */
111     public StatCollector(IoService service) {
112         this(service, 5000);
113     }
114 
115     /**
116      * create a stat collector for the given given service
117      * @param service the IoService to inspect
118      * @param pollingInterval milliseconds
119      */
120     public StatCollector(IoService service, int pollingInterval) {
121         this.service = service;
122         this.pollingInterval = pollingInterval;
123     }
124 
125     /**
126      * Start collecting stats for the {@link IoSession} of the service.
127      * New sessions or destroyed will be automaticly added or removed.
128      */
129     public void start() {
130         synchronized (this) {
131             if (worker != null && worker.isAlive())
132                 throw new RuntimeException("Stat collecting already started");
133 
134             // add all current sessions
135 
136             polledSessions = new ConcurrentLinkedQueue();
137 
138             for (Iterator iter = service.getManagedServiceAddresses()
139                     .iterator(); iter.hasNext();) {
140                 SocketAddress element = (SocketAddress) iter.next();
141 
142                 for (Iterator iter2 = service.getManagedSessions(element)
143                         .iterator(); iter2.hasNext();) {
144                     addSession((IoSession) iter2.next());
145 
146                 }
147             }
148 
149             // listen for new ones
150             service.addListener(serviceListener);
151 
152             // start polling
153             worker = new Worker();
154             worker.start();
155 
156         }
157 
158     }
159 
160     /**
161      * Stop collecting stats. all the {@link IoSessionStat} object will be removed of the
162      * polled session attachements. 
163      */
164     public void stop() {
165         synchronized (this) {
166             if (worker == null) {
167                 return;
168             }
169 
170             service.removeListener(serviceListener);
171 
172             // stop worker
173             worker.stop = true;
174             worker.interrupt();
175             while (worker.isAlive()) {
176                 try {
177                     worker.join();
178                 } catch (InterruptedException e) {
179                     //ignore since this is shutdown time
180                 }
181             }
182 
183             for (Iterator iter = polledSessions.iterator(); iter.hasNext();) {
184                 IoSession session = (IoSession) iter.next();
185                 session.removeAttribute(KEY);
186             }
187             polledSessions.clear();
188 
189             worker = null;
190         }
191     }
192 
193     /**
194      * is the stat collector started and polling the {@link IoSession} of the {@link IoService}
195      * @return true if started
196      */
197     public boolean isRunning() {
198         synchronized (this) {
199             return worker != null && worker.stop != true;
200         }
201     }
202 
203     private void addSession(IoSession session) {
204         IoSessionStat sessionStats = new IoSessionStat();
205         session.setAttribute(KEY, sessionStats);
206         totalProcessedSessions.incrementAndGet();
207         polledSessions.add(session);
208     }
209 
210     private void removeSession(IoSession session) {
211         // remove the session from the list of polled sessions
212         polledSessions.remove(session);
213 
214         // add the bytes processed between last polling and session closing
215         // prevent non seen byte with non-connected protocols like HTTP and datagrams
216         IoSessionStat sessStat = (IoSessionStat) session.getAttribute(KEY);
217 
218         // computing with time between polling and closing
219         long currentTime = System.currentTimeMillis();
220         synchronized (calcLock) {
221             bytesReadThroughput += (session.getReadBytes() - sessStat.lastByteRead)
222                     / ((currentTime - sessStat.lastPollingTime) / 1000f);
223             bytesWrittenThroughput += (session.getWrittenBytes() - sessStat.lastByteWrite)
224                     / ((currentTime - sessStat.lastPollingTime) / 1000f);
225             msgReadThroughput += (session.getReadMessages() - sessStat.lastMessageRead)
226                     / ((currentTime - sessStat.lastPollingTime) / 1000f);
227             msgWrittenThroughput += (session.getWrittenMessages() - sessStat.lastMessageWrite)
228                     / ((currentTime - sessStat.lastPollingTime) / 1000f);
229         }
230 
231         session.removeAttribute(KEY);
232     }
233 
234     /**
235      * total number of sessions processed by the stat collector
236      * @return number of sessions
237      */
238     public long getTotalProcessedSessions() {
239         return totalProcessedSessions.get();
240     }
241 
242     public float getBytesReadThroughput() {
243         return bytesReadThroughput;
244     }
245 
246     public float getBytesWrittenThroughput() {
247         return bytesWrittenThroughput;
248     }
249 
250     public float getMsgReadThroughput() {
251         return msgReadThroughput;
252     }
253 
254     public float getMsgWrittenThroughput() {
255         return msgWrittenThroughput;
256     }
257 
258     public long getSessionCount() {
259         return polledSessions.size();
260     }
261 
262     private class Worker extends Thread {
263 
264         boolean stop = false;
265 
266         private Worker() {
267             super("StatCollectorWorker-" + id);
268         }
269 
270         public void run() {
271             while (!stop) {
272                 for (Iterator iter = polledSessions.iterator(); iter.hasNext();) {
273                     IoSession session = (IoSession) iter.next();
274                     IoSessionStat sessStat = (IoSessionStat) session
275                             .getAttribute(KEY);
276 
277                     sessStat.lastByteRead = session.getReadBytes();
278                     sessStat.lastByteWrite = session.getWrittenBytes();
279                     sessStat.lastMessageRead = session.getReadMessages();
280                     sessStat.lastMessageWrite = session.getWrittenMessages();
281                 }
282 
283                 // wait polling time
284                 try {
285                     Thread.sleep(pollingInterval);
286                 } catch (InterruptedException e) {
287                 }
288 
289                 float tmpMsgWrittenThroughput = 0f;
290                 float tmpMsgReadThroughput = 0f;
291                 float tmpBytesWrittenThroughput = 0f;
292                 float tmpBytesReadThroughput = 0f;
293 
294                 for (Iterator iter = polledSessions.iterator(); iter.hasNext();) {
295 
296                     // upadating individual session statistics
297                     IoSession session = (IoSession) iter.next();
298                     IoSessionStat sessStat = (IoSessionStat) session
299                             .getAttribute(KEY);
300 
301                     sessStat.byteReadThroughput = (session.getReadBytes() - sessStat.lastByteRead)
302                             / (pollingInterval / 1000f);
303                     tmpBytesReadThroughput += sessStat.byteReadThroughput;
304 
305                     sessStat.byteWrittenThroughput = (session.getWrittenBytes() - sessStat.lastByteWrite)
306                             / (pollingInterval / 1000f);
307                     tmpBytesWrittenThroughput += sessStat.byteWrittenThroughput;
308 
309                     sessStat.messageReadThroughput = (session.getReadMessages() - sessStat.lastMessageRead)
310                             / (pollingInterval / 1000f);
311                     tmpMsgReadThroughput += sessStat.messageReadThroughput;
312 
313                     sessStat.messageWrittenThroughput = (session
314                             .getWrittenMessages() - sessStat.lastMessageWrite)
315                             / (pollingInterval / 1000f);
316                     tmpMsgWrittenThroughput += sessStat.messageWrittenThroughput;
317 
318                     synchronized (calcLock) {
319                         msgWrittenThroughput = tmpMsgWrittenThroughput;
320                         msgReadThroughput = tmpMsgReadThroughput;
321                         bytesWrittenThroughput = tmpBytesWrittenThroughput;
322                         bytesReadThroughput = tmpBytesReadThroughput;
323                         sessStat.lastPollingTime = System.currentTimeMillis();
324                     }
325                 }
326             }
327         }
328     }
329 }