Abstract class for fetching data from multiple partitions from the same broker.
Broker states are the possible state that a kafka broker can be in.
A delayed fetch request, which is satisfied (or more accurately, unblocked) -- if: Case A: This broker is no longer the leader for some partitions it tries to fetch
A delayed fetch request, which is satisfied (or more accurately, unblocked) -- if: Case A: This broker is no longer the leader for some partitions it tries to fetch
A delayed produce request, which is satisfied (or more accurately, unblocked) -- if for every partition it produce to: Case A: This broker is not the leader: unblock - should return error.
A delayed produce request, which is satisfied (or more accurately, unblocked) -- if for every partition it produce to: Case A: This broker is not the leader: unblock - should return error. Case B: This broker is the leader: B.1 - If there was a localError (when writing to the local log): unblock - should return error B.2 - else, at least requiredAcks replicas should be caught up to this request.
A request whose processing needs to be delayed for at most the given delayMs The associated keys are used for bookeeping, and represent the "trigger" that causes this request to check if it is satisfied, for example a key could be a (topic, partition) pair.
Keys used for delayed request metrics recording
The purgatory holding delayed fetch requests
Logic to handle the various Kafka requests
Configuration settings for the kafka server
This class registers the broker in zookeeper to allow other brokers and consumers to detect failures.
This class registers the broker in zookeeper to allow other brokers and consumers to detect failures. It uses an ephemeral znode with the path: /brokers/[0...N] --> advertisedHost:advertisedPort
Right now our definition of health is fairly naive. If we register in zk we are healthy, otherwise we are dead.
A thread that answers kafka requests.
Represents the lifecycle of a single Kafka broker.
Represents the lifecycle of a single Kafka broker. Handles all functionality required to start up and shutdown a single Kafka node.
This trait defines a leader elector If the existing leader is dead, this class will handle automatic re-election and if it succeeds, it invokes the leader state change callback
This class saves out a map of topic/partition=>offsets to a file
Configuration settings for in-built offset management
Configuration settings for in-built offset management
The maximum allowed metadata for any offset commit.
Batch size for reading from the offsets segments when loading offsets into the cache.
Offsets older than this retention period will be discarded.
Frequency at which to check for stale offsets.
The number of partitions for the offset commit topic (should not change after deployment).
The offsets topic segment bytes should be kept relatively small to facilitate faster log compaction and faster offset loads
The replication factor for the offset commit topic (set higher to ensure availability).
Compression codec for the offsets topic - compression should be turned on in order to achieve "atomic" commits.
The offset commit will be delayed until all replicas for the offsets topic receive the commit or this timeout is reached. (Similar to the producer request timeout.)
The required acks before the commit can be accepted. In general, the default (-1) should not be overridden.
The purgatory holding delayed producer requests
A helper class for dealing with asynchronous requests with a timeout.
A helper class for dealing with asynchronous requests with a timeout. A DelayedRequest has a request to delay and also a list of keys that can trigger the action. Implementations can add customized logic to control what it means for a given request to be satisfied. For example it could be that we are waiting for user-specified number of acks on a given (topic, partition) to be able to respond to a request or it could be that we are waiting for a given number of bytes to accumulate on a given request to be able to respond to that request (in the simple case we might wait for at least one byte to avoid busy waiting).
For us the key is generally a (topic, partition) pair. By calling val isSatisfiedByMe = checkAndMaybeWatch(delayedRequest) we will check if a request is satisfied already, and if not add the request for watch on all its keys.
It is up to the user to then call val satisfied = update(key, request) when a request relevant to the given key occurs. This triggers bookeeping logic and returns back any requests satisfied by this new request.
An implementation provides extends two helper functions def checkSatisfied(request: R, delayed: T): Boolean this function returns true if the given request (in combination with whatever previous requests have happened) satisfies the delayed request delayed. This method will likely also need to do whatever bookkeeping is necessary.
The second function is def expire(delayed: T) this function handles delayed requests that have hit their time limit without being satisfied.
This class initiates and carries out topic config changes.
This class initiates and carries out topic config changes.
It works as follows.
Config is stored under the path /brokers/topics/<topic_name>/config This znode stores the topic-overrides for this topic (but no defaults) in properties format.
To avoid watching all topics for changes instead we have a notification path /brokers/config_changes The TopicConfigManager has a child watch on this path.
To update a topic config we first update the topic config properties. Then we create a new sequential znode under the change path which contains the name of the topic that was updated, say /brokers/config_changes/config_change_13321 This is just a notification--the actual config change is stored only once under the /brokers/topics/<topic_name>/config path.
This will fire a watcher on all brokers. This watcher works as follows. It reads all the config change notifications. It keeps track of the highest config change suffix number it has applied previously. For any previously applied change it finds it checks if this notification is larger than a static expiration time (say 10mins) and if so it deletes this notification. For any new changes it reads the new configuration, combines it with the defaults, and updates the log config for all logs for that topic (if any) that it has.
Note that config is always read from the config path in zk, the notification is just a trigger to do so. So if a broker is down and misses a change that is fine--when it restarts it will be loading the full config anyway. Note also that if there are two consecutive config changes it is possible that only the last one will be applied (since by the time the broker reads the config the both changes may have been made). In this case the broker would needlessly refresh the config twice, but that is harmless.
On restart the config manager re-processes all notifications. This will usually be wasted work, but avoids any race conditions on startup where a change might be missed between the initial config load and registering for change notifications.
This class handles zookeeper based leader election based on an ephemeral path.
This class handles zookeeper based leader election based on an ephemeral path. The election module does not handle session expiration, instead it assumes the caller will handle it by probably try to re-elect again. If the existing leader is dead, this class will handle automatic re-election and if it succeeds, it invokes the leader state change callback
Broker states are the possible state that a kafka broker can be in. A broker should be only in one state at a time. The expected state transition with the following defined states is:
+-----------+ |Not Running| +-----+-----+ | v +-----+-----+ |Starting +--+ +-----+-----+ | +----+------------+ | +>+RecoveringFrom | v |UncleanShutdown | +----------+ +-----+-----+ +-------+---------+ |RunningAs | |RunningAs | | |Controller+<--->+Broker +<-----------+ +----------+ +-----+-----+ | | | v | +-----+------------+ |-----> |PendingControlled | |Shutdown | +-----+------------+ | v +-----+----------+ |BrokerShutting | |Down | +-----+----------+ | v +-----+-----+ |Not Running| +-----------+
Custom states is also allowed for cases where there are custom kafka states for different scenarios.