Class

kafka.controller

TopicDeletionManager

Related Doc: package controller

Permalink

class TopicDeletionManager extends Logging

This manages the state machine for topic deletion. 1. TopicCommand issues topic deletion by creating a new admin path /admin/delete_topics/<topic> 2. The controller listens for child changes on /admin/delete_topic and starts topic deletion for the respective topics 3. The controller has a background thread that handles topic deletion. The purpose of having this background thread is to accommodate the TTL feature, when we have it. This thread is signaled whenever deletion for a topic needs to be started or resumed. Currently, a topic's deletion can be started only by the onPartitionDeletion callback on the controller. In the future, it can be triggered based on the configured TTL for the topic. A topic will be ineligible for deletion in the following scenarios - 3.1 broker hosting one of the replicas for that topic goes down 3.2 partition reassignment for partitions of that topic is in progress 3.3 preferred replica election for partitions of that topic is in progress (though this is not strictly required since it holds the controller lock for the entire duration from start to end) 4. Topic deletion is resumed when - 4.1 broker hosting one of the replicas for that topic is started 4.2 preferred replica election for partitions of that topic completes 4.3 partition reassignment for partitions of that topic completes 5. Every replica for a topic being deleted is in either of the 3 states - 5.1 TopicDeletionStarted (Replica enters TopicDeletionStarted phase when the onPartitionDeletion callback is invoked. This happens when the child change watch for /admin/delete_topics fires on the controller. As part of this state change, the controller sends StopReplicaRequests to all replicas. It registers a callback for the StopReplicaResponse when deletePartition=true thereby invoking a callback when a response for delete replica is received from every replica) 5.2 TopicDeletionSuccessful (deleteTopicStopReplicaCallback() moves replicas from TopicDeletionStarted->TopicDeletionSuccessful depending on the error codes in StopReplicaResponse) 5.3 TopicDeletionFailed. (deleteTopicStopReplicaCallback() moves replicas from TopicDeletionStarted->TopicDeletionFailed depending on the error codes in StopReplicaResponse. In general, if a broker dies and if it hosted replicas for topics being deleted, the controller marks the respective replicas in TopicDeletionFailed state in the onBrokerFailure callback. The reason is that if a broker fails before the request is sent and after the replica is in TopicDeletionStarted state, it is possible that the replica will mistakenly remain in TopicDeletionStarted state and topic deletion will not be retried when the broker comes back up.) 6. The delete topic thread marks a topic successfully deleted only if all replicas are in TopicDeletionSuccessful state and it starts the topic deletion teardown mode where it deletes all topic state from the controllerContext as well as from zookeeper. This is the only time the /brokers/topics/<topic> path gets deleted. On the other hand, if no replica is in TopicDeletionStarted state and at least one replica is in TopicDeletionFailed state, then it marks the topic for deletion retry.

Linear Supertypes
Logging, AnyRef, Any
Ordering
  1. Alphabetic
  2. By inheritance
Inherited
  1. TopicDeletionManager
  2. Logging
  3. AnyRef
  4. Any
  1. Hide All
  2. Show all
Visibility
  1. Public
  2. All

Instance Constructors

  1. new TopicDeletionManager(controller: KafkaController, initialTopicsToBeDeleted: Set[String] = Set.empty, initialTopicsIneligibleForDeletion: Set[String] = Set.empty)

    Permalink

    initialTopicsToBeDeleted

    The topics that are queued up for deletion in zookeeper at the time of controller failover

    initialTopicsIneligibleForDeletion

    The topics ineligible for deletion due to any of the conditions mentioned in #3 above

Type Members

  1. class DeleteTopicsThread extends ShutdownableThread

    Permalink

