001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements. See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache license, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License. You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the license for the specific language governing permissions and
015     * limitations under the license.
016     */
017    package org.apache.logging.log4j.flume.appender;
018    
019    import org.apache.flume.Event;
020    import org.apache.flume.api.RpcClient;
021    import org.apache.flume.api.RpcClientFactory;
022    import org.apache.logging.log4j.core.appender.AppenderRuntimeException;
023    import org.apache.logging.log4j.core.appender.ManagerFactory;
024    
025    import java.util.Properties;
026    
027    /**
028     * Manager for FlumeAvroAppenders.
029     */
030    public class FlumeAvroManager extends AbstractFlumeManager {
031    
032        private static final int MAX_RECONNECTS = 3;
033    
034        private static AvroManagerFactory factory = new AvroManagerFactory();
035    
036        private final Agent[] agents;
037    
038        private final int batchSize;
039    
040        private final int retries;
041    
042        private final int connectTimeout;
043    
044        private final int requestTimeout;
045    
046        private int current = 0;
047    
048        private RpcClient rpcClient = null;
049    
050        /**
051         * Constructor
052         * @param name The unique name of this manager.
053         * @param agents An array of Agents.
054         * @param batchSize The number of events to include in a batch.
055         * @param retries The number of times to retry connecting before giving up.
056         * @param connectTimeout The connection timeout in ms.
057         * @param requestTimeout The request timeout in ms.
058         *
059         */
060        protected FlumeAvroManager(final String name, final String shortName, final Agent[] agents, final int batchSize,
061                                   final int retries, final int connectTimeout, final int requestTimeout) {
062            super(name);
063            this.agents = agents;
064            this.batchSize = batchSize;
065            this.retries = retries;
066            this.connectTimeout = connectTimeout;
067            this.requestTimeout = requestTimeout;
068            this.rpcClient = connect(agents, retries, connectTimeout, requestTimeout);
069        }
070    
071        /**
072         * Returns a FlumeAvroManager.
073         * @param name The name of the manager.
074         * @param agents The agents to use.
075         * @param batchSize The number of events to include in a batch.
076         * @return A FlumeAvroManager.
077         */
078        public static FlumeAvroManager getManager(final String name, final Agent[] agents, int batchSize,
079                                                  final int retries, final int connectTimeout, final int requestTimeout) {
080            if (agents == null || agents.length == 0) {
081                throw new IllegalArgumentException("At least one agent is required");
082            }
083    
084            if (batchSize <= 0) {
085                batchSize = 1;
086            }
087    
088            final StringBuilder sb = new StringBuilder("FlumeAvro[");
089            boolean first = true;
090            for (final Agent agent : agents) {
091                if (!first) {
092                    sb.append(",");
093                }
094                sb.append(agent.getHost()).append(":").append(agent.getPort());
095                first = false;
096            }
097            sb.append("]");
098            return getManager(sb.toString(), factory,
099                    new FactoryData(name, agents, batchSize, retries, connectTimeout, requestTimeout));
100        }
101    
102        /**
103         * Returns the agents.
104         * @return The agent array.
105         */
106        public Agent[] getAgents() {
107            return agents;
108        }
109    
110        /**
111         * Returns the index of the current agent.
112         * @return The index for the current agent.
113         */
114        public int getCurrent() {
115            return current;
116        }
117    
118        public int getRetries() {
119            return retries;
120        }
121    
122        public int getConnectTimeout() {
123            return connectTimeout;
124        }
125    
126        public int getRequestTimeout() {
127            return requestTimeout;
128        }
129    
130        public synchronized void send(final BatchEvent events) {
131            if (rpcClient == null) {
132                rpcClient = connect(agents, retries, connectTimeout, requestTimeout);
133            }
134    
135            if (rpcClient != null) {
136                try {
137                    LOGGER.trace("Sending batch of {} events", events.getEvents().size());
138                    rpcClient.appendBatch(events.getEvents());
139                } catch (final Exception ex) {
140                    rpcClient.close();
141                    rpcClient = null;
142                    String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ":" +
143                        agents[current].getPort();
144                    LOGGER.warn(msg, ex);
145                    throw new AppenderRuntimeException("No Flume agents are available");
146                }
147            }
148        }
149    
150        @Override
151        public synchronized void send(final Event event)  {
152            if (rpcClient == null) {
153                rpcClient = connect(agents, retries, connectTimeout, requestTimeout);
154            }
155    
156            if (rpcClient != null) {
157                try {
158                    rpcClient.append(event);
159                } catch (final Exception ex) {
160                    rpcClient.close();
161                    rpcClient = null;
162                    String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ":" +
163                        agents[current].getPort();
164                    LOGGER.warn(msg, ex);
165                    throw new AppenderRuntimeException("No Flume agents are available");
166                }
167            }
168        }
169    
170        /**
171         * There is a very good chance that this will always return the first agent even if it isn't available.
172         * @param agents The list of agents to choose from
173         * @return The FlumeEventAvroServer.
174         */
175    
176        private RpcClient connect(final Agent[] agents, int retries, int connectTimeout, int requestTimeout) {
177            try {
178                Properties props = new Properties();
179    
180                props.put("client.type", agents.length > 1 ? "default_failover" : "default");
181    
182                int count = 1;
183                StringBuilder sb = new StringBuilder();
184                for (Agent agent : agents) {
185                    if (sb.length() > 0) {
186                        sb.append(" ");
187                    }
188                    String hostName = "host" + count++;
189                    props.put("hosts." + hostName, agent.getHost() + ":" + agent.getPort());
190                    sb.append(hostName);
191                }
192                props.put("hosts", sb.toString());
193                if (batchSize > 0) {
194                    props.put("batch-size", Integer.toString(batchSize));
195                }
196                if (retries > 1) {
197                    if (retries > MAX_RECONNECTS) {
198                        retries = MAX_RECONNECTS;
199                    }
200                    props.put("max-attempts", Integer.toString(retries * agents.length));
201                }
202                if (requestTimeout >= 1000) {
203                    props.put("request-timeout", Integer.toString(requestTimeout));
204                }
205                if (connectTimeout >= 1000) {
206                    props.put("connect-timeout", Integer.toString(connectTimeout));
207                }
208                return RpcClientFactory.getInstance(props);
209            } catch (Exception ex) {
210                LOGGER.error("Unable to create Flume RPCClient: {}", ex.getMessage());
211                return null;
212            }
213        }
214    
215        @Override
216        protected void releaseSub() {
217            if (rpcClient != null) {
218                try {
219                    rpcClient.close();
220                } catch (final Exception ex) {
221                    LOGGER.error("Attempt to close RPC client failed", ex);
222                }
223            }
224            rpcClient = null;
225        }
226    
227        /**
228         * Factory data.
229         */
230        private static class FactoryData {
231            private final String name;
232            private final Agent[] agents;
233            private final int batchSize;
234            private final int retries;
235            private final int conntectTimeout;
236            private final int requestTimeout;
237    
238            /**
239             * Constructor.
240             * @param name The name of the Appender.
241             * @param agents The agents.
242             * @param batchSize The number of events to include in a batch.
243             */
244            public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
245                               final int connectTimeout, final int requestTimeout) {
246                this.name = name;
247                this.agents = agents;
248                this.batchSize = batchSize;
249                this.retries = retries;
250                this.conntectTimeout = connectTimeout;
251                this.requestTimeout = requestTimeout;
252            }
253        }
254    
255        /**
256         * Avro Manager Factory.
257         */
258        private static class AvroManagerFactory implements ManagerFactory<FlumeAvroManager, FactoryData> {
259    
260            /**
261             * Create the FlumeAvroManager.
262             * @param name The name of the entity to manage.
263             * @param data The data required to create the entity.
264             * @return The FlumeAvroManager.
265             */
266            @Override
267            public FlumeAvroManager createManager(final String name, final FactoryData data) {
268                try {
269    
270                    return new FlumeAvroManager(name, data.name, data.agents, data.batchSize, data.retries,
271                        data.conntectTimeout, data.requestTimeout);
272                } catch (final Exception ex) {
273                    LOGGER.error("Could not create FlumeAvroManager", ex);
274                }
275                return null;
276            }
277        }
278    
279    }