Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Detect expired messages in Kafka. Log and set a gauge. #12608

Merged
merged 6 commits into from
Mar 9, 2024

Conversation

vrajat
Copy link
Collaborator

@vrajat vrajat commented Mar 8, 2024

Pinot may take multiple hours between polling a partition in a Kafka topic. One specific example is that Pinot took a long time to flush a segment to disk. In the meantime, messages in Kafka can expire if message retention time is small.
If auto.offset.reset is set to smallest, then Kafka will silently move the offset to the first available message leading to data loss.
RealtimeSegmentValidationManager is a cron that runs every hour and detects where the offset of a segment in zookeeper is in the past when compared to the smallest offset in Kafka. However since it runs every hour, it may miss the data loss if it happens between runs.

This commit compares the startOffset to the batchFirstOffset.

  • startOffset: Offset requested by the database for the next batch.
  • batchFirstOffset: First offset of the batch of messages received from the stream.
    If startOffset < batchFirstOffset, then log the condition as well as set a meter to 1.

This test is implemented only for Kafka Streams.

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly good!

);
} else {
// Record that this batch has no data loss.
_serverMetrics.addMeteredTableValue(_tableStreamName, ServerMeter.STREAM_DATA_LOSS, 0L);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to emit meter when there is no loss

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking of an alert that is triggered when this metric goes to 1. A spike is sufficient to trigger an alert. Without the else block, the metric will not be reset to 0.

@codecov-commenter
Copy link

codecov-commenter commented Mar 8, 2024

Codecov Report

Attention: Patch coverage is 52.63158% with 9 lines in your changes are missing coverage. Please review.

Project coverage is 61.78%. Comparing base (59551e4) to head (f5d08a2).
Report is 101 commits behind head on master.

Files Patch % Lines
...a/manager/realtime/RealtimeSegmentDataManager.java 11.11% 7 Missing and 1 partial ⚠️
...pinot/plugin/stream/kafka20/KafkaMessageBatch.java 66.66% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #12608      +/-   ##
============================================
+ Coverage     61.75%   61.78%   +0.03%     
+ Complexity      207      198       -9     
============================================
  Files          2436     2450      +14     
  Lines        133233   133764     +531     
  Branches      20636    20745     +109     
============================================
+ Hits          82274    82648     +374     
- Misses        44911    45007      +96     
- Partials       6048     6109      +61     
Flag Coverage Δ
custom-integration1 <0.01% <0.00%> (-0.01%) ⬇️
integration <0.01% <0.00%> (-0.01%) ⬇️
integration1 <0.01% <0.00%> (-0.01%) ⬇️
integration2 0.00% <0.00%> (ø)
java-11 61.74% <52.63%> (+0.03%) ⬆️
java-21 61.66% <52.63%> (+0.04%) ⬆️
skip-bytebuffers-false 61.75% <52.63%> (+<0.01%) ⬆️
skip-bytebuffers-true 61.64% <52.63%> (+33.92%) ⬆️
temurin 61.78% <52.63%> (+0.03%) ⬆️
unittests 61.78% <52.63%> (+0.03%) ⬆️
unittests1 46.90% <33.33%> (+<0.01%) ⬆️
unittests2 27.70% <31.57%> (-0.03%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@vrajat
Copy link
Collaborator Author

vrajat commented Mar 8, 2024

I tested this commit manually. Gist of quickstart: https://gist.github.com/vrajat/5db2715535db325dde9af8e4a029da47
The workflow is as follows:

  • Insert a batch of 400 messages.
  • Pinot ingests these messages
  • Pause consumption
  • Insert another batch of 400 messages
  • Delete records up to offset 500. So 100 records are missing.
  • Resume consumption.

O/p

***** Cluster is running *****
***** Publish first batch of data to kafka streams *****
***** Starting githubEvents data stream and publishing to Kafka *****
***** Pausing consumption *****
***** Publish second batch of data to kafka streams *****
***** Starting githubEvents data stream and publishing to Kafka *****
Executing records delete operation
Records delete operation completed:
partition: githubEvents-1	low_watermark: 500
partition: githubEvents-0	low_watermark: 500
Offset is 500
Offset is 500
***** Publish third batch of data to kafka streams *****
***** Starting githubEvents data stream and publishing to Kafka *****
***** Resume consumption *****
You can always go to http://localhost:9000 to play around in the query console
2024/03/08 12:38:01.004 ERROR [PinotLLCRealtimeSegmentManager] [pool-23-thread-2] Data lost from offset: 400 to: 500 for partition: 0 of table: githubEvents_REALTIME
2024/03/08 12:38:01.013 ERROR [PinotLLCRealtimeSegmentManager] [pool-23-thread-2] Data lost from offset: 400 to: 500 for partition: 1 of table: githubEvents_REALTIME

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@Jackie-Jiang Jackie-Jiang merged commit b11d53c into apache:master Mar 9, 2024
19 checks passed
@vrajat vrajat deleted the segment_metrics2 branch March 9, 2024 16:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants