Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ability to track offsets of filtered events #12602

Merged
merged 1 commit into from
Apr 18, 2024

Conversation

tibrewalpratik17
Copy link
Contributor

@tibrewalpratik17 tibrewalpratik17 commented Mar 8, 2024

label:
observability

  • Adding ability to track offsets of filtered events. Publishing this log every 1 minute along with Consumed x events log. Added a separate log and did not clubbing with consumed events for easy debuggability. This will only happen when trackFilteredMessageOffset flag is enabled in Ingestion configs.

@codecov-commenter
Copy link

codecov-commenter commented Mar 8, 2024

Codecov Report

Attention: Patch coverage is 37.50000% with 10 lines in your changes missing coverage. Please review.

Project coverage is 62.08%. Comparing base (59551e4) to head (7d3579a).
Report is 1278 commits behind head on master.

Files with missing lines Patch % Lines
...a/manager/realtime/RealtimeSegmentDataManager.java 16.66% 8 Missing and 2 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #12602      +/-   ##
============================================
+ Coverage     61.75%   62.08%   +0.33%     
+ Complexity      207      198       -9     
============================================
  Files          2436     2502      +66     
  Lines        133233   136479    +3246     
  Branches      20636    21124     +488     
============================================
+ Hits          82274    84732    +2458     
- Misses        44911    45465     +554     
- Partials       6048     6282     +234     
Flag Coverage Δ
custom-integration1 <0.01% <0.00%> (-0.01%) ⬇️
integration <0.01% <0.00%> (-0.01%) ⬇️
integration1 <0.01% <0.00%> (-0.01%) ⬇️
integration2 0.00% <0.00%> (ø)
java-11 <0.01% <0.00%> (-61.71%) ⬇️
java-21 62.08% <37.50%> (+0.45%) ⬆️
skip-bytebuffers-false 62.06% <37.50%> (+0.32%) ⬆️
skip-bytebuffers-true 62.03% <37.50%> (+34.31%) ⬆️
temurin 62.08% <37.50%> (+0.33%) ⬆️
unittests 62.08% <37.50%> (+0.33%) ⬆️
unittests1 46.50% <37.50%> (-0.39%) ⬇️
unittests2 28.01% <0.00%> (+0.28%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@@ -572,7 +573,7 @@ private boolean processStreamEvents(MessageBatch messagesAndOffsets, long idlePi
// Decode message
StreamDataDecoderResult decodedRow = _streamDataDecoder.decode(messagesAndOffsets.getStreamMessage(index));
msgMetadata = messagesAndOffsets.getStreamMessage(index).getMetadata();
StreamPartitionMsgOffset messageOffset = messagesAndOffsets.getNextStreamPartitionMsgOffsetAtIndex(index);
int messageOffset = messagesAndOffsets.getMessageOffsetAtIndex(index);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this give the accurate offset? Will it cause overflow?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see usage of this method, so I guess it might already be deprecated

Copy link
Contributor Author

@tibrewalpratik17 tibrewalpratik17 Mar 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see it marked as deprecated and there is nothing similar to getNextStreamPartitionMsgOffsetAtIndex which returns StreamPartitionMsgOffset (it has long value). There can be scenarios of overflow.

Will wait for @satishd to chime in here as he might have more context on this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MessageBatch's getMessageOffsetAtIndex(index) javadoc says it gives the offset of the message at the given index. But the respective Kafka implementation does not seem to return that. What does that method really mean and what is it used for?

We need Kafka message's record offset here. So, we need a similar implementation like getNextStreamPartitionMsgOffsetAtIndex() to get it from the metadata properties and get the message record's offset.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@satishd gave a shot to having a getStreamPartitionMsgOffsetAtIndex in MessageBatch. Let me know if this looks good.

Comment on lines +1756 to +1775
if (_trackFilteredMessageOffsets) {
_segmentLogger.info("Filtered events with offsets: {}", _filteredMessageOffsets);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self-review: adding a nested-if instead of combining it to ensure that _filteredMessageOffsets gets clear every time just in case in future someone changes the logic of pushing to _filteredMessageOffsets.

Copy link
Member

@satishd satishd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @tibrewalpratik17 for addressing the review comments. Overall LGTM.

@tibrewalpratik17 tibrewalpratik17 changed the title Improve logging during filtering event consumption Ability to track offsets of filtered events Apr 17, 2024
Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@@ -1758,6 +1769,12 @@ private void updateCurrentDocumentCountMetrics() {
_segmentLogger.info(
"Consumed {} events from (rate:{}/s), currentOffset={}, numRowsConsumedSoFar={}, numRowsIndexedSoFar={}",
rowsConsumed, consumedRate, _currentOffset, _numRowsConsumed, _numRowsIndexed);
if (_filteredMessageOffsets.size() > 0) {
if (_trackFilteredMessageOffsets) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is redundant

Copy link
Contributor Author

@tibrewalpratik17 tibrewalpratik17 Apr 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left it on purpose if anyone changes the logic to pushing to _filteredMessageOffsets in the future for any other usecase we can still ensure that the config is enabled.

Copy link
Member

@satishd satishd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @tibrewalpratik17 for the updated PR. LGTM.

@Jackie-Jiang Jackie-Jiang merged commit 022e0a0 into apache:master Apr 18, 2024
20 checks passed
@tibrewalpratik17 tibrewalpratik17 deleted the update_error_log2 branch May 16, 2024 15:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants