kafka.log

LogCleaner

class LogCleaner extends Logging with KafkaMetricsGroup

The cleaner is responsible for removing obsolete records from logs which have the dedupe retention strategy. A message with key K and offset O is obsolete if there exists a message with key K and offset O' such that O < O'.

Each log can be thought of being split into two sections of segments: a "clean" section which has previously been cleaned followed by a "dirty" section that has not yet been cleaned. The active log segment is always excluded from cleaning.

The cleaning is carried out by a pool of background threads. Each thread chooses the dirtiest log that has the "dedupe" retention policy and cleans that. The dirtiness of the log is guessed by taking the ratio of bytes in the dirty section of the log to the total bytes in the log.

To clean a log the cleaner first builds a mapping of key=>last_offset for the dirty section of the log. See kafka.log.OffsetMap for details of the implementation of the mapping.

Once the key=>offset map is built, the log is cleaned by recopying each log segment but omitting any key that appears in the offset map with a higher offset than what is found in the segment (i.e. messages with a key that appears in the dirty section of the log).

To avoid segments shrinking to very small sizes with repeated cleanings we implement a rule by which if we will merge successive segments when doing a cleaning if their log and index size are less than the maximum log and index size prior to the clean beginning.

Cleaned segments are swapped into the log as they become available.

One nuance that the cleaner must handle is log truncation. If a log is truncated while it is being cleaned the cleaning of that log is aborted.

Messages with null payload are treated as deletes for the purpose of log compaction. This means that they receive special treatment by the cleaner. The cleaner will only retain delete records for a period of time to avoid accumulating space indefinitely. This period of time is configurable on a per-topic basis and is measured from the time the segment enters the clean portion of the log (at which point any prior message with that key has been removed). Delete markers in the clean section of the log that are older than this time will not be retained when log segments are being recopied as part of cleaning.

Linear Supertypes
Ordering
  1. Alphabetic
  2. By inheritance
Inherited
  1. LogCleaner
  2. KafkaMetricsGroup
  3. Logging
  4. AnyRef
  5. Any
  1. Hide All
  2. Show all
Learn more about member selection
Visibility
  1. Public
  2. All

Instance Constructors

  1. new LogCleaner(config: CleanerConfig, logDirs: Array[File], logs: Pool[TopicAndPartition, Log], time: Time = kafka.utils.SystemTime)

    config

    Configuration parameters for the cleaner

    logDirs

    The directories where offset checkpoints reside

    logs

    The pool of logs

    time

    A way to control the passage of time

