1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.flume.appender;
18
19 import org.apache.avro.AvroRemoteException;
20 import org.apache.avro.ipc.NettyTransceiver;
21 import org.apache.avro.ipc.Transceiver;
22 import org.apache.avro.ipc.specific.SpecificRequestor;
23 import org.apache.flume.source.avro.AvroFlumeEvent;
24 import org.apache.flume.source.avro.AvroSourceProtocol;
25 import org.apache.flume.source.avro.Status;
26 import org.apache.logging.log4j.core.appender.AppenderRuntimeException;
27 import org.apache.logging.log4j.core.appender.ManagerFactory;
28
29 import java.io.IOException;
30 import java.net.InetSocketAddress;
31 import java.nio.ByteBuffer;
32 import java.util.ArrayList;
33 import java.util.HashMap;
34 import java.util.List;
35 import java.util.Map;
36
37
38
39
40 public class FlumeAvroManager extends AbstractFlumeManager {
41
42
43
44
45 public static final int DEFAULT_RECONNECTION_DELAY = 500;
46
47 private static final int DEFAULT_RECONNECTS = 3;
48
49 private static ManagerFactory factory = new AvroManagerFactory();
50
51 private AvroSourceProtocol client;
52
53 private final Agent[] agents;
54
55 private final int batchSize;
56
57 private final EventList events = new EventList();
58
59 private int current = 0;
60
61 private Transceiver transceiver;
62
63
64
65
66
67
68
69 protected FlumeAvroManager(final String name, final String shortName, final Agent[] agents, final int batchSize) {
70 super(name);
71 this.agents = agents;
72 this.batchSize = batchSize;
73 this.client = connect(agents);
74 }
75
76
77
78
79
80
81
82
83 public static FlumeAvroManager getManager(final String name, final Agent[] agents, int batchSize) {
84 if (agents == null || agents.length == 0) {
85 throw new IllegalArgumentException("At least one agent is required");
86 }
87
88 if (batchSize <= 0) {
89 batchSize = 1;
90 }
91
92 final StringBuilder sb = new StringBuilder("FlumeAvro[");
93 boolean first = true;
94 for (final Agent agent : agents) {
95 if (!first) {
96 sb.append(",");
97 }
98 sb.append(agent.getHost()).append(":").append(agent.getPort());
99 first = false;
100 }
101 sb.append("]");
102 return (FlumeAvroManager) getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize));
103 }
104
105
106
107
108
109 public Agent[] getAgents() {
110 return agents;
111 }
112
113
114
115
116
117 public int getCurrent() {
118 return current;
119 }
120
121 @Override
122 public synchronized void send(final FlumeEvent event, int delay, int retries) {
123 if (delay == 0) {
124 delay = DEFAULT_RECONNECTION_DELAY;
125 }
126 if (retries == 0) {
127 retries = DEFAULT_RECONNECTS;
128 }
129 if (client == null) {
130 client = connect(agents);
131 }
132 String msg = "No Flume agents are available";
133 if (client != null) {
134 final AvroFlumeEvent avroEvent = new AvroFlumeEvent();
135 avroEvent.setBody(ByteBuffer.wrap(event.getBody()));
136 avroEvent.setHeaders(new HashMap<CharSequence, CharSequence>());
137
138 for (final Map.Entry<String, String> entry : event.getHeaders().entrySet()) {
139 avroEvent.getHeaders().put(entry.getKey(), entry.getValue());
140 }
141
142 final List<AvroFlumeEvent> batch = batchSize > 1 ? events.addAndGet(avroEvent, batchSize) : null;
143 if (batch == null && batchSize > 1) {
144 return;
145 }
146
147 int i = 0;
148
149 msg = "Error writing to " + getName();
150
151 do {
152 try {
153 final Status status = (batch == null) ? client.append(avroEvent) : client.appendBatch(batch);
154 if (!status.equals(Status.OK)) {
155 throw new AvroRemoteException("RPC communication failed to " + agents[current].getHost() +
156 ":" + agents[current].getPort());
157 }
158 return;
159 } catch (final Exception ex) {
160 if (i == retries - 1) {
161 msg = "Error writing to " + getName() + " at " + agents[current].getHost() + ":" +
162 agents[current].getPort();
163 LOGGER.warn(msg, ex);
164 break;
165 }
166 sleep(delay);
167 }
168 } while (++i < retries);
169
170 for (int index = 0; index < agents.length; ++index) {
171 if (index == current) {
172 continue;
173 }
174 final Agent agent = agents[index];
175 i = 0;
176 do {
177 try {
178 transceiver = null;
179 final AvroSourceProtocol c = connect(agent.getHost(), agent.getPort());
180 final Status status = (batch == null) ? c.append(avroEvent) : c.appendBatch(batch);
181 if (!status.equals(Status.OK)) {
182 if (i == retries - 1) {
183 final String warnMsg = "RPC communication failed to " + getName() + " at " +
184 agent.getHost() + ":" + agent.getPort();
185 LOGGER.warn(warnMsg);
186 }
187 continue;
188 }
189 client = c;
190 current = i;
191 return;
192 } catch (final Exception ex) {
193 if (i == retries - 1) {
194 final String warnMsg = "Error writing to " + getName() + " at " + agent.getHost() + ":" +
195 agent.getPort();
196 LOGGER.warn(warnMsg, ex);
197 break;
198 }
199 sleep(delay);
200 }
201 } while (++i < retries);
202 }
203 }
204
205 throw new AppenderRuntimeException(msg);
206
207 }
208
209 private void sleep(final int delay) {
210 try {
211 Thread.sleep(delay);
212 } catch (final InterruptedException ex) {
213 Thread.currentThread().interrupt();
214 }
215 }
216
217
218
219
220
221
222 private AvroSourceProtocol connect(final Agent[] agents) {
223 int i = 0;
224 for (final Agent agent : agents) {
225 final AvroSourceProtocol server = connect(agent.getHost(), agent.getPort());
226 if (server != null) {
227 current = i;
228 return server;
229 }
230 ++i;
231 }
232 LOGGER.error("Flume manager " + getName() + " was unable to connect to any agents");
233 return null;
234 }
235
236 private AvroSourceProtocol connect(final String hostname, final int port) {
237 try {
238 if (transceiver == null) {
239 transceiver = new NettyTransceiver(new InetSocketAddress(hostname, port));
240 }
241 } catch (final IOException ioe) {
242 LOGGER.error("Unable to create transceiver", ioe);
243 return null;
244 }
245 try {
246 return SpecificRequestor.getClient(AvroSourceProtocol.class, transceiver);
247 } catch (final IOException ioe) {
248 LOGGER.error("Unable to create Avro client");
249 return null;
250 }
251 }
252
253 @Override
254 protected void releaseSub() {
255 if (transceiver != null) {
256 try {
257 transceiver.close();
258 } catch (final IOException ioe) {
259 LOGGER.error("Attempt to clean up Avro transceiver failed", ioe);
260 }
261 }
262 client = null;
263 }
264
265
266
267
268 private static class EventList extends ArrayList<AvroFlumeEvent> {
269
270
271
272
273 private static final long serialVersionUID = -1599817377315957495L;
274
275 public synchronized List<AvroFlumeEvent> addAndGet(final AvroFlumeEvent event, final int batchSize) {
276 super.add(event);
277 if (this.size() >= batchSize) {
278 final List<AvroFlumeEvent> events = new ArrayList<AvroFlumeEvent>();
279 events.addAll(this);
280 clear();
281 return events;
282 } else {
283 return null;
284 }
285 }
286 }
287
288
289
290
291 private static class FactoryData {
292 private final String name;
293 private final Agent[] agents;
294 private final int batchSize;
295
296
297
298
299
300
301
302 public FactoryData(final String name, final Agent[] agents, final int batchSize) {
303 this.name = name;
304 this.agents = agents;
305 this.batchSize = batchSize;
306 }
307 }
308
309
310
311
312 private static class AvroManagerFactory implements ManagerFactory<FlumeAvroManager, FactoryData> {
313
314
315
316
317
318
319
320 public FlumeAvroManager createManager(final String name, final FactoryData data) {
321 try {
322
323 return new FlumeAvroManager(name, data.name, data.agents, data.batchSize);
324 } catch (final Exception ex) {
325 LOGGER.error("Could not create FlumeAvroManager", ex);
326 }
327 return null;
328 }
329 }
330
331 }