001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.flume.appender;
018
019import java.util.HashMap;
020import java.util.Locale;
021import java.util.Map;
022
023import org.apache.flume.Event;
024import org.apache.flume.EventDeliveryException;
025import org.apache.flume.agent.embedded.EmbeddedAgent;
026import org.apache.logging.log4j.LoggingException;
027import org.apache.logging.log4j.core.appender.ManagerFactory;
028import org.apache.logging.log4j.core.config.ConfigurationException;
029import org.apache.logging.log4j.core.config.Property;
030import org.apache.logging.log4j.core.helpers.NameUtil;
031import org.apache.logging.log4j.core.helpers.Strings;
032import org.apache.logging.log4j.util.PropertiesUtil;
033
034/**
035 *
036 */
037public class FlumeEmbeddedManager extends AbstractFlumeManager {
038
039    private static final String FILE_SEP = PropertiesUtil.getProperties().getStringProperty("file.separator");
040
041    private static final String IN_MEMORY = "InMemory";
042
043    private static FlumeManagerFactory factory = new FlumeManagerFactory();
044
045    private EmbeddedAgent agent;
046
047    private final String shortName;
048
049
050    /**
051     * Constructor
052     * @param name The unique name of this manager.
053     * @param shortName The short version of the agent name.
054     * @param agent The embedded agent.
055     */
056    protected FlumeEmbeddedManager(final String name, final String shortName, final EmbeddedAgent agent) {
057        super(name);
058        this.agent = agent;
059        this.shortName = shortName;
060    }
061
062    /**
063     * Returns a FlumeEmbeddedManager.
064     * @param name The name of the manager.
065     * @param agents The agents to use.
066     * @param properties Properties for the embedded manager.
067     * @param batchSize The number of events to include in a batch.
068     * @param dataDir The directory where the Flume FileChannel should write to.
069     * @return A FlumeAvroManager.
070     */
071    public static FlumeEmbeddedManager getManager(final String name, final Agent[] agents, final Property[] properties,
072                                                  int batchSize, final String dataDir) {
073
074        if (batchSize <= 0) {
075            batchSize = 1;
076        }
077
078        if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
079            throw new IllegalArgumentException("Either an Agent or properties are required");
080        } else if (agents != null && agents.length > 0 && properties != null && properties.length > 0) {
081            throw new IllegalArgumentException("Cannot configure both Agents and Properties.");
082        }
083
084        final StringBuilder sb = new StringBuilder();
085        boolean first = true;
086
087        if (agents != null && agents.length > 0) {
088            sb.append(name).append("[");
089            for (final Agent agent : agents) {
090                if (!first) {
091                    sb.append("_");
092                }
093                sb.append(agent.getHost()).append("-").append(agent.getPort());
094                first = false;
095            }
096            sb.append("]");
097        } else {
098            String sep = "";
099            sb.append(name).append("-");
100            final StringBuilder props = new StringBuilder();
101            for (final Property prop : properties) {
102                props.append(sep);
103                props.append(prop.getName()).append("=").append(prop.getValue());
104                sep = "_";
105            }
106            sb.append(NameUtil.md5(props.toString()));
107        }
108        return getManager(sb.toString(), factory,
109                new FactoryData(name, agents, properties, batchSize, dataDir));
110    }
111
112    @Override
113    public void send(final Event event) {
114        try {
115            agent.put(event);
116        } catch (EventDeliveryException ex) {
117            throw new LoggingException("Unable to deliver event to Flume Appender " + shortName, ex);
118        }
119    }
120
121    @Override
122    protected void releaseSub() {
123        agent.stop();
124    }
125
126    /**
127     * Factory data.
128     */
129    private static class FactoryData {
130        private final Agent[] agents;
131        private final Property[] properties;
132        private final int batchSize;
133        private final String dataDir;
134        private final String name;
135
136        /**
137         * Constructor.
138         * @param name The name of the Appender.
139         * @param agents The agents.
140         * @param properties The Flume configuration properties.
141         * @param batchSize The number of events to include in a batch.
142         * @param dataDir The directory where Flume should write to.
143         */
144        public FactoryData(final String name, final Agent[] agents, final Property[] properties, final int batchSize,
145                           final String dataDir) {
146            this.name = name;
147            this.agents = agents;
148            this.batchSize = batchSize;
149            this.properties = properties;
150            this.dataDir = dataDir;
151        }
152    }
153
154    /**
155     * Avro Manager Factory.
156     */
157    private static class FlumeManagerFactory implements ManagerFactory<FlumeEmbeddedManager, FactoryData> {
158
159        /**
160         * Create the FlumeAvroManager.
161         * @param name The name of the entity to manage.
162         * @param data The data required to create the entity.
163         * @return The FlumeAvroManager.
164         */
165        @Override
166        public FlumeEmbeddedManager createManager(final String name, final FactoryData data) {
167            try {
168                final Map<String, String> props = createProperties(data.name, data.agents, data.properties,
169                    data.batchSize, data.dataDir);
170                EmbeddedAgent agent = new EmbeddedAgent(name);
171                agent.configure(props);
172                agent.start();
173                LOGGER.debug("Created Agent " + name);
174                return new FlumeEmbeddedManager(name, data.name, agent);
175            } catch (final Exception ex) {
176                LOGGER.error("Could not create FlumeEmbeddedManager", ex);
177            }
178            return null;
179        }
180
181        private Map<String, String> createProperties(final String name, final Agent[] agents,
182                                                     final Property[] properties, final int batchSize, String dataDir) {
183            final Map<String, String> props = new HashMap<String, String>();
184
185            if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
186                LOGGER.error("No Flume configuration provided");
187                throw new ConfigurationException("No Flume configuration provided");
188            }
189
190            if (agents != null && agents.length > 0 && properties != null && properties.length > 0) {
191                LOGGER.error("Agents and Flume configuration cannot both be specified");
192                throw new ConfigurationException("Agents and Flume configuration cannot both be specified");
193            }
194
195            if (agents != null && agents.length > 0) {
196
197                if (dataDir != null && dataDir.length() > 0) {
198                    if (dataDir.equals(IN_MEMORY)) {
199                        props.put("channel.type", "memory");
200                    } else {
201                        props.put("channel.type", "file");
202
203                        if (!dataDir.endsWith(FILE_SEP)) {
204                            dataDir = dataDir + FILE_SEP;
205                        }
206
207                        props.put("channel.checkpointDir", dataDir + "checkpoint");
208                        props.put("channel.dataDirs", dataDir + "data");
209                    }
210
211                } else {
212                    props.put("channel.type", "file");
213                }
214
215                final StringBuilder sb = new StringBuilder();
216                String leading = "";
217                int priority = agents.length;
218                for (int i = 0; i < priority; ++i) {
219                    sb.append(leading).append("agent").append(i);
220                    leading = " ";
221                    final String prefix = "agent" + i;
222                    props.put(prefix + ".type", "avro");
223                    props.put(prefix + ".hostname", agents[i].getHost());
224                    props.put(prefix + ".port", Integer.toString(agents[i].getPort()));
225                    props.put(prefix + ".batch-size", Integer.toString(batchSize));
226                    props.put("processor.priority." + prefix, Integer.toString(agents.length - i));
227                }
228                props.put("sinks", sb.toString());
229                props.put("processor.type", "failover");
230            } else {
231                String[] sinks = null;
232
233                for (final Property property : properties) {
234                    final String key = property.getName();
235
236                    if (Strings.isEmpty(key)) {
237                        final String msg = "A property name must be provided";
238                        LOGGER.error(msg);
239                        throw new ConfigurationException(msg);
240                    }
241
242                    final String upperKey = key.toUpperCase(Locale.ENGLISH);
243
244                    if (upperKey.startsWith(name.toUpperCase(Locale.ENGLISH))) {
245                        final String msg =
246                            "Specification of the agent name is not allowed in Flume Appender configuration: " + key;
247                        LOGGER.error(msg);
248                        throw new ConfigurationException(msg);
249                    }
250
251                    final String value = property.getValue();
252                    if (Strings.isEmpty(value)) {
253                        final String msg = "A value for property " + key + " must be provided";
254                        LOGGER.error(msg);
255                        throw new ConfigurationException(msg);
256                    }
257
258                    if (upperKey.equals("SINKS")) {
259                        sinks = value.trim().split(" ");
260                    }
261
262                    props.put(key, value);
263                }
264
265                if (sinks == null || sinks.length == 0) {
266                    final String msg = "At least one Sink must be specified";
267                    LOGGER.error(msg);
268                    throw new ConfigurationException(msg);
269                }
270            }
271            return props;
272        }
273
274    }
275
276}