Value Members

  1. final def !=(arg0: AnyRef): Boolean

    Definition Classes
    AnyRef
  2. final def !=(arg0: Any): Boolean

    Definition Classes
    Any
  3. final def ##(): Int

    Definition Classes
    AnyRef → Any
  4. final def ==(arg0: AnyRef): Boolean

    Definition Classes
    AnyRef
  5. final def ==(arg0: Any): Boolean

    Definition Classes
    Any
  6. def abortAndPauseCleaning(topicAndPartition: TopicAndPartition): Unit

    Abort the cleaning of a particular partition if it's in progress, and pause any future cleaning of this partition.

    Abort the cleaning of a particular partition if it's in progress, and pause any future cleaning of this partition. This call blocks until the cleaning of the partition is aborted and paused.

  7. def abortCleaning(topicAndPartition: TopicAndPartition): Unit

    Abort the cleaning of a particular partition, if it's in progress.

    Abort the cleaning of a particular partition, if it's in progress. This call blocks until the cleaning of the partition is aborted.

  8. final def asInstanceOf[T0]: T0

    Definition Classes
    Any
  9. def awaitCleaned(topic: String, part: Int, offset: Long, timeout: Long = 30000L): Unit

    TODO: For testing, a way to know when work has completed.

    TODO: For testing, a way to know when work has completed. This method blocks until the cleaner has processed up to the given offset on the specified topic/partition

  10. def clone(): AnyRef

    Attributes
    protected[java.lang]
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  11. val config: CleanerConfig

    Configuration parameters for the cleaner

  12. def debug(msg: ⇒ String, e: ⇒ Throwable): Unit

    Definition Classes
    Logging
  13. def debug(e: ⇒ Throwable): Any

    Definition Classes
    Logging
  14. def debug(msg: ⇒ String): Unit

    Definition Classes
    Logging
  15. final def eq(arg0: AnyRef): Boolean

    Definition Classes
    AnyRef
  16. def equals(arg0: Any): Boolean

    Definition Classes
    AnyRef → Any
  17. def error(msg: ⇒ String, e: ⇒ Throwable): Unit

    Definition Classes
    Logging
  18. def error(e: ⇒ Throwable): Any

    Definition Classes
    Logging
  19. def error(msg: ⇒ String): Unit

    Definition Classes
    Logging
  20. def fatal(msg: ⇒ String, e: ⇒ Throwable): Unit

    Definition Classes
    Logging
  21. def fatal(e: ⇒ Throwable): Any

    Definition Classes
    Logging
  22. def fatal(msg: ⇒ String): Unit

    Definition Classes
    Logging
  23. def finalize(): Unit

    Attributes
    protected[java.lang]
    Definition Classes
    AnyRef
    Annotations
    @throws( classOf[java.lang.Throwable] )
  24. final def getClass(): Class[_]

    Definition Classes
    AnyRef → Any
  25. def hashCode(): Int

    Definition Classes
    AnyRef → Any
  26. def info(msg: ⇒ String, e: ⇒ Throwable): Unit

    Definition Classes
    Logging
  27. def info(e: ⇒ Throwable): Any

    Definition Classes
    Logging
  28. def info(msg: ⇒ String): Unit

    Definition Classes
    Logging
  29. final def isInstanceOf[T0]: Boolean

    Definition Classes
    Any
  30. val logDirs: Array[File]

    The directories where offset checkpoints reside

  31. var logIdent: String

    Attributes
    protected
    Definition Classes
    Logging
  32. lazy val logger: Logger

    Definition Classes
    Logging
  33. val loggerName: String

    Definition Classes
    Logging
  34. val logs: Pool[TopicAndPartition, Log]

    The pool of logs

  35. final def ne(arg0: AnyRef): Boolean

    Definition Classes
    AnyRef
  36. def newGauge[T](name: String, metric: Gauge[T], tags: Map[String, String] = Map.empty): Gauge[T]

    Definition Classes
    KafkaMetricsGroup
  37. def newHistogram(name: String, biased: Boolean = true, tags: Map[String, String] = Map.empty): Histogram

    Definition Classes
    KafkaMetricsGroup
  38. def newMeter(name: String, eventType: String, timeUnit: TimeUnit, tags: Map[String, String] = Map.empty): Meter

    Definition Classes
    KafkaMetricsGroup
  39. def newTimer(name: String, durationUnit: TimeUnit, rateUnit: TimeUnit, tags: Map[String, String] = Map.empty): Timer

    Definition Classes
    KafkaMetricsGroup
  40. final def notify(): Unit

    Definition Classes
    AnyRef
  41. final def notifyAll(): Unit

    Definition Classes
    AnyRef
  42. def removeMetric(name: String, tags: Map[String, String] = Map.empty): Unit

    Definition Classes
    KafkaMetricsGroup
  43. def resumeCleaning(topicAndPartition: TopicAndPartition): Unit

    Resume the cleaning of a paused partition.

    Resume the cleaning of a paused partition. This call blocks until the cleaning of a partition is resumed.

  44. def shutdown(): Unit

    Stop the background cleaning

  45. def startup(): Unit

    Start the background cleaning

  46. def swallow(action: ⇒ Unit): Unit

    Definition Classes
    Logging
  47. def swallowDebug(action: ⇒ Unit): Unit

    Definition Classes
    Logging
  48. def swallowError(action: ⇒ Unit): Unit

    Definition Classes
    Logging
  49. def swallowInfo(action: ⇒ Unit): Unit

    Definition Classes
    Logging
  50. def swallowTrace(action: ⇒ Unit): Unit

    Definition Classes
    Logging
  51. def swallowWarn(action: ⇒ Unit): Unit

    Definition Classes
    Logging
  52. final def synchronized[T0](arg0: ⇒ T0): T0

    Definition Classes
    AnyRef
  53. def toString(): String

    Definition Classes
    AnyRef → Any
  54. def trace(msg: ⇒ String, e: ⇒ Throwable): Unit

    Definition Classes
    Logging
  55. def trace(e: ⇒ Throwable): Any

    Definition Classes
    Logging
  56. def trace(msg: ⇒ String): Unit

    Definition Classes
    Logging
  57. def updateCheckpoints(dataDir: File): Unit

    Update checkpoint file, removing topics and partitions that no longer exist

  58. final def wait(): Unit

    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  59. final def wait(arg0: Long, arg1: Int): Unit

    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  60. final def wait(arg0: Long): Unit

    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  61. def warn(msg: ⇒ String, e: ⇒ Throwable): Unit

    Definition Classes
    Logging
  62. def warn(e: ⇒ Throwable): Any

    Definition Classes
    Logging
  63. def warn(msg: ⇒ String): Unit

    Definition Classes
    Logging

Inherited from KafkaMetricsGroup

Inherited from Logging

Inherited from AnyRef

Inherited from Any

Ungrouped