001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 package org.apache.logging.log4j.flume.appender; 018 019 import org.apache.flume.Event; 020 import org.apache.flume.api.RpcClient; 021 import org.apache.flume.api.RpcClientFactory; 022 import org.apache.logging.log4j.core.appender.AppenderRuntimeException; 023 import org.apache.logging.log4j.core.appender.ManagerFactory; 024 025 import java.util.Properties; 026 027 /** 028 * Manager for FlumeAvroAppenders. 029 */ 030 public class FlumeAvroManager extends AbstractFlumeManager { 031 032 private static final int MAX_RECONNECTS = 3; 033 034 private static AvroManagerFactory factory = new AvroManagerFactory(); 035 036 private final Agent[] agents; 037 038 private final int batchSize; 039 040 private final int retries; 041 042 private final int connectTimeout; 043 044 private final int requestTimeout; 045 046 private int current = 0; 047 048 private RpcClient rpcClient = null; 049 050 /** 051 * Constructor 052 * @param name The unique name of this manager. 053 * @param agents An array of Agents. 054 * @param batchSize The number of events to include in a batch. 055 * @param retries The number of times to retry connecting before giving up. 056 * @param connectTimeout The connection timeout in ms. 057 * @param requestTimeout The request timeout in ms. 058 * 059 */ 060 protected FlumeAvroManager(final String name, final String shortName, final Agent[] agents, final int batchSize, 061 final int retries, final int connectTimeout, final int requestTimeout) { 062 super(name); 063 this.agents = agents; 064 this.batchSize = batchSize; 065 this.retries = retries; 066 this.connectTimeout = connectTimeout; 067 this.requestTimeout = requestTimeout; 068 this.rpcClient = connect(agents, retries, connectTimeout, requestTimeout); 069 } 070 071 /** 072 * Returns a FlumeAvroManager. 073 * @param name The name of the manager. 074 * @param agents The agents to use. 075 * @param batchSize The number of events to include in a batch. 076 * @return A FlumeAvroManager. 077 */ 078 public static FlumeAvroManager getManager(final String name, final Agent[] agents, int batchSize, 079 final int retries, final int connectTimeout, final int requestTimeout) { 080 if (agents == null || agents.length == 0) { 081 throw new IllegalArgumentException("At least one agent is required"); 082 } 083 084 if (batchSize <= 0) { 085 batchSize = 1; 086 } 087 088 final StringBuilder sb = new StringBuilder("FlumeAvro["); 089 boolean first = true; 090 for (final Agent agent : agents) { 091 if (!first) { 092 sb.append(","); 093 } 094 sb.append(agent.getHost()).append(":").append(agent.getPort()); 095 first = false; 096 } 097 sb.append("]"); 098 return getManager(sb.toString(), factory, 099 new FactoryData(name, agents, batchSize, retries, connectTimeout, requestTimeout)); 100 } 101 102 /** 103 * Returns the agents. 104 * @return The agent array. 105 */ 106 public Agent[] getAgents() { 107 return agents; 108 } 109 110 /** 111 * Returns the index of the current agent. 112 * @return The index for the current agent. 113 */ 114 public int getCurrent() { 115 return current; 116 } 117 118 public int getRetries() { 119 return retries; 120 } 121 122 public int getConnectTimeout() { 123 return connectTimeout; 124 } 125 126 public int getRequestTimeout() { 127 return requestTimeout; 128 } 129 130 public synchronized void send(final BatchEvent events) { 131 if (rpcClient == null) { 132 rpcClient = connect(agents, retries, connectTimeout, requestTimeout); 133 } 134 135 if (rpcClient != null) { 136 try { 137 LOGGER.trace("Sending batch of {} events", events.getEvents().size()); 138 rpcClient.appendBatch(events.getEvents()); 139 } catch (final Exception ex) { 140 rpcClient.close(); 141 rpcClient = null; 142 String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ":" + 143 agents[current].getPort(); 144 LOGGER.warn(msg, ex); 145 throw new AppenderRuntimeException("No Flume agents are available"); 146 } 147 } 148 } 149 150 @Override 151 public synchronized void send(final Event event) { 152 if (rpcClient == null) { 153 rpcClient = connect(agents, retries, connectTimeout, requestTimeout); 154 } 155 156 if (rpcClient != null) { 157 try { 158 rpcClient.append(event); 159 } catch (final Exception ex) { 160 rpcClient.close(); 161 rpcClient = null; 162 String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ":" + 163 agents[current].getPort(); 164 LOGGER.warn(msg, ex); 165 throw new AppenderRuntimeException("No Flume agents are available"); 166 } 167 } 168 } 169 170 /** 171 * There is a very good chance that this will always return the first agent even if it isn't available. 172 * @param agents The list of agents to choose from 173 * @return The FlumeEventAvroServer. 174 */ 175 176 private RpcClient connect(final Agent[] agents, int retries, int connectTimeout, int requestTimeout) { 177 try { 178 Properties props = new Properties(); 179 180 props.put("client.type", agents.length > 1 ? "default_failover" : "default"); 181 182 int count = 1; 183 StringBuilder sb = new StringBuilder(); 184 for (Agent agent : agents) { 185 if (sb.length() > 0) { 186 sb.append(" "); 187 } 188 String hostName = "host" + count++; 189 props.put("hosts." + hostName, agent.getHost() + ":" + agent.getPort()); 190 sb.append(hostName); 191 } 192 props.put("hosts", sb.toString()); 193 if (batchSize > 0) { 194 props.put("batch-size", Integer.toString(batchSize)); 195 } 196 if (retries > 1) { 197 if (retries > MAX_RECONNECTS) { 198 retries = MAX_RECONNECTS; 199 } 200 props.put("max-attempts", Integer.toString(retries * agents.length)); 201 } 202 if (requestTimeout >= 1000) { 203 props.put("request-timeout", Integer.toString(requestTimeout)); 204 } 205 if (connectTimeout >= 1000) { 206 props.put("connect-timeout", Integer.toString(connectTimeout)); 207 } 208 return RpcClientFactory.getInstance(props); 209 } catch (Exception ex) { 210 LOGGER.error("Unable to create Flume RPCClient: {}", ex.getMessage()); 211 return null; 212 } 213 } 214 215 @Override 216 protected void releaseSub() { 217 if (rpcClient != null) { 218 try { 219 rpcClient.close(); 220 } catch (final Exception ex) { 221 LOGGER.error("Attempt to close RPC client failed", ex); 222 } 223 } 224 rpcClient = null; 225 } 226 227 /** 228 * Factory data. 229 */ 230 private static class FactoryData { 231 private final String name; 232 private final Agent[] agents; 233 private final int batchSize; 234 private final int retries; 235 private final int conntectTimeout; 236 private final int requestTimeout; 237 238 /** 239 * Constructor. 240 * @param name The name of the Appender. 241 * @param agents The agents. 242 * @param batchSize The number of events to include in a batch. 243 */ 244 public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries, 245 final int connectTimeout, final int requestTimeout) { 246 this.name = name; 247 this.agents = agents; 248 this.batchSize = batchSize; 249 this.retries = retries; 250 this.conntectTimeout = connectTimeout; 251 this.requestTimeout = requestTimeout; 252 } 253 } 254 255 /** 256 * Avro Manager Factory. 257 */ 258 private static class AvroManagerFactory implements ManagerFactory<FlumeAvroManager, FactoryData> { 259 260 /** 261 * Create the FlumeAvroManager. 262 * @param name The name of the entity to manage. 263 * @param data The data required to create the entity. 264 * @return The FlumeAvroManager. 265 */ 266 @Override 267 public FlumeAvroManager createManager(final String name, final FactoryData data) { 268 try { 269 270 return new FlumeAvroManager(name, data.name, data.agents, data.batchSize, data.retries, 271 data.conntectTimeout, data.requestTimeout); 272 } catch (final Exception ex) { 273 LOGGER.error("Could not create FlumeAvroManager", ex); 274 } 275 return null; 276 } 277 } 278 279 }