001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 package org.apache.logging.log4j.flume.appender; 018 019 import com.sleepycat.je.Cursor; 020 import com.sleepycat.je.Database; 021 import com.sleepycat.je.DatabaseConfig; 022 import com.sleepycat.je.DatabaseEntry; 023 import com.sleepycat.je.Environment; 024 import com.sleepycat.je.EnvironmentConfig; 025 import com.sleepycat.je.LockMode; 026 import com.sleepycat.je.OperationStatus; 027 import com.sleepycat.je.StatsConfig; 028 import org.apache.flume.Event; 029 import org.apache.flume.event.SimpleEvent; 030 import org.apache.logging.log4j.LoggingException; 031 import org.apache.logging.log4j.core.appender.ManagerFactory; 032 import org.apache.logging.log4j.core.config.Property; 033 import org.apache.logging.log4j.core.config.plugins.PluginManager; 034 import org.apache.logging.log4j.core.config.plugins.PluginType; 035 import org.apache.logging.log4j.core.helpers.FileUtils; 036 import org.apache.logging.log4j.core.helpers.SecretKeyProvider; 037 038 import javax.crypto.Cipher; 039 import javax.crypto.SecretKey; 040 import java.io.ByteArrayInputStream; 041 import java.io.ByteArrayOutputStream; 042 import java.io.DataInputStream; 043 import java.io.DataOutputStream; 044 import java.io.File; 045 import java.nio.charset.Charset; 046 import java.util.HashMap; 047 import java.util.Map; 048 import java.util.concurrent.LinkedBlockingQueue; 049 import java.util.concurrent.TimeUnit; 050 051 /** 052 * 053 */ 054 public class FlumePersistentManager extends FlumeAvroManager { 055 056 public static final String KEY_PROVIDER = "keyProvider"; 057 058 private static final Charset UTF8 = Charset.forName("UTF-8"); 059 060 private static final String SHUTDOWN = "Shutdown"; 061 062 private static final String DEFAULT_DATA_DIR = ".log4j/flumeData"; 063 064 private static BDBManagerFactory factory = new BDBManagerFactory(); 065 066 private Database database; 067 068 private final WriterThread worker; 069 070 private final LinkedBlockingQueue<byte []> queue = new LinkedBlockingQueue<byte[]>(); 071 072 private final SecretKey secretKey; 073 074 private final int delay; 075 076 /** 077 * Constructor 078 * @param name The unique name of this manager. 079 * @param agents An array of Agents. 080 * @param batchSize The number of events to include in a batch. 081 * @param database The database to write to. 082 */ 083 protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents, 084 final int batchSize, final int retries, final int connectionTimeout, 085 final int requestTimeout, final int delay, final Database database, 086 SecretKey secretKey) { 087 super(name, shortName, agents, batchSize, retries, connectionTimeout, requestTimeout); 088 this.delay = delay; 089 this.database = database; 090 this.worker = new WriterThread(database, this, queue, batchSize, secretKey); 091 this.worker.start(); 092 this.secretKey = secretKey; 093 } 094 095 096 /** 097 * Returns a FlumeAvroManager. 098 * @param name The name of the manager. 099 * @param agents The agents to use. 100 * @param batchSize The number of events to include in a batch. 101 * @return A FlumeAvroManager. 102 */ 103 public static FlumePersistentManager getManager(final String name, final Agent[] agents, Property[] properties, 104 int batchSize, final int retries, final int connectionTimeout, 105 final int requestTimeout, final int delay, final String dataDir) { 106 if (agents == null || agents.length == 0) { 107 throw new IllegalArgumentException("At least one agent is required"); 108 } 109 110 if (batchSize <= 0) { 111 batchSize = 1; 112 } 113 String dataDirectory = dataDir == null || dataDir.length() == 0 ? DEFAULT_DATA_DIR : dataDir; 114 115 final StringBuilder sb = new StringBuilder("FlumePersistent["); 116 boolean first = true; 117 for (final Agent agent : agents) { 118 if (!first) { 119 sb.append(","); 120 } 121 sb.append(agent.getHost()).append(":").append(agent.getPort()); 122 first = false; 123 } 124 sb.append("]"); 125 sb.append(" ").append(dataDirectory); 126 return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries, 127 connectionTimeout, requestTimeout, delay, dataDir, properties)); 128 } 129 130 @Override 131 public synchronized void send(final Event event) { 132 if (worker.isShutdown()) { 133 throw new LoggingException("Unable to record event"); 134 } 135 136 Map<String, String> headers = event.getHeaders(); 137 byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8); 138 try { 139 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 140 DataOutputStream daos = new DataOutputStream(baos); 141 daos.writeInt(event.getBody().length); 142 daos.write(event.getBody(), 0, event.getBody().length); 143 daos.writeInt(event.getHeaders().size()); 144 for (Map.Entry<String, String> entry : headers.entrySet()) { 145 daos.writeUTF(entry.getKey()); 146 daos.writeUTF(entry.getValue()); 147 } 148 byte[] eventData = baos.toByteArray(); 149 if (secretKey != null) { 150 Cipher cipher = Cipher.getInstance("AES"); 151 cipher.init(Cipher.ENCRYPT_MODE, secretKey); 152 eventData = cipher.doFinal(eventData); 153 } 154 final DatabaseEntry key = new DatabaseEntry(keyData); 155 final DatabaseEntry data = new DatabaseEntry(eventData); 156 database.put(null, key, data); 157 queue.add(keyData); 158 } catch (Exception ex) { 159 throw new LoggingException("Exception occurred writing log event", ex); 160 } 161 } 162 163 @Override 164 protected void releaseSub() { 165 LOGGER.debug("Shutting down FlumePersistentManager"); 166 worker.shutdown(); 167 try { 168 worker.join(); 169 } catch (InterruptedException ex) { 170 LOGGER.debug("Interrupted while waiting for worker to complete"); 171 } 172 try { 173 LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig())); 174 database.close(); 175 } catch (final Exception ex) { 176 LOGGER.warn("Failed to close database", ex); 177 } 178 super.releaseSub(); 179 } 180 181 private void doSend(final SimpleEvent event) { 182 LOGGER.debug("Sending event to Flume"); 183 super.send(event); 184 } 185 186 /** 187 * Factory data. 188 */ 189 private static class FactoryData { 190 private final String name; 191 private final Agent[] agents; 192 private final int batchSize; 193 private final String dataDir; 194 private final int retries; 195 private final int connectionTimeout; 196 private final int requestTimeout; 197 private final int delay; 198 private final Property[] properties; 199 200 /** 201 * Constructor. 202 * @param name The name of the Appender. 203 * @param agents The agents. 204 * @param batchSize The number of events to include in a batch. 205 * @param dataDir The directory for data. 206 */ 207 public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries, 208 final int connectionTimeout, final int requestTimeout, final int delay, 209 final String dataDir, final Property[] properties) { 210 this.name = name; 211 this.agents = agents; 212 this.batchSize = batchSize; 213 this.dataDir = dataDir; 214 this.retries = retries; 215 this.connectionTimeout = connectionTimeout; 216 this.requestTimeout = requestTimeout; 217 this.delay = delay; 218 this.properties = properties; 219 } 220 } 221 222 /** 223 * Avro Manager Factory. 224 */ 225 private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> { 226 227 /** 228 * Create the FlumeKratiManager. 229 * @param name The name of the entity to manage. 230 * @param data The data required to create the entity. 231 * @return The FlumeKratiManager. 232 */ 233 public FlumePersistentManager createManager(final String name, final FactoryData data) { 234 SecretKey secretKey = null; 235 236 Database database; 237 238 Map<String, String> properties = new HashMap<String, String>(); 239 if (data.properties != null) { 240 for (Property property : data.properties) { 241 properties.put(property.getName(), property.getValue()); 242 } 243 } 244 245 try { 246 247 File dir = new File(data.dataDir); 248 FileUtils.mkdir(dir, true); 249 final EnvironmentConfig dbEnvConfig = new EnvironmentConfig(); 250 dbEnvConfig.setTransactional(false); 251 dbEnvConfig.setAllowCreate(true); 252 final Environment environment = new Environment(dir, dbEnvConfig); 253 final DatabaseConfig dbConfig = new DatabaseConfig(); 254 dbConfig.setTransactional(false); 255 dbConfig.setAllowCreate(true); 256 database = environment.openDatabase(null, name, dbConfig); 257 } catch (final Exception ex) { 258 LOGGER.error("Could not create FlumePersistentManager", ex); 259 return null; 260 } 261 262 try { 263 String key = null; 264 for (Map.Entry<String, String> entry : properties.entrySet()) { 265 if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) { 266 key = entry.getValue(); 267 } 268 } 269 if (key != null) { 270 final PluginManager manager = new PluginManager("KeyProvider", SecretKeyProvider.class); 271 manager.collectPlugins(); 272 final Map<String, PluginType> plugins = manager.getPlugins(); 273 if (plugins != null) { 274 boolean found = false; 275 for (Map.Entry<String, PluginType> entry : plugins.entrySet()) { 276 if (entry.getKey().equalsIgnoreCase(key)) { 277 found = true; 278 Class cl = entry.getValue().getPluginClass(); 279 try { 280 SecretKeyProvider provider = (SecretKeyProvider) cl.newInstance(); 281 secretKey = provider.getSecretKey(); 282 LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName()); 283 } catch (Exception ex) { 284 LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled", 285 cl.getName()); 286 } 287 break; 288 } 289 } 290 if (!found) { 291 LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key); 292 } 293 } else { 294 LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key); 295 } 296 } 297 } catch (Exception ex) { 298 LOGGER.warn("Error setting up encryption - encryption will be disabled", ex); 299 } 300 return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries, 301 data.connectionTimeout, data.requestTimeout, data.delay, database, secretKey); 302 } 303 } 304 305 private static class WriterThread extends Thread { 306 private volatile boolean shutdown = false; 307 private final Database database; 308 private final FlumePersistentManager manager; 309 private final LinkedBlockingQueue<byte[]> queue; 310 private final SecretKey secretKey; 311 private final int batchSize; 312 313 public WriterThread(Database database, FlumePersistentManager manager, LinkedBlockingQueue<byte[]> queue, 314 int batchsize, SecretKey secretKey) { 315 this.database = database; 316 this.manager = manager; 317 this.queue = queue; 318 this.batchSize = batchsize; 319 this.secretKey = secretKey; 320 this.setDaemon(true); 321 } 322 323 public void shutdown() { 324 LOGGER.debug("Writer thread shutting down"); 325 this.shutdown = true; 326 if (queue.size() == 0) { 327 queue.add(SHUTDOWN.getBytes(UTF8)); 328 } 329 } 330 331 public boolean isShutdown() { 332 return shutdown; 333 } 334 335 @Override 336 public void run() { 337 LOGGER.trace("WriterThread started"); 338 long lastBatch = System.currentTimeMillis(); 339 while (!shutdown) { 340 if (database.count() >= batchSize || 341 database.count() > 0 && lastBatch + manager.delay > System.currentTimeMillis()) { 342 lastBatch = System.currentTimeMillis(); 343 try { 344 boolean errors = false; 345 DatabaseEntry key = new DatabaseEntry(); 346 final DatabaseEntry data = new DatabaseEntry(); 347 final Cursor cursor = database.openCursor(null, null); 348 try { 349 queue.clear(); 350 OperationStatus status; 351 try { 352 status = cursor.getFirst(key, data, LockMode.RMW); 353 if (batchSize > 1) { 354 BatchEvent batch = new BatchEvent(); 355 for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) { 356 SimpleEvent event = createEvent(data); 357 if (event != null) { 358 batch.addEvent(event); 359 } 360 status = cursor.getNext(key, data, LockMode.RMW); 361 } 362 try { 363 manager.send(batch); 364 } catch (Exception ioe) { 365 LOGGER.error("Error sending events", ioe); 366 break; 367 } 368 for (Event event : batch.getEvents()) { 369 try { 370 Map<String, String> headers = event.getHeaders(); 371 key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8)); 372 database.delete(null, key); 373 } catch (Exception ex) { 374 LOGGER.error("Error deleting key from database", ex); 375 } 376 } 377 } else { 378 while (status == OperationStatus.SUCCESS) { 379 SimpleEvent event = createEvent(data); 380 if (event != null) { 381 try { 382 manager.doSend(event); 383 } catch (Exception ioe) { 384 errors = true; 385 LOGGER.error("Error sending event", ioe); 386 break; 387 } 388 if (!errors) { 389 try { 390 cursor.delete(); 391 } catch (Exception ex) { 392 LOGGER.error("Unable to delete event", ex); 393 } 394 } 395 } 396 status = cursor.getNext(key, data, LockMode.RMW); 397 } 398 } 399 } catch (Exception ex) { 400 LOGGER.error("Error reading database", ex); 401 shutdown = true; 402 break; 403 } 404 405 } finally { 406 cursor.close(); 407 } 408 if (errors) { 409 Thread.sleep(manager.delay); 410 continue; 411 } 412 } catch (Exception ex) { 413 LOGGER.warn("WriterThread encountered an exception. Continuing.", ex); 414 } 415 } else { 416 try { 417 if (database.count() >= batchSize) { 418 continue; 419 } 420 queue.poll(manager.delay, TimeUnit.MILLISECONDS); 421 LOGGER.debug("WriterThread notified of work"); 422 } catch (InterruptedException ie) { 423 LOGGER.warn("WriterThread interrupted, continuing"); 424 } catch (Exception ex) { 425 LOGGER.error("WriterThread encountered an exception waiting for work", ex); 426 break; 427 } 428 } 429 } 430 LOGGER.trace("WriterThread exiting"); 431 } 432 433 private SimpleEvent createEvent(DatabaseEntry data) { 434 SimpleEvent event = new SimpleEvent(); 435 try { 436 byte[] eventData = data.getData(); 437 if (secretKey != null) { 438 Cipher cipher = Cipher.getInstance("AES"); 439 cipher.init(Cipher.DECRYPT_MODE, secretKey); 440 eventData = cipher.doFinal(eventData); 441 } 442 ByteArrayInputStream bais = new ByteArrayInputStream(eventData); 443 DataInputStream dais = new DataInputStream(bais); 444 int length = dais.readInt(); 445 byte[] bytes = new byte[length]; 446 dais.read(bytes, 0, length); 447 event.setBody(bytes); 448 length = dais.readInt(); 449 Map<String, String> map = new HashMap<String, String>(length); 450 for (int i = 0; i < length; ++i) { 451 String headerKey = dais.readUTF(); 452 String value = dais.readUTF(); 453 map.put(headerKey, value); 454 } 455 event.setHeaders(map); 456 return event; 457 } catch (Exception ex) { 458 LOGGER.error("Error retrieving event", ex); 459 return null; 460 } 461 } 462 463 } 464 }