Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add offset based lag metrics #13298

Merged
merged 3 commits into from
Jun 7, 2024
Merged

Conversation

KKcorps
Copy link
Contributor

@KKcorps KKcorps commented Jun 3, 2024

Adds new methods and metric that reports ingestion delay as difference in offset.

Only works for KafkaConsumer because Kinesis would need methods to get latest offset from upstream which are not present.

@codecov-commenter
Copy link

codecov-commenter commented Jun 3, 2024

Codecov Report

Attention: Patch coverage is 60.00000% with 16 lines in your changes missing coverage. Please review.

Project coverage is 35.21%. Comparing base (59551e4) to head (cc2fbfa).
Report is 565 commits behind head on master.

Files Patch % Lines
...e/data/manager/realtime/IngestionDelayTracker.java 54.83% 9 Missing and 5 partials ⚠️
...a/manager/realtime/RealtimeSegmentDataManager.java 83.33% 0 Missing and 1 partial ⚠️
...ata/manager/realtime/RealtimeTableDataManager.java 0.00% 1 Missing ⚠️
Additional details and impacted files
@@              Coverage Diff              @@
##             master   #13298       +/-   ##
=============================================
- Coverage     61.75%   35.21%   -26.54%     
=============================================
  Files          2436     2469       +33     
  Lines        133233   136105     +2872     
  Branches      20636    21059      +423     
=============================================
- Hits          82274    47933    -34341     
- Misses        44911    84618    +39707     
+ Partials       6048     3554     -2494     
Flag Coverage Δ
custom-integration1 ?
integration 0.00% <0.00%> (-0.01%) ⬇️
integration1 ?
integration2 0.00% <0.00%> (ø)
java-11 46.57% <60.00%> (-15.14%) ⬇️
java-21 0.00% <0.00%> (-61.63%) ⬇️
skip-bytebuffers-false 46.57% <60.00%> (-15.18%) ⬇️
skip-bytebuffers-true 0.00% <0.00%> (-27.73%) ⬇️
temurin 35.21% <60.00%> (-26.54%) ⬇️
unittests 46.57% <60.00%> (-15.18%) ⬇️
unittests1 46.57% <60.00%> (-0.32%) ⬇️
unittests2 ?

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Contributor

@swaminathanmanish swaminathanmanish left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm otherwise.

@@ -174,6 +188,22 @@ private long getIngestionDelayMs(long ingestionTimeMs) {
return agedIngestionDelayMs;
}

private long getPartitionOffsetLag(IngestionOffsets offset) {
if (offset == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious, can this even be null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah we pass that when we want to reset the metric

// Compute aged delay for current partition
// TODO: Support other types of offsets
if (!(msgOffset instanceof LongMsgOffset && latestOffset instanceof LongMsgOffset)) {
return 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are other types that can fall here? Basically we need to be able to explain this metric.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other than this, Kinesis returns a BigInteger offset and Pulsar returns a string offset that is composed of 3 longs

@KKcorps KKcorps merged commit fc358a8 into apache:master Jun 7, 2024
20 checks passed
gortiz pushed a commit to gortiz/pinot that referenced this pull request Jun 14, 2024
* Add offset based lag metrics

* Add tests

* Refactoring

---------

Co-authored-by: Kartik Khare <[email protected]>
@Jackie-Jiang
Copy link
Contributor

We are making one extra metadata read per message batch. Have you verified the overhead? Should we add a config to turn this on/off?

@jadami10
Copy link
Contributor

we're upgrading to 1.2 now, and this increased our kafka offset requests in our QA environment by 3 orders of magnitude. I think at minimum we need a way to turn this off (internally we're just commenting the metric out).

But we likely need a way to ensure offset requests to kafka don't scale with the number of clusters/tables/partitions being consumed.

@jadami10
Copy link
Contributor

the other big issue I see is if we fail to request the latest offset, we stop publishing both the time based and offset based ingestion lag.

@KKcorps
Copy link
Contributor Author

KKcorps commented Sep 19, 2024

hi @jadami10 thanks for bringing this up.

the earlier metric was really noisy since it relies on time column value instead of ingestion time which lead to false positives.

would making this metric configurable help? that way you'd be able to disable it without code changes

let me also see it there's a way to reduce frequency

@jadami10
Copy link
Contributor

the earlier metric was really noisy since it relies on time column value instead of ingestion time which lead to false positives.

That sounds like a misuse of the StreamMessageMetadata. There's 2 fields in https://github.com/apache/pinot/blob/master/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java#L71, getRecordIngestionTimeMs and getFirstStreamRecordIngestionTimeMs with 2 corresponding metrics to distinguish between between source time and publish time.

would making this metric configurable help? that way you'd be able to disable it without code changes

Only if we don't have a way to cap frequency. And it should be off by default.

let me also see it there's a way to reduce frequency

I think a key part here is we need to cap the frequency. For large Pinot deployments, you may have thousands of tables and hundreds of thousands of partitions consumed. So the baseline is O(100k) calls. But adding a new table consuming N partitions shouldn't add N more calls. We effectively need a global throttle, though I don't think there's a way to prevent starvation with large enough scale.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants