001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017package org.apache.logging.log4j.flume.appender; 018 019import java.util.Properties; 020 021import org.apache.flume.Event; 022import org.apache.flume.api.RpcClient; 023import org.apache.flume.api.RpcClientFactory; 024import org.apache.logging.log4j.core.appender.AppenderLoggingException; 025import org.apache.logging.log4j.core.appender.ManagerFactory; 026 027/** 028 * Manager for FlumeAvroAppenders. 029 */ 030public class FlumeAvroManager extends AbstractFlumeManager { 031 032 private static final int MAX_RECONNECTS = 3; 033 private static final int MINIMUM_TIMEOUT = 1000; 034 035 private static AvroManagerFactory factory = new AvroManagerFactory(); 036 037 private final Agent[] agents; 038 039 private final int batchSize; 040 041 private final long delayNanos; 042 private final int delayMillis; 043 044 private final int retries; 045 046 private final int connectTimeoutMillis; 047 048 private final int requestTimeoutMillis; 049 050 private final int current = 0; 051 052 private RpcClient rpcClient = null; 053 054 private BatchEvent batchEvent = new BatchEvent(); 055 private long nextSend = 0; 056 057 /** 058 * Constructor 059 * @param name The unique name of this manager. 060 * @param agents An array of Agents. 061 * @param batchSize The number of events to include in a batch. 062 * @param retries The number of times to retry connecting before giving up. 063 * @param connectTimeout The connection timeout in ms. 064 * @param requestTimeout The request timeout in ms. 065 * 066 */ 067 protected FlumeAvroManager(final String name, final String shortName, final Agent[] agents, final int batchSize, 068 final int delayMillis, final int retries, final int connectTimeout, final int requestTimeout) { 069 super(name); 070 this.agents = agents; 071 this.batchSize = batchSize; 072 this.delayMillis = delayMillis; 073 this.delayNanos = delayMillis * 1000000; 074 this.retries = retries; 075 this.connectTimeoutMillis = connectTimeout; 076 this.requestTimeoutMillis = requestTimeout; 077 this.rpcClient = connect(agents, retries, connectTimeout, requestTimeout); 078 } 079 080 /** 081 * Returns a FlumeAvroManager. 082 * @param name The name of the manager. 083 * @param agents The agents to use. 084 * @param batchSize The number of events to include in a batch. 085 * @param delayMillis The number of milliseconds to wait before sending an incomplete batch. 086 * @param retries The number of times to retry connecting before giving up. 087 * @param connectTimeoutMillis The connection timeout in ms. 088 * @param requestTimeoutMillis The request timeout in ms. 089 * @return A FlumeAvroManager. 090 */ 091 public static FlumeAvroManager getManager(final String name, final Agent[] agents, int batchSize, final int delayMillis, 092 final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) { 093 if (agents == null || agents.length == 0) { 094 throw new IllegalArgumentException("At least one agent is required"); 095 } 096 097 if (batchSize <= 0) { 098 batchSize = 1; 099 } 100 101 final StringBuilder sb = new StringBuilder("FlumeAvro["); 102 boolean first = true; 103 for (final Agent agent : agents) { 104 if (!first) { 105 sb.append(','); 106 } 107 sb.append(agent.getHost()).append(':').append(agent.getPort()); 108 first = false; 109 } 110 sb.append(']'); 111 return getManager(sb.toString(), factory, 112 new FactoryData(name, agents, batchSize, delayMillis, retries, connectTimeoutMillis, requestTimeoutMillis)); 113 } 114 115 /** 116 * Returns the agents. 117 * @return The agent array. 118 */ 119 public Agent[] getAgents() { 120 return agents; 121 } 122 123 /** 124 * Returns the index of the current agent. 125 * @return The index for the current agent. 126 */ 127 public int getCurrent() { 128 return current; 129 } 130 131 public int getRetries() { 132 return retries; 133 } 134 135 public int getConnectTimeoutMillis() { 136 return connectTimeoutMillis; 137 } 138 139 public int getRequestTimeoutMillis() { 140 return requestTimeoutMillis; 141 } 142 143 public int getBatchSize() { 144 return batchSize; 145 } 146 147 public int getDelayMillis() { 148 return delayMillis; 149 } 150 151 public synchronized void send(final BatchEvent events) { 152 if (rpcClient == null) { 153 rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis); 154 } 155 156 if (rpcClient != null) { 157 try { 158 LOGGER.trace("Sending batch of {} events", events.getEvents().size()); 159 rpcClient.appendBatch(events.getEvents()); 160 } catch (final Exception ex) { 161 rpcClient.close(); 162 rpcClient = null; 163 final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' + 164 agents[current].getPort(); 165 LOGGER.warn(msg, ex); 166 throw new AppenderLoggingException("No Flume agents are available"); 167 } 168 } else { 169 final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' + 170 agents[current].getPort(); 171 LOGGER.warn(msg); 172 throw new AppenderLoggingException("No Flume agents are available"); 173 } 174 } 175 176 @Override 177 public synchronized void send(final Event event) { 178 if (batchSize == 1) { 179 if (rpcClient == null) { 180 rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis); 181 } 182 183 if (rpcClient != null) { 184 try { 185 rpcClient.append(event); 186 } catch (final Exception ex) { 187 rpcClient.close(); 188 rpcClient = null; 189 final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' + 190 agents[current].getPort(); 191 LOGGER.warn(msg, ex); 192 throw new AppenderLoggingException("No Flume agents are available"); 193 } 194 } else { 195 final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' + 196 agents[current].getPort(); 197 LOGGER.warn(msg); 198 throw new AppenderLoggingException("No Flume agents are available"); 199 } 200 } else { 201 batchEvent.addEvent(event); 202 final int eventCount = batchEvent.getEvents().size(); 203 if (eventCount == 1) { 204 nextSend = System.nanoTime() + delayNanos; 205 } 206 if (eventCount >= batchSize || System.nanoTime() >= nextSend) { 207 send(batchEvent); 208 batchEvent = new BatchEvent(); 209 } 210 } 211 } 212 213 /** 214 * There is a very good chance that this will always return the first agent even if it isn't available. 215 * @param agents The list of agents to choose from 216 * @return The FlumeEventAvroServer. 217 */ 218 private RpcClient connect(final Agent[] agents, int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) { 219 try { 220 final Properties props = new Properties(); 221 222 props.put("client.type", "default_failover"); 223 224 int agentCount = 1; 225 final StringBuilder sb = new StringBuilder(); 226 for (final Agent agent : agents) { 227 if (sb.length() > 0) { 228 sb.append(' '); 229 } 230 final String hostName = "host" + agentCount++; 231 props.put("hosts." + hostName, agent.getHost() + ':' + agent.getPort()); 232 sb.append(hostName); 233 } 234 props.put("hosts", sb.toString()); 235 if (batchSize > 0) { 236 props.put("batch-size", Integer.toString(batchSize)); 237 } 238 if (retries > 1) { 239 if (retries > MAX_RECONNECTS) { 240 retries = MAX_RECONNECTS; 241 } 242 props.put("max-attempts", Integer.toString(retries * agents.length)); 243 } 244 if (requestTimeoutMillis >= MINIMUM_TIMEOUT) { 245 props.put("request-timeout", Integer.toString(requestTimeoutMillis)); 246 } 247 if (connectTimeoutMillis >= MINIMUM_TIMEOUT) { 248 props.put("connect-timeout", Integer.toString(connectTimeoutMillis)); 249 } 250 return RpcClientFactory.getInstance(props); 251 } catch (final Exception ex) { 252 LOGGER.error("Unable to create Flume RPCClient: {}", ex.getMessage()); 253 return null; 254 } 255 } 256 257 @Override 258 protected void releaseSub() { 259 if (rpcClient != null) { 260 try { 261 synchronized(this) { 262 try { 263 if (batchSize > 1 && batchEvent.getEvents().size() > 0) { 264 send(batchEvent); 265 } 266 } catch (final Exception ex) { 267 LOGGER.error("Error sending final batch: {}", ex.getMessage()); 268 } 269 } 270 rpcClient.close(); 271 } catch (final Exception ex) { 272 LOGGER.error("Attempt to close RPC client failed", ex); 273 } 274 } 275 rpcClient = null; 276 } 277 278 /** 279 * Factory data. 280 */ 281 private static class FactoryData { 282 private final String name; 283 private final Agent[] agents; 284 private final int batchSize; 285 private final int delayMillis; 286 private final int retries; 287 private final int conntectTimeoutMillis; 288 private final int requestTimeoutMillis; 289 290 /** 291 * Constructor. 292 * @param name The name of the Appender. 293 * @param agents The agents. 294 * @param batchSize The number of events to include in a batch. 295 */ 296 public FactoryData(final String name, final Agent[] agents, final int batchSize, final int delayMillis, 297 final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) { 298 this.name = name; 299 this.agents = agents; 300 this.batchSize = batchSize; 301 this.delayMillis = delayMillis; 302 this.retries = retries; 303 this.conntectTimeoutMillis = connectTimeoutMillis; 304 this.requestTimeoutMillis = requestTimeoutMillis; 305 } 306 } 307 308 /** 309 * Avro Manager Factory. 310 */ 311 private static class AvroManagerFactory implements ManagerFactory<FlumeAvroManager, FactoryData> { 312 313 /** 314 * Create the FlumeAvroManager. 315 * @param name The name of the entity to manage. 316 * @param data The data required to create the entity. 317 * @return The FlumeAvroManager. 318 */ 319 @Override 320 public FlumeAvroManager createManager(final String name, final FactoryData data) { 321 try { 322 323 return new FlumeAvroManager(name, data.name, data.agents, data.batchSize, data.delayMillis, 324 data.retries, data.conntectTimeoutMillis, data.requestTimeoutMillis); 325 } catch (final Exception ex) { 326 LOGGER.error("Could not create FlumeAvroManager", ex); 327 } 328 return null; 329 } 330 } 331 332}