001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 package org.apache.logging.log4j.flume.appender; 018 019 import com.sleepycat.je.Cursor; 020 import com.sleepycat.je.CursorConfig; 021 import com.sleepycat.je.Database; 022 import com.sleepycat.je.DatabaseConfig; 023 import com.sleepycat.je.DatabaseEntry; 024 import com.sleepycat.je.Environment; 025 import com.sleepycat.je.EnvironmentConfig; 026 import com.sleepycat.je.LockMode; 027 import com.sleepycat.je.OperationStatus; 028 import com.sleepycat.je.StatsConfig; 029 import com.sleepycat.je.Transaction; 030 import org.apache.flume.Event; 031 import org.apache.flume.event.SimpleEvent; 032 import org.apache.logging.log4j.LoggingException; 033 import org.apache.logging.log4j.core.appender.ManagerFactory; 034 import org.apache.logging.log4j.core.config.Property; 035 import org.apache.logging.log4j.core.config.plugins.PluginManager; 036 import org.apache.logging.log4j.core.config.plugins.PluginType; 037 import org.apache.logging.log4j.core.helpers.FileUtils; 038 import org.apache.logging.log4j.core.helpers.SecretKeyProvider; 039 040 import javax.crypto.Cipher; 041 import javax.crypto.SecretKey; 042 import java.io.ByteArrayInputStream; 043 import java.io.ByteArrayOutputStream; 044 import java.io.DataInputStream; 045 import java.io.DataOutputStream; 046 import java.io.File; 047 import java.nio.charset.Charset; 048 import java.util.HashMap; 049 import java.util.Map; 050 import java.util.concurrent.LinkedBlockingQueue; 051 import java.util.concurrent.TimeUnit; 052 import java.util.concurrent.locks.ReadWriteLock; 053 import java.util.concurrent.locks.ReentrantReadWriteLock; 054 import java.util.logging.Level; 055 056 /** 057 * 058 */ 059 public class FlumePersistentManager extends FlumeAvroManager { 060 061 public static final String KEY_PROVIDER = "keyProvider"; 062 063 private static final Charset UTF8 = Charset.forName("UTF-8"); 064 065 private static final String SHUTDOWN = "Shutdown"; 066 067 private static final String DEFAULT_DATA_DIR = ".log4j/flumeData"; 068 069 private static BDBManagerFactory factory = new BDBManagerFactory(); 070 071 private Database database; 072 073 private Environment environment; 074 075 private final WriterThread worker; 076 077 private final LinkedBlockingQueue<byte []> queue = new LinkedBlockingQueue<byte[]>(); 078 079 private final SecretKey secretKey; 080 081 private final int delay; 082 083 /** 084 * Constructor 085 * @param name The unique name of this manager. 086 * @param agents An array of Agents. 087 * @param batchSize The number of events to include in a batch. 088 * @param database The database to write to. 089 */ 090 protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents, 091 final int batchSize, final int retries, final int connectionTimeout, 092 final int requestTimeout, final int delay, final Database database, 093 final Environment environment, SecretKey secretKey) { 094 super(name, shortName, agents, batchSize, retries, connectionTimeout, requestTimeout); 095 this.delay = delay; 096 this.database = database; 097 this.environment = environment; 098 this.worker = new WriterThread(database, environment, this, queue, batchSize, secretKey); 099 this.worker.start(); 100 this.secretKey = secretKey; 101 } 102 103 104 /** 105 * Returns a FlumeAvroManager. 106 * @param name The name of the manager. 107 * @param agents The agents to use. 108 * @param batchSize The number of events to include in a batch. 109 * @return A FlumeAvroManager. 110 */ 111 public static FlumePersistentManager getManager(final String name, final Agent[] agents, Property[] properties, 112 int batchSize, final int retries, final int connectionTimeout, 113 final int requestTimeout, final int delay, final String dataDir) { 114 if (agents == null || agents.length == 0) { 115 throw new IllegalArgumentException("At least one agent is required"); 116 } 117 118 if (batchSize <= 0) { 119 batchSize = 1; 120 } 121 String dataDirectory = dataDir == null || dataDir.length() == 0 ? DEFAULT_DATA_DIR : dataDir; 122 123 final StringBuilder sb = new StringBuilder("FlumePersistent["); 124 boolean first = true; 125 for (final Agent agent : agents) { 126 if (!first) { 127 sb.append(","); 128 } 129 sb.append(agent.getHost()).append(":").append(agent.getPort()); 130 first = false; 131 } 132 sb.append("]"); 133 sb.append(" ").append(dataDirectory); 134 return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries, 135 connectionTimeout, requestTimeout, delay, dataDir, properties)); 136 } 137 138 @Override 139 public void send(final Event event) { 140 if (worker.isShutdown()) { 141 throw new LoggingException("Unable to record event"); 142 } 143 144 Map<String, String> headers = event.getHeaders(); 145 byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8); 146 try { 147 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 148 DataOutputStream daos = new DataOutputStream(baos); 149 daos.writeInt(event.getBody().length); 150 daos.write(event.getBody(), 0, event.getBody().length); 151 daos.writeInt(event.getHeaders().size()); 152 for (Map.Entry<String, String> entry : headers.entrySet()) { 153 daos.writeUTF(entry.getKey()); 154 daos.writeUTF(entry.getValue()); 155 } 156 byte[] eventData = baos.toByteArray(); 157 if (secretKey != null) { 158 Cipher cipher = Cipher.getInstance("AES"); 159 cipher.init(Cipher.ENCRYPT_MODE, secretKey); 160 eventData = cipher.doFinal(eventData); 161 } 162 final DatabaseEntry key = new DatabaseEntry(keyData); 163 final DatabaseEntry data = new DatabaseEntry(eventData); 164 Transaction txn = environment.beginTransaction(null, null); 165 try { 166 database.put(txn, key, data); 167 txn.commit(); 168 queue.add(keyData); 169 } catch (Exception ex) { 170 if (txn != null) { 171 txn.abort(); 172 } 173 throw ex; 174 } 175 } catch (Exception ex) { 176 throw new LoggingException("Exception occurred writing log event", ex); 177 } 178 } 179 180 @Override 181 protected void releaseSub() { 182 LOGGER.debug("Shutting down FlumePersistentManager"); 183 worker.shutdown(); 184 try { 185 worker.join(); 186 } catch (InterruptedException ex) { 187 LOGGER.debug("Interrupted while waiting for worker to complete"); 188 } 189 try { 190 LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig())); 191 database.close(); 192 } catch (final Exception ex) { 193 LOGGER.warn("Failed to close database", ex); 194 } 195 try { 196 environment.cleanLog(); 197 environment.close(); 198 } catch (final Exception ex) { 199 LOGGER.warn("Failed to close environment", ex); 200 } 201 super.releaseSub(); 202 } 203 204 private void doSend(final SimpleEvent event) { 205 LOGGER.debug("Sending event to Flume"); 206 super.send(event); 207 } 208 209 /** 210 * Factory data. 211 */ 212 private static class FactoryData { 213 private final String name; 214 private final Agent[] agents; 215 private final int batchSize; 216 private final String dataDir; 217 private final int retries; 218 private final int connectionTimeout; 219 private final int requestTimeout; 220 private final int delay; 221 private final Property[] properties; 222 223 /** 224 * Constructor. 225 * @param name The name of the Appender. 226 * @param agents The agents. 227 * @param batchSize The number of events to include in a batch. 228 * @param dataDir The directory for data. 229 */ 230 public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries, 231 final int connectionTimeout, final int requestTimeout, final int delay, 232 final String dataDir, final Property[] properties) { 233 this.name = name; 234 this.agents = agents; 235 this.batchSize = batchSize; 236 this.dataDir = dataDir; 237 this.retries = retries; 238 this.connectionTimeout = connectionTimeout; 239 this.requestTimeout = requestTimeout; 240 this.delay = delay; 241 this.properties = properties; 242 } 243 } 244 245 /** 246 * Avro Manager Factory. 247 */ 248 private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> { 249 250 /** 251 * Create the FlumeKratiManager. 252 * @param name The name of the entity to manage. 253 * @param data The data required to create the entity. 254 * @return The FlumeKratiManager. 255 */ 256 @Override 257 public FlumePersistentManager createManager(final String name, final FactoryData data) { 258 SecretKey secretKey = null; 259 260 Database database; 261 Environment environment; 262 263 Map<String, String> properties = new HashMap<String, String>(); 264 if (data.properties != null) { 265 for (Property property : data.properties) { 266 properties.put(property.getName(), property.getValue()); 267 } 268 } 269 270 try { 271 272 File dir = new File(data.dataDir); 273 FileUtils.mkdir(dir, true); 274 final EnvironmentConfig dbEnvConfig = new EnvironmentConfig(); 275 dbEnvConfig.setTransactional(true); 276 dbEnvConfig.setAllowCreate(true); 277 dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS); 278 environment = new Environment(dir, dbEnvConfig); 279 final DatabaseConfig dbConfig = new DatabaseConfig(); 280 dbConfig.setTransactional(true); 281 dbConfig.setAllowCreate(true); 282 database = environment.openDatabase(null, name, dbConfig); 283 } catch (final Exception ex) { 284 LOGGER.error("Could not create FlumePersistentManager", ex); 285 return null; 286 } 287 288 try { 289 String key = null; 290 for (Map.Entry<String, String> entry : properties.entrySet()) { 291 if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) { 292 key = entry.getValue(); 293 } 294 } 295 if (key != null) { 296 final PluginManager manager = new PluginManager("KeyProvider", SecretKeyProvider.class); 297 manager.collectPlugins(); 298 final Map<String, PluginType> plugins = manager.getPlugins(); 299 if (plugins != null) { 300 boolean found = false; 301 for (Map.Entry<String, PluginType> entry : plugins.entrySet()) { 302 if (entry.getKey().equalsIgnoreCase(key)) { 303 found = true; 304 Class cl = entry.getValue().getPluginClass(); 305 try { 306 SecretKeyProvider provider = (SecretKeyProvider) cl.newInstance(); 307 secretKey = provider.getSecretKey(); 308 LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName()); 309 } catch (Exception ex) { 310 LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled", 311 cl.getName()); 312 } 313 break; 314 } 315 } 316 if (!found) { 317 LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key); 318 } 319 } else { 320 LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key); 321 } 322 } 323 } catch (Exception ex) { 324 LOGGER.warn("Error setting up encryption - encryption will be disabled", ex); 325 } 326 return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries, 327 data.connectionTimeout, data.requestTimeout, data.delay, database, environment, secretKey); 328 } 329 } 330 331 private static class WriterThread extends Thread { 332 private volatile boolean shutdown = false; 333 private final Database database; 334 private final Environment environment; 335 private final FlumePersistentManager manager; 336 private final LinkedBlockingQueue<byte[]> queue; 337 private final SecretKey secretKey; 338 private final int batchSize; 339 340 public WriterThread(Database database, Environment environment, FlumePersistentManager manager, 341 LinkedBlockingQueue<byte[]> queue, int batchsize, SecretKey secretKey) { 342 this.database = database; 343 this.environment = environment; 344 this.manager = manager; 345 this.queue = queue; 346 this.batchSize = batchsize; 347 this.secretKey = secretKey; 348 this.setDaemon(true); 349 } 350 351 public void shutdown() { 352 LOGGER.debug("Writer thread shutting down"); 353 this.shutdown = true; 354 if (queue.size() == 0) { 355 queue.add(SHUTDOWN.getBytes(UTF8)); 356 } 357 } 358 359 public boolean isShutdown() { 360 return shutdown; 361 } 362 363 @Override 364 public void run() { 365 LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delay = " + manager.delay); 366 long nextBatch = System.currentTimeMillis() + manager.delay; 367 while (!shutdown) { 368 long now = System.currentTimeMillis(); 369 if (database.count() >= batchSize || (database.count() > 0 && nextBatch < now)) { 370 nextBatch = now + manager.delay; 371 try { 372 boolean errors = false; 373 DatabaseEntry key = new DatabaseEntry(); 374 final DatabaseEntry data = new DatabaseEntry(); 375 376 queue.clear(); 377 OperationStatus status; 378 if (batchSize > 1) { 379 Cursor cursor = database.openCursor(null, CursorConfig.DEFAULT); 380 try { 381 status = cursor.getFirst(key, data, null); 382 383 BatchEvent batch = new BatchEvent(); 384 for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) { 385 SimpleEvent event = createEvent(data); 386 if (event != null) { 387 batch.addEvent(event); 388 } 389 status = cursor.getNext(key, data, null); 390 } 391 try { 392 manager.send(batch); 393 } catch (Exception ioe) { 394 LOGGER.error("Error sending events", ioe); 395 break; 396 } 397 cursor.close(); 398 cursor = null; 399 Transaction txn = environment.beginTransaction(null, null); 400 try { 401 for (Event event : batch.getEvents()) { 402 try { 403 Map<String, String> headers = event.getHeaders(); 404 key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8)); 405 database.delete(txn, key); 406 } catch (Exception ex) { 407 LOGGER.error("Error deleting key from database", ex); 408 } 409 } 410 txn.commit(); 411 } catch (Exception ex) { 412 LOGGER.error("Unable to commit transaction", ex); 413 if (txn != null) { 414 txn.abort(); 415 } 416 } 417 } catch (Exception ex) { 418 LOGGER.error("Error reading database", ex); 419 shutdown = true; 420 break; 421 } finally { 422 if (cursor != null) { 423 cursor.close(); 424 } 425 } 426 } else { 427 Transaction txn = environment.beginTransaction(null, null); 428 Cursor cursor = database.openCursor(txn, null); 429 try { 430 status = cursor.getFirst(key, data, LockMode.RMW); 431 while (status == OperationStatus.SUCCESS) { 432 SimpleEvent event = createEvent(data); 433 if (event != null) { 434 try { 435 manager.doSend(event); 436 } catch (Exception ioe) { 437 errors = true; 438 LOGGER.error("Error sending event", ioe); 439 break; 440 } 441 if (!errors) { 442 try { 443 cursor.delete(); 444 } catch (Exception ex) { 445 LOGGER.error("Unable to delete event", ex); 446 } 447 } 448 } 449 status = cursor.getNext(key, data, LockMode.RMW); 450 } 451 if (cursor != null) { 452 cursor.close(); 453 cursor = null; 454 } 455 txn.commit(); 456 txn = null; 457 } catch (Exception ex) { 458 LOGGER.error("Error reading or writing to database", ex); 459 shutdown = true; 460 break; 461 } finally { 462 if (cursor != null) { 463 cursor.close(); 464 } 465 if (txn != null) { 466 txn.abort(); 467 } 468 } 469 } 470 if (errors) { 471 Thread.sleep(manager.delay); 472 continue; 473 } 474 } catch (Exception ex) { 475 LOGGER.warn("WriterThread encountered an exception. Continuing.", ex); 476 } 477 } else { 478 while (!shutdown && (database.count() == 0 || database.count() < batchSize && nextBatch > now)) { 479 try { 480 long interval = nextBatch - now; 481 queue.poll(interval, TimeUnit.MILLISECONDS); 482 } catch (InterruptedException ie) { 483 LOGGER.warn("WriterThread interrupted, continuing"); 484 } catch (Exception ex) { 485 LOGGER.error("WriterThread encountered an exception waiting for work", ex); 486 break; 487 } 488 now = System.currentTimeMillis(); 489 if (database.count() == 0) { 490 nextBatch = now + manager.delay; 491 } 492 } 493 LOGGER.debug("WriterThread ready to work"); 494 } 495 } 496 LOGGER.trace("WriterThread exiting"); 497 } 498 499 private SimpleEvent createEvent(DatabaseEntry data) { 500 SimpleEvent event = new SimpleEvent(); 501 try { 502 byte[] eventData = data.getData(); 503 if (secretKey != null) { 504 Cipher cipher = Cipher.getInstance("AES"); 505 cipher.init(Cipher.DECRYPT_MODE, secretKey); 506 eventData = cipher.doFinal(eventData); 507 } 508 ByteArrayInputStream bais = new ByteArrayInputStream(eventData); 509 DataInputStream dais = new DataInputStream(bais); 510 int length = dais.readInt(); 511 byte[] bytes = new byte[length]; 512 dais.read(bytes, 0, length); 513 event.setBody(bytes); 514 length = dais.readInt(); 515 Map<String, String> map = new HashMap<String, String>(length); 516 for (int i = 0; i < length; ++i) { 517 String headerKey = dais.readUTF(); 518 String value = dais.readUTF(); 519 map.put(headerKey, value); 520 } 521 event.setHeaders(map); 522 return event; 523 } catch (Exception ex) { 524 LOGGER.error("Error retrieving event", ex); 525 return null; 526 } 527 } 528 529 } 530 }