Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metrics for Realtime Rows Fetched and Stream Consumer Create Exceptions #12522

Merged
merged 13 commits into from
Mar 4, 2024

Conversation

suddendust
Copy link
Contributor

@suddendust suddendust commented Feb 29, 2024

This PR adds two metrics in RealtimeSegmentDataManager:

1. REALTIME_ROWS_FETCHED: This is a new gauge that can assume the following values:

- 0: When there were no messages in the last batch. This can happen when there are no messages or very low vol of messages in the stream partition.
- > 0 : When messages were fetched in the last batch.
- -1: When there were exceptions fetching the message batch from the stream partition.

This metric can be used to identify the 1st case of no messages in a stream partition as:

max_over_time(pinot_server_realtimeLastFetchedBatchSize_Value{}[15m]): This expr checks the max batch size consumed in the last 15m. If it's 0, it means no ingestion has happened in the last 15m. We can use this as a source alert to silence other alerts such as RealtimeIngestionStopped.

  1. REALTIME_ROWS_FETCHED: This is a new meter that tracks the number of rows fetched from the stream partition. This basically tracks how many rows there are in the batch, before processing anything.

This metric can be use to detect is there is no data in the stream partition as:

rate(pinot_server_realtimeRowsFetched_Count{}[1m]) == 0

Why can't we reuse pinot_server_realtimeRowsConsumed?

pinot_server_realtimeRowsConsumed tracks the number of rows that were successfully indexed. If there were problems transforming/indexing the row, those rows aren't counted in this. So it makes it hard to calculate the total number of rows being fetched from the partition stream.

  1. STREAM_CONSUMER_CREATE_EXCEPTIONS: If we face exceptions trying to create a stream consumer.

Testing:

rate(pinot_server_realtimeRowsFetched_Count{}[1m]):

  • When there were messages in the last fetched batch:
Screenshot 2024-02-29 at 5 43 30 PM
  • No data in input stream

Uploading Screenshot 2024-02-29 at 5.48.44 PM.png…

  • Exposed metric:
Screenshot 2024-02-29 at 5 33 35 PM

STREAM_CONSUMER_CREATE_EXCEPTIONS:

Screenshot 2024-02-29 at 3 22 49 PM Screenshot 2024-02-29 at 3 26 08 PM

@codecov-commenter
Copy link

codecov-commenter commented Feb 29, 2024

Codecov Report

Attention: Patch coverage is 33.33333% with 16 lines in your changes are missing coverage. Please review.

Project coverage is 61.60%. Comparing base (59551e4) to head (8068a1b).
Report is 42 commits behind head on master.

Files Patch % Lines
...a/manager/realtime/RealtimeSegmentDataManager.java 27.27% 16 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #12522      +/-   ##
============================================
- Coverage     61.75%   61.60%   -0.15%     
  Complexity      207      207              
============================================
  Files          2436     2451      +15     
  Lines        133233   133646     +413     
  Branches      20636    20689      +53     
============================================
+ Hits          82274    82331      +57     
- Misses        44911    45226     +315     
- Partials       6048     6089      +41     
Flag Coverage Δ
custom-integration1 ?
integration <0.01% <0.00%> (-0.01%) ⬇️
integration1 <0.01% <0.00%> (-0.01%) ⬇️
integration2 0.00% <0.00%> (ø)
java-11 0.00% <0.00%> (-61.71%) ⬇️
java-21 61.60% <33.33%> (-0.03%) ⬇️
skip-bytebuffers-false 34.89% <33.33%> (-26.85%) ⬇️
skip-bytebuffers-true 61.59% <33.33%> (+33.86%) ⬆️
temurin 61.60% <33.33%> (-0.15%) ⬇️
unittests 61.60% <33.33%> (-0.15%) ⬇️
unittests1 46.77% <33.33%> (-0.12%) ⬇️
unittests2 27.64% <0.00%> (-0.09%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@gortiz
Copy link
Contributor

gortiz commented Feb 29, 2024

I like the idea, but I'm not sure if the current implementation is correct. Specifically, the scenario that concerns me is the one where:

  • we fetch batches far often than the Prometheus polling rate
  • the source mostly emits not empty batches

In that scenario it seems possible that REALTIME_LAST_FETCHED_BATCH_SIZE would be 0 each time Prometheus polls it even if we are actually ingesting.

Instead of being a gauge, we could have something like: REALTIME_FETCHED_ROWS which could be a counter. This metric would be non decrement and we could just apply rate (or some other operation) to that metric. If it doesn't increase in a while, we can be sure that there are no data in the source and therefore we can skip some alerts.

@suddendust
Copy link
Contributor Author

That's a valid case.

If it doesn't increase in a while, we can be sure that there are no data in the source and therefore we can skip some alerts.

We can't differentiate b/w this (valid) case and when there are exceptions fetching a batch. Although such exceptions can be tracked using REALTIME_CONSUMPTION_EXCEPTIONS.

@ege-st
Copy link
Contributor

ege-st commented Feb 29, 2024

This seems like a situation where a histogram would be right choice: it would let us do percentile breakdowns of rows read and I believe it also includes the counter that @gortiz suggested.

@gortiz
Copy link
Contributor

gortiz commented Feb 29, 2024

Small tip, instead of rate you can also use increase. I don't think the difference in performance would be important enough to change it, but it may be cleaner what you want to express.

Anyway I think both functions are fine.

@gortiz
Copy link
Contributor

gortiz commented Feb 29, 2024

LGTM. Unless there is some urgency, I'll wait til tomorrow to merge it just in case other people want to review it.

Copy link
Contributor

@zhtaoxiang zhtaoxiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

REALTIME_ROWS_FILTERED("rows", false),
INVALID_REALTIME_ROWS_DROPPED("rows", false),
INCOMPLETE_REALTIME_ROWS_CONSUMED("rows", false),
REALTIME_CONSUMPTION_EXCEPTIONS("exceptions", true),
REALTIME_OFFSET_COMMITS("commits", true),
REALTIME_OFFSET_COMMIT_EXCEPTIONS("exceptions", false),
STREAM_CONSUMER_CREATE_EXCEPTIONS("exceptions", false),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this newly created metric is not used anywhere

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That code wasn't checked-in, sorry about that!

Comment on lines +1688 to +1689
_currentOffset = _partitionGroupConsumer.checkpoint(_currentOffset);
closePartitionGroupConsumer();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that those 2 lines throw exceptions? If possible, then the exception is not related to consumer creation.

Should we only try catch line 1690 and 1691?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If they throw an exception, the consumer wouldn't get created right? And _currentOffset = _partitionGroupConsumer.checkpoint(_currentOffset); does throw a runtime exception (at least in code).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if they throw an exception, is the old consumer closed? if not, the consumer is still alive, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess to be sure we should re-throw the exception in the catch part.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please help me understand why re-throw the exception can address the potential issue I mentioned above?

Copy link
Contributor Author

@suddendust suddendust Mar 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In

     1.  _currentOffset = _partitionGroupConsumer.checkpoint(_currentOffset);
     2.  closePartitionGroupConsumer();
     3.  _partitionGroupConsumer =
          _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus);
      _partitionGroupConsumer.start(_currentOffset);

