|
|||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | ||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |
java.lang.Objectorg.apache.kafka.clients.producer.internals.RecordAccumulator
public final class RecordAccumulator
This class acts as a queue that accumulates records into MemoryRecords
instances to be sent to the server.
The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless this behavior is explicitly disabled.
Nested Class Summary | |
---|---|
static class |
RecordAccumulator.ReadyCheckResult
|
static class |
RecordAccumulator.RecordAppendResult
|
Constructor Summary | |
---|---|
RecordAccumulator(int batchSize,
long totalSize,
long lingerMs,
long retryBackoffMs,
boolean blockOnBufferFull,
Metrics metrics,
Time time)
Create a new record accumulator |
Method Summary | |
---|---|
RecordAccumulator.RecordAppendResult |
append(TopicPartition tp,
byte[] key,
byte[] value,
CompressionType compression,
Callback callback)
Add a record to the accumulator, return the append result |
void |
close()
Close this accumulator and force all the record buffers to be drained |
void |
deallocate(RecordBatch batch)
Deallocate the record batch |
java.util.Map<java.lang.Integer,java.util.List<RecordBatch>> |
drain(Cluster cluster,
java.util.Set<Node> nodes,
int maxSize,
long now)
Drain all the data for the given nodes and collate them into a list of batches that will fit within the specified size on a per-node basis. |
boolean |
hasUnsent()
|
RecordAccumulator.ReadyCheckResult |
ready(Cluster cluster,
long nowMs)
Get a list of nodes whose partitions are ready to be sent, and the time to when any partition will be ready if no partitions are ready yet; If the ready nodes list is non-empty, the timeout value will be 0. |
void |
reenqueue(RecordBatch batch,
long now)
Re-enqueue the given record batch in the accumulator to retry |
Methods inherited from class java.lang.Object |
---|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait |
Constructor Detail |
---|
public RecordAccumulator(int batchSize, long totalSize, long lingerMs, long retryBackoffMs, boolean blockOnBufferFull, Metrics metrics, Time time)
batchSize
- The size to use when allocating MemoryRecords
instancestotalSize
- The maximum memory the record accumulator can use.lingerMs
- An artificial delay time to add before declaring a records instance that isn't full ready for
sending. This allows time for more records to arrive. Setting a non-zero lingerMs will trade off some
latency for potentially better throughput due to more batching (and hence fewer, larger requests).retryBackoffMs
- An artificial delay time to retry the produce request upon receiving an error. This avoids
exhausting all retries in a short period of time.blockOnBufferFull
- If true block when we are out of memory; if false throw an exception when we are out of
memorymetrics
- The metricstime
- The time instance to useMethod Detail |
---|
public RecordAccumulator.RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, CompressionType compression, Callback callback) throws java.lang.InterruptedException
The append result will contain the future metadata, and flag for whether the appended batch is full or a new batch is created
tp
- The topic/partition to which this record is being sentkey
- The key for the recordvalue
- The value for the recordcompression
- The compression codec for the recordcallback
- The user-supplied callback to execute when the request is complete
java.lang.InterruptedException
public void reenqueue(RecordBatch batch, long now)
public RecordAccumulator.ReadyCheckResult ready(Cluster cluster, long nowMs)
A destination node is ready to send data if ANY one of its partition is not backing off the send and ANY of the following are true :
public boolean hasUnsent()
public java.util.Map<java.lang.Integer,java.util.List<RecordBatch>> drain(Cluster cluster, java.util.Set<Node> nodes, int maxSize, long now)
cluster
- The current cluster metadatanodes
- The list of node to drainmaxSize
- The maximum number of bytes to drainnow
- The current unix time in milliseconds
RecordBatch
for each node specified with total size less than the requested maxSize.
TODO: There may be a starvation issue due to iteration orderpublic void deallocate(RecordBatch batch)
public void close()
|
|||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | ||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |