1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.proxy;
21
22 import java.io.UnsupportedEncodingException;
23 import java.util.LinkedList;
24 import java.util.Queue;
25
26 import org.apache.mina.core.buffer.IoBuffer;
27 import org.apache.mina.core.filterchain.IoFilter.NextFilter;
28 import org.apache.mina.core.future.DefaultWriteFuture;
29 import org.apache.mina.core.future.WriteFuture;
30 import org.apache.mina.core.session.IoSession;
31 import org.apache.mina.core.write.DefaultWriteRequest;
32 import org.apache.mina.core.write.WriteRequest;
33 import org.apache.mina.proxy.filter.ProxyFilter;
34 import org.apache.mina.proxy.filter.ProxyHandshakeIoBuffer;
35 import org.apache.mina.proxy.session.ProxyIoSession;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39
40
41
42
43
44
45
46
47
48
49 public abstract class AbstractProxyLogicHandler implements ProxyLogicHandler {
50
51 private final static Logger logger = LoggerFactory
52 .getLogger(AbstractProxyLogicHandler.class);
53
54
55
56
57 private ProxyIoSession proxyIoSession;
58
59
60
61
62 private Queue<Event> writeRequestQueue = null;
63
64
65
66
67 private boolean handshakeComplete = false;
68
69
70
71
72
73
74 public AbstractProxyLogicHandler(ProxyIoSession proxyIoSession) {
75 this.proxyIoSession = proxyIoSession;
76 }
77
78
79
80
81 protected ProxyFilter getProxyFilter() {
82 return proxyIoSession.getProxyFilter();
83 }
84
85
86
87
88 protected IoSession getSession() {
89 return proxyIoSession.getSession();
90 }
91
92
93
94
95 public ProxyIoSession getProxyIoSession() {
96 return proxyIoSession;
97 }
98
99
100
101
102
103
104
105 protected WriteFuture writeData(final NextFilter nextFilter,
106 final IoBuffer data) throws UnsupportedEncodingException {
107
108 ProxyHandshakeIoBuffer writeBuffer = new ProxyHandshakeIoBuffer(data);
109
110 logger.debug(" session write: {}", writeBuffer);
111
112 WriteFuture writeFuture = new DefaultWriteFuture(getSession());
113 getProxyFilter().writeData(nextFilter, getSession(),
114 new DefaultWriteRequest(writeBuffer, writeFuture), true);
115
116 return writeFuture;
117 }
118
119
120
121
122
123 public boolean isHandshakeComplete() {
124 synchronized (this) {
125 return handshakeComplete;
126 }
127 }
128
129
130
131
132 protected final void setHandshakeComplete() {
133 synchronized (this) {
134 handshakeComplete = true;
135 }
136
137 ProxyIoSession proxyIoSession = getProxyIoSession();
138 proxyIoSession.getConnector()
139 .fireConnected(proxyIoSession.getSession())
140 .awaitUninterruptibly();
141
142 logger.debug(" handshake completed");
143
144
145 try {
146 proxyIoSession.getEventQueue().flushPendingSessionEvents();
147 flushPendingWriteRequests();
148 } catch (Exception ex) {
149 logger.error("Unable to flush pending write requests", ex);
150 }
151 }
152
153
154
155
156 protected synchronized void flushPendingWriteRequests() throws Exception {
157 logger.debug(" flushPendingWriteRequests()");
158
159 if (writeRequestQueue == null) {
160 return;
161 }
162
163 Event scheduledWrite;
164 while ((scheduledWrite = writeRequestQueue.poll()) != null) {
165 logger.debug(" Flushing buffered write request: {}",
166 scheduledWrite.data);
167
168 getProxyFilter().filterWrite(scheduledWrite.nextFilter,
169 getSession(), (WriteRequest) scheduledWrite.data);
170 }
171
172
173 writeRequestQueue = null;
174 }
175
176
177
178
179 public synchronized void enqueueWriteRequest(final NextFilter nextFilter,
180 final WriteRequest writeRequest) {
181 if (writeRequestQueue == null) {
182 writeRequestQueue = new LinkedList<Event>();
183 }
184
185 writeRequestQueue.offer(new Event(nextFilter, writeRequest));
186 }
187
188
189
190
191 protected void closeSession(final String message, final Throwable t) {
192 if (t != null) {
193 logger.error(message, t);
194 proxyIoSession.setAuthenticationFailed(true);
195 } else {
196 logger.error(message);
197 }
198
199 getSession().close(true);
200 }
201
202
203
204
205
206
207 protected void closeSession(final String message) {
208 closeSession(message, null);
209 }
210
211
212
213
214 private final static class Event {
215 private final NextFilter nextFilter;
216
217 private final Object data;
218
219 Event(final NextFilter nextFilter, final Object data) {
220 this.nextFilter = nextFilter;
221 this.data = data;
222 }
223
224 public Object getData() {
225 return data;
226 }
227
228 public NextFilter getNextFilter() {
229 return nextFilter;
230 }
231 }
232 }