Value Members

  1. final def !=(arg0: Any): Boolean

    Permalink
    Definition Classes
    AnyRef → Any
  2. final def ##(): Int

    Permalink
    Definition Classes
    AnyRef → Any
  3. final def ==(arg0: Any): Boolean

    Permalink
    Definition Classes
    AnyRef → Any
  4. final def asInstanceOf[T0]: T0

    Permalink
    Definition Classes
    Any
  5. def clone(): AnyRef

    Permalink
    Attributes
    protected[java.lang]
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  6. val controllerContext: ControllerContext

    Permalink
  7. def debug(msg: ⇒ String, e: ⇒ Throwable): Unit

    Permalink
    Definition Classes
    Logging
  8. def debug(e: ⇒ Throwable): Any

    Permalink
    Definition Classes
    Logging
  9. def debug(msg: ⇒ String): Unit

    Permalink
    Definition Classes
    Logging
  10. val deleteLock: ReentrantLock

    Permalink
  11. val deleteTopicStateChanged: AtomicBoolean

    Permalink
  12. val deleteTopicsCond: Condition

    Permalink
  13. var deleteTopicsThread: DeleteTopicsThread

    Permalink
  14. def enqueueTopicsForDeletion(topics: Set[String]): Unit

    Permalink

    Invoked by the child change listener on /admin/delete_topics to queue up the topics for deletion.

    Invoked by the child change listener on /admin/delete_topics to queue up the topics for deletion. The topic gets added to the topicsToBeDeleted list and only gets removed from the list when the topic deletion has completed successfully i.e. all replicas of all partitions of that topic are deleted successfully.

    topics

    Topics that should be deleted

  15. final def eq(arg0: AnyRef): Boolean

    Permalink
    Definition Classes
    AnyRef
  16. def equals(arg0: Any): Boolean

    Permalink
    Definition Classes
    AnyRef → Any
  17. def error(msg: ⇒ String, e: ⇒ Throwable): Unit

    Permalink
    Definition Classes
    Logging
  18. def error(e: ⇒ Throwable): Any

    Permalink
    Definition Classes
    Logging
  19. def error(msg: ⇒ String): Unit

    Permalink
    Definition Classes
    Logging
  20. def failReplicaDeletion(replicas: Set[PartitionAndReplica]): Unit

    Permalink

    Invoked when a broker that hosts replicas for topics to be deleted goes down.

    Invoked when a broker that hosts replicas for topics to be deleted goes down. Also invoked when the callback for StopReplicaResponse receives an error code for the replicas of a topic to be deleted. As part of this, the replicas are moved from ReplicaDeletionStarted to ReplicaDeletionIneligible state. Also, the topic is added to the list of topics ineligible for deletion until further notice. The delete topic thread is notified so it can retry topic deletion if it has received a response for all replicas of a topic to be deleted

    replicas

    Replicas for which deletion has failed

  21. def fatal(msg: ⇒ String, e: ⇒ Throwable): Unit

    Permalink
    Definition Classes
    Logging
  22. def fatal(e: ⇒ Throwable): Any

    Permalink
    Definition Classes
    Logging
  23. def fatal(msg: ⇒ String): Unit

    Permalink
    Definition Classes
    Logging
  24. def finalize(): Unit

    Permalink
    Attributes
    protected[java.lang]
    Definition Classes
    AnyRef
    Annotations
    @throws( classOf[java.lang.Throwable] )
  25. final def getClass(): Class[_]

    Permalink
    Definition Classes
    AnyRef → Any
  26. def hashCode(): Int

    Permalink
    Definition Classes
    AnyRef → Any
  27. def info(msg: ⇒ String, e: ⇒ Throwable): Unit

    Permalink
    Definition Classes
    Logging
  28. def info(e: ⇒ Throwable): Any

    Permalink
    Definition Classes
    Logging
  29. def info(msg: ⇒ String): Unit

    Permalink
    Definition Classes
    Logging
  30. val isDeleteTopicEnabled: Boolean

    Permalink
  31. final def isInstanceOf[T0]: Boolean

    Permalink
    Definition Classes
    Any
  32. def isPartitionToBeDeleted(topicAndPartition: TopicAndPartition): Boolean

    Permalink
  33. def isTopicDeletionInProgress(topic: String): Boolean

    Permalink
  34. def isTopicIneligibleForDeletion(topic: String): Boolean

    Permalink
  35. def isTopicQueuedUpForDeletion(topic: String): Boolean

    Permalink
  36. var logIdent: String

    Permalink
    Attributes
    protected
    Definition Classes
    Logging
  37. lazy val logger: Logger

    Permalink
    Definition Classes
    Logging
  38. val loggerName: String

    Permalink
    Definition Classes
    Logging
  39. def markTopicIneligibleForDeletion(topics: Set[String]): Unit

    Permalink

    Halt delete topic if - 1.

    Halt delete topic if - 1. replicas being down 2. partition reassignment in progress for some partitions of the topic 3. preferred replica election in progress for some partitions of the topic

    topics

    Topics that should be marked ineligible for deletion. No op if the topic is was not previously queued up for deletion

  40. final def ne(arg0: AnyRef): Boolean

    Permalink
    Definition Classes
    AnyRef
  41. final def notify(): Unit

    Permalink
    Definition Classes
    AnyRef
  42. final def notifyAll(): Unit

    Permalink
    Definition Classes
    AnyRef
  43. val partitionStateMachine: PartitionStateMachine

    Permalink
  44. val partitionsToBeDeleted: Set[TopicAndPartition]

    Permalink
  45. val replicaStateMachine: ReplicaStateMachine

    Permalink
  46. def resumeDeletionForTopics(topics: Set[String] = Set.empty): Unit

    Permalink

    Invoked when any event that can possibly resume topic deletion occurs.

    Invoked when any event that can possibly resume topic deletion occurs. These events include - 1. New broker starts up. Any replicas belonging to topics queued up for deletion can be deleted since the broker is up 2. Partition reassignment completes. Any partitions belonging to topics queued up for deletion finished reassignment 3. Preferred replica election completes. Any partitions belonging to topics queued up for deletion finished preferred replica election

    topics

    Topics for which deletion can be resumed

  47. def shutdown(): Unit

    Permalink

    Invoked when the current controller resigns.

    Invoked when the current controller resigns. At this time, all state for topic deletion should be cleared.

  48. def start(): Unit

    Permalink

    Invoked at the end of new controller initiation

  49. def swallow(action: ⇒ Unit): Unit

    Permalink
    Definition Classes
    Logging
  50. def swallowDebug(action: ⇒ Unit): Unit

    Permalink
    Definition Classes
    Logging
  51. def swallowError(action: ⇒ Unit): Unit

    Permalink
    Definition Classes
    Logging
  52. def swallowInfo(action: ⇒ Unit): Unit

    Permalink
    Definition Classes
    Logging
  53. def swallowTrace(action: ⇒ Unit): Unit

    Permalink
    Definition Classes
    Logging
  54. def swallowWarn(action: ⇒ Unit): Unit

    Permalink
    Definition Classes
    Logging
  55. final def synchronized[T0](arg0: ⇒ T0): T0

    Permalink
    Definition Classes
    AnyRef
  56. def toString(): String

    Permalink
    Definition Classes
    AnyRef → Any
  57. val topicsIneligibleForDeletion: Set[String]

    Permalink
  58. val topicsToBeDeleted: Set[String]

    Permalink
  59. def trace(msg: ⇒ String, e: ⇒ Throwable): Unit

    Permalink
    Definition Classes
    Logging
  60. def trace(e: ⇒ Throwable): Any

    Permalink
    Definition Classes
    Logging
  61. def trace(msg: ⇒ String): Unit

    Permalink
    Definition Classes
    Logging
  62. final def wait(): Unit

    Permalink
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  63. final def wait(arg0: Long, arg1: Int): Unit

    Permalink
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  64. final def wait(arg0: Long): Unit

    Permalink
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  65. def warn(msg: ⇒ String, e: ⇒ Throwable): Unit

    Permalink
    Definition Classes
    Logging
  66. def warn(e: ⇒ Throwable): Any

    Permalink
    Definition Classes
    Logging
  67. def warn(msg: ⇒ String): Unit

    Permalink
    Definition Classes
    Logging

Inherited from Logging

Inherited from AnyRef

Inherited from Any

Ungrouped