001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements. See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache license, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License. You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the license for the specific language governing permissions and
015     * limitations under the license.
016     */
017    package org.apache.logging.log4j.flume.appender;
018    
019    import org.apache.flume.Event;
020    import org.apache.flume.api.RpcClient;
021    import org.apache.flume.api.RpcClientFactory;
022    import org.apache.logging.log4j.core.appender.AppenderRuntimeException;
023    import org.apache.logging.log4j.core.appender.ManagerFactory;
024    
025    import java.util.Properties;
026    
027    /**
028     * Manager for FlumeAvroAppenders.
029     */
030    public class FlumeAvroManager extends AbstractFlumeManager {
031    
032        private static final int MAX_RECONNECTS = 3;
033    
034        private static ManagerFactory factory = new AvroManagerFactory();
035    
036        private final Agent[] agents;
037    
038        private final int batchSize;
039    
040        private final int retries;
041    
042        private final int connectTimeout;
043    
044        private final int requestTimeout;
045    
046        private int current = 0;
047    
048        private RpcClient rpcClient = null;
049    
050        /**
051         * Constructor
052         * @param name The unique name of this manager.
053         * @param agents An array of Agents.
054         * @param batchSize The number of events to include in a batch.
055         * @param retries The number of times to retry connecting before giving up.
056         * @param connectTimeout The connection timeout in ms.
057         * @param requestTimeout The request timeout in ms.
058         *
059         */
060        protected FlumeAvroManager(final String name, final String shortName, final Agent[] agents, final int batchSize,
061                                   final int retries, final int connectTimeout, final int requestTimeout) {
062            super(name);
063            this.agents = agents;
064            this.batchSize = batchSize;
065            this.retries = retries;
066            this.connectTimeout = connectTimeout;
067            this.requestTimeout = requestTimeout;
068            this.rpcClient = connect(agents, retries, connectTimeout, requestTimeout);
069        }
070    
071        /**
072         * Returns a FlumeAvroManager.
073         * @param name The name of the manager.
074         * @param agents The agents to use.
075         * @param batchSize The number of events to include in a batch.
076         * @return A FlumeAvroManager.
077         */
078        public static FlumeAvroManager getManager(final String name, final Agent[] agents, int batchSize,
079                                                  final int retries, final int connectTimeout, final int requestTimeout) {
080            if (agents == null || agents.length == 0) {
081                throw new IllegalArgumentException("At least one agent is required");
082            }
083    
084            if (batchSize <= 0) {
085                batchSize = 1;
086            }
087    
088            final StringBuilder sb = new StringBuilder("FlumeAvro[");
089            boolean first = true;
090            for (final Agent agent : agents) {
091                if (!first) {
092                    sb.append(",");
093                }
094                sb.append(agent.getHost()).append(":").append(agent.getPort());
095                first = false;
096            }
097            sb.append("]");
098            return (FlumeAvroManager) getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries,
099                connectTimeout, requestTimeout));
100        }
101    
102        /**
103         * Returns the agents.
104         * @return The agent array.
105         */
106        public Agent[] getAgents() {
107            return agents;
108        }
109    
110        /**
111         * Returns the index of the current agent.
112         * @return The index for the current agent.
113         */
114        public int getCurrent() {
115            return current;
116        }
117    
118        public int getRetries() {
119            return retries;
120        }
121    
122        public int getConnectTimeout() {
123            return connectTimeout;
124        }
125    
126        public int getRequestTimeout() {
127            return requestTimeout;
128        }
129    
130        public synchronized void send(final BatchEvent events) {
131            if (rpcClient == null) {
132                rpcClient = connect(agents, retries, connectTimeout, requestTimeout);
133            }
134    
135            if (rpcClient != null) {
136                try {
137                    rpcClient.appendBatch(events.getEvents());
138                } catch (final Exception ex) {
139                    rpcClient.close();
140                    rpcClient = null;
141                    String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ":" +
142                        agents[current].getPort();
143                    LOGGER.warn(msg, ex);
144                    throw new AppenderRuntimeException("No Flume agents are available");
145                }
146            }
147        }
148    
149        @Override
150        public synchronized void send(final Event event)  {
151            if (rpcClient == null) {
152                rpcClient = connect(agents, retries, connectTimeout, requestTimeout);
153            }
154    
155            if (rpcClient != null) {
156                try {
157                    rpcClient.append(event);
158                } catch (final Exception ex) {
159                    rpcClient.close();
160                    rpcClient = null;
161                    String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ":" +
162                        agents[current].getPort();
163                    LOGGER.warn(msg, ex);
164                    throw new AppenderRuntimeException("No Flume agents are available");
165                }
166            }
167        }
168    
169        /**
170         * There is a very good chance that this will always return the first agent even if it isn't available.
171         * @param agents The list of agents to choose from
172         * @return The FlumeEventAvroServer.
173         */
174    
175        private RpcClient connect(final Agent[] agents, int retries, int connectTimeout, int requestTimeout) {
176            Properties props = new Properties();
177    
178            props.put("client.type", agents.length > 1 ? "default_failover" : "default");
179    
180            int count = 1;
181            StringBuilder sb = new StringBuilder();
182            for (Agent agent : agents) {
183                if (sb.length() > 0) {
184                    sb.append(" ");
185                }
186                String hostName = "host" + count++;
187                props.put("hosts." + hostName, agent.getHost() + ":" + agent.getPort());
188                sb.append(hostName);
189            }
190            props.put("hosts", sb.toString());
191            if (batchSize > 0) {
192                props.put("batch-size", Integer.toString(batchSize));
193            }
194            if (retries > 1) {
195                if (retries > MAX_RECONNECTS) {
196                    retries = MAX_RECONNECTS;
197                }
198                props.put("max-attempts", Integer.toString(retries * agents.length));
199            }
200            if (requestTimeout >= 1000) {
201                props.put("request-timeout", Integer.toString(requestTimeout));
202            }
203            if (connectTimeout >= 1000) {
204                props.put("connect-timeout", Integer.toString(connectTimeout));
205            }
206            return RpcClientFactory.getInstance(props);
207        }
208    
209        @Override
210        protected void releaseSub() {
211            if (rpcClient != null) {
212                try {
213                    rpcClient.close();
214                } catch (final Exception ex) {
215                    LOGGER.error("Attempt to close RPC client failed", ex);
216                }
217            }
218            rpcClient = null;
219        }
220    
221        /**
222         * Factory data.
223         */
224        private static class FactoryData {
225            private final String name;
226            private final Agent[] agents;
227            private final int batchSize;
228            private final int retries;
229            private final int conntectTimeout;
230            private final int requestTimeout;
231    
232            /**
233             * Constructor.
234             * @param name The name of the Appender.
235             * @param agents The agents.
236             * @param batchSize The number of events to include in a batch.
237             */
238            public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
239                               final int connectTimeout, final int requestTimeout) {
240                this.name = name;
241                this.agents = agents;
242                this.batchSize = batchSize;
243                this.retries = retries;
244                this.conntectTimeout = connectTimeout;
245                this.requestTimeout = requestTimeout;
246            }
247        }
248    
249        /**
250         * Avro Manager Factory.
251         */
252        private static class AvroManagerFactory implements ManagerFactory<FlumeAvroManager, FactoryData> {
253    
254            /**
255             * Create the FlumeAvroManager.
256             * @param name The name of the entity to manage.
257             * @param data The data required to create the entity.
258             * @return The FlumeAvroManager.
259             */
260            public FlumeAvroManager createManager(final String name, final FactoryData data) {
261                try {
262    
263                    return new FlumeAvroManager(name, data.name, data.agents, data.batchSize, data.retries,
264                        data.conntectTimeout, data.requestTimeout);
265                } catch (final Exception ex) {
266                    LOGGER.error("Could not create FlumeAvroManager", ex);
267                }
268                return null;
269            }
270        }
271    
272    }