001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor.idempotent;
018    
019    import java.io.File;
020    import java.io.FileOutputStream;
021    import java.io.IOException;
022    import java.util.Map;
023    import java.util.Scanner;
024    import java.util.concurrent.atomic.AtomicBoolean;
025    
026    import org.apache.camel.spi.IdempotentRepository;
027    import org.apache.camel.util.LRUCache;
028    import org.apache.camel.util.ObjectHelper;
029    import org.apache.commons.logging.Log;
030    import org.apache.commons.logging.LogFactory;
031    
032    /**
033     * A file based implementation of {@link org.apache.camel.spi.IdempotentRepository}.
034     * <p/>
035     * Care should be taken to use a suitable underlying {@link java.util.Map} to avoid this class being a
036     * memory leak.
037     *
038     * @version $Revision: 736555 $
039     */
040    public class FileIdempotentRepository implements IdempotentRepository<String> {
041        private static final transient Log LOG = LogFactory.getLog(FileIdempotentRepository.class);
042        private static final String STORE_DELIMITER = "\n";
043        private Map<String, Object> cache;
044        private File fileStore;
045        private long maxFileStoreSize = 1024 * 1000L; // 1mb store file
046        private AtomicBoolean init = new AtomicBoolean();
047    
048        public FileIdempotentRepository() {
049            // default use a 1st level cache 
050            this.cache = new LRUCache<String, Object>(1000);
051        }
052    
053        public FileIdempotentRepository(File fileStore, Map<String, Object> set) {
054            this.fileStore = fileStore;
055            this.cache = set;
056        }
057    
058        /**
059         * Creates a new file based repository using a {@link org.apache.camel.util.LRUCache}
060         * as 1st level cache with a default of 1000 entries in the cache.
061         *
062         * @param fileStore  the file store
063         */
064        public static IdempotentRepository<String> fileIdempotentRepository(File fileStore) {
065            return fileIdempotentRepository(fileStore, 1000);
066        }
067    
068        /**
069         * Creates a new file based repository using a {@link org.apache.camel.util.LRUCache}
070         * as 1st level cache.
071         *
072         * @param fileStore  the file store
073         * @param cacheSize  the cache size
074         */
075        public static IdempotentRepository<String> fileIdempotentRepository(File fileStore, int cacheSize) {
076            return fileIdempotentRepository(fileStore, new LRUCache<String, Object>(cacheSize));
077        }
078    
079        /**
080         * Creates a new file based repository using a {@link org.apache.camel.util.LRUCache}
081         * as 1st level cache.
082         *
083         * @param fileStore  the file store
084         * @param cacheSize  the cache size
085         * @param maxFileStoreSize  the max size in bytes for the filestore file 
086         */
087        public static IdempotentRepository<String> fileIdempotentRepository(File fileStore, int cacheSize, long maxFileStoreSize) {
088            FileIdempotentRepository repository = new FileIdempotentRepository(fileStore, new LRUCache<String, Object>(cacheSize));
089            repository.setMaxFileStoreSize(maxFileStoreSize);
090            return repository;
091        }
092    
093        /**
094         * Creates a new file based repository using the given {@link java.util.Map}
095         * as 1st level cache.
096         * <p/>
097         * Care should be taken to use a suitable underlying {@link java.util.Map} to avoid this class being a
098         * memory leak.
099         *
100         * @param store  the file store
101         * @param cache  the cache to use as 1st level cache
102         */
103        public static IdempotentRepository<String> fileIdempotentRepository(File store, Map<String, Object> cache) {
104            return new FileIdempotentRepository(store, cache);
105        }
106    
107        public boolean add(String messageId) {
108            synchronized (cache) {
109                // init store if not loaded before
110                if (init.compareAndSet(false, true)) {
111                    loadStore();
112                }
113    
114                if (cache.containsKey(messageId)) {
115                    return false;
116                } else {
117                    cache.put(messageId, messageId);
118                    if (fileStore.length() < maxFileStoreSize) {
119                        // just append to store
120                        appendToStore(messageId);
121                    } else {
122                        // trunk store and flush the cache
123                        trunkStore();
124                    }
125    
126                    return true;
127                }
128            }
129        }
130    
131        public boolean contains(String key) {
132            synchronized (cache) {
133                // init store if not loaded before
134                if (init.compareAndSet(false, true)) {
135                    loadStore();
136                }
137                return cache.containsKey(key);
138            }
139        }
140    
141        public File getFileStore() {
142            return fileStore;
143        }
144    
145        public void setFileStore(File fileStore) {
146            this.fileStore = fileStore;
147        }
148    
149        public Map<String, Object> getCache() {
150            return cache;
151        }
152    
153        public void setCache(Map<String, Object> cache) {
154            this.cache = cache;
155        }
156    
157        public long getMaxFileStoreSize() {
158            return maxFileStoreSize;
159        }
160    
161        /**
162         * Sets the maximum filesize for the file store in bytes.
163         * <p/>
164         * The default is 1mb.
165         */
166        public void setMaxFileStoreSize(long maxFileStoreSize) {
167            this.maxFileStoreSize = maxFileStoreSize;
168        }
169    
170        /**
171         * Sets the cache size
172         */
173        public void setCacheSize(int size) {
174            if (cache != null) {
175                cache.clear();
176            }
177            cache = new LRUCache<String, Object>(size);
178        }
179    
180        /**
181         * Appends the given message id to the file store
182         *
183         * @param messageId  the message id
184         */
185        protected void appendToStore(final String messageId) {
186            if (LOG.isDebugEnabled()) {
187                LOG.debug("Appending " + messageId + " to idempotent filestore: " + fileStore);
188            }
189            FileOutputStream fos = null;
190            try {
191                // create store if missing
192                if (!fileStore.exists()) {
193                    fileStore.createNewFile();
194                }
195                // append to store
196                fos = new FileOutputStream(fileStore, true);
197                fos.write(messageId.getBytes());
198                fos.write(STORE_DELIMITER.getBytes());
199            } catch (IOException e) {
200                throw ObjectHelper.wrapRuntimeCamelException(e);
201            } finally {
202                ObjectHelper.close(fos, "Appending to file idempotent repository", LOG);
203            }
204        }
205    
206        /**
207         * Trunks the file store when the max store size is hit by rewriting the 1st level cache
208         * to the file store.
209         */
210        protected void trunkStore() {
211            if (LOG.isInfoEnabled()) {
212                LOG.info("Trunking idempotent filestore: " + fileStore);
213            }
214            FileOutputStream fos = null;
215            try {
216                fos = new FileOutputStream(fileStore);
217                for (String key : cache.keySet()) {
218                    fos.write(key.getBytes());
219                    fos.write(STORE_DELIMITER.getBytes());
220                }
221            } catch (IOException e) {
222                throw ObjectHelper.wrapRuntimeCamelException(e);
223            } finally {
224                ObjectHelper.close(fos, "Trunking file idempotent repository", LOG);
225            }
226        }
227    
228        /**
229         * Loads the given file store into the 1st level cache
230         */
231        protected void loadStore() {
232            if (LOG.isTraceEnabled()) {
233                LOG.trace("Loading to 1st level cache from idempotent filestore: " + fileStore);
234            }
235    
236            if (!fileStore.exists()) {
237                return;
238            }
239    
240            cache.clear();
241            Scanner scanner = null;
242            try {
243                scanner = new Scanner(fileStore);
244                scanner.useDelimiter(STORE_DELIMITER);
245                while (scanner.hasNextLine()) {
246                    String line = scanner.nextLine();
247                    cache.put(line, line);
248                }
249            } catch (IOException e) {
250                throw ObjectHelper.wrapRuntimeCamelException(e);
251            } finally {
252                if (scanner != null) {
253                    scanner.close();
254                }
255            }
256    
257            if (LOG.isDebugEnabled()) {
258                LOG.debug("Loaded " + cache.size() + " to the 1st level cache from idempotent filestore: " + fileStore);
259            }
260        }
261    
262    }