Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup the consumer interfaces and legacy code #12697

Merged
merged 2 commits into from
Apr 7, 2024

Conversation

Jackie-Jiang
Copy link
Contributor

@Jackie-Jiang Jackie-Jiang commented Mar 22, 2024

Deprecated:

  • PartitionLevelConsumer: Use PartitionGroupConsumer instead
  • In PartitionGroupConsumer:
    • Remove endOffset from fetchMessages() because offset might not be monotonically increasing for some consumers (e.g. Kinesis), and we should not use end offset to filter messages
  • In MessageBatch:
    • getMessageAtIndex(): Use getStreamMessage() instead
    • getMessageBytesAtIndex(): Use getStreamMessage() instead
    • getMessageLengthAtIndex(): Use getStreamMessage() instead
    • getMessageOffsetAtIndex(): Use getStreamMessage() instead, where offset info is embedded in the StreamMessageMetadata
    • getMetadataAtIndex(): Use getStreamMessage() instead
    • getNextStreamMessageOffsetAtIndex(): Use getOffsetOfNextBatch() instead
    • getNextStreamPartitionMsgOffsetAtIndex(): Use getOffsetOfNextBatch() instead
  • In StreamPartitionMsgOffset:
    • fromString(): Should be a static method

Removed Metadata Extractor interfaces because they are not well designed and not really pluggable (plug at consumer level should be good enough)

Add offset info into RowMetadata:

  • StreamPartitionMsgOffset getOffset()
  • StreamPartitionMsgOffset getNextOffset()

Bugfix:

  • With offset info properly attached to message metadata, we can correctly track the offset for each message. Currently we are using the next offset for each message

Clean up the code to adapt the above changes.

@codecov-commenter
Copy link

codecov-commenter commented Mar 22, 2024

Codecov Report

Attention: Patch coverage is 54.02844% with 194 lines in your changes are missing coverage. Please review.

Project coverage is 62.00%. Comparing base (59551e4) to head (082aada).
Report is 204 commits behind head on master.

Files Patch % Lines
...apache/pinot/plugin/stream/pulsar/PulsarUtils.java 53.26% 38 Missing and 5 partials ⚠️
...lugin/stream/kinesis/KinesisConnectionHandler.java 5.88% 32 Missing ⚠️
...java/org/apache/pinot/spi/stream/MessageBatch.java 25.00% 12 Missing and 3 partials ⚠️
...e/pinot/plugin/stream/kinesis/KinesisConsumer.java 65.85% 8 Missing and 6 partials ⚠️
...a/manager/realtime/RealtimeSegmentDataManager.java 54.16% 8 Missing and 3 partials ⚠️
...in/stream/kafka20/KafkaPartitionLevelConsumer.java 68.57% 10 Missing and 1 partial ⚠️
...in/stream/kinesis/KinesisPartitionGroupOffset.java 33.33% 10 Missing ⚠️
...apache/pinot/spi/stream/StreamMessageMetadata.java 58.33% 10 Missing ⚠️
.../pulsar/PulsarPartitionLevelConnectionHandler.java 52.94% 5 Missing and 3 partials ⚠️
...ava/org/apache/pinot/spi/stream/StreamMessage.java 36.36% 7 Missing ⚠️
... and 13 more
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #12697      +/-   ##
============================================
+ Coverage     61.75%   62.00%   +0.25%     
+ Complexity      207      198       -9     
============================================
  Files          2436     2458      +22     
  Lines        133233   134667    +1434     
  Branches      20636    20812     +176     
============================================
+ Hits          82274    83500    +1226     
- Misses        44911    45021     +110     
- Partials       6048     6146      +98     
Flag Coverage Δ
custom-integration1 <0.01% <0.00%> (-0.01%) ⬇️
integration <0.01% <0.00%> (-0.01%) ⬇️
integration1 <0.01% <0.00%> (-0.01%) ⬇️
integration2 0.00% <0.00%> (ø)
java-11 61.94% <54.02%> (+0.23%) ⬆️
java-21 61.87% <54.02%> (+0.24%) ⬆️
skip-bytebuffers-false 61.97% <54.02%> (+0.22%) ⬆️
skip-bytebuffers-true 61.84% <54.02%> (+34.11%) ⬆️
temurin 62.00% <54.02%> (+0.25%) ⬆️
unittests 62.00% <54.02%> (+0.25%) ⬆️
unittests1 46.73% <45.36%> (-0.16%) ⬇️
unittests2 27.95% <43.60%> (+0.21%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@Jackie-Jiang
Copy link
Contributor Author

I've reverted the change of removing separate executor for Kinesis and Pulsar to make this PR only focus on the interface change

Copy link
Contributor

@swaminathanmanish swaminathanmanish left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall this cleanup makes sense. Thanks for taking this up and explaining the rationale.
Completed core apis & kafka review. Looking through kinesis, pulsar.

_segmentLogger.error(errorMessage, e);
_realtimeTableDataManager.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), errorMessage, e));
}
}
}
_currentOffset = messageOffset;
_currentOffset = nextOffset;
_numRowsIndexed = _realtimeSegment.getNumDocsIndexed();
_numRowsConsumed++;
streamMessageCount++;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for cleaning this up. Hope this can be verified.

