001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.file;
018    
019    import org.apache.camel.Exchange;
020    import org.apache.camel.Producer;
021    import org.apache.camel.impl.DefaultProducer;
022    import org.apache.camel.util.ExchangeHelper;
023    import org.apache.commons.logging.Log;
024    import org.apache.commons.logging.LogFactory;
025    
026    import java.io.File;
027    import java.io.FileOutputStream;
028    import java.io.IOException;
029    import java.io.InputStream;
030    import java.io.RandomAccessFile;
031    import java.nio.ByteBuffer;
032    import java.nio.channels.FileChannel;
033    
034    /**
035     * A {@link Producer} implementation for File
036     *
037     * @version $Revision: 523016 $
038     */
039    public class FileProducer extends DefaultProducer {
040        private static final transient Log LOG = LogFactory.getLog(FileProducer.class);
041        private final FileEndpoint endpoint;
042    
043        public FileProducer(FileEndpoint endpoint) {
044            super(endpoint);
045            this.endpoint = endpoint;
046        }
047    
048        public FileEndpoint getEndpoint() {
049            return (FileEndpoint) super.getEndpoint();
050        }
051    
052        /**
053         * @param exchange
054         * @see org.apache.camel.Processor#process(Exchange)
055         */
056        public void process(Exchange exchange) throws Exception {
057            process(endpoint.toExchangeType(exchange));
058        }
059    
060        public void process(FileExchange exchange) throws Exception {
061            InputStream in = ExchangeHelper.getMandatoryInBody(exchange, InputStream.class);
062            File file = createFileName(exchange);
063            buildDirectory(file);
064            if (LOG.isDebugEnabled()) {
065                LOG.debug("About to write to: " + file + " from exchange: " + exchange);
066            }
067            FileChannel fc = null;
068            try {
069                if (getEndpoint().isAppend()) {
070                    fc = new RandomAccessFile(file, "rw").getChannel();
071                    fc.position(fc.size());
072                }
073                else {
074                    fc = new FileOutputStream(file).getChannel();
075                }
076                int size = getEndpoint().getBufferSize();
077                byte[] buffer = new byte[size];
078                ByteBuffer byteBuffer = ByteBuffer.wrap(buffer);
079                while (true) {
080                    int count = in.read(buffer);
081                    if (count <= 0) {
082                        break;
083                    }
084                    else if (count < size) {
085                        byteBuffer = ByteBuffer.wrap(buffer, 0, count);
086                        fc.write(byteBuffer);
087                        break;
088                    }
089                    else {
090                        fc.write(byteBuffer);
091                    }
092                }
093            }
094            finally {
095                if (in != null) {
096                    try {
097                        in.close();
098                    }
099                    catch (IOException e) {
100                        LOG.warn("Failed to close input: " + e, e);
101                    }
102                }
103                if (fc != null) {
104                    try {
105                        fc.close();
106                    }
107                    catch (IOException e) {
108                        LOG.warn("Failed to close output: " + e, e);
109                    }
110                }
111            }
112            /*
113            ByteBuffer payload = exchange.getIn().getBody(ByteBuffer.class);
114            if (payload == null) {
115                InputStream in = ExchangeHelper.getMandatoryInBody(exchange, InputStream.class);
116                payload = ExchangeHelper.convertToMandatoryType(exchange, ByteBuffer.class, in);
117            }
118            payload.flip();
119            File file = createFileName(exchange);
120            buildDirectory(file);
121            if (LOG.isDebugEnabled()) {
122                LOG.debug("Creating file: " + file);
123            }
124            FileChannel fc = null;
125            try {
126                if (getEndpoint().isAppend()) {
127                    fc = new RandomAccessFile(file, "rw").getChannel();
128                    fc.position(fc.size());
129                }
130                else {
131                    fc = new FileOutputStream(file).getChannel();
132                }
133                fc.write(payload);
134            }
135            catch (Throwable e) {
136                LOG.error("Failed to write to File: " + file, e);
137            }
138            finally {
139                if (fc != null) {
140                    fc.close();
141                }
142            }
143            */
144        }
145    
146        protected File createFileName(FileExchange exchange) {
147            String fileName = exchange.getIn().getMessageId();
148    
149            File endpointFile = endpoint.getFile();
150            String name = exchange.getIn().getHeader(FileComponent.HEADER_FILE_NAME, String.class);
151            if (name != null) {
152                File answer = new File(endpointFile, name);
153                if (answer.isDirectory()) {
154                    return new File(answer, fileName);
155                }
156                else {
157                    return answer;
158                }
159            }
160            if (endpointFile != null && endpointFile.isDirectory()) {
161                return new File(endpointFile, fileName);
162            }
163            else {
164                return new File(fileName);
165            }
166        }
167    
168        private void buildDirectory(File file) {
169            String dirName = file.getAbsolutePath();
170            int index = dirName.lastIndexOf(File.separatorChar);
171            if (index > 0) {
172                dirName = dirName.substring(0, index);
173                File dir = new File(dirName);
174                dir.mkdirs();
175            }
176        }
177    }