001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 package org.apache.logging.log4j.flume.appender; 018 019 import org.apache.avro.AvroRemoteException; 020 import org.apache.avro.ipc.NettyTransceiver; 021 import org.apache.avro.ipc.Transceiver; 022 import org.apache.avro.ipc.specific.SpecificRequestor; 023 import org.apache.flume.source.avro.AvroFlumeEvent; 024 import org.apache.flume.source.avro.AvroSourceProtocol; 025 import org.apache.flume.source.avro.Status; 026 import org.apache.logging.log4j.core.appender.AppenderRuntimeException; 027 import org.apache.logging.log4j.core.appender.ManagerFactory; 028 029 import java.io.IOException; 030 import java.net.InetSocketAddress; 031 import java.nio.ByteBuffer; 032 import java.util.ArrayList; 033 import java.util.HashMap; 034 import java.util.List; 035 import java.util.Map; 036 037 /** 038 * Manager for FlumeAvroAppenders. 039 */ 040 public class FlumeAvroManager extends AbstractFlumeManager { 041 042 /** 043 The default reconnection delay (500 milliseconds or .5 seconds). 044 */ 045 public static final int DEFAULT_RECONNECTION_DELAY = 500; 046 047 private static final int DEFAULT_RECONNECTS = 3; 048 049 private static ManagerFactory factory = new AvroManagerFactory(); 050 051 private AvroSourceProtocol client; 052 053 private final Agent[] agents; 054 055 private final int batchSize; 056 057 private final EventList events = new EventList(); 058 059 private int current = 0; 060 061 private Transceiver transceiver; 062 063 /** 064 * Constructor 065 * @param name The unique name of this manager. 066 * @param agents An array of Agents. 067 * @param batchSize The number of evetns to include in a batch. 068 */ 069 protected FlumeAvroManager(String name, String shortName, Agent[] agents, int batchSize) { 070 super(name); 071 this.agents = agents; 072 this.batchSize = batchSize; 073 this.client = connect(agents); 074 } 075 076 /** 077 * Returns a FlumeAvroManager. 078 * @param agents The agents to use. 079 * @param batchSize The number of events to include in a batch. 080 * @return A FlumeAvroManager. 081 */ 082 public static FlumeAvroManager getManager(String name, Agent[] agents, int batchSize) { 083 if (agents == null || agents.length == 0) { 084 throw new IllegalArgumentException("At least one agent is required"); 085 } 086 087 if (batchSize <= 0) { 088 batchSize = 1; 089 } 090 091 StringBuilder sb = new StringBuilder("FlumeAvro["); 092 boolean first = true; 093 for (Agent agent : agents) { 094 if (!first) { 095 sb.append(","); 096 } 097 sb.append(agent.getHost()).append(":").append(agent.getPort()); 098 first = false; 099 } 100 sb.append("]"); 101 return (FlumeAvroManager) getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize)); 102 } 103 104 /** 105 * Returns the agents. 106 * @return The agent array. 107 */ 108 public Agent[] getAgents() { 109 return agents; 110 } 111 112 /** 113 * Returns the index of the current agent. 114 * @return The index for the current agent. 115 */ 116 public int getCurrent() { 117 return current; 118 } 119 120 @Override 121 public synchronized void send(FlumeEvent event, int delay, int retries) { 122 if (delay == 0) { 123 delay = DEFAULT_RECONNECTION_DELAY; 124 } 125 if (retries == 0) { 126 retries = DEFAULT_RECONNECTS; 127 } 128 AvroFlumeEvent avroEvent = new AvroFlumeEvent(); 129 avroEvent.body = ByteBuffer.wrap(event.getBody()); 130 avroEvent.headers = new HashMap<CharSequence, CharSequence>(); 131 132 for (Map.Entry<String, String> entry : event.getHeaders().entrySet()) { 133 avroEvent.headers.put(entry.getKey(), entry.getValue()); 134 } 135 136 List<AvroFlumeEvent> batch = batchSize > 1 ? events.addAndGet(avroEvent, batchSize) : null; 137 if (batch == null && batchSize > 1) { 138 return; 139 } 140 141 int i = 0; 142 143 String msg = "Error writing to " + getName(); 144 145 do { 146 try { 147 Status status = (batch == null) ? client.append(avroEvent) : client.appendBatch(batch); 148 if (!status.equals(Status.OK)) { 149 throw new AvroRemoteException("RPC communication failed to " + agents[current].getHost() + 150 ":" + agents[current].getPort()); 151 } 152 return; 153 } catch (Exception ex) { 154 if (i == retries - 1) { 155 msg = "Error writing to " + getName() + " at " + agents[current].getHost() + ":" + 156 agents[current].getPort(); 157 LOGGER.warn(msg, ex); 158 break; 159 } 160 sleep(delay); 161 } 162 } while (++i < retries); 163 164 for (int index = 0; index < agents.length; ++index) { 165 if (index == current) { 166 continue; 167 } 168 Agent agent = agents[index]; 169 i = 0; 170 do { 171 try { 172 transceiver = null; 173 AvroSourceProtocol c = connect(agent.getHost(), agent.getPort()); 174 Status status = (batch == null) ? c.append(avroEvent) : c.appendBatch(batch); 175 if (!status.equals(Status.OK)) { 176 if (i == retries - 1) { 177 String warnMsg = "RPC communication failed to " + getName() + " at " + 178 agent.getHost() + ":" + agent.getPort(); 179 LOGGER.warn(warnMsg); 180 } 181 continue; 182 } 183 client = c; 184 current = i; 185 return; 186 } catch (Exception ex) { 187 if (i == retries - 1) { 188 String warnMsg = "Error writing to " + getName() + " at " + agent.getHost() + ":" + 189 agent.getPort(); 190 LOGGER.warn(warnMsg, ex); 191 break; 192 } 193 sleep(delay); 194 } 195 } while (++i < retries); 196 } 197 198 throw new AppenderRuntimeException(msg); 199 200 } 201 202 private void sleep(int delay) { 203 try { 204 Thread.sleep(delay); 205 } catch (InterruptedException ex) { 206 Thread.currentThread().interrupt(); 207 } 208 } 209 210 /** 211 * There is a very good chance that this will always return the first agent even if it isn't available. 212 * @param agents The list of agents to choose from 213 * @return The FlumeEventAvroServer. 214 */ 215 private AvroSourceProtocol connect(Agent[] agents) { 216 int i = 0; 217 for (Agent agent : agents) { 218 AvroSourceProtocol server = connect(agent.getHost(), agent.getPort()); 219 if (server != null) { 220 current = i; 221 return server; 222 } 223 ++i; 224 } 225 throw new AppenderRuntimeException("Unable to connect to any agents"); 226 } 227 228 private AvroSourceProtocol connect(String hostname, int port) { 229 try { 230 if (transceiver == null) { 231 transceiver = new NettyTransceiver(new InetSocketAddress(hostname, port)); 232 } 233 } catch (IOException ioe) { 234 LOGGER.error("Unable to create transceiver", ioe); 235 return null; 236 } 237 try { 238 return SpecificRequestor.getClient(AvroSourceProtocol.class, transceiver); 239 } catch (IOException ioe) { 240 LOGGER.error("Unable to create Avro client"); 241 return null; 242 } 243 } 244 245 @Override 246 protected void releaseSub() { 247 if (transceiver != null) { 248 try { 249 transceiver.close(); 250 } catch (IOException ioe) { 251 LOGGER.error("Attempt to clean up Avro transceiver failed", ioe); 252 } 253 } 254 client = null; 255 } 256 257 /** 258 * Thread-safe List management of a batch. 259 */ 260 private static class EventList extends ArrayList<AvroFlumeEvent> { 261 262 public synchronized List<AvroFlumeEvent> addAndGet(AvroFlumeEvent event, int batchSize) { 263 super.add(event); 264 if (this.size() >= batchSize) { 265 List<AvroFlumeEvent> events = new ArrayList<AvroFlumeEvent>(); 266 events.addAll(this); 267 clear(); 268 return events; 269 } else { 270 return null; 271 } 272 } 273 } 274 275 /** 276 * Factory data. 277 */ 278 private static class FactoryData { 279 private String name; 280 private Agent[] agents; 281 private int batchSize; 282 283 /** 284 * Constructor. 285 * @param name The name of the Appender. 286 * @param agents The agents. 287 * @param batchSize The number of events to include in a batch. 288 */ 289 public FactoryData(String name, Agent[] agents, int batchSize) { 290 this.name = name; 291 this.agents = agents; 292 this.batchSize = batchSize; 293 } 294 } 295 296 /** 297 * Avro Manager Factory. 298 */ 299 private static class AvroManagerFactory implements ManagerFactory<FlumeAvroManager, FactoryData> { 300 301 /** 302 * Create the FlumeAvroManager. 303 * @param name The name of the entity to manage. 304 * @param data The data required to create the entity. 305 * @return The FlumeAvroManager. 306 */ 307 public FlumeAvroManager createManager(String name, FactoryData data) { 308 try { 309 310 return new FlumeAvroManager(name, data.name, data.agents, data.batchSize); 311 } catch (Exception ex) { 312 LOGGER.error("Could not create FlumeAvroManager", ex); 313 } 314 return null; 315 } 316 } 317 318 }