Loading
Amr Ali Eissa

Sr. Software Engineer

Sr. Data Engineer

Sr. Software Engineer

Sr. Data Engineer

Amr Ali Eissa

Sr. Software Engineer

Sr. Data Engineer

Sr. Software Engineer

Sr. Data Engineer

Blog Post

Consuming Kafka Topics From a Specific Timestamp Using Java

Consuming Kafka Topics From a Specific Timestamp Using Java

When working with Apache Kafka, you may sometimes need to start consuming messages from a particular point in time. This can be crucial for:

  • Replaying events from a known moment.
  • Debugging issues that started at (or after) a certain time.
  • Recovering data while avoiding unnecessary older messages.

In this article, we’ll show how to consume messages from a formatted timestamp (e.g., 2025-02-18 14:24:50), and also explain two essential Kafka consumer configurations:

  • max.partition.fetch.bytes
  • max.poll.records

Step 1: Set Up the Kafka Consumer & Key Configurations

Begin by creating a Java consumer, ensuring you provide the essential properties and pay special attention to the two configurations outlined below:

props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1048576); // 1 MB
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
 

Understanding max.partition.fetch.bytes

  • Meaning: Limits the maximum number of bytes the consumer can fetch per partition in a single request.
  • Impact: Helps control network traffic and consumer memory usage. If your messages are large, increasing this value can prevent fetch issues. However, avoid making it too large, as it can cause memory pressure.
  • Example: Default is often 1MB (1048576 bytes). Increase it to handle bigger messages in a single fetch, but ensure you have enough memory to hold the data.

Understanding max.poll.records

  • Meaning: Specifies the maximum number of records returned in a single poll() call.
  • Impact: Helps you tune the trade-off between throughput and memory usage. High values can increase throughput but also risk hitting memory or processing bottlenecks in your application if messages are processed in larger batches.
  • Example: Setting this to 500 (as above) means you’ll get at most 500 records at a time—even if you can fetch more data within max.partition.fetch.bytes.

Key Difference

  • max.partition.fetch.bytes governs data size in bytes (limiting large messages).
  • max.poll.records governs the total number of messages returned per poll (limiting large batches).

Step 2: Assign Partitions & Seek From a Formatted Timestamp

To consume messages starting from a human-readable timestamp like "2025-02-18 14:24:50", you’ll need to:

  1. Convert the timestamp string into milliseconds (epoch time).
  2. Assign partitions and seek offsets based on that timestamp.
  3. Handle the case where no messages exist at or after that timestamp.

2a: Parse the Timestamp

String timestampStr = "2025-02-18 14:24:50";
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long timestampMillis;

try {
    Date parsedDate = dateFormat.parse(timestampStr);
    timestampMillis = parsedDate.getTime();
} catch (ParseException e) {
    throw new RuntimeException("Invalid timestamp format. Please use yyyy-MM-dd HH:mm:ss", e);
}
 

This converts your human-friendly date/time string to milliseconds since epoch.

2b: Subscribe & Poll Once to Ensure Partition Assignment

consumer.subscribe(Pattern.compile("test-topic"));

// Do an initial poll to ensure partitions are assigned
consumer.poll(Duration.ofSeconds(1));
 

A brief poll helps the consumer discover its assigned partitions.

2c: Seek the Consumer to the Timestamp-Based Offsets

Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
for (TopicPartition partition : consumer.assignment()) {
    timestampsToSearch.put(partition, timestampMillis);
}

// Fetch offsets for the requested timestamps
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = consumer.offsetsForTimes(timestampsToSearch);

// Seek each partition to the correct offset
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsetsForTimes.entrySet()) {
    TopicPartition partition = entry.getKey();
    OffsetAndTimestamp offsetAndTimestamp = entry.getValue();

    if (offsetAndTimestamp != null) {
        // We found messages at or after the given timestamp
        consumer.seek(partition, offsetAndTimestamp.offset());
    } else {
        // No messages exist at or after the given timestamp, so we decide how to handle
        // In this case, we move consumer to the end of the partition.
        consumer.seekToEnd(Collections.singleton(partition));
    }
}

Tip: You can also decide to seekToBeginning if you want to restart consumption from the earliest offset when no message at or after the timestamp is found. It depends on your business requirements. The code above demonstrates seeking to the end to skip messages entirely if none match your requested time.

Step 3: Continuously Consume & Process Messages

With the consumer positioned at the correct offsets, simply fetch and process messages in a loop:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));

    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("Consumed message: key=%s, value=%s, offset=%d, timestamp=%s%n",
                record.key(),
                record.value(),
                record.offset(),
                new Date(record.timestamp()).toString());
    }
}
 This loop will continuously fetch and process messages starting from the calculated offsets.
 

Additional Considerations

  • Exception Handling
    • Wrap your consumption logic with robust error handling, especially around parsing timestamps and polling messages.
  • Memory & Throughput Trade-offs
    • Choose appropriate values for max.partition.fetch.bytes and max.poll.records. Start with the defaults or slightly adjusted settings, then monitor your application’s throughput and memory usage, tuning further if needed.
  • Handling No Messages at the Timestamp
    • If offsetsForTimes returns null for a given partition (meaning Kafka found no messages at or after your timestamp), decide whether to skip that partition entirely, jump to the earliest offset, or gracefully shut down if your application requires data from that time.
  • Group Management
    • If using a consumer group, keep in mind that partition rebalances can occur.

 

Summary

In this article, we explored:

Timestamp-Based Consumption

  • How to parse a human-friendly date/time and use it to seek Kafka offsets.

Key Consumer Configurations

  • max.partition.fetch.bytes: Governs the maximum data size fetched per partition in a single request.
  • max.poll.records: Limits how many messages are returned in a single poll.

Handling Edge Cases

  • When no messages exist for or after a certain timestamp, deciding whether to seek to the end or beginning of the partition.

By leveraging these techniques, you can precisely start consumption from the moment in time that matters to your use case, while also tuning your application’s performance and resource usage. This level of control can be extremely valuable for event replay, debugging, and targeted data recovery scenarios in Apache Kafka.

Tags: