View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.flume.appender;
18  
19  import org.apache.avro.AvroRemoteException;
20  import org.apache.avro.ipc.NettyTransceiver;
21  import org.apache.avro.ipc.Transceiver;
22  import org.apache.avro.ipc.specific.SpecificRequestor;
23  import org.apache.flume.source.avro.AvroFlumeEvent;
24  import org.apache.flume.source.avro.AvroSourceProtocol;
25  import org.apache.flume.source.avro.Status;
26  import org.apache.logging.log4j.core.appender.AppenderRuntimeException;
27  import org.apache.logging.log4j.core.appender.ManagerFactory;
28  
29  import java.io.IOException;
30  import java.net.InetSocketAddress;
31  import java.nio.ByteBuffer;
32  import java.util.ArrayList;
33  import java.util.HashMap;
34  import java.util.List;
35  import java.util.Map;
36  
37  /**
38   * Manager for FlumeAvroAppenders.
39   */
40  public class FlumeAvroManager extends AbstractFlumeManager {
41  
42      /**
43        The default reconnection delay (500 milliseconds or .5 seconds).
44       */
45      public static final int DEFAULT_RECONNECTION_DELAY   = 500;
46  
47      private static final int DEFAULT_RECONNECTS = 3;
48  
49      private static ManagerFactory factory = new AvroManagerFactory();
50  
51      private AvroSourceProtocol client;
52  
53      private final Agent[] agents;
54  
55      private final int batchSize;
56  
57      private final EventList events = new EventList();
58  
59      private int current = 0;
60  
61      private Transceiver transceiver;
62  
63      /**
64       * Constructor
65       * @param name The unique name of this manager.
66       * @param agents An array of Agents.
67       * @param batchSize The number of evetns to include in a batch.
68       */
69      protected FlumeAvroManager(final String name, final String shortName, final Agent[] agents, final int batchSize) {
70          super(name);
71          this.agents = agents;
72          this.batchSize = batchSize;
73          this.client = connect(agents);
74      }
75  
76      /**
77       * Returns a FlumeAvroManager.
78       * @param name The name of the manager.
79       * @param agents The agents to use.
80       * @param batchSize The number of events to include in a batch.
81       * @return A FlumeAvroManager.
82       */
83      public static FlumeAvroManager getManager(final String name, final Agent[] agents, int batchSize) {
84          if (agents == null || agents.length == 0) {
85              throw new IllegalArgumentException("At least one agent is required");
86          }
87  
88          if (batchSize <= 0) {
89              batchSize = 1;
90          }
91  
92          final StringBuilder sb = new StringBuilder("FlumeAvro[");
93          boolean first = true;
94          for (final Agent agent : agents) {
95              if (!first) {
96                  sb.append(",");
97              }
98              sb.append(agent.getHost()).append(":").append(agent.getPort());
99              first = false;
100         }
101         sb.append("]");
102         return (FlumeAvroManager) getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize));
103     }
104 
105     /**
106      * Returns the agents.
107      * @return The agent array.
108      */
109     public Agent[] getAgents() {
110         return agents;
111     }
112 
113     /**
114      * Returns the index of the current agent.
115      * @return The index for the current agent.
116      */
117     public int getCurrent() {
118         return current;
119     }
120 
121     @Override
122     public synchronized void send(final FlumeEvent event, int delay, int retries)  {
123         if (delay == 0) {
124             delay = DEFAULT_RECONNECTION_DELAY;
125         }
126         if (retries == 0) {
127             retries = DEFAULT_RECONNECTS;
128         }
129         if (client == null) {
130             client = connect(agents);
131         }
132         String msg = "No Flume agents are available";
133         if (client != null) {
134             final AvroFlumeEvent avroEvent = new AvroFlumeEvent();
135             avroEvent.setBody(ByteBuffer.wrap(event.getBody()));
136             avroEvent.setHeaders(new HashMap<CharSequence, CharSequence>());
137 
138             for (final Map.Entry<String, String> entry : event.getHeaders().entrySet()) {
139                 avroEvent.getHeaders().put(entry.getKey(), entry.getValue());
140             }
141 
142             final List<AvroFlumeEvent> batch = batchSize > 1 ? events.addAndGet(avroEvent, batchSize) : null;
143             if (batch == null && batchSize > 1) {
144                 return;
145             }
146 
147             int i = 0;
148 
149             msg = "Error writing to " + getName();
150 
151             do {
152                 try {
153                     final Status status = (batch == null) ? client.append(avroEvent) : client.appendBatch(batch);
154                     if (!status.equals(Status.OK)) {
155                         throw new AvroRemoteException("RPC communication failed to " + agents[current].getHost() +
156                             ":" + agents[current].getPort());
157                     }
158                     return;
159                 } catch (final Exception ex) {
160                     if (i == retries - 1) {
161                         msg = "Error writing to " + getName() + " at " + agents[current].getHost() + ":" +
162                             agents[current].getPort();
163                         LOGGER.warn(msg, ex);
164                         break;
165                     }
166                     sleep(delay);
167                 }
168             } while (++i < retries);
169 
170             for (int index = 0; index < agents.length; ++index) {
171                 if (index == current) {
172                     continue;
173                 }
174                 final Agent agent = agents[index];
175                 i = 0;
176                 do {
177                     try {
178                         transceiver = null;
179                         final AvroSourceProtocol c = connect(agent.getHost(), agent.getPort());
180                         final Status status = (batch == null) ? c.append(avroEvent) : c.appendBatch(batch);
181                         if (!status.equals(Status.OK)) {
182                             if (i == retries - 1) {
183                                 final String warnMsg = "RPC communication failed to " + getName() + " at " +
184                                     agent.getHost() + ":" + agent.getPort();
185                                 LOGGER.warn(warnMsg);
186                             }
187                             continue;
188                         }
189                         client = c;
190                         current = i;
191                         return;
192                     } catch (final Exception ex) {
193                         if (i == retries - 1) {
194                             final String warnMsg = "Error writing to " + getName() + " at " + agent.getHost() + ":" +
195                                 agent.getPort();
196                             LOGGER.warn(warnMsg, ex);
197                             break;
198                         }
199                         sleep(delay);
200                     }
201                 } while (++i < retries);
202             }
203         }
204 
205         throw new AppenderRuntimeException(msg);
206 
207     }
208 
209     private void sleep(final int delay) {
210         try {
211             Thread.sleep(delay);
212         } catch (final InterruptedException ex) {
213             Thread.currentThread().interrupt();
214         }
215     }
216 
217     /**
218      * There is a very good chance that this will always return the first agent even if it isn't available.
219      * @param agents The list of agents to choose from
220      * @return The FlumeEventAvroServer.
221      */
222     private AvroSourceProtocol connect(final Agent[] agents) {
223         int i = 0;
224         for (final Agent agent : agents) {
225             final AvroSourceProtocol server = connect(agent.getHost(), agent.getPort());
226             if (server != null) {
227                 current = i;
228                 return server;
229             }
230             ++i;
231         }
232         LOGGER.error("Flume manager " + getName() + " was unable to connect to any agents");
233         return null;
234     }
235 
236     private AvroSourceProtocol connect(final String hostname, final int port) {
237         try {
238             if (transceiver == null) {
239                 transceiver = new NettyTransceiver(new InetSocketAddress(hostname, port));
240             }
241         } catch (final IOException ioe) {
242             LOGGER.error("Unable to create transceiver", ioe);
243             return null;
244         }
245         try {
246             return SpecificRequestor.getClient(AvroSourceProtocol.class, transceiver);
247         } catch (final IOException ioe) {
248             LOGGER.error("Unable to create Avro client");
249             return null;
250         }
251     }
252 
253     @Override
254     protected void releaseSub() {
255         if (transceiver != null) {
256             try {
257                 transceiver.close();
258             } catch (final IOException ioe) {
259                 LOGGER.error("Attempt to clean up Avro transceiver failed", ioe);
260             }
261         }
262         client = null;
263     }
264 
265     /**
266      * Thread-safe List management of a batch.
267      */
268     private static class EventList extends ArrayList<AvroFlumeEvent> {
269 
270         /**
271          * Generated serial version ID.
272          */
273         private static final long serialVersionUID = -1599817377315957495L;
274 
275         public synchronized List<AvroFlumeEvent> addAndGet(final AvroFlumeEvent event, final int batchSize) {
276             super.add(event);
277             if (this.size() >= batchSize) {
278                 final List<AvroFlumeEvent> events = new ArrayList<AvroFlumeEvent>();
279                 events.addAll(this);
280                 clear();
281                 return events;
282             } else {
283                 return null;
284             }
285         }
286     }
287 
288     /**
289      * Factory data.
290      */
291     private static class FactoryData {
292         private final String name;
293         private final Agent[] agents;
294         private final int batchSize;
295 
296         /**
297          * Constructor.
298          * @param name The name of the Appender.
299          * @param agents The agents.
300          * @param batchSize The number of events to include in a batch.
301          */
302         public FactoryData(final String name, final Agent[] agents, final int batchSize) {
303             this.name = name;
304             this.agents = agents;
305             this.batchSize = batchSize;
306         }
307     }
308 
309     /**
310      * Avro Manager Factory.
311      */
312     private static class AvroManagerFactory implements ManagerFactory<FlumeAvroManager, FactoryData> {
313 
314         /**
315          * Create the FlumeAvroManager.
316          * @param name The name of the entity to manage.
317          * @param data The data required to create the entity.
318          * @return The FlumeAvroManager.
319          */
320         public FlumeAvroManager createManager(final String name, final FactoryData data) {
321             try {
322 
323                 return new FlumeAvroManager(name, data.name, data.agents, data.batchSize);
324             } catch (final Exception ex) {
325                 LOGGER.error("Could not create FlumeAvroManager", ex);
326             }
327             return null;
328         }
329     }
330 
331 }