Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance Pulsar consumer #12812

Merged
merged 2 commits into from
Apr 11, 2024

Conversation

Jackie-Jiang
Copy link
Contributor

@Jackie-Jiang Jackie-Jiang commented Apr 8, 2024

@Jackie-Jiang Jackie-Jiang added dependencies Pull requests that update a dependency file ingestion bugfix refactor real-time labels Apr 8, 2024
@Jackie-Jiang Jackie-Jiang removed the dependencies Pull requests that update a dependency file label Apr 8, 2024
@codecov-commenter
Copy link

codecov-commenter commented Apr 8, 2024

Codecov Report

Attention: Patch coverage is 71.79487% with 11 lines in your changes are missing coverage. Please review.

Project coverage is 61.99%. Comparing base (59551e4) to head (cdf1193).
Report is 242 commits behind head on master.

Files Patch % Lines
...in/stream/pulsar/PulsarPartitionLevelConsumer.java 56.52% 7 Missing and 3 partials ⚠️
...pache/pinot/plugin/stream/pulsar/PulsarConfig.java 93.33% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #12812      +/-   ##
============================================
+ Coverage     61.75%   61.99%   +0.24%     
+ Complexity      207      198       -9     
============================================
  Files          2436     2462      +26     
  Lines        133233   134696    +1463     
  Branches      20636    20812     +176     
============================================
+ Hits          82274    83507    +1233     
- Misses        44911    45052     +141     
- Partials       6048     6137      +89     
Flag Coverage Δ
custom-integration1 <0.01% <0.00%> (-0.01%) ⬇️
integration <0.01% <0.00%> (-0.01%) ⬇️
integration1 <0.01% <0.00%> (-0.01%) ⬇️
integration2 0.00% <0.00%> (ø)
java-11 61.94% <71.79%> (+0.23%) ⬆️
java-21 61.88% <71.79%> (+0.25%) ⬆️
skip-bytebuffers-false 61.98% <71.79%> (+0.23%) ⬆️
skip-bytebuffers-true 61.84% <71.79%> (+34.11%) ⬆️
temurin 61.99% <71.79%> (+0.24%) ⬆️
unittests 61.99% <71.79%> (+0.24%) ⬆️
unittests1 46.70% <ø> (-0.19%) ⬇️
unittests2 27.94% <71.79%> (+0.21%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Contributor

@swaminathanmanish swaminathanmanish left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Jackie. From the code, it does not look like there are any changes in behavior except that its done in a different way without the executor. Just wanted to confirm.


// Seek to the start message id if necessary
// NOTE: Use Objects.equals() to check reference first for performance.
if (!Objects.equals(startMessageId, _nextMessageId)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify, when startMessageId and nextMessageId are the same, it means we are done with the current batch and starting from next batch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When they are the same, we don't need to seek the message id again

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When they are the same, we don't need to seek the message id again

Ok. Its the caching logic to avoid re-seeking.

break;
}
while (_reader.hasMessageAvailable() && System.currentTimeMillis() < endTimeMs) {
messages.add(PulsarUtils.buildPulsarStreamMessage(_reader.readNext(), _config));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming that readNext will block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

readNext() shouldn't block. We call readNext() only when there hasMessageAvailable()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

readNext() shouldn't block. We call readNext() only when there hasMessageAvailable()
Makes sense if hasMessageAvailable is used. I read this and thought it blocks
https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java#L49

@Jackie-Jiang
Copy link
Contributor Author

Thanks Jackie. From the code, it does not look like there are any changes in behavior except that its done in a different way without the executor. Just wanted to confirm.

It doesn't rely on the async task to be interrupted. But essentially both of them will gather as many messages as possible within the timeout.

Copy link
Contributor

@swaminathanmanish swaminathanmanish left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@Jackie-Jiang Jackie-Jiang merged commit eacbf12 into apache:master Apr 11, 2024
19 checks passed
@Jackie-Jiang Jackie-Jiang deleted the enhance_pulsar_consumer branch April 11, 2024 21:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants