Class/Object

kafka.controller

KafkaController

Related Docs: object KafkaController | package controller

Permalink

class KafkaController extends Logging with KafkaMetricsGroup

Linear Supertypes
Ordering
  1. Alphabetic
  2. By inheritance
Inherited
  1. KafkaController
  2. KafkaMetricsGroup
  3. Logging
  4. AnyRef
  5. Any
  1. Hide All
  2. Show all
Visibility
  1. Public
  2. All

Instance Constructors

  1. new KafkaController(config: KafkaConfig, zkUtils: ZkUtils, brokerState: BrokerState, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None)

    Permalink

Type Members

  1. class SessionExpirationListener extends IZkStateListener with Logging

    Permalink

Value Members

  1. final def !=(arg0: Any): Boolean

    Permalink
    Definition Classes
    AnyRef → Any
  2. final def ##(): Int

    Permalink
    Definition Classes
    AnyRef → Any
  3. final def ==(arg0: Any): Boolean

    Permalink
    Definition Classes
    AnyRef → Any
  4. final def asInstanceOf[T0]: T0

    Permalink
    Definition Classes
    Any
  5. val brokerState: BrokerState

    Permalink
  6. def clientId: String

    Permalink
  7. def clone(): AnyRef

    Permalink
    Attributes
    protected[java.lang]
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  8. val config: KafkaConfig

    Permalink
  9. val controllerContext: ControllerContext

    Permalink
  10. def debug(msg: ⇒ String, e: ⇒ Throwable): Unit

    Permalink
    Definition Classes
    Logging
  11. def debug(e: ⇒ Throwable): Any

    Permalink
    Definition Classes
    Logging
  12. def debug(msg: ⇒ String): Unit

    Permalink
    Definition Classes
    Logging
  13. var deleteTopicManager: TopicDeletionManager

    Permalink
  14. def epoch: Int

    Permalink
  15. final def eq(arg0: AnyRef): Boolean

    Permalink
    Definition Classes
    AnyRef
  16. def equals(arg0: Any): Boolean

    Permalink
    Definition Classes
    AnyRef → Any
  17. def error(msg: ⇒ String, e: ⇒ Throwable): Unit

    Permalink
    Definition Classes
    Logging
  18. def error(e: ⇒ Throwable): Any

    Permalink
    Definition Classes
    Logging
  19. def error(msg: ⇒ String): Unit

    Permalink
    Definition Classes
    Logging
  20. def fatal(msg: ⇒ String, e: ⇒ Throwable): Unit

    Permalink
    Definition Classes
    Logging
  21. def fatal(e: ⇒ Throwable): Any

    Permalink
    Definition Classes
    Logging
  22. def fatal(msg: ⇒ String): Unit

    Permalink
    Definition Classes
    Logging
  23. def finalize(): Unit

    Permalink
    Attributes
    protected[java.lang]
    Definition Classes
    AnyRef
    Annotations
    @throws( classOf[java.lang.Throwable] )
  24. final def getClass(): Class[_]

    Permalink
    Definition Classes
    AnyRef → Any
  25. def hashCode(): Int

    Permalink
    Definition Classes
    AnyRef → Any
  26. def incrementControllerEpoch(zkClient: ZkClient): Unit

    Permalink
  27. def info(msg: ⇒ String, e: ⇒ Throwable): Unit

    Permalink
    Definition Classes
    Logging
  28. def info(e: ⇒ Throwable): Any

    Permalink
    Definition Classes
    Logging
  29. def info(msg: ⇒ String): Unit

    Permalink
    Definition Classes
    Logging
  30. def initiateReassignReplicasForTopicPartition(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext): Unit

    Permalink
  31. def isActive(): Boolean

    Permalink

    Returns true if this broker is the current controller.

  32. final def isInstanceOf[T0]: Boolean

    Permalink
    Definition Classes
    Any
  33. var logIdent: String

    Permalink
    Attributes
    protected
    Definition Classes
    Logging
  34. lazy val logger: Logger

    Permalink
    Definition Classes
    Logging
  35. val loggerName: String

    Permalink
    Definition Classes
    Logging
  36. final def ne(arg0: AnyRef): Boolean

    Permalink
    Definition Classes
    AnyRef
  37. def newGauge[T](name: String, metric: Gauge[T], tags: Map[String, String] = Map.empty): Gauge[T]

    Permalink
    Definition Classes
    KafkaMetricsGroup
  38. def newHistogram(name: String, biased: Boolean = true, tags: Map[String, String] = Map.empty): Histogram

    Permalink
    Definition Classes
    KafkaMetricsGroup
  39. def newMeter(name: String, eventType: String, timeUnit: TimeUnit, tags: Map[String, String] = Map.empty): Meter

    Permalink
    Definition Classes
    KafkaMetricsGroup
  40. def newTimer(name: String, durationUnit: TimeUnit, rateUnit: TimeUnit, tags: Map[String, String] = Map.empty): Timer

    Permalink
    Definition Classes
    KafkaMetricsGroup
  41. final def notify(): Unit

    Permalink
    Definition Classes
    AnyRef
  42. final def notifyAll(): Unit

    Permalink
    Definition Classes
    AnyRef
  43. val offlinePartitionSelector: OfflinePartitionLeaderSelector

    Permalink
  44. def onBrokerFailure(deadBrokers: Seq[Int]): Unit

    Permalink

    This callback is invoked by the replica state machine's broker change listener with the list of failed brokers as input.

    This callback is invoked by the replica state machine's broker change listener with the list of failed brokers as input. It does the following - 1. Mark partitions with dead leaders as offline 2. Triggers the OnlinePartition state change for all new/offline partitions 3. Invokes the OfflineReplica state change on the input list of newly started brokers 4. If no partitions are effected then send UpdateMetadataRequest to live or shutting down brokers

    Note that we don't need to refresh the leader/isr cache for all topic/partitions at this point. This is because the partition state machine will refresh our cache for us when performing leader election for all new/offline partitions coming online.

  45. def onBrokerStartup(newBrokers: Seq[Int]): Unit

    Permalink

    This callback is invoked by the replica state machine's broker change listener, with the list of newly started brokers as input.

    This callback is invoked by the replica state machine's broker change listener, with the list of newly started brokers as input. It does the following - 1. Sends update metadata request to all live and shutting down brokers 2. Triggers the OnlinePartition state change for all new/offline partitions 3. It checks whether there are reassigned replicas assigned to any newly started brokers. If so, it performs the reassignment logic for each topic/partition.

    Note that we don't need to refresh the leader/isr cache for all topic/partitions at this point for two reasons: 1. The partition state machine, when triggering online state change, will refresh leader and ISR for only those partitions currently new or offline (rather than every partition this controller is aware of) 2. Even if we do refresh the cache, there is no guarantee that by the time the leader and ISR request reaches every broker that it is still valid. Brokers check the leader epoch to determine validity of the request.

  46. def onControllerFailover(): Unit

    Permalink

    This callback is invoked by the zookeeper leader elector on electing the current broker as the new controller.

    This callback is invoked by the zookeeper leader elector on electing the current broker as the new controller. It does the following things on the become-controller state change - 1. Register controller epoch changed listener 2. Increments the controller epoch 3. Initializes the controller's context object that holds cache objects for current topics, live brokers and leaders for all existing partitions. 4. Starts the controller's channel manager 5. Starts the replica state machine 6. Starts the partition state machine If it encounters any unexpected exception/error while becoming controller, it resigns as the current controller. This ensures another controller election will be triggered and there will always be an actively serving controller

  47. def onControllerResignation(): Unit

    Permalink

    This callback is invoked by the zookeeper leader elector when the current broker resigns as the controller.

    This callback is invoked by the zookeeper leader elector when the current broker resigns as the controller. This is required to clean up internal controller data structures

  48. def onNewPartitionCreation(newPartitions: Set[TopicAndPartition]): Unit

    Permalink

    This callback is invoked by the topic change callback with the list of failed brokers as input.

    This callback is invoked by the topic change callback with the list of failed brokers as input. It does the following - 1. Move the newly created partitions to the NewPartition state 2. Move the newly created partitions from NewPartition->OnlinePartition state

  49. def onNewTopicCreation(topics: Set[String], newPartitions: Set[TopicAndPartition]): Unit

    Permalink

    This callback is invoked by the partition state machine's topic change listener with the list of new topics and partitions as input.

    This callback is invoked by the partition state machine's topic change listener with the list of new topics and partitions as input. It does the following - 1. Registers partition change listener. This is not required until KAFKA-347 2. Invokes the new partition callback 3. Send metadata request with the new topic to all brokers so they allow requests for that topic to be served

  50. def onPartitionReassignment(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext): Unit

    Permalink

    This callback is invoked by the reassigned partitions listener.

    This callback is invoked by the reassigned partitions listener. When an admin command initiates a partition reassignment, it creates the /admin/reassign_partitions path that triggers the zookeeper listener. Reassigning replicas for a partition goes through a few steps listed in the code. RAR = Reassigned replicas OAR = Original list of replicas for partition AR = current assigned replicas

    1. Update AR in ZK with OAR + RAR. 2. Send LeaderAndIsr request to every replica in OAR + RAR (with AR as OAR + RAR). We do this by forcing an update of the leader epoch in zookeeper. 3. Start new replicas RAR - OAR by moving replicas in RAR - OAR to NewReplica state. 4. Wait until all replicas in RAR are in sync with the leader. 5 Move all replicas in RAR to OnlineReplica state. 6. Set AR to RAR in memory. 7. If the leader is not in RAR, elect a new leader from RAR. If new leader needs to be elected from RAR, a LeaderAndIsr will be sent. If not, then leader epoch will be incremented in zookeeper and a LeaderAndIsr request will be sent. In any case, the LeaderAndIsr request will have AR = RAR. This will prevent the leader from adding any replica in RAR - OAR back in the isr. 8. Move all replicas in OAR - RAR to OfflineReplica state. As part of OfflineReplica state change, we shrink the isr to remove OAR - RAR in zookeeper and sent a LeaderAndIsr ONLY to the Leader to notify it of the shrunk isr. After that, we send a StopReplica (delete = false) to the replicas in OAR - RAR. 9. Move all replicas in OAR - RAR to NonExistentReplica state. This will send a StopReplica (delete = false) to the replicas in OAR - RAR to physically delete the replicas on disk. 10. Update AR in ZK with RAR. 11. Update the /admin/reassign_partitions path in ZK to remove this partition. 12. After electing leader, the replicas and isr information changes. So resend the update metadata request to every broker.

    For example, if OAR = {1, 2, 3} and RAR = {4,5,6}, the values in the assigned replica (AR) and leader/isr path in ZK may go through the following transition. AR leader/isr {1,2,3} 1/{1,2,3} (initial state) {1,2,3,4,5,6} 1/{1,2,3} (step 2) {1,2,3,4,5,6} 1/{1,2,3,4,5,6} (step 4) {1,2,3,4,5,6} 4/{1,2,3,4,5,6} (step 7) {1,2,3,4,5,6} 4/{4,5,6} (step 8) {4,5,6} 4/{4,5,6} (step 10)

    Note that we have to update AR in ZK with RAR last since it's the only place where we store OAR persistently. This way, if the controller crashes before that step, we can still recover.

  51. def onPreferredReplicaElection(partitions: Set[TopicAndPartition], isTriggeredByAutoRebalance: Boolean = false): Unit

    Permalink
  52. val partitionStateMachine: PartitionStateMachine

    Permalink
  53. def removeMetric(name: String, tags: Map[String, String] = Map.empty): Unit

    Permalink
    Definition Classes
    KafkaMetricsGroup
  54. def removePartitionFromReassignedPartitions(topicAndPartition: TopicAndPartition): Unit

    Permalink
  55. def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition], isTriggeredByAutoRebalance: Boolean): Unit

    Permalink
  56. def removeReplicaFromIsr(topic: String, partition: Int, replicaId: Int): Option[LeaderIsrAndControllerEpoch]

    Permalink

    Removes a given partition replica from the ISR; if it is not the current leader and there are sufficient remaining replicas in ISR.

    Removes a given partition replica from the ISR; if it is not the current leader and there are sufficient remaining replicas in ISR.

    topic

    topic

    partition

    partition

    replicaId

    replica Id

    returns

    the new leaderAndIsr (with the replica removed if it was present), or None if leaderAndIsr is empty.

  57. val replicaStateMachine: ReplicaStateMachine

    Permalink
  58. def sendRequest(brokerId: Int, apiKey: ApiKeys, apiVersion: Option[Short], request: AbstractRequest, callback: (AbstractRequestResponse) ⇒ Unit = null): Unit

    Permalink
  59. def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicAndPartition] = Set.empty[TopicAndPartition]): Unit

    Permalink

    Send the leader information for selected partitions to selected brokers so that they can correctly respond to metadata requests

    Send the leader information for selected partitions to selected brokers so that they can correctly respond to metadata requests

    brokers

    The brokers that the update metadata request should be sent to

  60. def shutdown(): Unit

    Permalink

    Invoked when the controller module of a Kafka server is shutting down.

    Invoked when the controller module of a Kafka server is shutting down. If the broker was the current controller, it shuts down the partition and replica state machines. If not, those are a no-op. In addition to that, it also shuts down the controller channel manager, if one exists (i.e. if it was the current controller)

  61. def shutdownBroker(id: Int): Set[TopicAndPartition]

    Permalink

    On clean shutdown, the controller first determines the partitions that the shutting down broker leads, and moves leadership of those partitions to another broker that is in that partition's ISR.

    On clean shutdown, the controller first determines the partitions that the shutting down broker leads, and moves leadership of those partitions to another broker that is in that partition's ISR.

    id

    Id of the broker to shutdown.

    returns

    The number of partitions that the broker still leads.

  62. def startup(): Unit

    Permalink

    Invoked when the controller module of a Kafka server is started up.

    Invoked when the controller module of a Kafka server is started up. This does not assume that the current broker is the controller. It merely registers the session expiration listener and starts the controller leader elector

  63. def swallow(action: ⇒ Unit): Unit

    Permalink
    Definition Classes
    Logging
  64. def swallowDebug(action: ⇒ Unit): Unit

    Permalink
    Definition Classes
    Logging
  65. def swallowError(action: ⇒ Unit): Unit

    Permalink
    Definition Classes
    Logging
  66. def swallowInfo(action: ⇒ Unit): Unit

    Permalink
    Definition Classes
    Logging
  67. def swallowTrace(action: ⇒ Unit): Unit

    Permalink
    Definition Classes
    Logging
  68. def swallowWarn(action: ⇒ Unit): Unit

    Permalink
    Definition Classes
    Logging
  69. final def synchronized[T0](arg0: ⇒ T0): T0

    Permalink
    Definition Classes
    AnyRef
  70. def toString(): String

    Permalink
    Definition Classes
    AnyRef → Any
  71. def trace(msg: ⇒ String, e: ⇒ Throwable): Unit

    Permalink
    Definition Classes
    Logging
  72. def trace(e: ⇒ Throwable): Any

    Permalink
    Definition Classes
    Logging
  73. def trace(msg: ⇒ String): Unit

    Permalink
    Definition Classes
    Logging
  74. def updateAssignedReplicasForPartition(topicAndPartition: TopicAndPartition, newReplicaAssignmentForTopic: Map[TopicAndPartition, Seq[Int]]): Unit

    Permalink
  75. def updateLeaderAndIsrCache(topicAndPartitions: Set[TopicAndPartition] = ...): Unit

    Permalink
  76. final def wait(): Unit

    Permalink
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  77. final def wait(arg0: Long, arg1: Int): Unit

    Permalink
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  78. final def wait(arg0: Long): Unit

    Permalink
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  79. def warn(msg: ⇒ String, e: ⇒ Throwable): Unit

    Permalink
    Definition Classes
    Logging
  80. def warn(e: ⇒ Throwable): Any

    Permalink
    Definition Classes
    Logging
  81. def warn(msg: ⇒ String): Unit

    Permalink
    Definition Classes
    Logging

Inherited from KafkaMetricsGroup

Inherited from Logging

Inherited from AnyRef

Inherited from Any

Ungrouped