-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Metrics for Realtime Rows Fetched and Stream Consumer Create Exceptions #12522
Metrics for Realtime Rows Fetched and Stream Consumer Create Exceptions #12522
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #12522 +/- ##
============================================
- Coverage 61.75% 61.60% -0.15%
Complexity 207 207
============================================
Files 2436 2451 +15
Lines 133233 133646 +413
Branches 20636 20689 +53
============================================
+ Hits 82274 82331 +57
- Misses 44911 45226 +315
- Partials 6048 6089 +41
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
I like the idea, but I'm not sure if the current implementation is correct. Specifically, the scenario that concerns me is the one where:
In that scenario it seems possible that Instead of being a gauge, we could have something like: |
That's a valid case.
We can't differentiate b/w this (valid) case and when there are exceptions fetching a batch. Although such exceptions can be tracked using |
This seems like a situation where a histogram would be right choice: it would let us do percentile breakdowns of rows read and I believe it also includes the counter that @gortiz suggested. |
Small tip, instead of rate you can also use increase. I don't think the difference in performance would be important enough to change it, but it may be cleaner what you want to express. Anyway I think both functions are fine. |
LGTM. Unless there is some urgency, I'll wait til tomorrow to merge it just in case other people want to review it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
REALTIME_ROWS_FILTERED("rows", false), | ||
INVALID_REALTIME_ROWS_DROPPED("rows", false), | ||
INCOMPLETE_REALTIME_ROWS_CONSUMED("rows", false), | ||
REALTIME_CONSUMPTION_EXCEPTIONS("exceptions", true), | ||
REALTIME_OFFSET_COMMITS("commits", true), | ||
REALTIME_OFFSET_COMMIT_EXCEPTIONS("exceptions", false), | ||
STREAM_CONSUMER_CREATE_EXCEPTIONS("exceptions", false), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this newly created metric is not used anywhere
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That code wasn't checked-in, sorry about that!
_currentOffset = _partitionGroupConsumer.checkpoint(_currentOffset); | ||
closePartitionGroupConsumer(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible that those 2 lines throw exceptions? If possible, then the exception is not related to consumer creation.
Should we only try catch line 1690 and 1691?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If they throw an exception, the consumer wouldn't get created right? And _currentOffset = _partitionGroupConsumer.checkpoint(_currentOffset);
does throw a runtime exception (at least in code).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if they throw an exception, is the old consumer closed? if not, the consumer is still alive, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess to be sure we should re-throw the exception in the catch part.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you please help me understand why re-throw the exception can address the potential issue I mentioned above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In
1. _currentOffset = _partitionGroupConsumer.checkpoint(_currentOffset);
2. closePartitionGroupConsumer();
3. _partitionGroupConsumer =
_streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus);
_partitionGroupConsumer.start(_currentOffset);
If 1st or 2nd throws an exception, 3rd won't be executed - This is a valid case of incrementing this metric, no?
then the exception is not related to consumer creation.
This metric tracks if a consumer cannot be created for any reason. In this case as well, the consumer isn't created. So we should bump it?
Earlier, I was swallowing this ingestion which would not be bubbled up any runtime exception up the call stack. Re-throwing addressed that.
Please correct me if I am wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the 1st or 2nd line throws an exception,
- that means the old consumer is still running, may be it can still be used for consuming data?
- the error is not really about creating consumer, in this case, maybe we don't want to increase the metric, WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that means the old consumer is still running, may be it can still be used for consuming data?
I am not sure of the control flow here, if it can be re-used. If it can be reused, then yes, we shouldn't bump the metric. If it can't, then we should.
But look at it this way: We want to be alerted in both cases because the control tried creating a new consumer and hoped using that, but it couldn't for some reason. We def want to look into why we couldn't even close the old consumer.
This reverts commit 10766b6.
@@ -77,7 +77,7 @@ rules: | |||
- pattern: "\"org.apache.pinot.common.metrics\"<type=\"ServerMetrics\", name=\"pinot.server.realtimeConsumptionExceptions\"><>(\\w+)" | |||
name: "pinot_server_realtime_consumptionExceptions_$1" | |||
cache: true | |||
- pattern: "\"org.apache.pinot.common.metrics\"<type=\"ServerMetrics\", name=\"pinot.server.([^\\.]*?)_(OFFLINE|REALTIME)\\-(.+)\\-(\\w+).(invalidRealtimeRowsDropped|incompleteRealtimeRowsConsumed|rowsWithErrors|realtimeRowsConsumed)\"><>(\\w+)" | |||
- pattern: "\"org.apache.pinot.common.metrics\"<type=\"ServerMetrics\", name=\"pinot.server.([^\\.]*?)_(OFFLINE|REALTIME)\\-(.+)\\-(\\w+).(invalidRealtimeRowsDropped|incompleteRealtimeRowsConsumed|rowsWithErrors|realtimeRowsConsumed|realtimeRowsFetched|streamConsumerCreateExceptions)\"><>(\\w+)" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not related to this PR but... Why do we need to add an enumeration here? I think we should just say any word here to fetch everything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah didn't want to add another change to this PR that would've increased the regression surface. This will be fixed in another PR.
@@ -427,6 +427,9 @@ protected boolean consumeLoop() | |||
try { | |||
messageBatch = | |||
_partitionGroupConsumer.fetchMessages(_currentOffset, null, _streamConfig.getFetchTimeoutMillis()); | |||
//track realtime rows fetched on a table level. This included valid + invalid rows | |||
_serverMetrics.addMeteredTableValue(_clientId, ServerMeter.REALTIME_ROWS_FETCHED, | |||
messageBatch.getMessageCount()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is not correct. It should be messageBatch.getUnfilteredMessageCount()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the interface:
/**
* @return number of available messages
*/
int getMessageCount();
/**
* @return number of messages returned from the stream
*/
default int getUnfilteredMessageCount() {
return getMessageCount();
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing this out
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we are not using the correct metric and we are ignoring the filtered documents
LGTM (this time for sure! :D). I'll wait a bit in case some other committer can take a look |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
This PR adds two metrics in
RealtimeSegmentDataManager
:1.REALTIME_ROWS_FETCHED
: This is a new gauge that can assume the following values:- 0: When there were no messages in the last batch. This can happen when there are no messages or very low vol of messages in the stream partition.- > 0 : When messages were fetched in the last batch.- -1: When there were exceptions fetching the message batch from the stream partition.This metric can be used to identify the 1st case of no messages in a stream partition as:max_over_time(pinot_server_realtimeLastFetchedBatchSize_Value{}[15m])
: This expr checks the max batch size consumed in the last 15m. If it's 0, it means no ingestion has happened in the last 15m. We can use this as a source alert to silence other alerts such asRealtimeIngestionStopped
.REALTIME_ROWS_FETCHED
: This is a new meter that tracks the number of rows fetched from the stream partition. This basically tracks how many rows there are in the batch, before processing anything.This metric can be use to detect is there is no data in the stream partition as:
rate(pinot_server_realtimeRowsFetched_Count{}[1m]) == 0
Why can't we reuse
pinot_server_realtimeRowsConsumed
?pinot_server_realtimeRowsConsumed
tracks the number of rows that were successfully indexed. If there were problems transforming/indexing the row, those rows aren't counted in this. So it makes it hard to calculate the total number of rows being fetched from the partition stream.STREAM_CONSUMER_CREATE_EXCEPTIONS
: If we face exceptions trying to create a stream consumer.Testing:
rate(pinot_server_realtimeRowsFetched_Count{}[1m])
:STREAM_CONSUMER_CREATE_EXCEPTIONS
: