-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cleanup the consumer interfaces and legacy code #12697
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #12697 +/- ##
============================================
+ Coverage 61.75% 62.00% +0.25%
+ Complexity 207 198 -9
============================================
Files 2436 2458 +22
Lines 133233 134667 +1434
Branches 20636 20812 +176
============================================
+ Hits 82274 83500 +1226
- Misses 44911 45021 +110
- Partials 6048 6146 +98
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
1e66caf
to
a9f3cf7
Compare
a9f3cf7
to
d7d7a51
Compare
...tion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
Outdated
Show resolved
Hide resolved
...tion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
Show resolved
Hide resolved
d7d7a51
to
9a75dbf
Compare
I've reverted the change of removing separate executor for Kinesis and Pulsar to make this PR only focus on the interface change |
9a75dbf
to
f4bab13
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall this cleanup makes sense. Thanks for taking this up and explaining the rationale.
Completed core apis & kafka review. Looking through kinesis, pulsar.
_segmentLogger.error(errorMessage, e); | ||
_realtimeTableDataManager.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), errorMessage, e)); | ||
} | ||
} | ||
} | ||
_currentOffset = messageOffset; | ||
_currentOffset = nextOffset; | ||
_numRowsIndexed = _realtimeSegment.getNumDocsIndexed(); | ||
_numRowsConsumed++; | ||
streamMessageCount++; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for cleaning this up. Hope this can be verified.
if (indexedMessageCount > 0) {
// Record Ingestion delay for this partition with metadata for last message we processed
updateIngestionDelay(_lastRowMetadata);
} else if (!hasTransformedRows && (msgMetadata != null)) {
// If all messages were filtered by transformation, we still attempt to update ingestion delay using
// the metadata for the last message we processed if any.
updateIngestionDelay(msgMetadata);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic was added in #10418, and unfortunately seems like there is no test added.
Since we already provided a default implementation of getLastMessageMetadata()
, it should be safe to simplify this logic.
@Override | ||
public StreamMessage getStreamMessage(int index) { | ||
return _messageList.get(index); | ||
public StreamPartitionMsgOffset getFirstMessageOffset() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this backwards compatible, since it can return null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We perform a null check after reading the value (the logic already exists), so this is actually a bug fix. When there is no value consumed, we should return null
instead of negative offset. It is backward compatible.
Added an @Nullable
annotation to this method
if (!records.isEmpty()) { | ||
firstOffset = records.get(0).offset(); | ||
_lastFetchedOffset = records.get(records.size() - 1).offset(); | ||
offsetOfNextBatch = _lastFetchedOffset + 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes sense and simplifies the logic. Hope we can comprehensively verify through integ tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is verified. If this value is not properly set, the consumption won't match the expected rows
f4bab13
to
5c693ce
Compare
5c693ce
to
082aada
Compare
|
||
public PulsarPartitionLevelConsumer(String clientId, StreamConfig streamConfig, | ||
PartitionGroupConsumptionStatus partitionGroupConsumptionStatus) { | ||
// TODO: Revisit the logic of using a separate executor to manage the request timeout. Currently it is not thread safe |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is enableKeyValueStitch not used any more?
@KKcorps - Could you verify as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is handled within PulsarUtils.buildPulsarStreamMessage()
} | ||
return buildKinesisMessageBatch(startOffset, messages, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there still a race condition if the executor is running along with buildKinesisMessageBatch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. This race condition exists in the current code as well. Will try to address it separately.
_shardIteratorType = kinesisConfig.getShardIteratorType(); | ||
_rpsLimit = kinesisConfig.getRpsLimit(); | ||
_executorService = Executors.newSingleThreadExecutor(); | ||
// TODO: Revisit the logic of using a separate executor to manage the request timeout. Currently it is not thread safe |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will be good to understand why we created an executor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO we should not. Currently I believe we created it to enforce the timeout. @KKcorps to confirm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the cleanup.
Thanks @Jackie-Jiang for cleaning up and fixing these interfaces. It helps address issues like ##12602 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry for the delay reviewing. Everything looks backwards compatible to me and should still work for those of us within at least 1 version up to date with these interfaces.
* @return A {@link GenericRow} that encapsulates the headers in the ingested row | ||
* Returns the stream offset of the message. | ||
*/ | ||
@Nullable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
out of curiosity, why do we use nulls and nullable annnotation instead of Optionals?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both historic reason and performance reason:
- Optional was introduced in java 8 (it is handy to be used along with function APIs), and majority of the code is using
null
- There could be performance overhead because it is an extra Object wrapper (24 bytes) over a reference. In majority of the places it doesn't matter, but checkout this interesting post
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's a solid post. thank you for the reference!
if (metadata != null) { | ||
if (metadata.getHeaders() != null) { | ||
metadata.getHeaders().getFieldToValueMap() | ||
.forEach((key, value) -> row.putValue(HEADER_KEY_PREFIX + key, value)); | ||
} | ||
metadata.getRecordMetadata() | ||
.forEach((key, value) -> row.putValue(METADATA_KEY_PREFIX + key, value)); | ||
if (metadata.getRecordMetadata() != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe
Optional.ofNullable(metadata.getRecordMetadata())
.orElse(Collections.emptyMap())
is a better pattern for the future so we don't call get twice
Deprecated:
PartitionLevelConsumer
: UsePartitionGroupConsumer
insteadPartitionGroupConsumer
:endOffset
fromfetchMessages()
because offset might not be monotonically increasing for some consumers (e.g.Kinesis
), and we should not use end offset to filter messagesMessageBatch
:getMessageAtIndex()
: UsegetStreamMessage()
insteadgetMessageBytesAtIndex()
: UsegetStreamMessage()
insteadgetMessageLengthAtIndex()
: UsegetStreamMessage()
insteadgetMessageOffsetAtIndex()
: UsegetStreamMessage()
instead, where offset info is embedded in theStreamMessageMetadata
getMetadataAtIndex()
: UsegetStreamMessage()
insteadgetNextStreamMessageOffsetAtIndex()
: UsegetOffsetOfNextBatch()
insteadgetNextStreamPartitionMsgOffsetAtIndex()
: UsegetOffsetOfNextBatch()
insteadStreamPartitionMsgOffset
:fromString()
: Should be a static methodRemoved Metadata Extractor interfaces because they are not well designed and not really pluggable (plug at consumer level should be good enough)
Add offset info into
RowMetadata
:StreamPartitionMsgOffset getOffset()
StreamPartitionMsgOffset getNextOffset()
Bugfix:
Clean up the code to adapt the above changes.