Value Members
-
final
def
!=(arg0: AnyRef): Boolean
-
final
def
!=(arg0: Any): Boolean
-
final
def
##(): Int
-
final
def
==(arg0: AnyRef): Boolean
-
final
def
==(arg0: Any): Boolean
-
def
abortAndPauseCleaning(topicAndPartition: TopicAndPartition): Unit
-
def
abortCleaning(topicAndPartition: TopicAndPartition): Unit
-
final
def
asInstanceOf[T0]: T0
-
def
awaitCleaned(topic: String, part: Int, offset: Long, timeout: Long = 30000L): Unit
-
def
clone(): AnyRef
-
-
def
debug(msg: ⇒ String, e: ⇒ Throwable): Unit
-
def
debug(e: ⇒ Throwable): Any
-
def
debug(msg: ⇒ String): Unit
-
final
def
eq(arg0: AnyRef): Boolean
-
def
equals(arg0: Any): Boolean
-
def
error(msg: ⇒ String, e: ⇒ Throwable): Unit
-
def
error(e: ⇒ Throwable): Any
-
def
error(msg: ⇒ String): Unit
-
def
fatal(msg: ⇒ String, e: ⇒ Throwable): Unit
-
def
fatal(e: ⇒ Throwable): Any
-
def
fatal(msg: ⇒ String): Unit
-
def
finalize(): Unit
-
final
def
getClass(): java.lang.Class[_]
-
def
hashCode(): Int
-
def
info(msg: ⇒ String, e: ⇒ Throwable): Unit
-
def
info(e: ⇒ Throwable): Any
-
def
info(msg: ⇒ String): Unit
-
final
def
isInstanceOf[T0]: Boolean
-
val
logDirs: Array[File]
-
var
logIdent: String
-
lazy val
logger: Logger
-
val
loggerName: String
-
-
final
def
ne(arg0: AnyRef): Boolean
-
def
newGauge[T](name: String, metric: Gauge[T], tags: Map[String, String]): Gauge[T]
-
def
newHistogram(name: String, biased: Boolean = true, tags: Map[String, String] = Map.empty): Histogram
-
def
newMeter(name: String, eventType: String, timeUnit: TimeUnit, tags: Map[String, String] = Map.empty): Meter
-
def
newTimer(name: String, durationUnit: TimeUnit, rateUnit: TimeUnit, tags: Map[String, String] = Map.empty): Timer
-
final
def
notify(): Unit
-
final
def
notifyAll(): Unit
-
def
removeMetric(name: String, tags: Map[String, String] = Map.empty): Unit
-
def
resumeCleaning(topicAndPartition: TopicAndPartition): Unit
-
def
shutdown(): Unit
-
def
startup(): Unit
-
def
swallow(action: ⇒ Unit): Unit
-
def
swallowDebug(action: ⇒ Unit): Unit
-
def
swallowError(action: ⇒ Unit): Unit
-
def
swallowInfo(action: ⇒ Unit): Unit
-
def
swallowTrace(action: ⇒ Unit): Unit
-
def
swallowWarn(action: ⇒ Unit): Unit
-
final
def
synchronized[T0](arg0: ⇒ T0): T0
-
def
toString(): String
-
def
trace(msg: ⇒ String, e: ⇒ Throwable): Unit
-
def
trace(e: ⇒ Throwable): Any
-
def
trace(msg: ⇒ String): Unit
-
def
updateCheckpoints(dataDir: File): Unit
-
final
def
wait(): Unit
-
final
def
wait(arg0: Long, arg1: Int): Unit
-
final
def
wait(arg0: Long): Unit
-
def
warn(msg: ⇒ String, e: ⇒ Throwable): Unit
-
def
warn(e: ⇒ Throwable): Any
-
def
warn(msg: ⇒ String): Unit
Inherited from AnyRef
Inherited from Any
The cleaner is responsible for removing obsolete records from logs which have the dedupe retention strategy. A message with key K and offset O is obsolete if there exists a message with key K and offset O' such that O < O'.
Each log can be thought of being split into two sections of segments: a "clean" section which has previously been cleaned followed by a "dirty" section that has not yet been cleaned. The active log segment is always excluded from cleaning.
The cleaning is carried out by a pool of background threads. Each thread chooses the dirtiest log that has the "dedupe" retention policy and cleans that. The dirtiness of the log is guessed by taking the ratio of bytes in the dirty section of the log to the total bytes in the log.
To clean a log the cleaner first builds a mapping of key=>last_offset for the dirty section of the log. See kafka.log.OffsetMap for details of the implementation of the mapping.
Once the key=>offset map is built, the log is cleaned by recopying each log segment but omitting any key that appears in the offset map with a higher offset than what is found in the segment (i.e. messages with a key that appears in the dirty section of the log).
To avoid segments shrinking to very small sizes with repeated cleanings we implement a rule by which if we will merge successive segments when doing a cleaning if their log and index size are less than the maximum log and index size prior to the clean beginning.
Cleaned segments are swapped into the log as they become available.
One nuance that the cleaner must handle is log truncation. If a log is truncated while it is being cleaned the cleaning of that log is aborted.
Messages with null payload are treated as deletes for the purpose of log compaction. This means that they receive special treatment by the cleaner. The cleaner will only retain delete records for a period of time to avoid accumulating space indefinitely. This period of time is configurable on a per-topic basis and is measured from the time the segment enters the clean portion of the log (at which point any prior message with that key has been removed). Delete markers in the clean section of the log that are older than this time will not be retained when log segments are being recopied as part of cleaning.