View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.flume.appender;
18  
19  import org.apache.flume.Event;
20  import org.apache.flume.api.RpcClient;
21  import org.apache.flume.api.RpcClientFactory;
22  import org.apache.logging.log4j.core.appender.AppenderRuntimeException;
23  import org.apache.logging.log4j.core.appender.ManagerFactory;
24  
25  import java.util.Properties;
26  
27  /**
28   * Manager for FlumeAvroAppenders.
29   */
30  public class FlumeAvroManager extends AbstractFlumeManager {
31  
32      private static final int MAX_RECONNECTS = 3;
33  
34      private static ManagerFactory factory = new AvroManagerFactory();
35  
36      private final Agent[] agents;
37  
38      private final int batchSize;
39  
40      private final int retries;
41  
42      private final int connectTimeout;
43  
44      private final int requestTimeout;
45  
46      private int current = 0;
47  
48      private RpcClient rpcClient = null;
49  
50      /**
51       * Constructor
52       * @param name The unique name of this manager.
53       * @param agents An array of Agents.
54       * @param batchSize The number of events to include in a batch.
55       * @param retries The number of times to retry connecting before giving up.
56       * @param connectTimeout The connection timeout in ms.
57       * @param requestTimeout The request timeout in ms.
58       *
59       */
60      protected FlumeAvroManager(final String name, final String shortName, final Agent[] agents, final int batchSize,
61                                 final int retries, final int connectTimeout, final int requestTimeout) {
62          super(name);
63          this.agents = agents;
64          this.batchSize = batchSize;
65          this.retries = retries;
66          this.connectTimeout = connectTimeout;
67          this.requestTimeout = requestTimeout;
68          this.rpcClient = connect(agents, retries, connectTimeout, requestTimeout);
69      }
70  
71      /**
72       * Returns a FlumeAvroManager.
73       * @param name The name of the manager.
74       * @param agents The agents to use.
75       * @param batchSize The number of events to include in a batch.
76       * @return A FlumeAvroManager.
77       */
78      public static FlumeAvroManager getManager(final String name, final Agent[] agents, int batchSize,
79                                                final int retries, final int connectTimeout, final int requestTimeout) {
80          if (agents == null || agents.length == 0) {
81              throw new IllegalArgumentException("At least one agent is required");
82          }
83  
84          if (batchSize <= 0) {
85              batchSize = 1;
86          }
87  
88          final StringBuilder sb = new StringBuilder("FlumeAvro[");
89          boolean first = true;
90          for (final Agent agent : agents) {
91              if (!first) {
92                  sb.append(",");
93              }
94              sb.append(agent.getHost()).append(":").append(agent.getPort());
95              first = false;
96          }
97          sb.append("]");
98          return (FlumeAvroManager) getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries,
99              connectTimeout, requestTimeout));
100     }
101 
102     /**
103      * Returns the agents.
104      * @return The agent array.
105      */
106     public Agent[] getAgents() {
107         return agents;
108     }
109 
110     /**
111      * Returns the index of the current agent.
112      * @return The index for the current agent.
113      */
114     public int getCurrent() {
115         return current;
116     }
117 
118     public int getRetries() {
119         return retries;
120     }
121 
122     public int getConnectTimeout() {
123         return connectTimeout;
124     }
125 
126     public int getRequestTimeout() {
127         return requestTimeout;
128     }
129 
130     public synchronized void send(final BatchEvent events) {
131         if (rpcClient == null) {
132             rpcClient = connect(agents, retries, connectTimeout, requestTimeout);
133         }
134 
135         if (rpcClient != null) {
136             try {
137                 rpcClient.appendBatch(events.getEvents());
138             } catch (final Exception ex) {
139                 rpcClient.close();
140                 rpcClient = null;
141                 String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ":" +
142                     agents[current].getPort();
143                 LOGGER.warn(msg, ex);
144                 throw new AppenderRuntimeException("No Flume agents are available");
145             }
146         }
147     }
148 
149     @Override
150     public synchronized void send(final Event event)  {
151         if (rpcClient == null) {
152             rpcClient = connect(agents, retries, connectTimeout, requestTimeout);
153         }
154 
155         if (rpcClient != null) {
156             try {
157                 rpcClient.append(event);
158             } catch (final Exception ex) {
159                 rpcClient.close();
160                 rpcClient = null;
161                 String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ":" +
162                     agents[current].getPort();
163                 LOGGER.warn(msg, ex);
164                 throw new AppenderRuntimeException("No Flume agents are available");
165             }
166         }
167     }
168 
169     /**
170      * There is a very good chance that this will always return the first agent even if it isn't available.
171      * @param agents The list of agents to choose from
172      * @return The FlumeEventAvroServer.
173      */
174 
175     private RpcClient connect(final Agent[] agents, int retries, int connectTimeout, int requestTimeout) {
176         Properties props = new Properties();
177 
178         props.put("client.type", agents.length > 1 ? "default_failover" : "default");
179 
180         int count = 1;
181         StringBuilder sb = new StringBuilder();
182         for (Agent agent : agents) {
183             if (sb.length() > 0) {
184                 sb.append(" ");
185             }
186             String hostName = "host" + count++;
187             props.put("hosts." + hostName, agent.getHost() + ":" + agent.getPort());
188             sb.append(hostName);
189         }
190         props.put("hosts", sb.toString());
191         if (batchSize > 0) {
192             props.put("batch-size", Integer.toString(batchSize));
193         }
194         if (retries > 1) {
195             if (retries > MAX_RECONNECTS) {
196                 retries = MAX_RECONNECTS;
197             }
198             props.put("max-attempts", Integer.toString(retries * agents.length));
199         }
200         if (requestTimeout >= 1000) {
201             props.put("request-timeout", Integer.toString(requestTimeout));
202         }
203         if (connectTimeout >= 1000) {
204             props.put("connect-timeout", Integer.toString(connectTimeout));
205         }
206         return RpcClientFactory.getInstance(props);
207     }
208 
209     @Override
210     protected void releaseSub() {
211         if (rpcClient != null) {
212             try {
213                 rpcClient.close();
214             } catch (final Exception ex) {
215                 LOGGER.error("Attempt to close RPC client failed", ex);
216             }
217         }
218         rpcClient = null;
219     }
220 
221     /**
222      * Factory data.
223      */
224     private static class FactoryData {
225         private final String name;
226         private final Agent[] agents;
227         private final int batchSize;
228         private final int retries;
229         private final int conntectTimeout;
230         private final int requestTimeout;
231 
232         /**
233          * Constructor.
234          * @param name The name of the Appender.
235          * @param agents The agents.
236          * @param batchSize The number of events to include in a batch.
237          */
238         public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
239                            final int connectTimeout, final int requestTimeout) {
240             this.name = name;
241             this.agents = agents;
242             this.batchSize = batchSize;
243             this.retries = retries;
244             this.conntectTimeout = connectTimeout;
245             this.requestTimeout = requestTimeout;
246         }
247     }
248 
249     /**
250      * Avro Manager Factory.
251      */
252     private static class AvroManagerFactory implements ManagerFactory<FlumeAvroManager, FactoryData> {
253 
254         /**
255          * Create the FlumeAvroManager.
256          * @param name The name of the entity to manage.
257          * @param data The data required to create the entity.
258          * @return The FlumeAvroManager.
259          */
260         public FlumeAvroManager createManager(final String name, final FactoryData data) {
261             try {
262 
263                 return new FlumeAvroManager(name, data.name, data.agents, data.batchSize, data.retries,
264                     data.conntectTimeout, data.requestTimeout);
265             } catch (final Exception ex) {
266                 LOGGER.error("Could not create FlumeAvroManager", ex);
267             }
268             return null;
269         }
270     }
271 
272 }