kafka.controller

TopicDeletionManager

class TopicDeletionManager extends Logging

This manages the state machine for topic deletion. 1. TopicCommand issues topic deletion by creating a new admin path /admin/delete_topics/<topic> 2. The controller listens for child changes on /admin/delete_topic and starts topic deletion for the respective topics 3. The controller has a background thread that handles topic deletion. The purpose of having this background thread is to accommodate the TTL feature, when we have it. This thread is signaled whenever deletion for a topic needs to be started or resumed. Currently, a topic's deletion can be started only by the onPartitionDeletion callback on the controller. In the future, it can be triggered based on the configured TTL for the topic. A topic will be ineligible for deletion in the following scenarios - 3.1 broker hosting one of the replicas for that topic goes down 3.2 partition reassignment for partitions of that topic is in progress 3.3 preferred replica election for partitions of that topic is in progress (though this is not strictly required since it holds the controller lock for the entire duration from start to end) 4. Topic deletion is resumed when - 4.1 broker hosting one of the replicas for that topic is started 4.2 preferred replica election for partitions of that topic completes 4.3 partition reassignment for partitions of that topic completes 5. Every replica for a topic being deleted is in either of the 3 states - 5.1 TopicDeletionStarted (Replica enters TopicDeletionStarted phase when the onPartitionDeletion callback is invoked. This happens when the child change watch for /admin/delete_topics fires on the controller. As part of this state change, the controller sends StopReplicaRequests to all replicas. It registers a callback for the StopReplicaResponse when deletePartition=true thereby invoking a callback when a response for delete replica is received from every replica) 5.2 TopicDeletionSuccessful (deleteTopicStopReplicaCallback() moves replicas from TopicDeletionStarted->TopicDeletionSuccessful depending on the error codes in StopReplicaResponse) 5.3 TopicDeletionFailed. (deleteTopicStopReplicaCallback() moves replicas from TopicDeletionStarted->TopicDeletionFailed depending on the error codes in StopReplicaResponse. In general, if a broker dies and if it hosted replicas for topics being deleted, the controller marks the respective replicas in TopicDeletionFailed state in the onBrokerFailure callback. The reason is that if a broker fails before the request is sent and after the replica is in TopicDeletionStarted state, it is possible that the replica will mistakenly remain in TopicDeletionStarted state and topic deletion will not be retried when the broker comes back up.) 6. The delete topic thread marks a topic successfully deleted only if all replicas are in TopicDeletionSuccessful state and it starts the topic deletion teardown mode where it deletes all topic state from the controllerContext as well as from zookeeper. This is the only time the /brokers/topics/<topic> path gets deleted. On the other hand, if no replica is in TopicDeletionStarted state and at least one replica is in TopicDeletionFailed state, then it marks the topic for deletion retry.

Linear Supertypes
Logging, AnyRef, Any
Ordering
  1. Alphabetic
  2. By inheritance
Inherited
  1. TopicDeletionManager
  2. Logging
  3. AnyRef
  4. Any
  1. Hide All
  2. Show all
Learn more about member selection
Visibility
  1. Public
  2. All

Instance Constructors

  1. new TopicDeletionManager(controller: KafkaController, initialTopicsToBeDeleted: Set[String] = scala.collection.Set.empty[String], initialTopicsIneligibleForDeletion: Set[String] = scala.collection.Set.empty[String])

    controller
    initialTopicsToBeDeleted

    The topics that are queued up for deletion in zookeeper at the time of controller failover

    initialTopicsIneligibleForDeletion

    The topics ineligible for deletion due to any of the conditions mentioned in #3 above

Type Members

  1. class DeleteTopicsThread extends ShutdownableThread