if (indexedMessageCount > 0) {
// Record Ingestion delay for this partition with metadata for last message we processed
updateIngestionDelay(_lastRowMetadata);
} else if (!hasTransformedRows && (msgMetadata != null)) {
// If all messages were filtered by transformation, we still attempt to update ingestion delay using
// the metadata for the last message we processed if any.
updateIngestionDelay(msgMetadata);
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic was added in #10418, and unfortunately seems like there is no test added.

Since we already provided a default implementation of getLastMessageMetadata(), it should be safe to simplify this logic.

@Override
public StreamMessage getStreamMessage(int index) {
return _messageList.get(index);
public StreamPartitionMsgOffset getFirstMessageOffset() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this backwards compatible, since it can return null

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We perform a null check after reading the value (the logic already exists), so this is actually a bug fix. When there is no value consumed, we should return null instead of negative offset. It is backward compatible.
Added an @Nullable annotation to this method

if (!records.isEmpty()) {
firstOffset = records.get(0).offset();
_lastFetchedOffset = records.get(records.size() - 1).offset();
offsetOfNextBatch = _lastFetchedOffset + 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense and simplifies the logic. Hope we can comprehensively verify through integ tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is verified. If this value is not properly set, the consumption won't match the expected rows


public PulsarPartitionLevelConsumer(String clientId, StreamConfig streamConfig,
PartitionGroupConsumptionStatus partitionGroupConsumptionStatus) {
// TODO: Revisit the logic of using a separate executor to manage the request timeout. Currently it is not thread safe
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is enableKeyValueStitch not used any more?

@KKcorps - Could you verify as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is handled within PulsarUtils.buildPulsarStreamMessage()

}
return buildKinesisMessageBatch(startOffset, messages, false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there still a race condition if the executor is running along with buildKinesisMessageBatch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This race condition exists in the current code as well. Will try to address it separately.

_shardIteratorType = kinesisConfig.getShardIteratorType();
_rpsLimit = kinesisConfig.getRpsLimit();
_executorService = Executors.newSingleThreadExecutor();
// TODO: Revisit the logic of using a separate executor to manage the request timeout. Currently it is not thread safe
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will be good to understand why we created an executor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO we should not. Currently I believe we created it to enforce the timeout. @KKcorps to confirm

Copy link
Contributor

@swaminathanmanish swaminathanmanish left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the cleanup.

@Jackie-Jiang Jackie-Jiang merged commit 7f87cd3 into apache:master Apr 7, 2024
21 checks passed
@Jackie-Jiang Jackie-Jiang deleted the message_batch branch April 7, 2024 21:19
@satishd
Copy link
Member

satishd commented Apr 18, 2024

Thanks @Jackie-Jiang for cleaning up and fixing these interfaces. It helps address issues like ##12602

Copy link
Contributor

@jadami10 jadami10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry for the delay reviewing. Everything looks backwards compatible to me and should still work for those of us within at least 1 version up to date with these interfaces.

* @return A {@link GenericRow} that encapsulates the headers in the ingested row
* Returns the stream offset of the message.
*/
@Nullable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of curiosity, why do we use nulls and nullable annnotation instead of Optionals?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both historic reason and performance reason:

  • Optional was introduced in java 8 (it is handy to be used along with function APIs), and majority of the code is using null
  • There could be performance overhead because it is an extra Object wrapper (24 bytes) over a reference. In majority of the places it doesn't matter, but checkout this interesting post

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's a solid post. thank you for the reference!

if (metadata != null) {
if (metadata.getHeaders() != null) {
metadata.getHeaders().getFieldToValueMap()
.forEach((key, value) -> row.putValue(HEADER_KEY_PREFIX + key, value));
}
metadata.getRecordMetadata()
.forEach((key, value) -> row.putValue(METADATA_KEY_PREFIX + key, value));
if (metadata.getRecordMetadata() != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe

Optional.ofNullable(metadata.getRecordMetadata())
    .orElse(Collections.emptyMap())

is a better pattern for the future so we don't call get twice

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants