001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.flume.appender;
018
019import java.util.Properties;
020
021import org.apache.flume.Event;
022import org.apache.flume.api.RpcClient;
023import org.apache.flume.api.RpcClientFactory;
024import org.apache.logging.log4j.core.appender.AppenderLoggingException;
025import org.apache.logging.log4j.core.appender.ManagerFactory;
026
027/**
028 * Manager for FlumeAvroAppenders.
029 */
030public class FlumeAvroManager extends AbstractFlumeManager {
031
032    private static final int MAX_RECONNECTS = 3;
033    private static final int MINIMUM_TIMEOUT = 1000;
034
035    private static AvroManagerFactory factory = new AvroManagerFactory();
036
037    private final Agent[] agents;
038
039    private final int batchSize;
040
041    private final long delayNanos;
042    private final int delayMillis;
043
044    private final int retries;
045
046    private final int connectTimeoutMillis;
047
048    private final int requestTimeoutMillis;
049
050    private final int current = 0;
051
052    private RpcClient rpcClient = null;
053
054    private BatchEvent batchEvent = new BatchEvent();
055    private long nextSend = 0;
056
057    /**
058     * Constructor
059     * @param name The unique name of this manager.
060     * @param agents An array of Agents.
061     * @param batchSize The number of events to include in a batch.
062     * @param retries The number of times to retry connecting before giving up.
063     * @param connectTimeout The connection timeout in ms.
064     * @param requestTimeout The request timeout in ms.
065     *
066     */
067    protected FlumeAvroManager(final String name, final String shortName, final Agent[] agents, final int batchSize,
068                               final int delayMillis, final int retries, final int connectTimeout, final int requestTimeout) {
069        super(name);
070        this.agents = agents;
071        this.batchSize = batchSize;
072        this.delayMillis = delayMillis;
073        this.delayNanos = delayMillis * 1000000;
074        this.retries = retries;
075        this.connectTimeoutMillis = connectTimeout;
076        this.requestTimeoutMillis = requestTimeout;
077        this.rpcClient = connect(agents, retries, connectTimeout, requestTimeout);
078    }
079
080    /**
081     * Returns a FlumeAvroManager.
082     * @param name The name of the manager.
083     * @param agents The agents to use.
084     * @param batchSize The number of events to include in a batch.
085     * @param delayMillis The number of milliseconds to wait before sending an incomplete batch.
086     * @param retries The number of times to retry connecting before giving up.
087     * @param connectTimeoutMillis The connection timeout in ms.
088     * @param requestTimeoutMillis The request timeout in ms.
089     * @return A FlumeAvroManager.
090     */
091    public static FlumeAvroManager getManager(final String name, final Agent[] agents, int batchSize, final int delayMillis,
092                                              final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
093        if (agents == null || agents.length == 0) {
094            throw new IllegalArgumentException("At least one agent is required");
095        }
096
097        if (batchSize <= 0) {
098            batchSize = 1;
099        }
100
101        final StringBuilder sb = new StringBuilder("FlumeAvro[");
102        boolean first = true;
103        for (final Agent agent : agents) {
104            if (!first) {
105                sb.append(',');
106            }
107            sb.append(agent.getHost()).append(':').append(agent.getPort());
108            first = false;
109        }
110        sb.append(']');
111        return getManager(sb.toString(), factory,
112                new FactoryData(name, agents, batchSize, delayMillis, retries, connectTimeoutMillis, requestTimeoutMillis));
113    }
114
115    /**
116     * Returns the agents.
117     * @return The agent array.
118     */
119    public Agent[] getAgents() {
120        return agents;
121    }
122
123    /**
124     * Returns the index of the current agent.
125     * @return The index for the current agent.
126     */
127    public int getCurrent() {
128        return current;
129    }
130
131    public int getRetries() {
132        return retries;
133    }
134
135    public int getConnectTimeoutMillis() {
136        return connectTimeoutMillis;
137    }
138
139    public int getRequestTimeoutMillis() {
140        return requestTimeoutMillis;
141    }
142
143    public int getBatchSize() {
144        return batchSize;
145    }
146
147    public int getDelayMillis() {
148        return delayMillis;
149    }
150
151    public synchronized void send(final BatchEvent events) {
152        if (rpcClient == null) {
153            rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis);
154        }
155
156        if (rpcClient != null) {
157            try {
158                LOGGER.trace("Sending batch of {} events", events.getEvents().size());
159                rpcClient.appendBatch(events.getEvents());
160            } catch (final Exception ex) {
161                rpcClient.close();
162                rpcClient = null;
163                final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
164                    agents[current].getPort();
165                LOGGER.warn(msg, ex);
166                throw new AppenderLoggingException("No Flume agents are available");
167            }
168        }  else {
169            final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
170                agents[current].getPort();
171            LOGGER.warn(msg);
172            throw new AppenderLoggingException("No Flume agents are available");
173        }
174    }
175
176    @Override
177    public synchronized void send(final Event event)  {
178        if (batchSize == 1) {
179            if (rpcClient == null) {
180                rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis);
181            }
182
183            if (rpcClient != null) {
184                try {
185                    rpcClient.append(event);
186                } catch (final Exception ex) {
187                    rpcClient.close();
188                    rpcClient = null;
189                    final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
190                            agents[current].getPort();
191                    LOGGER.warn(msg, ex);
192                    throw new AppenderLoggingException("No Flume agents are available");
193                }
194            } else {
195                final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
196                        agents[current].getPort();
197                LOGGER.warn(msg);
198                throw new AppenderLoggingException("No Flume agents are available");
199            }
200        } else {
201            batchEvent.addEvent(event);
202            final int eventCount = batchEvent.getEvents().size();
203            if (eventCount == 1) {
204                nextSend = System.nanoTime() + delayNanos;
205            }
206            if (eventCount >= batchSize || System.nanoTime() >= nextSend) {
207                send(batchEvent);
208                batchEvent = new BatchEvent();
209            }
210        }
211    }
212
213    /**
214     * There is a very good chance that this will always return the first agent even if it isn't available.
215     * @param agents The list of agents to choose from
216     * @return The FlumeEventAvroServer.
217     */
218    private RpcClient connect(final Agent[] agents, int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
219        try {
220            final Properties props = new Properties();
221
222            props.put("client.type", "default_failover");
223
224            int agentCount = 1;
225            final StringBuilder sb = new StringBuilder();
226            for (final Agent agent : agents) {
227                if (sb.length() > 0) {
228                    sb.append(' ');
229                }
230                final String hostName = "host" + agentCount++;
231                props.put("hosts." + hostName, agent.getHost() + ':' + agent.getPort());
232                sb.append(hostName);
233            }
234            props.put("hosts", sb.toString());
235            if (batchSize > 0) {
236                props.put("batch-size", Integer.toString(batchSize));
237            }
238            if (retries > 1) {
239                if (retries > MAX_RECONNECTS) {
240                    retries = MAX_RECONNECTS;
241                }
242                props.put("max-attempts", Integer.toString(retries * agents.length));
243            }
244            if (requestTimeoutMillis >= MINIMUM_TIMEOUT) {
245                props.put("request-timeout", Integer.toString(requestTimeoutMillis));
246            }
247            if (connectTimeoutMillis >= MINIMUM_TIMEOUT) {
248                props.put("connect-timeout", Integer.toString(connectTimeoutMillis));
249            }
250            return RpcClientFactory.getInstance(props);
251        } catch (final Exception ex) {
252            LOGGER.error("Unable to create Flume RPCClient: {}", ex.getMessage());
253            return null;
254        }
255    }
256
257    @Override
258    protected void releaseSub() {
259        if (rpcClient != null) {
260            try {
261                synchronized(this) {
262                    try {
263                        if (batchSize > 1 && batchEvent.getEvents().size() > 0) {
264                            send(batchEvent);
265                        }
266                    } catch (final Exception ex) {
267                        LOGGER.error("Error sending final batch: {}", ex.getMessage());
268                    }
269                }
270                rpcClient.close();
271            } catch (final Exception ex) {
272                LOGGER.error("Attempt to close RPC client failed", ex);
273            }
274        }
275        rpcClient = null;
276    }
277
278    /**
279     * Factory data.
280     */
281    private static class FactoryData {
282        private final String name;
283        private final Agent[] agents;
284        private final int batchSize;
285        private final int delayMillis;
286        private final int retries;
287        private final int conntectTimeoutMillis;
288        private final int requestTimeoutMillis;
289
290        /**
291         * Constructor.
292         * @param name The name of the Appender.
293         * @param agents The agents.
294         * @param batchSize The number of events to include in a batch.
295         */
296        public FactoryData(final String name, final Agent[] agents, final int batchSize, final int delayMillis,
297                final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
298            this.name = name;
299            this.agents = agents;
300            this.batchSize = batchSize;
301            this.delayMillis = delayMillis;
302            this.retries = retries;
303            this.conntectTimeoutMillis = connectTimeoutMillis;
304            this.requestTimeoutMillis = requestTimeoutMillis;
305        }
306    }
307
308    /**
309     * Avro Manager Factory.
310     */
311    private static class AvroManagerFactory implements ManagerFactory<FlumeAvroManager, FactoryData> {
312
313        /**
314         * Create the FlumeAvroManager.
315         * @param name The name of the entity to manage.
316         * @param data The data required to create the entity.
317         * @return The FlumeAvroManager.
318         */
319        @Override
320        public FlumeAvroManager createManager(final String name, final FactoryData data) {
321            try {
322
323                return new FlumeAvroManager(name, data.name, data.agents, data.batchSize, data.delayMillis,
324                        data.retries, data.conntectTimeoutMillis, data.requestTimeoutMillis);
325            } catch (final Exception ex) {
326                LOGGER.error("Could not create FlumeAvroManager", ex);
327            }
328            return null;
329        }
330    }
331
332}