View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.vmpipe.support;
21  
22  import java.net.SocketAddress;
23  import java.util.ArrayList;
24  import java.util.List;
25  import java.util.concurrent.BlockingQueue;
26  import java.util.concurrent.LinkedBlockingQueue;
27  import java.util.concurrent.locks.Lock;
28  import java.util.concurrent.locks.ReentrantLock;
29  
30  import org.apache.mina.common.IoFilter.WriteRequest;
31  import org.apache.mina.common.IoFilterChain;
32  import org.apache.mina.common.IoHandler;
33  import org.apache.mina.common.IoService;
34  import org.apache.mina.common.IoServiceConfig;
35  import org.apache.mina.common.IoSession;
36  import org.apache.mina.common.IoSessionConfig;
37  import org.apache.mina.common.TransportType;
38  import org.apache.mina.common.support.BaseIoSession;
39  import org.apache.mina.common.support.BaseIoSessionConfig;
40  import org.apache.mina.common.support.IoServiceListenerSupport;
41  
42  /**
43   * A {@link IoSession} for in-VM transport (VM_PIPE).
44   *
45   * @author The Apache Directory Project (mina-dev@directory.apache.org)
46   * @version $Rev: 575603 $, $Date: 2007-09-14 19:04:45 +0900 (금, 14  9월 2007) $
47   */
48  public class VmPipeSessionImpl extends BaseIoSession {
49      private static final IoSessionConfig CONFIG = new BaseIoSessionConfig() {
50      };
51  
52      private final IoService service;
53  
54      private final IoServiceConfig serviceConfig;
55  
56      private final IoServiceListenerSupport serviceListeners;
57  
58      private final SocketAddress localAddress;
59  
60      private final SocketAddress remoteAddress;
61  
62      private final SocketAddress serviceAddress;
63  
64      private final IoHandler handler;
65  
66      private final VmPipeFilterChain filterChain;
67  
68      private final VmPipeSessionImpl remoteSession;
69  
70      private final Lock lock;
71  
72      final BlockingQueue<Object> pendingDataQueue;
73  
74      /*
75       * Constructor for client-side session.
76       */
77      public VmPipeSessionImpl( IoService service, IoServiceConfig serviceConfig,
78                                IoServiceListenerSupport serviceListeners,
79                                SocketAddress localAddress, IoHandler handler, VmPipe remoteEntry ) {
80          this.service = service;
81          this.serviceConfig = serviceConfig;
82          this.serviceListeners = serviceListeners;
83          this.lock = new ReentrantLock();
84          this.localAddress = localAddress;
85          this.remoteAddress = this.serviceAddress = remoteEntry.getAddress();
86          this.handler = handler;
87          this.filterChain = new VmPipeFilterChain(this);
88          this.pendingDataQueue = new LinkedBlockingQueue<Object>();
89  
90          remoteSession = new VmPipeSessionImpl(this, remoteEntry);
91      }
92  
93      /*
94       * Constructor for server-side session.
95       */
96      private VmPipeSessionImpl(VmPipeSessionImpl remoteSession, VmPipe entry) {
97          this.service = entry.getAcceptor();
98          this.serviceConfig = entry.getConfig();
99          this.serviceListeners = entry.getListeners();
100         this.lock = remoteSession.lock;
101         this.localAddress = this.serviceAddress = remoteSession.remoteAddress;
102         this.remoteAddress = remoteSession.localAddress;
103         this.handler = entry.getHandler();
104         this.filterChain = new VmPipeFilterChain(this);
105         this.remoteSession = remoteSession;
106         this.pendingDataQueue = new LinkedBlockingQueue<Object>();
107     }
108 
109     public IoService getService() {
110         return service;
111     }
112 
113     IoServiceListenerSupport getServiceListeners() {
114         return serviceListeners;
115     }
116 
117     public IoServiceConfig getServiceConfig() {
118         return serviceConfig;
119     }
120 
121     public IoSessionConfig getConfig() {
122         return CONFIG;
123     }
124 
125     public IoFilterChain getFilterChain() {
126         return filterChain;
127     }
128 
129     public VmPipeSessionImpl getRemoteSession() {
130         return remoteSession;
131     }
132 
133     public IoHandler getHandler() {
134         return handler;
135     }
136 
137     @Override
138     protected void close0() {
139         filterChain.fireFilterClose(this);
140     }
141 
142     @Override
143     protected void write0(WriteRequest writeRequest) {
144         this.filterChain.fireFilterWrite(this, writeRequest);
145     }
146 
147     public TransportType getTransportType() {
148         return TransportType.VM_PIPE;
149     }
150 
151     public SocketAddress getRemoteAddress() {
152         return remoteAddress;
153     }
154 
155     public SocketAddress getLocalAddress() {
156         return localAddress;
157     }
158 
159     public SocketAddress getServiceAddress() {
160         return serviceAddress;
161     }
162 
163     @Override
164     protected void updateTrafficMask() {
165         if (getTrafficMask().isReadable() || getTrafficMask().isWritable()) {
166             List<Object> data = new ArrayList<Object>();
167 
168             pendingDataQueue.drainTo(data);
169 
170             for (Object aData : data) {
171                 if (aData instanceof WriteRequest) {
172                     // TODO Optimize unefficient data transfer.
173                     // Data will be returned to pendingDataQueue
174                     // if getTraffic().isWritable() is false.
175                     WriteRequest wr = (WriteRequest) aData;
176                     filterChain.doWrite(this, wr);
177                 } else {
178                     // TODO Optimize unefficient data transfer.
179                     // Data will be returned to pendingDataQueue
180                     // if getTraffic().isReadable() is false.
181                     filterChain.fireMessageReceived(this, aData);
182                 }
183             }
184         }
185     }
186 
187     Lock getLock() {
188         return lock;
189     }
190 }