001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor.idempotent; 018 019 import java.io.File; 020 import java.io.FileOutputStream; 021 import java.io.IOException; 022 import java.util.Map; 023 import java.util.Scanner; 024 import java.util.concurrent.atomic.AtomicBoolean; 025 026 import org.apache.camel.spi.IdempotentRepository; 027 import org.apache.camel.util.LRUCache; 028 import org.apache.camel.util.ObjectHelper; 029 import org.apache.commons.logging.Log; 030 import org.apache.commons.logging.LogFactory; 031 032 /** 033 * A file based implementation of {@link org.apache.camel.spi.IdempotentRepository}. 034 * <p/> 035 * Care should be taken to use a suitable underlying {@link java.util.Map} to avoid this class being a 036 * memory leak. 037 * 038 * @version $Revision: 736555 $ 039 */ 040 public class FileIdempotentRepository implements IdempotentRepository<String> { 041 private static final transient Log LOG = LogFactory.getLog(FileIdempotentRepository.class); 042 private static final String STORE_DELIMITER = "\n"; 043 private Map<String, Object> cache; 044 private File fileStore; 045 private long maxFileStoreSize = 1024 * 1000L; // 1mb store file 046 private AtomicBoolean init = new AtomicBoolean(); 047 048 public FileIdempotentRepository() { 049 // default use a 1st level cache 050 this.cache = new LRUCache<String, Object>(1000); 051 } 052 053 public FileIdempotentRepository(File fileStore, Map<String, Object> set) { 054 this.fileStore = fileStore; 055 this.cache = set; 056 } 057 058 /** 059 * Creates a new file based repository using a {@link org.apache.camel.util.LRUCache} 060 * as 1st level cache with a default of 1000 entries in the cache. 061 * 062 * @param fileStore the file store 063 */ 064 public static IdempotentRepository<String> fileIdempotentRepository(File fileStore) { 065 return fileIdempotentRepository(fileStore, 1000); 066 } 067 068 /** 069 * Creates a new file based repository using a {@link org.apache.camel.util.LRUCache} 070 * as 1st level cache. 071 * 072 * @param fileStore the file store 073 * @param cacheSize the cache size 074 */ 075 public static IdempotentRepository<String> fileIdempotentRepository(File fileStore, int cacheSize) { 076 return fileIdempotentRepository(fileStore, new LRUCache<String, Object>(cacheSize)); 077 } 078 079 /** 080 * Creates a new file based repository using a {@link org.apache.camel.util.LRUCache} 081 * as 1st level cache. 082 * 083 * @param fileStore the file store 084 * @param cacheSize the cache size 085 * @param maxFileStoreSize the max size in bytes for the filestore file 086 */ 087 public static IdempotentRepository<String> fileIdempotentRepository(File fileStore, int cacheSize, long maxFileStoreSize) { 088 FileIdempotentRepository repository = new FileIdempotentRepository(fileStore, new LRUCache<String, Object>(cacheSize)); 089 repository.setMaxFileStoreSize(maxFileStoreSize); 090 return repository; 091 } 092 093 /** 094 * Creates a new file based repository using the given {@link java.util.Map} 095 * as 1st level cache. 096 * <p/> 097 * Care should be taken to use a suitable underlying {@link java.util.Map} to avoid this class being a 098 * memory leak. 099 * 100 * @param store the file store 101 * @param cache the cache to use as 1st level cache 102 */ 103 public static IdempotentRepository<String> fileIdempotentRepository(File store, Map<String, Object> cache) { 104 return new FileIdempotentRepository(store, cache); 105 } 106 107 public boolean add(String messageId) { 108 synchronized (cache) { 109 // init store if not loaded before 110 if (init.compareAndSet(false, true)) { 111 loadStore(); 112 } 113 114 if (cache.containsKey(messageId)) { 115 return false; 116 } else { 117 cache.put(messageId, messageId); 118 if (fileStore.length() < maxFileStoreSize) { 119 // just append to store 120 appendToStore(messageId); 121 } else { 122 // trunk store and flush the cache 123 trunkStore(); 124 } 125 126 return true; 127 } 128 } 129 } 130 131 public boolean contains(String key) { 132 synchronized (cache) { 133 // init store if not loaded before 134 if (init.compareAndSet(false, true)) { 135 loadStore(); 136 } 137 return cache.containsKey(key); 138 } 139 } 140 141 public File getFileStore() { 142 return fileStore; 143 } 144 145 public void setFileStore(File fileStore) { 146 this.fileStore = fileStore; 147 } 148 149 public Map<String, Object> getCache() { 150 return cache; 151 } 152 153 public void setCache(Map<String, Object> cache) { 154 this.cache = cache; 155 } 156 157 public long getMaxFileStoreSize() { 158 return maxFileStoreSize; 159 } 160 161 /** 162 * Sets the maximum filesize for the file store in bytes. 163 * <p/> 164 * The default is 1mb. 165 */ 166 public void setMaxFileStoreSize(long maxFileStoreSize) { 167 this.maxFileStoreSize = maxFileStoreSize; 168 } 169 170 /** 171 * Sets the cache size 172 */ 173 public void setCacheSize(int size) { 174 if (cache != null) { 175 cache.clear(); 176 } 177 cache = new LRUCache<String, Object>(size); 178 } 179 180 /** 181 * Appends the given message id to the file store 182 * 183 * @param messageId the message id 184 */ 185 protected void appendToStore(final String messageId) { 186 if (LOG.isDebugEnabled()) { 187 LOG.debug("Appending " + messageId + " to idempotent filestore: " + fileStore); 188 } 189 FileOutputStream fos = null; 190 try { 191 // create store if missing 192 if (!fileStore.exists()) { 193 fileStore.createNewFile(); 194 } 195 // append to store 196 fos = new FileOutputStream(fileStore, true); 197 fos.write(messageId.getBytes()); 198 fos.write(STORE_DELIMITER.getBytes()); 199 } catch (IOException e) { 200 throw ObjectHelper.wrapRuntimeCamelException(e); 201 } finally { 202 ObjectHelper.close(fos, "Appending to file idempotent repository", LOG); 203 } 204 } 205 206 /** 207 * Trunks the file store when the max store size is hit by rewriting the 1st level cache 208 * to the file store. 209 */ 210 protected void trunkStore() { 211 if (LOG.isInfoEnabled()) { 212 LOG.info("Trunking idempotent filestore: " + fileStore); 213 } 214 FileOutputStream fos = null; 215 try { 216 fos = new FileOutputStream(fileStore); 217 for (String key : cache.keySet()) { 218 fos.write(key.getBytes()); 219 fos.write(STORE_DELIMITER.getBytes()); 220 } 221 } catch (IOException e) { 222 throw ObjectHelper.wrapRuntimeCamelException(e); 223 } finally { 224 ObjectHelper.close(fos, "Trunking file idempotent repository", LOG); 225 } 226 } 227 228 /** 229 * Loads the given file store into the 1st level cache 230 */ 231 protected void loadStore() { 232 if (LOG.isTraceEnabled()) { 233 LOG.trace("Loading to 1st level cache from idempotent filestore: " + fileStore); 234 } 235 236 if (!fileStore.exists()) { 237 return; 238 } 239 240 cache.clear(); 241 Scanner scanner = null; 242 try { 243 scanner = new Scanner(fileStore); 244 scanner.useDelimiter(STORE_DELIMITER); 245 while (scanner.hasNextLine()) { 246 String line = scanner.nextLine(); 247 cache.put(line, line); 248 } 249 } catch (IOException e) { 250 throw ObjectHelper.wrapRuntimeCamelException(e); 251 } finally { 252 if (scanner != null) { 253 scanner.close(); 254 } 255 } 256 257 if (LOG.isDebugEnabled()) { 258 LOG.debug("Loaded " + cache.size() + " to the 1st level cache from idempotent filestore: " + fileStore); 259 } 260 } 261 262 }