Skip to content

Commit

Permalink
Allow for states larger than record size
Browse files Browse the repository at this point in the history
  • Loading branch information
SoerenHenning committed Jan 5, 2024
1 parent 5753c8d commit 0a8e043
Showing 1 changed file with 3 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public ConsumerResult accept(TimestampedRecord record, State state) {
state.setData(data);

if (initCountRandom) {
// Take first 32 bytes of record or less if record is smaller as seed for random
// Take first 32 bytes of record (or less if record is smaller) as seed for random
final long seedForRandom = hasher.hashBytesToLong(record.getData(), 0, Math.min(record.getData().length, 32));
final SplittableRandom random = new SplittableRandom(seedForRandom);
countInit = random.nextInt(outputRate);
Expand All @@ -84,7 +84,8 @@ public ConsumerResult accept(TimestampedRecord record, State state) {
stateBuffer.position(stateBuffer.position() + Long.BYTES); // start timestamp
}
stateBuffer.putLong(record.getTimestamp()); // end timestamp
stateBuffer.put(record.getData(), 0, stateBuffer.remaining()); // fill with data from record
final int bytesToCopy = Math.min(stateBuffer.remaining(), record.getData().length);
stateBuffer.put(record.getData(), 0, bytesToCopy); // fill with data from record

LOGGER.debug("{}: count = {}", name, count);

Expand Down

0 comments on commit 0a8e043

Please sign in to comment.