001    /****************************************************************
002     * Licensed to the Apache Software Foundation (ASF) under one   *
003     * or more contributor license agreements.  See the NOTICE file *
004     * distributed with this work for additional information        *
005     * regarding copyright ownership.  The ASF licenses this file   *
006     * to you under the Apache License, Version 2.0 (the            *
007     * "License"); you may not use this file except in compliance   *
008     * with the License.  You may obtain a copy of the License at   *
009     *                                                              *
010     *   http://www.apache.org/licenses/LICENSE-2.0                 *
011     *                                                              *
012     * Unless required by applicable law or agreed to in writing,   *
013     * software distributed under the License is distributed on an  *
014     * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
015     * KIND, either express or implied.  See the License for the    *
016     * specific language governing permissions and limitations      *
017     * under the License.                                           *
018     ****************************************************************/
019    
020    package org.apache.james.jspf.executor;
021    
022    import org.apache.james.jspf.core.DNSLookupContinuation;
023    import org.apache.james.jspf.core.DNSResponse;
024    import org.apache.james.jspf.core.Logger;
025    import org.apache.james.jspf.core.SPFChecker;
026    import org.apache.james.jspf.core.SPFCheckerExceptionCatcher;
027    import org.apache.james.jspf.core.SPFSession;
028    import org.apache.james.jspf.core.exceptions.SPFResultException;
029    import org.apache.james.jspf.core.exceptions.TimeoutException;
030    
031    import java.util.Collections;
032    import java.util.HashMap;
033    import java.util.LinkedList;
034    import java.util.Map;
035    
036    /**
037     * Async implementation of SPFExecutor
038     *
039     */
040    public class StagedMultipleSPFExecutor implements SPFExecutor, Runnable {
041    
042        private static final String ATTRIBUTE_STAGED_EXECUTOR_CONTINUATION = "StagedMultipleSPFExecutor.continuation";
043    
044        private static class ResponseQueueImpl extends LinkedList<IResponse> implements IResponseQueue {
045    
046            private static final long serialVersionUID = 5714025260393791651L;
047            
048            private int waitingThreads = 0;
049    
050            /**
051             * @see org.apache.james.jspf.executor.IResponseQueue#insertResponse(org.apache.james.jspf.executor.IResponse)
052             */
053            public synchronized void insertResponse(IResponse r) {
054                addLast(r);
055                notify();
056            }
057    
058            /**
059             * @see org.apache.james.jspf.executor.IResponseQueue#removeResponse()
060             */
061            public synchronized IResponse removeResponse() {
062                if ( (size() - waitingThreads <= 0) ) {
063                    try { waitingThreads++; wait();}
064                    catch (InterruptedException e)  {Thread.interrupted();}
065                    waitingThreads--;
066                }
067                return (IResponse)removeFirst();        }
068    
069        }
070    
071        // Use short as id because the id header is limited to 16 bit
072        // From RFC1035 4.1.1. Header section format :
073        // 
074        // ID              A 16 bit identifier assigned by the program that
075        //                 generates any kind of query.  This identifier is copied
076        //                 the corresponding reply and can be used by the requester
077        //                 to match up replies to outstanding queries.
078        //
079        private static short id;
080        
081        private synchronized int nextId() {
082            return id++;
083        }
084        
085        private Logger log;
086        private DNSAsynchLookupService dnsProbe;
087        private Thread worker;
088        private Map<Integer,SPFSession> sessions;
089        private Map<Integer,FutureSPFResult>results;
090        private ResponseQueueImpl responseQueue;
091    
092        public StagedMultipleSPFExecutor(Logger log, DNSAsynchLookupService service) {
093            this.log = log;
094            this.dnsProbe = service;
095    
096            this.responseQueue = new ResponseQueueImpl();
097    
098            this.sessions = Collections.synchronizedMap(new HashMap<Integer,SPFSession>());
099            this.results = Collections.synchronizedMap(new HashMap<Integer,FutureSPFResult>());
100    
101            this.worker = new Thread(this);
102            this.worker.setDaemon(true);
103            this.worker.setName("SPFExecutor");
104            this.worker.start();
105        }
106    
107        /**
108         * Execute the non-blocking part of the processing and returns.
109         * If the working queue is full (50 pending responses) this method will not return
110         * until the queue is again not full.
111         * 
112         * @see org.apache.james.jspf.executor.SPFExecutor#execute(org.apache.james.jspf.core.SPFSession, org.apache.james.jspf.executor.FutureSPFResult)
113         */
114        public void execute(SPFSession session, FutureSPFResult result) {
115            execute(session, result, true);
116        }
117            
118        public void execute(SPFSession session, FutureSPFResult result, boolean throttle) {
119            SPFChecker checker;
120            while ((checker = session.popChecker()) != null) {
121                // only execute checkers we added (better recursivity)
122                log.debug("Executing checker: " + checker);
123                try {
124                    DNSLookupContinuation cont = checker.checkSPF(session);
125                    // if the checker returns a continuation we return it
126                    if (cont != null) {
127                        invokeAsynchService(session, result, cont, throttle);
128                        return;
129                    }
130                } catch (Exception e) {
131                    while (e != null) {
132                        while (checker == null || !(checker instanceof SPFCheckerExceptionCatcher)) {
133                            checker = session.popChecker();
134                        }
135                        try {
136                            ((SPFCheckerExceptionCatcher) checker).onException(e, session);
137                            e = null;
138                        } catch (SPFResultException ex) {
139                            e = ex;
140                        } finally {
141                            checker = null;
142                        }
143                    }
144                }
145            }
146            result.setSPFResult(session);
147        }
148    
149        /**
150         * throttle should be true only when the caller thread is the client and not the worker thread.
151         * We could even remove the throttle parameter and check the currentThread.
152         * This way the worker is never "blocked" while outside callers will be blocked if our
153         * queue is too big (so this is not fully "asynchronous").
154         */
155        private synchronized void invokeAsynchService(SPFSession session,
156                FutureSPFResult result, DNSLookupContinuation cont, boolean throttle) {
157            while (throttle && results.size() > 50) {
158                try {
159                    this.wait(100);
160                } catch (InterruptedException e) {
161                }
162            }
163            int nextId = nextId();
164            sessions.put(new Integer(nextId), session);
165            results.put(new Integer(nextId), result);
166            session.setAttribute(ATTRIBUTE_STAGED_EXECUTOR_CONTINUATION, cont);
167            dnsProbe.getRecordsAsynch(cont.getRequest(), nextId, responseQueue);
168        }
169    
170        public void run() {
171    
172            while (true) {
173                
174                IResponse resp = responseQueue.removeResponse();
175                
176                Integer respId = (Integer)resp.getId();
177                SPFSession session = sessions.remove(respId);
178                FutureSPFResult result = results.remove(respId);
179                
180                DNSLookupContinuation cont = (DNSLookupContinuation) session.getAttribute(ATTRIBUTE_STAGED_EXECUTOR_CONTINUATION);
181                
182                DNSResponse response;
183                if (resp.getException() != null) {
184                    response = new DNSResponse((TimeoutException) resp.getException());
185                } else {
186                    response = new DNSResponse(resp.getValue());
187                }
188                
189                
190                try {
191                    cont = cont.getListener().onDNSResponse(response, session);
192                    
193                    if (cont != null) {
194                        invokeAsynchService(session, result, cont, false);
195                    } else {
196                        execute(session, result, false);
197                    }
198    
199                } catch (Exception e) {
200                    SPFChecker checker = null;
201                    while (e != null) {
202                        while (checker == null || !(checker instanceof SPFCheckerExceptionCatcher)) {
203                            checker = session.popChecker();
204                        }
205                        try {
206                            ((SPFCheckerExceptionCatcher) checker).onException(e, session);
207                            e = null;
208                        } catch (SPFResultException ex) {
209                            e = ex;
210                        } finally {
211                            checker = null;
212                        }
213                    }
214                    execute(session, result, false);
215                }
216            }
217        }
218    
219    }