-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ability to track offsets of filtered events #12602
Ability to track offsets of filtered events #12602
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #12602 +/- ##
============================================
+ Coverage 61.75% 62.08% +0.33%
+ Complexity 207 198 -9
============================================
Files 2436 2502 +66
Lines 133233 136479 +3246
Branches 20636 21124 +488
============================================
+ Hits 82274 84732 +2458
- Misses 44911 45465 +554
- Partials 6048 6282 +234
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
...re/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
Outdated
Show resolved
Hide resolved
@@ -572,7 +573,7 @@ private boolean processStreamEvents(MessageBatch messagesAndOffsets, long idlePi | |||
// Decode message | |||
StreamDataDecoderResult decodedRow = _streamDataDecoder.decode(messagesAndOffsets.getStreamMessage(index)); | |||
msgMetadata = messagesAndOffsets.getStreamMessage(index).getMetadata(); | |||
StreamPartitionMsgOffset messageOffset = messagesAndOffsets.getNextStreamPartitionMsgOffsetAtIndex(index); | |||
int messageOffset = messagesAndOffsets.getMessageOffsetAtIndex(index); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this give the accurate offset? Will it cause overflow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see usage of this method, so I guess it might already be deprecated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see it marked as deprecated and there is nothing similar to getNextStreamPartitionMsgOffsetAtIndex
which returns StreamPartitionMsgOffset (it has long value). There can be scenarios of overflow.
Will wait for @satishd to chime in here as he might have more context on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MessageBatch's getMessageOffsetAtIndex(index)
javadoc says it gives the offset of the message at the given index. But the respective Kafka implementation does not seem to return that. What does that method really mean and what is it used for?
We need Kafka message's record offset here. So, we need a similar implementation like getNextStreamPartitionMsgOffsetAtIndex()
to get it from the metadata properties and get the message record's offset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@satishd gave a shot to having a getStreamPartitionMsgOffsetAtIndex
in MessageBatch. Let me know if this looks good.
...re/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
Show resolved
Hide resolved
if (_trackFilteredMessageOffsets) { | ||
_segmentLogger.info("Filtered events with offsets: {}", _filteredMessageOffsets); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self-review: adding a nested-if instead of combining it to ensure that _filteredMessageOffsets gets clear every time just in case in future someone changes the logic of pushing to _filteredMessageOffsets.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @tibrewalpratik17 for addressing the review comments. Overall LGTM.
...re/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
Show resolved
Hide resolved
...pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMessage.java
Outdated
Show resolved
Hide resolved
...re/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
Show resolved
Hide resolved
f8ca87f
to
799b623
Compare
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java
Outdated
Show resolved
Hide resolved
pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
Outdated
Show resolved
Hide resolved
.../pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java
Outdated
Show resolved
Hide resolved
5b68ba8
to
7d3579a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@@ -1758,6 +1769,12 @@ private void updateCurrentDocumentCountMetrics() { | |||
_segmentLogger.info( | |||
"Consumed {} events from (rate:{}/s), currentOffset={}, numRowsConsumedSoFar={}, numRowsIndexedSoFar={}", | |||
rowsConsumed, consumedRate, _currentOffset, _numRowsConsumed, _numRowsIndexed); | |||
if (_filteredMessageOffsets.size() > 0) { | |||
if (_trackFilteredMessageOffsets) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check is redundant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left it on purpose if anyone changes the logic to pushing to _filteredMessageOffsets in the future for any other usecase we can still ensure that the config is enabled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @tibrewalpratik17 for the updated PR. LGTM.
label:
observability
Consumed x events
log. Added a separate log and did not clubbing with consumed events for easy debuggability. This will only happen whentrackFilteredMessageOffset
flag is enabled in Ingestion configs.