-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enhance Kinesis consumer #12806
Enhance Kinesis consumer #12806
Conversation
Jackie-Jiang
commented
Apr 7, 2024
•
edited
Loading
edited
- Do not use a separate thread to fetch Kinesis records (this can fix the potential race condition)
- Cache the shard iterator
- Return the message batch immediately without combining multiple of them (timeout is ignored)
- Change the default max records per fetch to 10,000 (Kinesis default)
- Remove some unused dependencies
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #12806 +/- ##
============================================
+ Coverage 61.75% 62.24% +0.49%
+ Complexity 207 198 -9
============================================
Files 2436 2527 +91
Lines 133233 138410 +5177
Branches 20636 21400 +764
============================================
+ Hits 82274 86152 +3878
- Misses 44911 45836 +925
- Partials 6048 6422 +374
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
cbb2faf
to
d588079
Compare
|
||
// NOTE: Kinesis enforces a limit of 5 getRecords request per second on each shard from AWS end, beyond which we | ||
// start getting ProvisionedThroughputExceededException. Rate limit the requests to avoid this. | ||
long currentTimeMs = System.currentTimeMillis(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need our own custom rate limiter here ? Does kinesis client provide options to do the same thing/handle this, instead of us having this logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't find one from Kinesis client. Seems it will just throw LimitExceededException
.
The rps
is currently configured on Pinot side though, so I guess it makes sense to rate limit on the Pinot side.
} else { | ||
LOGGER.warn(message + ": " + throwable.getMessage()); | ||
// TODO: Revisit this logic to see if we always miss the first message when consuming from a new shard |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add more explanation to this ? Why would we miss the 1st message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
// Read records | ||
GetRecordsRequest getRecordRequest = | ||
GetRecordsRequest.builder().shardIterator(shardIterator).limit(_config.getNumMaxRecordsToFetch()).build(); | ||
GetRecordsResponse getRecordsResponse = _kinesisClient.getRecords(getRecordRequest); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be empty right, even if the stream has some data, given how kinesis works? We'll be return a response even if its empty
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need some test to verify the behavior. The consumer can handle empty message batch, but the consumption lag might be set to 0 because it thought there is no more message. Added a TODO to revisit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. We can also a metric to track this when it happens.
d588079
to
597e8ee
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM other than clarifications.
// Read records | ||
GetRecordsRequest getRecordRequest = | ||
GetRecordsRequest.builder().shardIterator(shardIterator).limit(_config.getNumMaxRecordsToFetch()).build(); | ||
GetRecordsResponse getRecordsResponse = _kinesisClient.getRecords(getRecordRequest); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. We can also a metric to track this when it happens.
* Kinesis enforces a limit of 5 getRecords request per second on each shard from AWS end, beyond which we start | ||
* getting {@link ProvisionedThroughputExceededException}. Rate limit the requests to avoid this. | ||
*/ | ||
private void rateLimitRequests() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for creating a separate method. I guess this being a special kind of rate limiter that needs to block until we are ready to fetch again, we cannot leverage off the shelf ones like guava.
if kinesis has a limit, don't we need to adhere to that limit. So does getRpsLimit() need to be what Kinesis limit is ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kinesis limit is not very straight forward, so I guess we need to iterate on this to get the best settings.
long currentTimeMs = System.currentTimeMillis(); | ||
int currentTimeSeconds = (int) TimeUnit.MILLISECONDS.toSeconds(currentTimeMs); | ||
if (currentTimeSeconds == _currentSecond) { | ||
if (_numRequestsInCurrentSecond == _config.getRpsLimit()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be done later. A log.info or metric would help debug if rate limiting becomes an issue.
597e8ee
to
1f5d462
Compare
// Get the shard iterator | ||
String shardIterator; | ||
if (startSequenceNumber.equals(_nextStartSequenceNumber)) { | ||
shardIterator = _nextShardIterator; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will need to handle a case here when nextShardIterator has expired (since it has time limit of 5 minutes).
1f5d462
to
bd15dac
Compare
bd15dac
to
4d71bf3
Compare
4d71bf3
to
a8a02e6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Have tested it out and the changes work well for kinesis.
* Enhance Kinesis consumer * Simplify the handling * Address comments