If 1st or 2nd throws an exception, 3rd won't be executed - This is a valid case of incrementing this metric, no?

then the exception is not related to consumer creation.

This metric tracks if a consumer cannot be created for any reason. In this case as well, the consumer isn't created. So we should bump it?

Earlier, I was swallowing this ingestion which would not be bubbled up any runtime exception up the call stack. Re-throwing addressed that.

Please correct me if I am wrong.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the 1st or 2nd line throws an exception,

  1. that means the old consumer is still running, may be it can still be used for consuming data?
  2. the error is not really about creating consumer, in this case, maybe we don't want to increase the metric, WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that means the old consumer is still running, may be it can still be used for consuming data?

I am not sure of the control flow here, if it can be re-used. If it can be reused, then yes, we shouldn't bump the metric. If it can't, then we should.

But look at it this way: We want to be alerted in both cases because the control tried creating a new consumer and hoped using that, but it couldn't for some reason. We def want to look into why we couldn't even close the old consumer.

@@ -77,7 +77,7 @@ rules:
- pattern: "\"org.apache.pinot.common.metrics\"<type=\"ServerMetrics\", name=\"pinot.server.realtimeConsumptionExceptions\"><>(\\w+)"
name: "pinot_server_realtime_consumptionExceptions_$1"
cache: true
- pattern: "\"org.apache.pinot.common.metrics\"<type=\"ServerMetrics\", name=\"pinot.server.([^\\.]*?)_(OFFLINE|REALTIME)\\-(.+)\\-(\\w+).(invalidRealtimeRowsDropped|incompleteRealtimeRowsConsumed|rowsWithErrors|realtimeRowsConsumed)\"><>(\\w+)"
- pattern: "\"org.apache.pinot.common.metrics\"<type=\"ServerMetrics\", name=\"pinot.server.([^\\.]*?)_(OFFLINE|REALTIME)\\-(.+)\\-(\\w+).(invalidRealtimeRowsDropped|incompleteRealtimeRowsConsumed|rowsWithErrors|realtimeRowsConsumed|realtimeRowsFetched|streamConsumerCreateExceptions)\"><>(\\w+)"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to this PR but... Why do we need to add an enumeration here? I think we should just say any word here to fetch everything.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah didn't want to add another change to this PR that would've increased the regression surface. This will be fixed in another PR.

@@ -427,6 +427,9 @@ protected boolean consumeLoop()
try {
messageBatch =
_partitionGroupConsumer.fetchMessages(_currentOffset, null, _streamConfig.getFetchTimeoutMillis());
//track realtime rows fetched on a table level. This included valid + invalid rows
_serverMetrics.addMeteredTableValue(_clientId, ServerMeter.REALTIME_ROWS_FETCHED,
messageBatch.getMessageCount());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is not correct. It should be messageBatch.getUnfilteredMessageCount().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the interface:

  /**
   * @return number of available messages
   */
  int getMessageCount();

  /**
   * @return number of messages returned from the stream
   */
  default int getUnfilteredMessageCount() {
    return getMessageCount();
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out

Copy link
Contributor

@gortiz gortiz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are not using the correct metric and we are ignoring the filtered documents

@gortiz
Copy link
Contributor

gortiz commented Mar 4, 2024

LGTM (this time for sure! :D). I'll wait a bit in case some other committer can take a look

Copy link
Contributor

@zhtaoxiang zhtaoxiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@xiangfu0 xiangfu0 merged commit dbf55b3 into apache:master Mar 4, 2024
21 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants