001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements. See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache license, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License. You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the license for the specific language governing permissions and
015     * limitations under the license.
016     */
017    package org.apache.logging.log4j.flume.appender;
018    
019    import org.apache.flume.SourceRunner;
020    import org.apache.flume.lifecycle.LifecycleController;
021    import org.apache.flume.lifecycle.LifecycleState;
022    import org.apache.flume.node.NodeConfiguration;
023    import org.apache.flume.node.nodemanager.DefaultLogicalNodeManager;
024    import org.apache.logging.log4j.core.appender.ManagerFactory;
025    import org.apache.logging.log4j.core.config.ConfigurationException;
026    import org.apache.logging.log4j.core.config.Property;
027    
028    import java.security.MessageDigest;
029    import java.util.Locale;
030    import java.util.Properties;
031    
032    /**
033     *
034     */
035    public class FlumeEmbeddedManager extends AbstractFlumeManager {
036    
037        private static ManagerFactory factory = new FlumeManagerFactory();
038    
039        private final FlumeNode node;
040    
041        private NodeConfiguration conf;
042    
043        protected static final String SOURCE_NAME = "log4j-source";
044    
045        private static final String LINE_SEP = System.getProperty("file.separator");
046    
047        private final Log4jEventSource source;
048    
049        private final String shortName;
050    
051    
052        /**
053         * Constructor
054         * @param name The unique name of this manager.
055         * @param node The Flume Node.
056         */
057        protected FlumeEmbeddedManager(String name, String shortName, FlumeNode node) {
058            super(name);
059            this.node = node;
060            this.shortName = shortName;
061            SourceRunner runner = node.getConfiguration().getSourceRunners().get(SOURCE_NAME);
062            if (runner == null || runner.getSource() == null) {
063                throw new IllegalStateException("No Source has been created for Appender " + shortName);
064            }
065            source  = (Log4jEventSource) runner.getSource();
066        }
067    
068        /**
069         * Returns a FlumeEmbeddedManager.
070         * @param agents The agents to use.
071         * @param batchSize The number of events to include in a batch.
072         * @return A FlumeAvroManager.
073         */
074        public static FlumeEmbeddedManager getManager(String name, Agent[] agents, Property[] properties, int batchSize,
075                                                      String dataDir) {
076    
077            if (batchSize <= 0) {
078                batchSize = 1;
079            }
080    
081            if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
082                throw new IllegalArgumentException("Either an Agent or properties are required");
083            } else if (agents != null && agents.length > 0 && properties != null && properties.length > 0) {
084                throw new IllegalArgumentException("Cannot configure both Agents and Properties.");
085            }
086    
087            StringBuilder sb = new StringBuilder();
088            boolean first = true;
089    
090            if (agents != null && agents.length > 0) {
091                sb.append("FlumeEmbedded[");
092                for (Agent agent : agents) {
093                    if (!first) {
094                        sb.append(",");
095                    }
096                    sb.append(agent.getHost()).append(":").append(agent.getPort());
097                    first = false;
098                }
099                sb.append("]");
100            } else {
101                String sep = "";
102                sb.append(name).append(":");
103                StringBuilder props = new StringBuilder();
104                for (Property prop : properties) {
105                    props.append(sep);
106                    props.append(prop.getName()).append("=").append(prop.getValue());
107                    sep = ",";
108                }
109                try {
110                    MessageDigest digest = MessageDigest.getInstance("MD5");
111                    digest.update(sb.toString().getBytes());
112                    byte[] bytes = digest.digest();
113                    StringBuilder md5 = new StringBuilder();
114                    for (byte b : bytes) {
115                        String hex = Integer.toHexString(0xff & b);
116                        if (hex.length() == 1) {
117                            md5.append('0');
118                        }
119                        md5.append(hex);
120                    }
121                    sb.append(md5.toString());
122                } catch (Exception ex) {
123                    sb.append(props);
124                }
125            }
126            return (FlumeEmbeddedManager) getManager(sb.toString(), factory,
127                new FactoryData(name, agents, properties, batchSize, dataDir));
128        }
129    
130        @Override
131        public void send(FlumeEvent event, int delay, int retries) {
132            source.send(event);
133        }
134    
135        @Override
136        protected void releaseSub() {
137            node.stop();
138        }
139    
140        /**
141         * Factory data.
142         */
143        private static class FactoryData {
144            private Agent[] agents;
145            private Property[] properties;
146            private int batchSize;
147            private String dataDir;
148            private String name;
149    
150            /**
151             * Constructor.
152             * @param name The name of the Appender.
153             * @param agents The agents.
154             * @param properties The Flume configuration properties.
155             * @param batchSize The number of events to include in a batch.
156             * @param dataDir The directory where Flume should write to.
157             */
158            public FactoryData(String name, Agent[] agents, Property[] properties, int batchSize, String dataDir) {
159                this.name = name;
160                this.agents = agents;
161                this.batchSize = batchSize;
162                this.properties = properties;
163                this.dataDir = dataDir;
164            }
165        }
166    
167        /**
168         * Avro Manager Factory.
169         */
170        private static class FlumeManagerFactory implements ManagerFactory<FlumeEmbeddedManager, FactoryData> {
171            private static final String sourceType = Log4jEventSource.class.getName();
172    
173            /**
174             * Create the FlumeAvroManager.
175             * @param name The name of the entity to manage.
176             * @param data The data required to create the entity.
177             * @return The FlumeAvroManager.
178             */
179            public FlumeEmbeddedManager createManager(String name, FactoryData data) {
180                try {
181                    DefaultLogicalNodeManager nodeManager = new DefaultLogicalNodeManager();
182                    Properties props = createProperties(data.name, data.agents, data.properties, data.batchSize,
183                        data.dataDir);
184                    FlumeConfigurationBuilder builder = new FlumeConfigurationBuilder();
185                    NodeConfiguration conf = builder.load(data.name, props, nodeManager);
186    
187                    FlumeNode node = new FlumeNode(nodeManager, conf);
188    
189                    node.start();
190                    LifecycleController.waitForOneOf(node, LifecycleState.START_OR_ERROR);
191    
192                    return new FlumeEmbeddedManager(name, data.name, node);
193                } catch (Exception ex) {
194                    LOGGER.error("Could not create FlumeEmbeddedManager", ex);
195                }
196                return null;
197            }
198    
199            private Properties createProperties(String name, Agent[] agents, Property[] properties, int batchSize,
200                                                String dataDir) {
201                Properties props = new Properties();
202    
203                if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
204                    LOGGER.error("No Flume configuration provided");
205                    throw new ConfigurationException("No Flume configuration provided");
206                }
207    
208                if ((agents != null && agents.length > 0 && properties != null && properties.length > 0)) {
209                    LOGGER.error("Agents and Flume configuration cannot both be specified");
210                    throw new ConfigurationException("Agents and Flume configuration cannot both be specified");
211                }
212    
213                if (agents != null && agents.length > 0) {
214                    props.put(name + ".sources", FlumeEmbeddedManager.SOURCE_NAME);
215                    props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".type", sourceType);
216                    props.put(name + ".channels", "file");
217                    props.put(name + ".channels.file.type", "file");
218                    if (dataDir != null && dataDir.length() > 0) {
219                        if (!dataDir.endsWith(LINE_SEP)) {
220                            dataDir = dataDir + LINE_SEP;
221                        }
222    
223                        props.put(name + ".channels.file.checkpointDir", dataDir + "checkpoint");
224                        props.put(name + ".channels.file.dataDirs", dataDir + "data");
225                    }
226    
227                    StringBuilder sb = new StringBuilder();
228                    String leading = "";
229                    int priority = agents.length;
230                    for (int i=0; i < agents.length; ++i) {
231                        sb.append(leading).append("agent").append(i);
232                        leading = " ";
233                        String prefix = name + ".sinks.agent" + i;
234                        props.put(prefix + ".channel", "file");
235                        props.put(prefix + ".type", "avro");
236                        props.put(prefix + ".hostname", agents[i].getHost());
237                        props.put(prefix + ".port", Integer.toString(agents[i].getPort()));
238                        props.put(prefix + ".batch-size", Integer.toString(batchSize));
239                        props.put(name + ".sinkgroups.group1.processor.priority.agent" + i, Integer.toString(priority));
240                        --priority;
241                    }
242                    props.put(name + ".sinks", sb.toString());
243                    props.put(name + ".sinkgroups", "group1");
244                    props.put(name + ".sinkgroups.group1.sinks", sb.toString());
245                    props.put(name + ".sinkgroups.group1.processor.type", "failover");
246                    String sourceChannels = "file";
247                    props.put(name + ".channels", sourceChannels);
248                    props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".channels", sourceChannels);
249                } else {
250                    String channels = null;
251                    String[] sinks = null;
252    
253                    props.put(name + ".sources", FlumeEmbeddedManager.SOURCE_NAME);
254                    props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".type", sourceType);
255    
256                    for (Property property : properties) {
257                        String key = property.getName();
258    
259                        if (key == null || key.length() == 0) {
260                            String msg = "A property name must be provided";
261                            LOGGER.error(msg);
262                            throw new ConfigurationException(msg);
263                        }
264    
265                        String upperKey = key.toUpperCase(Locale.ENGLISH);
266    
267                        if (upperKey.startsWith(name.toUpperCase(Locale.ENGLISH))) {
268                            String msg = "Specification of the agent name is allowed in Flume Appender configuration: " + key;
269                            LOGGER.error(msg);
270                            throw new ConfigurationException(msg);
271                        }
272    
273                        if (upperKey.startsWith("SOURCES.")) {
274                            String msg = "Specification of Sources is not allowed in Flume Appender: " + key;
275                            LOGGER.error(msg);
276                            throw new ConfigurationException(msg);
277                        }
278    
279                        String value = property.getValue();
280                        if (value == null || value.length() == 0) {
281                            String msg = "A value for property " + key + " must be provided";
282                            LOGGER.error(msg);
283                            throw new ConfigurationException(msg);
284                        }
285    
286                        if (upperKey.equals("CHANNELS")) {
287                            channels = value.trim();
288                        } else if (upperKey.equals("SINKS")) {
289                            sinks = value.trim().split(" ");
290                        }
291    
292                        props.put(name + '.' + key, value);
293                    }
294    
295                    String sourceChannels = channels;
296    
297                    if (channels == null) {
298                        sourceChannels = "file";
299                        props.put(name + ".channels", sourceChannels);
300                    }
301    
302                    props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".channels", sourceChannels);
303    
304                    if (sinks == null || sinks.length == 0) {
305                        String msg = "At least one Sink must be specified";
306                        LOGGER.error(msg);
307                        throw new ConfigurationException(msg);
308                    }
309                }
310                return props;
311            }
312    
313        }
314    
315    }