Value Members

  1. final def !=(arg0: AnyRef): Boolean

    Definition Classes
    AnyRef
  2. final def !=(arg0: Any): Boolean

    Definition Classes
    Any
  3. final def ##(): Int

    Definition Classes
    AnyRef → Any
  4. final def ==(arg0: AnyRef): Boolean

    Definition Classes
    AnyRef
  5. final def ==(arg0: Any): Boolean

    Definition Classes
    Any
  6. final def asInstanceOf[T0]: T0

    Definition Classes
    Any
  7. def clone(): AnyRef

    Attributes
    protected[java.lang]
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  8. val controllerContext: ControllerContext

  9. def debug(msg: ⇒ String, e: ⇒ Throwable): Unit

    Definition Classes
    Logging
  10. def debug(e: ⇒ Throwable): Any

    Definition Classes
    Logging
  11. def debug(msg: ⇒ String): Unit

    Definition Classes
    Logging
  12. val deleteLock: ReentrantLock

  13. val deleteTopicStateChanged: AtomicBoolean

  14. val deleteTopicsCond: Condition

  15. var deleteTopicsThread: DeleteTopicsThread

  16. def enqueueTopicsForDeletion(topics: Set[String]): Unit

    Invoked by the child change listener on /admin/delete_topics to queue up the topics for deletion.

    Invoked by the child change listener on /admin/delete_topics to queue up the topics for deletion. The topic gets added to the topicsToBeDeleted list and only gets removed from the list when the topic deletion has completed successfully i.e. all replicas of all partitions of that topic are deleted successfully.

    topics

    Topics that should be deleted

  17. final def eq(arg0: AnyRef): Boolean

    Definition Classes
    AnyRef
  18. def equals(arg0: Any): Boolean

    Definition Classes
    AnyRef → Any
  19. def error(msg: ⇒ String, e: ⇒ Throwable): Unit

    Definition Classes
    Logging
  20. def error(e: ⇒ Throwable): Any

    Definition Classes
    Logging
  21. def error(msg: ⇒ String): Unit

    Definition Classes
    Logging
  22. def failReplicaDeletion(replicas: Set[PartitionAndReplica]): Unit

    Invoked when a broker that hosts replicas for topics to be deleted goes down.

    Invoked when a broker that hosts replicas for topics to be deleted goes down. Also invoked when the callback for StopReplicaResponse receives an error code for the replicas of a topic to be deleted. As part of this, the replicas are moved from ReplicaDeletionStarted to ReplicaDeletionIneligible state. Also, the topic is added to the list of topics ineligible for deletion until further notice. The delete topic thread is notified so it can retry topic deletion if it has received a response for all replicas of a topic to be deleted

    replicas

    Replicas for which deletion has failed

  23. def fatal(msg: ⇒ String, e: ⇒ Throwable): Unit

    Definition Classes
    Logging
  24. def fatal(e: ⇒ Throwable): Any

    Definition Classes
    Logging
  25. def fatal(msg: ⇒ String): Unit

    Definition Classes
    Logging
  26. def finalize(): Unit

    Attributes
    protected[java.lang]
    Definition Classes
    AnyRef
    Annotations
    @throws( classOf[java.lang.Throwable] )
  27. final def getClass(): Class[_]

    Definition Classes
    AnyRef → Any
  28. def hashCode(): Int

    Definition Classes
    AnyRef → Any
  29. def info(msg: ⇒ String, e: ⇒ Throwable): Unit

    Definition Classes
    Logging
  30. def info(e: ⇒ Throwable): Any

    Definition Classes
    Logging
  31. def info(msg: ⇒ String): Unit

    Definition Classes
    Logging
  32. val isDeleteTopicEnabled: Boolean

  33. final def isInstanceOf[T0]: Boolean

    Definition Classes
    Any
  34. def isPartitionToBeDeleted(topicAndPartition: TopicAndPartition): Boolean

  35. def isTopicDeletionInProgress(topic: String): Boolean

  36. def isTopicIneligibleForDeletion(topic: String): Boolean

  37. def isTopicQueuedUpForDeletion(topic: String): Boolean

  38. var logIdent: String

    Attributes
    protected
    Definition Classes
    Logging
  39. lazy val logger: Logger

    Definition Classes
    Logging
  40. val loggerName: String

    Definition Classes
    Logging
  41. def markTopicIneligibleForDeletion(topics: Set[String]): Unit

    Halt delete topic if - 1.

    Halt delete topic if - 1. replicas being down 2. partition reassignment in progress for some partitions of the topic 3. preferred replica election in progress for some partitions of the topic

    topics

    Topics that should be marked ineligible for deletion. No op if the topic is was not previously queued up for deletion

  42. final def ne(arg0: AnyRef): Boolean

    Definition Classes
    AnyRef
  43. final def notify(): Unit

    Definition Classes
    AnyRef
  44. final def notifyAll(): Unit

    Definition Classes
    AnyRef
  45. val partitionStateMachine: PartitionStateMachine

  46. val partitionsToBeDeleted: Set[TopicAndPartition]

  47. val replicaStateMachine: ReplicaStateMachine

  48. def resumeDeletionForTopics(topics: Set[String] = Set.empty): Unit

    Invoked when any event that can possibly resume topic deletion occurs.

    Invoked when any event that can possibly resume topic deletion occurs. These events include - 1. New broker starts up. Any replicas belonging to topics queued up for deletion can be deleted since the broker is up 2. Partition reassignment completes. Any partitions belonging to topics queued up for deletion finished reassignment 3. Preferred replica election completes. Any partitions belonging to topics queued up for deletion finished preferred replica election

    topics

    Topics for which deletion can be resumed

  49. def shutdown(): Unit

    Invoked when the current controller resigns.

    Invoked when the current controller resigns. At this time, all state for topic deletion should be cleared.

  50. def start(): Unit

    Invoked at the end of new controller initiation

  51. def swallow(action: ⇒ Unit): Unit

    Definition Classes
    Logging
  52. def swallowDebug(action: ⇒ Unit): Unit

    Definition Classes
    Logging
  53. def swallowError(action: ⇒ Unit): Unit

    Definition Classes
    Logging
  54. def swallowInfo(action: ⇒ Unit): Unit

    Definition Classes
    Logging
  55. def swallowTrace(action: ⇒ Unit): Unit

    Definition Classes
    Logging
  56. def swallowWarn(action: ⇒ Unit): Unit

    Definition Classes
    Logging
  57. final def synchronized[T0](arg0: ⇒ T0): T0

    Definition Classes
    AnyRef
  58. def toString(): String

    Definition Classes
    AnyRef → Any
  59. val topicsIneligibleForDeletion: Set[String]

  60. val topicsToBeDeleted: Set[String]

  61. def trace(msg: ⇒ String, e: ⇒ Throwable): Unit

    Definition Classes
    Logging
  62. def trace(e: ⇒ Throwable): Any

    Definition Classes
    Logging
  63. def trace(msg: ⇒ String): Unit

    Definition Classes
    Logging
  64. final def wait(): Unit

    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  65. final def wait(arg0: Long, arg1: Int): Unit

    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  66. final def wait(arg0: Long): Unit

    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  67. def warn(msg: ⇒ String, e: ⇒ Throwable): Unit

    Definition Classes
    Logging
  68. def warn(e: ⇒ Throwable): Any

    Definition Classes
    Logging
  69. def warn(msg: ⇒ String): Unit

    Definition Classes
    Logging

Inherited from Logging

Inherited from AnyRef

Inherited from Any

Ungrouped