-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Detect expired messages in Kafka. Log and set a gauge. #12608
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly good!
...ka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
Outdated
Show resolved
Hide resolved
); | ||
} else { | ||
// Record that this batch has no data loss. | ||
_serverMetrics.addMeteredTableValue(_tableStreamName, ServerMeter.STREAM_DATA_LOSS, 0L); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to emit meter when there is no loss
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am thinking of an alert that is triggered when this metric goes to 1. A spike is sufficient to trigger an alert. Without the else block, the metric will not be reset to 0.
...re/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
Outdated
Show resolved
Hide resolved
Co-authored-by: Xiaotian (Jackie) Jiang <[email protected]>
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #12608 +/- ##
============================================
+ Coverage 61.75% 61.78% +0.03%
+ Complexity 207 198 -9
============================================
Files 2436 2450 +14
Lines 133233 133764 +531
Branches 20636 20745 +109
============================================
+ Hits 82274 82648 +374
- Misses 44911 45007 +96
- Partials 6048 6109 +61
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
I tested this commit manually. Gist of quickstart: https://gist.github.com/vrajat/5db2715535db325dde9af8e4a029da47
O/p
|
Co-authored-by: Xiaotian (Jackie) Jiang <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Pinot may take multiple hours between polling a partition in a Kafka topic. One specific example is that Pinot took a long time to flush a segment to disk. In the meantime, messages in Kafka can expire if message retention time is small.
If
auto.offset.reset
is set to smallest, then Kafka will silently move the offset to the first available message leading to data loss.RealtimeSegmentValidationManager is a cron that runs every hour and detects where the offset of a segment in zookeeper is in the past when compared to the smallest offset in Kafka. However since it runs every hour, it may miss the data loss if it happens between runs.
This commit compares the startOffset to the batchFirstOffset.
If startOffset < batchFirstOffset, then log the condition as well as set a meter to 1.
This test is implemented only for Kafka Streams.