001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements. See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache license, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License. You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the license for the specific language governing permissions and
015     * limitations under the license.
016     */
017    package org.apache.logging.log4j.flume.appender;
018    
019    import org.apache.flume.Event;
020    import org.apache.flume.SourceRunner;
021    import org.apache.flume.node.NodeConfiguration;
022    import org.apache.flume.node.nodemanager.DefaultLogicalNodeManager;
023    import org.apache.logging.log4j.core.appender.ManagerFactory;
024    import org.apache.logging.log4j.core.config.ConfigurationException;
025    import org.apache.logging.log4j.core.config.Property;
026    import org.apache.logging.log4j.core.helpers.NameUtil;
027    import org.apache.logging.log4j.util.PropertiesUtil;
028    
029    import java.util.Locale;
030    import java.util.Properties;
031    
032    /**
033     *
034     */
035    public class FlumeEmbeddedManager extends AbstractFlumeManager {
036    
037        /** Name for the Flume source */
038        protected static final String SOURCE_NAME = "log4j-source";
039    
040        private static FlumeManagerFactory factory = new FlumeManagerFactory();
041    
042        private static final String FiLE_SEP = PropertiesUtil.getProperties().getStringProperty("file.separator");
043    
044        private static final String IN_MEMORY = "InMemory";
045    
046        private final FlumeNode node;
047    
048        private NodeConfiguration conf;
049    
050        private final Log4jEventSource source;
051    
052        private final String shortName;
053    
054    
055        /**
056         * Constructor
057         * @param name The unique name of this manager.
058         * @param node The Flume Node.
059         */
060        protected FlumeEmbeddedManager(final String name, final String shortName, final FlumeNode node) {
061            super(name);
062            this.node = node;
063            this.shortName = shortName;
064            final SourceRunner runner = node.getConfiguration().getSourceRunners().get(SOURCE_NAME);
065            if (runner == null || runner.getSource() == null) {
066                throw new IllegalStateException("No Source has been created for Appender " + shortName);
067            }
068            source  = (Log4jEventSource) runner.getSource();
069        }
070    
071        /**
072         * Returns a FlumeEmbeddedManager.
073         * @param name The name of the manager.
074         * @param agents The agents to use.
075         * @param properties Properties for the embedded manager.
076         * @param batchSize The number of events to include in a batch.
077         * @param dataDir The directory where the Flume FileChannel should write to.
078         * @return A FlumeAvroManager.
079         */
080        public static FlumeEmbeddedManager getManager(final String name, final Agent[] agents, final Property[] properties,
081                                                      int batchSize, final String dataDir) {
082    
083            if (batchSize <= 0) {
084                batchSize = 1;
085            }
086    
087            if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
088                throw new IllegalArgumentException("Either an Agent or properties are required");
089            } else if (agents != null && agents.length > 0 && properties != null && properties.length > 0) {
090                throw new IllegalArgumentException("Cannot configure both Agents and Properties.");
091            }
092    
093            final StringBuilder sb = new StringBuilder();
094            boolean first = true;
095    
096            if (agents != null && agents.length > 0) {
097                sb.append("FlumeEmbedded[");
098                for (final Agent agent : agents) {
099                    if (!first) {
100                        sb.append(",");
101                    }
102                    sb.append(agent.getHost()).append(":").append(agent.getPort());
103                    first = false;
104                }
105                sb.append("]");
106            } else {
107                String sep = "";
108                sb.append(name).append(":");
109                final StringBuilder props = new StringBuilder();
110                for (final Property prop : properties) {
111                    props.append(sep);
112                    props.append(prop.getName()).append("=").append(prop.getValue());
113                    sep = ",";
114                }
115                sb.append(NameUtil.md5(props.toString()));
116            }
117            return getManager(sb.toString(), factory,
118                    new FactoryData(name, agents, properties, batchSize, dataDir));
119        }
120    
121        @Override
122        public void send(final Event event) {
123            source.send(event);
124        }
125    
126        @Override
127        protected void releaseSub() {
128            node.stop();
129        }
130    
131        /**
132         * Factory data.
133         */
134        private static class FactoryData {
135            private final Agent[] agents;
136            private final Property[] properties;
137            private final int batchSize;
138            private final String dataDir;
139            private final String name;
140    
141            /**
142             * Constructor.
143             * @param name The name of the Appender.
144             * @param agents The agents.
145             * @param properties The Flume configuration properties.
146             * @param batchSize The number of events to include in a batch.
147             * @param dataDir The directory where Flume should write to.
148             */
149            public FactoryData(final String name, final Agent[] agents, final Property[] properties, final int batchSize,
150                               final String dataDir) {
151                this.name = name;
152                this.agents = agents;
153                this.batchSize = batchSize;
154                this.properties = properties;
155                this.dataDir = dataDir;
156            }
157        }
158    
159        /**
160         * Avro Manager Factory.
161         */
162        private static class FlumeManagerFactory implements ManagerFactory<FlumeEmbeddedManager, FactoryData> {
163            private static final String SOURCE_TYPE = Log4jEventSource.class.getName();
164    
165            /**
166             * Create the FlumeAvroManager.
167             * @param name The name of the entity to manage.
168             * @param data The data required to create the entity.
169             * @return The FlumeAvroManager.
170             */
171            @Override
172            public FlumeEmbeddedManager createManager(final String name, final FactoryData data) {
173                try {
174                    final DefaultLogicalNodeManager nodeManager = new DefaultLogicalNodeManager();
175                    final Properties props = createProperties(data.name, data.agents, data.properties, data.batchSize,
176                        data.dataDir);
177                    final FlumeConfigurationBuilder builder = new FlumeConfigurationBuilder();
178                    final NodeConfiguration conf = builder.load(data.name, props, nodeManager);
179    
180                    final FlumeNode node = new FlumeNode(nodeManager, nodeManager, conf);
181    
182                    node.start();
183    
184                    return new FlumeEmbeddedManager(name, data.name, node);
185                } catch (final Exception ex) {
186                    LOGGER.error("Could not create FlumeEmbeddedManager", ex);
187                }
188                return null;
189            }
190    
191            private Properties createProperties(final String name, final Agent[] agents, final Property[] properties,
192                                                final int batchSize, String dataDir) {
193                final Properties props = new Properties();
194    
195                if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
196                    LOGGER.error("No Flume configuration provided");
197                    throw new ConfigurationException("No Flume configuration provided");
198                }
199    
200                if ((agents != null && agents.length > 0 && properties != null && properties.length > 0)) {
201                    LOGGER.error("Agents and Flume configuration cannot both be specified");
202                    throw new ConfigurationException("Agents and Flume configuration cannot both be specified");
203                }
204    
205                if (agents != null && agents.length > 0) {
206                    props.put(name + ".sources", FlumeEmbeddedManager.SOURCE_NAME);
207                    props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".type", SOURCE_TYPE);
208    
209                    if (dataDir != null && dataDir.length() > 0) {
210                        if (dataDir.equals(IN_MEMORY)) {
211                            props.put(name + ".channels", "primary");
212                            props.put(name + ".channels.primary.type", "memory");
213                        } else {
214                            props.put(name + ".channels", "primary");
215                            props.put(name + ".channels.primary.type", "file");
216    
217                            if (!dataDir.endsWith(FiLE_SEP)) {
218                                dataDir = dataDir + FiLE_SEP;
219                            }
220    
221                            props.put(name + ".channels.primary.checkpointDir", dataDir + "checkpoint");
222                            props.put(name + ".channels.primary.dataDirs", dataDir + "data");
223                        }
224    
225                    } else {
226                        props.put(name + ".channels", "primary");
227                        props.put(name + ".channels.primary.type", "file");
228                    }
229    
230                    final StringBuilder sb = new StringBuilder();
231                    String leading = "";
232                    int priority = agents.length;
233                    for (int i = 0; i < agents.length; ++i) {
234                        sb.append(leading).append("agent").append(i);
235                        leading = " ";
236                        final String prefix = name + ".sinks.agent" + i;
237                        props.put(prefix + ".channel", "primary");
238                        props.put(prefix + ".type", "avro");
239                        props.put(prefix + ".hostname", agents[i].getHost());
240                        props.put(prefix + ".port", Integer.toString(agents[i].getPort()));
241                        props.put(prefix + ".batch-size", Integer.toString(batchSize));
242                        props.put(name + ".sinkgroups.group1.processor.priority.agent" + i, Integer.toString(priority));
243                        --priority;
244                    }
245                    props.put(name + ".sinks", sb.toString());
246                    props.put(name + ".sinkgroups", "group1");
247                    props.put(name + ".sinkgroups.group1.sinks", sb.toString());
248                    props.put(name + ".sinkgroups.group1.processor.type", "failover");
249                    final String sourceChannels = "primary";
250                    props.put(name + ".channels", sourceChannels);
251                    props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".channels", sourceChannels);
252                } else {
253                    String channels = null;
254                    String[] sinks = null;
255    
256                    props.put(name + ".sources", FlumeEmbeddedManager.SOURCE_NAME);
257                    props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".type", SOURCE_TYPE);
258    
259                    for (final Property property : properties) {
260                        final String key = property.getName();
261    
262                        if (key == null || key.length() == 0) {
263                            final String msg = "A property name must be provided";
264                            LOGGER.error(msg);
265                            throw new ConfigurationException(msg);
266                        }
267    
268                        final String upperKey = key.toUpperCase(Locale.ENGLISH);
269    
270                        if (upperKey.startsWith(name.toUpperCase(Locale.ENGLISH))) {
271                            final String msg =
272                                "Specification of the agent name is not allowed in Flume Appender configuration: " + key;
273                            LOGGER.error(msg);
274                            throw new ConfigurationException(msg);
275                        }
276    
277                        // Prohibit setting the sources as they are set above but allow interceptors to be set
278                        if (upperKey.startsWith("SOURCES.") && !upperKey.startsWith("SOURCES.LOG4J-SOURCE.INTERCEPTORS")) {
279                            final String msg = "Specification of Sources is not allowed in Flume Appender: " + key;
280                            LOGGER.error(msg);
281                            throw new ConfigurationException(msg);
282                        }
283    
284                        final String value = property.getValue();
285                        if (value == null || value.length() == 0) {
286                            final String msg = "A value for property " + key + " must be provided";
287                            LOGGER.error(msg);
288                            throw new ConfigurationException(msg);
289                        }
290    
291                        if (upperKey.equals("CHANNELS")) {
292                            channels = value.trim();
293                        } else if (upperKey.equals("SINKS")) {
294                            sinks = value.trim().split(" ");
295                        }
296    
297                        props.put(name + '.' + key, value);
298                    }
299    
300                    String sourceChannels = channels;
301    
302                    if (channels == null) {
303                        sourceChannels = "primary";
304                        props.put(name + ".channels", sourceChannels);
305                    }
306    
307                    props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".channels", sourceChannels);
308    
309                    if (sinks == null || sinks.length == 0) {
310                        final String msg = "At least one Sink must be specified";
311                        LOGGER.error(msg);
312                        throw new ConfigurationException(msg);
313                    }
314                }
315                return props;
316            }
317    
318        }
319    
320    }