-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enhance Pulsar consumer #12812
Enhance Pulsar consumer #12812
Conversation
88ba624
to
1b57668
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #12812 +/- ##
============================================
+ Coverage 61.75% 61.99% +0.24%
+ Complexity 207 198 -9
============================================
Files 2436 2462 +26
Lines 133233 134696 +1463
Branches 20636 20812 +176
============================================
+ Hits 82274 83507 +1233
- Misses 44911 45052 +141
- Partials 6048 6137 +89
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
1b57668
to
cdf1193
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Jackie. From the code, it does not look like there are any changes in behavior except that its done in a different way without the executor. Just wanted to confirm.
|
||
// Seek to the start message id if necessary | ||
// NOTE: Use Objects.equals() to check reference first for performance. | ||
if (!Objects.equals(startMessageId, _nextMessageId)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To clarify, when startMessageId and nextMessageId are the same, it means we are done with the current batch and starting from next batch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When they are the same, we don't need to seek the message id again
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When they are the same, we don't need to seek the message id again
Ok. Its the caching logic to avoid re-seeking.
break; | ||
} | ||
while (_reader.hasMessageAvailable() && System.currentTimeMillis() < endTimeMs) { | ||
messages.add(PulsarUtils.buildPulsarStreamMessage(_reader.readNext(), _config)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assuming that readNext will block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
readNext()
shouldn't block. We call readNext()
only when there hasMessageAvailable()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
readNext()
shouldn't block. We callreadNext()
only when therehasMessageAvailable()
Makes sense if hasMessageAvailable is used. I read this and thought it blocks
https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java#L49
It doesn't rely on the async task to be interrupted. But essentially both of them will gather as many messages as possible within the timeout. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
java.util.ConcurrentModificationException
#10503)