001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.flume.appender;
018
019import java.util.Properties;
020import java.util.concurrent.TimeUnit;
021
022import org.apache.flume.Event;
023import org.apache.flume.api.RpcClient;
024import org.apache.flume.api.RpcClientFactory;
025import org.apache.logging.log4j.core.appender.AppenderLoggingException;
026import org.apache.logging.log4j.core.appender.ManagerFactory;
027
028/**
029 * Manager for FlumeAvroAppenders.
030 */
031public class FlumeAvroManager extends AbstractFlumeManager {
032
033    private static final int MAX_RECONNECTS = 3;
034    private static final int MINIMUM_TIMEOUT = 1000;
035
036    private static AvroManagerFactory factory = new AvroManagerFactory();
037
038    private final Agent[] agents;
039
040    private final int batchSize;
041
042    private final long delayNanos;
043    private final int delayMillis;
044
045    private final int retries;
046
047    private final int connectTimeoutMillis;
048
049    private final int requestTimeoutMillis;
050
051    private final int current = 0;
052
053    private RpcClient rpcClient = null;
054
055    private BatchEvent batchEvent = new BatchEvent();
056    private long nextSend = 0;
057
058    /**
059     * Constructor
060     * @param name The unique name of this manager.
061     * @param agents An array of Agents.
062     * @param batchSize The number of events to include in a batch.
063     * @param retries The number of times to retry connecting before giving up.
064     * @param connectTimeout The connection timeout in ms.
065     * @param requestTimeout The request timeout in ms.
066     *
067     */
068    protected FlumeAvroManager(final String name, final String shortName, final Agent[] agents, final int batchSize,
069                               final int delayMillis, final int retries, final int connectTimeout, final int requestTimeout) {
070        super(name);
071        this.agents = agents;
072        this.batchSize = batchSize;
073        this.delayMillis = delayMillis;
074        this.delayNanos = TimeUnit.MILLISECONDS.toNanos(delayMillis);
075        this.retries = retries;
076        this.connectTimeoutMillis = connectTimeout;
077        this.requestTimeoutMillis = requestTimeout;
078        this.rpcClient = connect(agents, retries, connectTimeout, requestTimeout);
079    }
080
081    /**
082     * Returns a FlumeAvroManager.
083     * @param name The name of the manager.
084     * @param agents The agents to use.
085     * @param batchSize The number of events to include in a batch.
086     * @param delayMillis The number of milliseconds to wait before sending an incomplete batch.
087     * @param retries The number of times to retry connecting before giving up.
088     * @param connectTimeoutMillis The connection timeout in ms.
089     * @param requestTimeoutMillis The request timeout in ms.
090     * @return A FlumeAvroManager.
091     */
092    public static FlumeAvroManager getManager(final String name, final Agent[] agents, int batchSize, final int delayMillis,
093                                              final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
094        if (agents == null || agents.length == 0) {
095            throw new IllegalArgumentException("At least one agent is required");
096        }
097
098        if (batchSize <= 0) {
099            batchSize = 1;
100        }
101
102        final StringBuilder sb = new StringBuilder("FlumeAvro[");
103        boolean first = true;
104        for (final Agent agent : agents) {
105            if (!first) {
106                sb.append(',');
107            }
108            sb.append(agent.getHost()).append(':').append(agent.getPort());
109            first = false;
110        }
111        sb.append(']');
112        return getManager(sb.toString(), factory,
113                new FactoryData(name, agents, batchSize, delayMillis, retries, connectTimeoutMillis, requestTimeoutMillis));
114    }
115
116    /**
117     * Returns the agents.
118     * @return The agent array.
119     */
120    public Agent[] getAgents() {
121        return agents;
122    }
123
124    /**
125     * Returns the index of the current agent.
126     * @return The index for the current agent.
127     */
128    public int getCurrent() {
129        return current;
130    }
131
132    public int getRetries() {
133        return retries;
134    }
135
136    public int getConnectTimeoutMillis() {
137        return connectTimeoutMillis;
138    }
139
140    public int getRequestTimeoutMillis() {
141        return requestTimeoutMillis;
142    }
143
144    public int getBatchSize() {
145        return batchSize;
146    }
147
148    public int getDelayMillis() {
149        return delayMillis;
150    }
151
152    public synchronized void send(final BatchEvent events) {
153        if (rpcClient == null) {
154            rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis);
155        }
156
157        if (rpcClient != null) {
158            try {
159                LOGGER.trace("Sending batch of {} events", events.getEvents().size());
160                rpcClient.appendBatch(events.getEvents());
161            } catch (final Exception ex) {
162                rpcClient.close();
163                rpcClient = null;
164                final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
165                    agents[current].getPort();
166                LOGGER.warn(msg, ex);
167                throw new AppenderLoggingException("No Flume agents are available");
168            }
169        }  else {
170            final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
171                agents[current].getPort();
172            LOGGER.warn(msg);
173            throw new AppenderLoggingException("No Flume agents are available");
174        }
175    }
176
177    @Override
178    public synchronized void send(final Event event)  {
179        if (batchSize == 1) {
180            if (rpcClient == null) {
181                rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis);
182            }
183
184            if (rpcClient != null) {
185                try {
186                    rpcClient.append(event);
187                } catch (final Exception ex) {
188                    rpcClient.close();
189                    rpcClient = null;
190                    final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
191                            agents[current].getPort();
192                    LOGGER.warn(msg, ex);
193                    throw new AppenderLoggingException("No Flume agents are available");
194                }
195            } else {
196                final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
197                        agents[current].getPort();
198                LOGGER.warn(msg);
199                throw new AppenderLoggingException("No Flume agents are available");
200            }
201        } else {
202            batchEvent.addEvent(event);
203            final int eventCount = batchEvent.getEvents().size();
204            if (eventCount == 1) {
205                nextSend = System.nanoTime() + delayNanos;
206            }
207            if (eventCount >= batchSize || System.nanoTime() >= nextSend) {
208                send(batchEvent);
209                batchEvent = new BatchEvent();
210            }
211        }
212    }
213
214    /**
215     * There is a very good chance that this will always return the first agent even if it isn't available.
216     * @param agents The list of agents to choose from
217     * @return The FlumeEventAvroServer.
218     */
219    private RpcClient connect(final Agent[] agents, int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
220        try {
221            final Properties props = new Properties();
222
223            props.put("client.type", "default_failover");
224
225            int agentCount = 1;
226            final StringBuilder sb = new StringBuilder();
227            for (final Agent agent : agents) {
228                if (sb.length() > 0) {
229                    sb.append(' ');
230                }
231                final String hostName = "host" + agentCount++;
232                props.put("hosts." + hostName, agent.getHost() + ':' + agent.getPort());
233                sb.append(hostName);
234            }
235            props.put("hosts", sb.toString());
236            if (batchSize > 0) {
237                props.put("batch-size", Integer.toString(batchSize));
238            }
239            if (retries > 1) {
240                if (retries > MAX_RECONNECTS) {
241                    retries = MAX_RECONNECTS;
242                }
243                props.put("max-attempts", Integer.toString(retries * agents.length));
244            }
245            if (requestTimeoutMillis >= MINIMUM_TIMEOUT) {
246                props.put("request-timeout", Integer.toString(requestTimeoutMillis));
247            }
248            if (connectTimeoutMillis >= MINIMUM_TIMEOUT) {
249                props.put("connect-timeout", Integer.toString(connectTimeoutMillis));
250            }
251            return RpcClientFactory.getInstance(props);
252        } catch (final Exception ex) {
253            LOGGER.error("Unable to create Flume RPCClient: {}", ex.getMessage());
254            return null;
255        }
256    }
257
258    @Override
259    protected void releaseSub() {
260        if (rpcClient != null) {
261            try {
262                synchronized(this) {
263                    try {
264                        if (batchSize > 1 && batchEvent.getEvents().size() > 0) {
265                            send(batchEvent);
266                        }
267                    } catch (final Exception ex) {
268                        LOGGER.error("Error sending final batch: {}", ex.getMessage());
269                    }
270                }
271                rpcClient.close();
272            } catch (final Exception ex) {
273                LOGGER.error("Attempt to close RPC client failed", ex);
274            }
275        }
276        rpcClient = null;
277    }
278
279    /**
280     * Factory data.
281     */
282    private static class FactoryData {
283        private final String name;
284        private final Agent[] agents;
285        private final int batchSize;
286        private final int delayMillis;
287        private final int retries;
288        private final int conntectTimeoutMillis;
289        private final int requestTimeoutMillis;
290
291        /**
292         * Constructor.
293         * @param name The name of the Appender.
294         * @param agents The agents.
295         * @param batchSize The number of events to include in a batch.
296         */
297        public FactoryData(final String name, final Agent[] agents, final int batchSize, final int delayMillis,
298                final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
299            this.name = name;
300            this.agents = agents;
301            this.batchSize = batchSize;
302            this.delayMillis = delayMillis;
303            this.retries = retries;
304            this.conntectTimeoutMillis = connectTimeoutMillis;
305            this.requestTimeoutMillis = requestTimeoutMillis;
306        }
307    }
308
309    /**
310     * Avro Manager Factory.
311     */
312    private static class AvroManagerFactory implements ManagerFactory<FlumeAvroManager, FactoryData> {
313
314        /**
315         * Create the FlumeAvroManager.
316         * @param name The name of the entity to manage.
317         * @param data The data required to create the entity.
318         * @return The FlumeAvroManager.
319         */
320        @Override
321        public FlumeAvroManager createManager(final String name, final FactoryData data) {
322            try {
323
324                return new FlumeAvroManager(name, data.name, data.agents, data.batchSize, data.delayMillis,
325                        data.retries, data.conntectTimeoutMillis, data.requestTimeoutMillis);
326            } catch (final Exception ex) {
327                LOGGER.error("Could not create FlumeAvroManager", ex);
328            }
329            return null;
330        }
331    }
332
333}