001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 package org.apache.logging.log4j.flume.appender; 018 019 import org.apache.avro.AvroRemoteException; 020 import org.apache.avro.ipc.NettyTransceiver; 021 import org.apache.avro.ipc.Transceiver; 022 import org.apache.avro.ipc.specific.SpecificRequestor; 023 import org.apache.flume.source.avro.AvroFlumeEvent; 024 import org.apache.flume.source.avro.AvroSourceProtocol; 025 import org.apache.flume.source.avro.Status; 026 import org.apache.logging.log4j.core.appender.AppenderRuntimeException; 027 import org.apache.logging.log4j.core.appender.ManagerFactory; 028 029 import java.io.IOException; 030 import java.net.InetSocketAddress; 031 import java.nio.ByteBuffer; 032 import java.util.ArrayList; 033 import java.util.HashMap; 034 import java.util.List; 035 import java.util.Map; 036 037 /** 038 * Manager for FlumeAvroAppenders. 039 */ 040 public class FlumeAvroManager extends AbstractFlumeManager { 041 042 /** 043 The default reconnection delay (500 milliseconds or .5 seconds). 044 */ 045 public static final int DEFAULT_RECONNECTION_DELAY = 500; 046 047 private static final int DEFAULT_RECONNECTS = 3; 048 049 private static ManagerFactory factory = new AvroManagerFactory(); 050 051 private AvroSourceProtocol client; 052 053 private final Agent[] agents; 054 055 private final int batchSize; 056 057 private final EventList events = new EventList(); 058 059 private int current = 0; 060 061 private Transceiver transceiver; 062 063 /** 064 * Constructor 065 * @param name The unique name of this manager. 066 * @param agents An array of Agents. 067 * @param batchSize The number of evetns to include in a batch. 068 */ 069 protected FlumeAvroManager(final String name, final String shortName, final Agent[] agents, final int batchSize) { 070 super(name); 071 this.agents = agents; 072 this.batchSize = batchSize; 073 this.client = connect(agents); 074 } 075 076 /** 077 * Returns a FlumeAvroManager. 078 * @param name The name of the manager. 079 * @param agents The agents to use. 080 * @param batchSize The number of events to include in a batch. 081 * @return A FlumeAvroManager. 082 */ 083 public static FlumeAvroManager getManager(final String name, final Agent[] agents, int batchSize) { 084 if (agents == null || agents.length == 0) { 085 throw new IllegalArgumentException("At least one agent is required"); 086 } 087 088 if (batchSize <= 0) { 089 batchSize = 1; 090 } 091 092 final StringBuilder sb = new StringBuilder("FlumeAvro["); 093 boolean first = true; 094 for (final Agent agent : agents) { 095 if (!first) { 096 sb.append(","); 097 } 098 sb.append(agent.getHost()).append(":").append(agent.getPort()); 099 first = false; 100 } 101 sb.append("]"); 102 return (FlumeAvroManager) getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize)); 103 } 104 105 /** 106 * Returns the agents. 107 * @return The agent array. 108 */ 109 public Agent[] getAgents() { 110 return agents; 111 } 112 113 /** 114 * Returns the index of the current agent. 115 * @return The index for the current agent. 116 */ 117 public int getCurrent() { 118 return current; 119 } 120 121 @Override 122 public synchronized void send(final FlumeEvent event, int delay, int retries) { 123 if (delay == 0) { 124 delay = DEFAULT_RECONNECTION_DELAY; 125 } 126 if (retries == 0) { 127 retries = DEFAULT_RECONNECTS; 128 } 129 if (client == null) { 130 client = connect(agents); 131 } 132 String msg = "No Flume agents are available"; 133 if (client != null) { 134 final AvroFlumeEvent avroEvent = new AvroFlumeEvent(); 135 avroEvent.setBody(ByteBuffer.wrap(event.getBody())); 136 avroEvent.setHeaders(new HashMap<CharSequence, CharSequence>()); 137 138 for (final Map.Entry<String, String> entry : event.getHeaders().entrySet()) { 139 avroEvent.getHeaders().put(entry.getKey(), entry.getValue()); 140 } 141 142 final List<AvroFlumeEvent> batch = batchSize > 1 ? events.addAndGet(avroEvent, batchSize) : null; 143 if (batch == null && batchSize > 1) { 144 return; 145 } 146 147 int i = 0; 148 149 msg = "Error writing to " + getName(); 150 151 do { 152 try { 153 final Status status = (batch == null) ? client.append(avroEvent) : client.appendBatch(batch); 154 if (!status.equals(Status.OK)) { 155 throw new AvroRemoteException("RPC communication failed to " + agents[current].getHost() + 156 ":" + agents[current].getPort()); 157 } 158 return; 159 } catch (final Exception ex) { 160 if (i == retries - 1) { 161 msg = "Error writing to " + getName() + " at " + agents[current].getHost() + ":" + 162 agents[current].getPort(); 163 LOGGER.warn(msg, ex); 164 break; 165 } 166 sleep(delay); 167 } 168 } while (++i < retries); 169 170 for (int index = 0; index < agents.length; ++index) { 171 if (index == current) { 172 continue; 173 } 174 final Agent agent = agents[index]; 175 i = 0; 176 do { 177 try { 178 transceiver = null; 179 final AvroSourceProtocol c = connect(agent.getHost(), agent.getPort()); 180 final Status status = (batch == null) ? c.append(avroEvent) : c.appendBatch(batch); 181 if (!status.equals(Status.OK)) { 182 if (i == retries - 1) { 183 final String warnMsg = "RPC communication failed to " + getName() + " at " + 184 agent.getHost() + ":" + agent.getPort(); 185 LOGGER.warn(warnMsg); 186 } 187 continue; 188 } 189 client = c; 190 current = i; 191 return; 192 } catch (final Exception ex) { 193 if (i == retries - 1) { 194 final String warnMsg = "Error writing to " + getName() + " at " + agent.getHost() + ":" + 195 agent.getPort(); 196 LOGGER.warn(warnMsg, ex); 197 break; 198 } 199 sleep(delay); 200 } 201 } while (++i < retries); 202 } 203 } 204 205 throw new AppenderRuntimeException(msg); 206 207 } 208 209 private void sleep(final int delay) { 210 try { 211 Thread.sleep(delay); 212 } catch (final InterruptedException ex) { 213 Thread.currentThread().interrupt(); 214 } 215 } 216 217 /** 218 * There is a very good chance that this will always return the first agent even if it isn't available. 219 * @param agents The list of agents to choose from 220 * @return The FlumeEventAvroServer. 221 */ 222 private AvroSourceProtocol connect(final Agent[] agents) { 223 int i = 0; 224 for (final Agent agent : agents) { 225 final AvroSourceProtocol server = connect(agent.getHost(), agent.getPort()); 226 if (server != null) { 227 current = i; 228 return server; 229 } 230 ++i; 231 } 232 LOGGER.error("Flume manager " + getName() + " was unable to connect to any agents"); 233 return null; 234 } 235 236 private AvroSourceProtocol connect(final String hostname, final int port) { 237 try { 238 if (transceiver == null) { 239 transceiver = new NettyTransceiver(new InetSocketAddress(hostname, port)); 240 } 241 } catch (final IOException ioe) { 242 LOGGER.error("Unable to create transceiver", ioe); 243 return null; 244 } 245 try { 246 return SpecificRequestor.getClient(AvroSourceProtocol.class, transceiver); 247 } catch (final IOException ioe) { 248 LOGGER.error("Unable to create Avro client"); 249 return null; 250 } 251 } 252 253 @Override 254 protected void releaseSub() { 255 if (transceiver != null) { 256 try { 257 transceiver.close(); 258 } catch (final IOException ioe) { 259 LOGGER.error("Attempt to clean up Avro transceiver failed", ioe); 260 } 261 } 262 client = null; 263 } 264 265 /** 266 * Thread-safe List management of a batch. 267 */ 268 private static class EventList extends ArrayList<AvroFlumeEvent> { 269 270 /** 271 * Generated serial version ID. 272 */ 273 private static final long serialVersionUID = -1599817377315957495L; 274 275 public synchronized List<AvroFlumeEvent> addAndGet(final AvroFlumeEvent event, final int batchSize) { 276 super.add(event); 277 if (this.size() >= batchSize) { 278 final List<AvroFlumeEvent> events = new ArrayList<AvroFlumeEvent>(); 279 events.addAll(this); 280 clear(); 281 return events; 282 } else { 283 return null; 284 } 285 } 286 } 287 288 /** 289 * Factory data. 290 */ 291 private static class FactoryData { 292 private final String name; 293 private final Agent[] agents; 294 private final int batchSize; 295 296 /** 297 * Constructor. 298 * @param name The name of the Appender. 299 * @param agents The agents. 300 * @param batchSize The number of events to include in a batch. 301 */ 302 public FactoryData(final String name, final Agent[] agents, final int batchSize) { 303 this.name = name; 304 this.agents = agents; 305 this.batchSize = batchSize; 306 } 307 } 308 309 /** 310 * Avro Manager Factory. 311 */ 312 private static class AvroManagerFactory implements ManagerFactory<FlumeAvroManager, FactoryData> { 313 314 /** 315 * Create the FlumeAvroManager. 316 * @param name The name of the entity to manage. 317 * @param data The data required to create the entity. 318 * @return The FlumeAvroManager. 319 */ 320 public FlumeAvroManager createManager(final String name, final FactoryData data) { 321 try { 322 323 return new FlumeAvroManager(name, data.name, data.agents, data.batchSize); 324 } catch (final Exception ex) { 325 LOGGER.error("Could not create FlumeAvroManager", ex); 326 } 327 return null; 328 } 329 } 330 331 }