001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements. See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache license, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License. You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the license for the specific language governing permissions and
015     * limitations under the license.
016     */
017    package org.apache.logging.log4j.flume.appender;
018    
019    import org.apache.flume.SourceRunner;
020    import org.apache.flume.node.NodeConfiguration;
021    import org.apache.flume.node.nodemanager.DefaultLogicalNodeManager;
022    import org.apache.logging.log4j.core.appender.ManagerFactory;
023    import org.apache.logging.log4j.core.config.ConfigurationException;
024    import org.apache.logging.log4j.core.config.Property;
025    import org.apache.logging.log4j.core.helpers.NameUtil;
026    import org.apache.logging.log4j.util.PropertiesUtil;
027    
028    import java.util.Locale;
029    import java.util.Properties;
030    
031    /**
032     *
033     */
034    public class FlumeEmbeddedManager extends AbstractFlumeManager {
035    
036        /** Name for the Flume source */
037        protected static final String SOURCE_NAME = "log4j-source";
038    
039        private static ManagerFactory factory = new FlumeManagerFactory();
040    
041        private static final String FiLE_SEP = PropertiesUtil.getProperties().getStringProperty("file.separator");
042    
043        private static final String IN_MEMORY = "InMemory";
044    
045        private final FlumeNode node;
046    
047        private NodeConfiguration conf;
048    
049        private final Log4jEventSource source;
050    
051        private final String shortName;
052    
053    
054        /**
055         * Constructor
056         * @param name The unique name of this manager.
057         * @param node The Flume Node.
058         */
059        protected FlumeEmbeddedManager(final String name, final String shortName, final FlumeNode node) {
060            super(name);
061            this.node = node;
062            this.shortName = shortName;
063            final SourceRunner runner = node.getConfiguration().getSourceRunners().get(SOURCE_NAME);
064            if (runner == null || runner.getSource() == null) {
065                throw new IllegalStateException("No Source has been created for Appender " + shortName);
066            }
067            source  = (Log4jEventSource) runner.getSource();
068        }
069    
070        /**
071         * Returns a FlumeEmbeddedManager.
072         * @param name The name of the manager.
073         * @param agents The agents to use.
074         * @param properties Properties for the embedded manager.
075         * @param batchSize The number of events to include in a batch.
076         * @param dataDir The directory where the Flume FileChannel should write to.
077         * @return A FlumeAvroManager.
078         */
079        public static FlumeEmbeddedManager getManager(final String name, final Agent[] agents, final Property[] properties,
080                                                      int batchSize, final String dataDir) {
081    
082            if (batchSize <= 0) {
083                batchSize = 1;
084            }
085    
086            if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
087                throw new IllegalArgumentException("Either an Agent or properties are required");
088            } else if (agents != null && agents.length > 0 && properties != null && properties.length > 0) {
089                throw new IllegalArgumentException("Cannot configure both Agents and Properties.");
090            }
091    
092            final StringBuilder sb = new StringBuilder();
093            boolean first = true;
094    
095            if (agents != null && agents.length > 0) {
096                sb.append("FlumeEmbedded[");
097                for (final Agent agent : agents) {
098                    if (!first) {
099                        sb.append(",");
100                    }
101                    sb.append(agent.getHost()).append(":").append(agent.getPort());
102                    first = false;
103                }
104                sb.append("]");
105            } else {
106                String sep = "";
107                sb.append(name).append(":");
108                final StringBuilder props = new StringBuilder();
109                for (final Property prop : properties) {
110                    props.append(sep);
111                    props.append(prop.getName()).append("=").append(prop.getValue());
112                    sep = ",";
113                }
114                sb.append(NameUtil.md5(props.toString()));
115            }
116            return (FlumeEmbeddedManager) getManager(sb.toString(), factory,
117                new FactoryData(name, agents, properties, batchSize, dataDir));
118        }
119    
120        @Override
121        public void send(final FlumeEvent event, final int delay, final int retries) {
122            source.send(event);
123        }
124    
125        @Override
126        protected void releaseSub() {
127            node.stop();
128        }
129    
130        /**
131         * Factory data.
132         */
133        private static class FactoryData {
134            private final Agent[] agents;
135            private final Property[] properties;
136            private final int batchSize;
137            private final String dataDir;
138            private final String name;
139    
140            /**
141             * Constructor.
142             * @param name The name of the Appender.
143             * @param agents The agents.
144             * @param properties The Flume configuration properties.
145             * @param batchSize The number of events to include in a batch.
146             * @param dataDir The directory where Flume should write to.
147             */
148            public FactoryData(final String name, final Agent[] agents, final Property[] properties, final int batchSize,
149                               final String dataDir) {
150                this.name = name;
151                this.agents = agents;
152                this.batchSize = batchSize;
153                this.properties = properties;
154                this.dataDir = dataDir;
155            }
156        }
157    
158        /**
159         * Avro Manager Factory.
160         */
161        private static class FlumeManagerFactory implements ManagerFactory<FlumeEmbeddedManager, FactoryData> {
162            private static final String SOURCE_TYPE = Log4jEventSource.class.getName();
163    
164            /**
165             * Create the FlumeAvroManager.
166             * @param name The name of the entity to manage.
167             * @param data The data required to create the entity.
168             * @return The FlumeAvroManager.
169             */
170            public FlumeEmbeddedManager createManager(final String name, final FactoryData data) {
171                try {
172                    final DefaultLogicalNodeManager nodeManager = new DefaultLogicalNodeManager();
173                    final Properties props = createProperties(data.name, data.agents, data.properties, data.batchSize,
174                        data.dataDir);
175                    final FlumeConfigurationBuilder builder = new FlumeConfigurationBuilder();
176                    final NodeConfiguration conf = builder.load(data.name, props, nodeManager);
177    
178                    final FlumeNode node = new FlumeNode(nodeManager, nodeManager, conf);
179    
180                    node.start();
181    
182                    return new FlumeEmbeddedManager(name, data.name, node);
183                } catch (final Exception ex) {
184                    LOGGER.error("Could not create FlumeEmbeddedManager", ex);
185                }
186                return null;
187            }
188    
189            private Properties createProperties(final String name, final Agent[] agents, final Property[] properties,
190                                                final int batchSize, String dataDir) {
191                final Properties props = new Properties();
192    
193                if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
194                    LOGGER.error("No Flume configuration provided");
195                    throw new ConfigurationException("No Flume configuration provided");
196                }
197    
198                if ((agents != null && agents.length > 0 && properties != null && properties.length > 0)) {
199                    LOGGER.error("Agents and Flume configuration cannot both be specified");
200                    throw new ConfigurationException("Agents and Flume configuration cannot both be specified");
201                }
202    
203                if (agents != null && agents.length > 0) {
204                    props.put(name + ".sources", FlumeEmbeddedManager.SOURCE_NAME);
205                    props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".type", SOURCE_TYPE);
206    
207                    if (dataDir != null && dataDir.length() > 0) {
208                        if (dataDir.equals(IN_MEMORY)) {
209                            props.put(name + ".channels", "primary");
210                            props.put(name + ".channels.primary.type", "memory");
211                        } else {
212                            props.put(name + ".channels", "primary");
213                            props.put(name + ".channels.primary.type", "file");
214    
215                            if (!dataDir.endsWith(FiLE_SEP)) {
216                                dataDir = dataDir + FiLE_SEP;
217                            }
218    
219                            props.put(name + ".channels.primary.checkpointDir", dataDir + "checkpoint");
220                            props.put(name + ".channels.primary.dataDirs", dataDir + "data");
221                        }
222    
223                    } else {
224                        props.put(name + ".channels", "primary");
225                        props.put(name + ".channels.primary.type", "file");
226                    }
227    
228                    final StringBuilder sb = new StringBuilder();
229                    String leading = "";
230                    int priority = agents.length;
231                    for (int i = 0; i < agents.length; ++i) {
232                        sb.append(leading).append("agent").append(i);
233                        leading = " ";
234                        final String prefix = name + ".sinks.agent" + i;
235                        props.put(prefix + ".channel", "primary");
236                        props.put(prefix + ".type", "avro");
237                        props.put(prefix + ".hostname", agents[i].getHost());
238                        props.put(prefix + ".port", Integer.toString(agents[i].getPort()));
239                        props.put(prefix + ".batch-size", Integer.toString(batchSize));
240                        props.put(name + ".sinkgroups.group1.processor.priority.agent" + i, Integer.toString(priority));
241                        --priority;
242                    }
243                    props.put(name + ".sinks", sb.toString());
244                    props.put(name + ".sinkgroups", "group1");
245                    props.put(name + ".sinkgroups.group1.sinks", sb.toString());
246                    props.put(name + ".sinkgroups.group1.processor.type", "failover");
247                    final String sourceChannels = "primary";
248                    props.put(name + ".channels", sourceChannels);
249                    props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".channels", sourceChannels);
250                } else {
251                    String channels = null;
252                    String[] sinks = null;
253    
254                    props.put(name + ".sources", FlumeEmbeddedManager.SOURCE_NAME);
255                    props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".type", SOURCE_TYPE);
256    
257                    for (final Property property : properties) {
258                        final String key = property.getName();
259    
260                        if (key == null || key.length() == 0) {
261                            final String msg = "A property name must be provided";
262                            LOGGER.error(msg);
263                            throw new ConfigurationException(msg);
264                        }
265    
266                        final String upperKey = key.toUpperCase(Locale.ENGLISH);
267    
268                        if (upperKey.startsWith(name.toUpperCase(Locale.ENGLISH))) {
269                            final String msg =
270                                "Specification of the agent name is allowed in Flume Appender configuration: " + key;
271                            LOGGER.error(msg);
272                            throw new ConfigurationException(msg);
273                        }
274    
275                        if (upperKey.startsWith("SOURCES.")) {
276                            final String msg = "Specification of Sources is not allowed in Flume Appender: " + key;
277                            LOGGER.error(msg);
278                            throw new ConfigurationException(msg);
279                        }
280    
281                        final String value = property.getValue();
282                        if (value == null || value.length() == 0) {
283                            final String msg = "A value for property " + key + " must be provided";
284                            LOGGER.error(msg);
285                            throw new ConfigurationException(msg);
286                        }
287    
288                        if (upperKey.equals("CHANNELS")) {
289                            channels = value.trim();
290                        } else if (upperKey.equals("SINKS")) {
291                            sinks = value.trim().split(" ");
292                        }
293    
294                        props.put(name + '.' + key, value);
295                    }
296    
297                    String sourceChannels = channels;
298    
299                    if (channels == null) {
300                        sourceChannels = "primary";
301                        props.put(name + ".channels", sourceChannels);
302                    }
303    
304                    props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".channels", sourceChannels);
305    
306                    if (sinks == null || sinks.length == 0) {
307                        final String msg = "At least one Sink must be specified";
308                        LOGGER.error(msg);
309                        throw new ConfigurationException(msg);
310                    }
311                }
312                return props;
313            }
314    
315        }
316    
317    }