001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.file.strategy; 018 019 import java.io.File; 020 import java.io.IOException; 021 import java.io.RandomAccessFile; 022 import java.nio.channels.Channel; 023 import java.nio.channels.FileChannel; 024 import java.nio.channels.FileLock; 025 026 import org.apache.camel.Exchange; 027 import org.apache.camel.component.file.GenericFile; 028 import org.apache.camel.component.file.GenericFileExclusiveReadLockStrategy; 029 import org.apache.camel.component.file.GenericFileOperations; 030 import org.apache.camel.util.ExchangeHelper; 031 import org.apache.camel.util.ObjectHelper; 032 import org.apache.commons.logging.Log; 033 import org.apache.commons.logging.LogFactory; 034 035 /** 036 * Acquires exclusive read lock to the given file. Will wait until the lock is granted. 037 * After granting the read lock it is released, we just want to make sure that when we start 038 * consuming the file its not currently in progress of being written by third party. 039 */ 040 public class FileLockExclusiveReadLockStrategy implements GenericFileExclusiveReadLockStrategy<File> { 041 private static final transient Log LOG = LogFactory.getLog(FileLockExclusiveReadLockStrategy.class); 042 private long timeout; 043 044 public boolean acquireExclusiveReadLock(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { 045 File target = new File(file.getAbsoluteFilePath()); 046 047 if (LOG.isTraceEnabled()) { 048 LOG.trace("Waiting for exclusive read lock to file: " + target); 049 } 050 051 try { 052 // try to acquire rw lock on the file before we can consume it 053 FileChannel channel = new RandomAccessFile(target, "rw").getChannel(); 054 055 long start = System.currentTimeMillis(); 056 boolean exclusive = false; 057 058 while (!exclusive) { 059 // timeout check 060 if (timeout > 0) { 061 long delta = System.currentTimeMillis() - start; 062 if (delta > timeout) { 063 LOG.debug("Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + target); 064 // we could not get the lock within the timeout period, so return false 065 return false; 066 } 067 } 068 069 // get the lock using either try lock or not depending on if we are using timeout or not 070 FileLock lock = null; 071 try { 072 lock = timeout > 0 ? channel.tryLock() : channel.lock(); 073 } catch (IllegalStateException ex) { 074 // Also catch the OverlappingFileLockException here. Do nothing here 075 } 076 if (lock != null) { 077 if (LOG.isTraceEnabled()) { 078 LOG.trace("Acquired exclusive read lock: " + lock + " to file: " + target); 079 } 080 081 // store lock so we can release it later 082 exchange.setProperty("CamelFileLock", lock); 083 exchange.setProperty("CamelFileLockName", target.getName()); 084 085 exclusive = true; 086 } else { 087 boolean interrupted = sleep(); 088 if (interrupted) { 089 // we were interrputed while sleeping, we are likely being shutdown so return false 090 return false; 091 } 092 } 093 } 094 } catch (IOException e) { 095 // must handle IOException as some apps on Windows etc. will still somehow hold a lock to a file 096 // such as AntiVirus or MS Office that has special locks for it's supported files 097 if (timeout == 0) { 098 // if not using timeout, then we cant retry, so rethrow 099 throw e; 100 } 101 if (LOG.isDebugEnabled()) { 102 LOG.debug("Cannot acquire read lock. Will try again.", e); 103 } 104 boolean interrupted = sleep(); 105 if (interrupted) { 106 // we were interrputed while sleeping, we are likely being shutdown so return false 107 return false; 108 } 109 } 110 111 return true; 112 } 113 114 public void releaseExclusiveReadLock(GenericFileOperations<File> operations, 115 GenericFile<File> file, Exchange exchange) throws Exception { 116 FileLock lock = ExchangeHelper.getMandatoryProperty(exchange, "CamelFileLock", FileLock.class); 117 String lockFileName = ExchangeHelper.getMandatoryProperty(exchange, "CamelFileLockName", String.class); 118 Channel channel = lock.channel(); 119 try { 120 lock.release(); 121 } finally { 122 // must close channel first 123 ObjectHelper.close(channel, "while acquiring exclusive read lock for file: " + lockFileName, LOG); 124 } 125 } 126 127 private boolean sleep() { 128 LOG.trace("Exclusive read lock not granted. Sleeping for 1000 millis."); 129 try { 130 Thread.sleep(1000); 131 return true; 132 } catch (InterruptedException e) { 133 LOG.debug("Sleep interrupted while waiting for exclusive read lock, so breaking out"); 134 return false; 135 } 136 } 137 138 public long getTimeout() { 139 return timeout; 140 } 141 142 /** 143 * Sets an optional timeout period. 144 * <p/> 145 * If the readlock could not be granted within the timeperiod then the wait is stopped and the 146 * acquireReadLock returns <tt>false</tt>. 147 * 148 * @param timeout period in millis 149 */ 150 public void setTimeout(long timeout) { 151 this.timeout = timeout; 152 } 153 154 }