org.apache.kafka.clients.producer.internals
Class RecordAccumulator

java.lang.Object
  extended by org.apache.kafka.clients.producer.internals.RecordAccumulator

public final class RecordAccumulator
extends java.lang.Object

This class acts as a queue that accumulates records into MemoryRecords instances to be sent to the server.

The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless this behavior is explicitly disabled.


Nested Class Summary
static class RecordAccumulator.ReadyCheckResult
           
static class RecordAccumulator.RecordAppendResult
           
 
Constructor Summary
RecordAccumulator(int batchSize, long totalSize, long lingerMs, long retryBackoffMs, boolean blockOnBufferFull, Metrics metrics, Time time)
          Create a new record accumulator
 
Method Summary
 RecordAccumulator.RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, CompressionType compression, Callback callback)
          Add a record to the accumulator, return the append result
 void close()
          Close this accumulator and force all the record buffers to be drained
 void deallocate(RecordBatch batch)
          Deallocate the record batch
 java.util.Map<java.lang.Integer,java.util.List<RecordBatch>> drain(Cluster cluster, java.util.Set<Node> nodes, int maxSize, long now)
          Drain all the data for the given nodes and collate them into a list of batches that will fit within the specified size on a per-node basis.
 boolean hasUnsent()
           
 RecordAccumulator.ReadyCheckResult ready(Cluster cluster, long nowMs)
          Get a list of nodes whose partitions are ready to be sent, and the time to when any partition will be ready if no partitions are ready yet; If the ready nodes list is non-empty, the timeout value will be 0.
 void reenqueue(RecordBatch batch, long now)
          Re-enqueue the given record batch in the accumulator to retry
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Constructor Detail

RecordAccumulator

public RecordAccumulator(int batchSize,
                         long totalSize,
                         long lingerMs,
                         long retryBackoffMs,
                         boolean blockOnBufferFull,
                         Metrics metrics,
                         Time time)
Create a new record accumulator

Parameters:
batchSize - The size to use when allocating MemoryRecords instances
totalSize - The maximum memory the record accumulator can use.
lingerMs - An artificial delay time to add before declaring a records instance that isn't full ready for sending. This allows time for more records to arrive. Setting a non-zero lingerMs will trade off some latency for potentially better throughput due to more batching (and hence fewer, larger requests).
retryBackoffMs - An artificial delay time to retry the produce request upon receiving an error. This avoids exhausting all retries in a short period of time.
blockOnBufferFull - If true block when we are out of memory; if false throw an exception when we are out of memory
metrics - The metrics
time - The time instance to use
Method Detail

append

public RecordAccumulator.RecordAppendResult append(TopicPartition tp,
                                                   byte[] key,
                                                   byte[] value,
                                                   CompressionType compression,
                                                   Callback callback)
                                            throws java.lang.InterruptedException
Add a record to the accumulator, return the append result

The append result will contain the future metadata, and flag for whether the appended batch is full or a new batch is created

Parameters:
tp - The topic/partition to which this record is being sent
key - The key for the record
value - The value for the record
compression - The compression codec for the record
callback - The user-supplied callback to execute when the request is complete
Throws:
java.lang.InterruptedException

reenqueue

public void reenqueue(RecordBatch batch,
                      long now)
Re-enqueue the given record batch in the accumulator to retry


ready

public RecordAccumulator.ReadyCheckResult ready(Cluster cluster,
                                                long nowMs)
Get a list of nodes whose partitions are ready to be sent, and the time to when any partition will be ready if no partitions are ready yet; If the ready nodes list is non-empty, the timeout value will be 0. Also return the flag for whether there are any unknown leaders for the accumulated partition batches.

A destination node is ready to send data if ANY one of its partition is not backing off the send and ANY of the following are true :

  1. The record set is full
  2. The record set has sat in the accumulator for at least lingerMs milliseconds
  3. The accumulator is out of memory and threads are blocking waiting for data (in this case all partitions are immediately considered ready).
  4. The accumulator has been closed


hasUnsent

public boolean hasUnsent()
Returns:
Whether there is any unsent record in the accumulator.

drain

public java.util.Map<java.lang.Integer,java.util.List<RecordBatch>> drain(Cluster cluster,
                                                                          java.util.Set<Node> nodes,
                                                                          int maxSize,
                                                                          long now)
Drain all the data for the given nodes and collate them into a list of batches that will fit within the specified size on a per-node basis. This method attempts to avoid choosing the same topic-node over and over.

Parameters:
cluster - The current cluster metadata
nodes - The list of node to drain
maxSize - The maximum number of bytes to drain
now - The current unix time in milliseconds
Returns:
A list of RecordBatch for each node specified with total size less than the requested maxSize. TODO: There may be a starvation issue due to iteration order

deallocate

public void deallocate(RecordBatch batch)
Deallocate the record batch


close

public void close()
Close this accumulator and force all the record buffers to be drained