001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017package org.apache.logging.log4j.flume.appender; 018 019import java.util.Properties; 020import java.util.concurrent.TimeUnit; 021 022import org.apache.flume.Event; 023import org.apache.flume.api.RpcClient; 024import org.apache.flume.api.RpcClientFactory; 025import org.apache.logging.log4j.core.appender.AppenderLoggingException; 026import org.apache.logging.log4j.core.appender.ManagerFactory; 027 028/** 029 * Manager for FlumeAvroAppenders. 030 */ 031public class FlumeAvroManager extends AbstractFlumeManager { 032 033 private static final int MAX_RECONNECTS = 3; 034 private static final int MINIMUM_TIMEOUT = 1000; 035 036 private static AvroManagerFactory factory = new AvroManagerFactory(); 037 038 private final Agent[] agents; 039 040 private final int batchSize; 041 042 private final long delayNanos; 043 private final int delayMillis; 044 045 private final int retries; 046 047 private final int connectTimeoutMillis; 048 049 private final int requestTimeoutMillis; 050 051 private final int current = 0; 052 053 private RpcClient rpcClient = null; 054 055 private BatchEvent batchEvent = new BatchEvent(); 056 private long nextSend = 0; 057 058 /** 059 * Constructor 060 * @param name The unique name of this manager. 061 * @param agents An array of Agents. 062 * @param batchSize The number of events to include in a batch. 063 * @param retries The number of times to retry connecting before giving up. 064 * @param connectTimeout The connection timeout in ms. 065 * @param requestTimeout The request timeout in ms. 066 * 067 */ 068 protected FlumeAvroManager(final String name, final String shortName, final Agent[] agents, final int batchSize, 069 final int delayMillis, final int retries, final int connectTimeout, final int requestTimeout) { 070 super(name); 071 this.agents = agents; 072 this.batchSize = batchSize; 073 this.delayMillis = delayMillis; 074 this.delayNanos = TimeUnit.MILLISECONDS.toNanos(delayMillis); 075 this.retries = retries; 076 this.connectTimeoutMillis = connectTimeout; 077 this.requestTimeoutMillis = requestTimeout; 078 this.rpcClient = connect(agents, retries, connectTimeout, requestTimeout); 079 } 080 081 /** 082 * Returns a FlumeAvroManager. 083 * @param name The name of the manager. 084 * @param agents The agents to use. 085 * @param batchSize The number of events to include in a batch. 086 * @param delayMillis The number of milliseconds to wait before sending an incomplete batch. 087 * @param retries The number of times to retry connecting before giving up. 088 * @param connectTimeoutMillis The connection timeout in ms. 089 * @param requestTimeoutMillis The request timeout in ms. 090 * @return A FlumeAvroManager. 091 */ 092 public static FlumeAvroManager getManager(final String name, final Agent[] agents, int batchSize, final int delayMillis, 093 final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) { 094 if (agents == null || agents.length == 0) { 095 throw new IllegalArgumentException("At least one agent is required"); 096 } 097 098 if (batchSize <= 0) { 099 batchSize = 1; 100 } 101 102 final StringBuilder sb = new StringBuilder("FlumeAvro["); 103 boolean first = true; 104 for (final Agent agent : agents) { 105 if (!first) { 106 sb.append(','); 107 } 108 sb.append(agent.getHost()).append(':').append(agent.getPort()); 109 first = false; 110 } 111 sb.append(']'); 112 return getManager(sb.toString(), factory, 113 new FactoryData(name, agents, batchSize, delayMillis, retries, connectTimeoutMillis, requestTimeoutMillis)); 114 } 115 116 /** 117 * Returns the agents. 118 * @return The agent array. 119 */ 120 public Agent[] getAgents() { 121 return agents; 122 } 123 124 /** 125 * Returns the index of the current agent. 126 * @return The index for the current agent. 127 */ 128 public int getCurrent() { 129 return current; 130 } 131 132 public int getRetries() { 133 return retries; 134 } 135 136 public int getConnectTimeoutMillis() { 137 return connectTimeoutMillis; 138 } 139 140 public int getRequestTimeoutMillis() { 141 return requestTimeoutMillis; 142 } 143 144 public int getBatchSize() { 145 return batchSize; 146 } 147 148 public int getDelayMillis() { 149 return delayMillis; 150 } 151 152 public synchronized void send(final BatchEvent events) { 153 if (rpcClient == null) { 154 rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis); 155 } 156 157 if (rpcClient != null) { 158 try { 159 LOGGER.trace("Sending batch of {} events", events.getEvents().size()); 160 rpcClient.appendBatch(events.getEvents()); 161 } catch (final Exception ex) { 162 rpcClient.close(); 163 rpcClient = null; 164 final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' + 165 agents[current].getPort(); 166 LOGGER.warn(msg, ex); 167 throw new AppenderLoggingException("No Flume agents are available"); 168 } 169 } else { 170 final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' + 171 agents[current].getPort(); 172 LOGGER.warn(msg); 173 throw new AppenderLoggingException("No Flume agents are available"); 174 } 175 } 176 177 @Override 178 public synchronized void send(final Event event) { 179 if (batchSize == 1) { 180 if (rpcClient == null) { 181 rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis); 182 } 183 184 if (rpcClient != null) { 185 try { 186 rpcClient.append(event); 187 } catch (final Exception ex) { 188 rpcClient.close(); 189 rpcClient = null; 190 final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' + 191 agents[current].getPort(); 192 LOGGER.warn(msg, ex); 193 throw new AppenderLoggingException("No Flume agents are available"); 194 } 195 } else { 196 final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' + 197 agents[current].getPort(); 198 LOGGER.warn(msg); 199 throw new AppenderLoggingException("No Flume agents are available"); 200 } 201 } else { 202 batchEvent.addEvent(event); 203 final int eventCount = batchEvent.getEvents().size(); 204 if (eventCount == 1) { 205 nextSend = System.nanoTime() + delayNanos; 206 } 207 if (eventCount >= batchSize || System.nanoTime() >= nextSend) { 208 send(batchEvent); 209 batchEvent = new BatchEvent(); 210 } 211 } 212 } 213 214 /** 215 * There is a very good chance that this will always return the first agent even if it isn't available. 216 * @param agents The list of agents to choose from 217 * @return The FlumeEventAvroServer. 218 */ 219 private RpcClient connect(final Agent[] agents, int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) { 220 try { 221 final Properties props = new Properties(); 222 223 props.put("client.type", "default_failover"); 224 225 int agentCount = 1; 226 final StringBuilder sb = new StringBuilder(); 227 for (final Agent agent : agents) { 228 if (sb.length() > 0) { 229 sb.append(' '); 230 } 231 final String hostName = "host" + agentCount++; 232 props.put("hosts." + hostName, agent.getHost() + ':' + agent.getPort()); 233 sb.append(hostName); 234 } 235 props.put("hosts", sb.toString()); 236 if (batchSize > 0) { 237 props.put("batch-size", Integer.toString(batchSize)); 238 } 239 if (retries > 1) { 240 if (retries > MAX_RECONNECTS) { 241 retries = MAX_RECONNECTS; 242 } 243 props.put("max-attempts", Integer.toString(retries * agents.length)); 244 } 245 if (requestTimeoutMillis >= MINIMUM_TIMEOUT) { 246 props.put("request-timeout", Integer.toString(requestTimeoutMillis)); 247 } 248 if (connectTimeoutMillis >= MINIMUM_TIMEOUT) { 249 props.put("connect-timeout", Integer.toString(connectTimeoutMillis)); 250 } 251 return RpcClientFactory.getInstance(props); 252 } catch (final Exception ex) { 253 LOGGER.error("Unable to create Flume RPCClient: {}", ex.getMessage()); 254 return null; 255 } 256 } 257 258 @Override 259 protected void releaseSub() { 260 if (rpcClient != null) { 261 try { 262 synchronized(this) { 263 try { 264 if (batchSize > 1 && batchEvent.getEvents().size() > 0) { 265 send(batchEvent); 266 } 267 } catch (final Exception ex) { 268 LOGGER.error("Error sending final batch: {}", ex.getMessage()); 269 } 270 } 271 rpcClient.close(); 272 } catch (final Exception ex) { 273 LOGGER.error("Attempt to close RPC client failed", ex); 274 } 275 } 276 rpcClient = null; 277 } 278 279 /** 280 * Factory data. 281 */ 282 private static class FactoryData { 283 private final String name; 284 private final Agent[] agents; 285 private final int batchSize; 286 private final int delayMillis; 287 private final int retries; 288 private final int conntectTimeoutMillis; 289 private final int requestTimeoutMillis; 290 291 /** 292 * Constructor. 293 * @param name The name of the Appender. 294 * @param agents The agents. 295 * @param batchSize The number of events to include in a batch. 296 */ 297 public FactoryData(final String name, final Agent[] agents, final int batchSize, final int delayMillis, 298 final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) { 299 this.name = name; 300 this.agents = agents; 301 this.batchSize = batchSize; 302 this.delayMillis = delayMillis; 303 this.retries = retries; 304 this.conntectTimeoutMillis = connectTimeoutMillis; 305 this.requestTimeoutMillis = requestTimeoutMillis; 306 } 307 } 308 309 /** 310 * Avro Manager Factory. 311 */ 312 private static class AvroManagerFactory implements ManagerFactory<FlumeAvroManager, FactoryData> { 313 314 /** 315 * Create the FlumeAvroManager. 316 * @param name The name of the entity to manage. 317 * @param data The data required to create the entity. 318 * @return The FlumeAvroManager. 319 */ 320 @Override 321 public FlumeAvroManager createManager(final String name, final FactoryData data) { 322 try { 323 324 return new FlumeAvroManager(name, data.name, data.agents, data.batchSize, data.delayMillis, 325 data.retries, data.conntectTimeoutMillis, data.requestTimeoutMillis); 326 } catch (final Exception ex) { 327 LOGGER.error("Could not create FlumeAvroManager", ex); 328 } 329 return null; 330 } 331 } 332 333}