1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.flume.appender;
18
19 import org.apache.flume.Event;
20 import org.apache.flume.api.RpcClient;
21 import org.apache.flume.api.RpcClientFactory;
22 import org.apache.logging.log4j.core.appender.AppenderRuntimeException;
23 import org.apache.logging.log4j.core.appender.ManagerFactory;
24
25 import java.util.Properties;
26
27
28
29
30 public class FlumeAvroManager extends AbstractFlumeManager {
31
32 private static final int MAX_RECONNECTS = 3;
33
34 private static AvroManagerFactory factory = new AvroManagerFactory();
35
36 private final Agent[] agents;
37
38 private final int batchSize;
39
40 private final int retries;
41
42 private final int connectTimeout;
43
44 private final int requestTimeout;
45
46 private int current = 0;
47
48 private RpcClient rpcClient = null;
49
50
51
52
53
54
55
56
57
58
59
60 protected FlumeAvroManager(final String name, final String shortName, final Agent[] agents, final int batchSize,
61 final int retries, final int connectTimeout, final int requestTimeout) {
62 super(name);
63 this.agents = agents;
64 this.batchSize = batchSize;
65 this.retries = retries;
66 this.connectTimeout = connectTimeout;
67 this.requestTimeout = requestTimeout;
68 this.rpcClient = connect(agents, retries, connectTimeout, requestTimeout);
69 }
70
71
72
73
74
75
76
77
78 public static FlumeAvroManager getManager(final String name, final Agent[] agents, int batchSize,
79 final int retries, final int connectTimeout, final int requestTimeout) {
80 if (agents == null || agents.length == 0) {
81 throw new IllegalArgumentException("At least one agent is required");
82 }
83
84 if (batchSize <= 0) {
85 batchSize = 1;
86 }
87
88 final StringBuilder sb = new StringBuilder("FlumeAvro[");
89 boolean first = true;
90 for (final Agent agent : agents) {
91 if (!first) {
92 sb.append(",");
93 }
94 sb.append(agent.getHost()).append(":").append(agent.getPort());
95 first = false;
96 }
97 sb.append("]");
98 return getManager(sb.toString(), factory,
99 new FactoryData(name, agents, batchSize, retries, connectTimeout, requestTimeout));
100 }
101
102
103
104
105
106 public Agent[] getAgents() {
107 return agents;
108 }
109
110
111
112
113
114 public int getCurrent() {
115 return current;
116 }
117
118 public int getRetries() {
119 return retries;
120 }
121
122 public int getConnectTimeout() {
123 return connectTimeout;
124 }
125
126 public int getRequestTimeout() {
127 return requestTimeout;
128 }
129
130 public synchronized void send(final BatchEvent events) {
131 if (rpcClient == null) {
132 rpcClient = connect(agents, retries, connectTimeout, requestTimeout);
133 }
134
135 if (rpcClient != null) {
136 try {
137 LOGGER.trace("Sending batch of {} events", events.getEvents().size());
138 rpcClient.appendBatch(events.getEvents());
139 } catch (final Exception ex) {
140 rpcClient.close();
141 rpcClient = null;
142 String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ":" +
143 agents[current].getPort();
144 LOGGER.warn(msg, ex);
145 throw new AppenderRuntimeException("No Flume agents are available");
146 }
147 }
148 }
149
150 @Override
151 public synchronized void send(final Event event) {
152 if (rpcClient == null) {
153 rpcClient = connect(agents, retries, connectTimeout, requestTimeout);
154 }
155
156 if (rpcClient != null) {
157 try {
158 rpcClient.append(event);
159 } catch (final Exception ex) {
160 rpcClient.close();
161 rpcClient = null;
162 String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ":" +
163 agents[current].getPort();
164 LOGGER.warn(msg, ex);
165 throw new AppenderRuntimeException("No Flume agents are available");
166 }
167 }
168 }
169
170
171
172
173
174
175
176 private RpcClient connect(final Agent[] agents, int retries, int connectTimeout, int requestTimeout) {
177 try {
178 Properties props = new Properties();
179
180 props.put("client.type", agents.length > 1 ? "default_failover" : "default");
181
182 int count = 1;
183 StringBuilder sb = new StringBuilder();
184 for (Agent agent : agents) {
185 if (sb.length() > 0) {
186 sb.append(" ");
187 }
188 String hostName = "host" + count++;
189 props.put("hosts." + hostName, agent.getHost() + ":" + agent.getPort());
190 sb.append(hostName);
191 }
192 props.put("hosts", sb.toString());
193 if (batchSize > 0) {
194 props.put("batch-size", Integer.toString(batchSize));
195 }
196 if (retries > 1) {
197 if (retries > MAX_RECONNECTS) {
198 retries = MAX_RECONNECTS;
199 }
200 props.put("max-attempts", Integer.toString(retries * agents.length));
201 }
202 if (requestTimeout >= 1000) {
203 props.put("request-timeout", Integer.toString(requestTimeout));
204 }
205 if (connectTimeout >= 1000) {
206 props.put("connect-timeout", Integer.toString(connectTimeout));
207 }
208 return RpcClientFactory.getInstance(props);
209 } catch (Exception ex) {
210 LOGGER.error("Unable to create Flume RPCClient: {}", ex.getMessage());
211 return null;
212 }
213 }
214
215 @Override
216 protected void releaseSub() {
217 if (rpcClient != null) {
218 try {
219 rpcClient.close();
220 } catch (final Exception ex) {
221 LOGGER.error("Attempt to close RPC client failed", ex);
222 }
223 }
224 rpcClient = null;
225 }
226
227
228
229
230 private static class FactoryData {
231 private final String name;
232 private final Agent[] agents;
233 private final int batchSize;
234 private final int retries;
235 private final int conntectTimeout;
236 private final int requestTimeout;
237
238
239
240
241
242
243
244 public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
245 final int connectTimeout, final int requestTimeout) {
246 this.name = name;
247 this.agents = agents;
248 this.batchSize = batchSize;
249 this.retries = retries;
250 this.conntectTimeout = connectTimeout;
251 this.requestTimeout = requestTimeout;
252 }
253 }
254
255
256
257
258 private static class AvroManagerFactory implements ManagerFactory<FlumeAvroManager, FactoryData> {
259
260
261
262
263
264
265
266 @Override
267 public FlumeAvroManager createManager(final String name, final FactoryData data) {
268 try {
269
270 return new FlumeAvroManager(name, data.name, data.agents, data.batchSize, data.retries,
271 data.conntectTimeout, data.requestTimeout);
272 } catch (final Exception ex) {
273 LOGGER.error("Could not create FlumeAvroManager", ex);
274 }
275 return null;
276 }
277 }
278
279 }