-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[partial-upsert] configure early release of _partitionGroupConsumerSemaphore in RealtimeSegmentDataManager #13256
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #13256 +/- ##
============================================
+ Coverage 61.75% 61.98% +0.23%
+ Complexity 207 198 -9
============================================
Files 2436 2550 +114
Lines 133233 140267 +7034
Branches 20636 21803 +1167
============================================
+ Hits 82274 86945 +4671
- Misses 44911 46722 +1811
- Partials 6048 6600 +552
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
cc: @Jackie-Jiang , @klsince for review. |
@JsonPropertyDescription("Whether to pause partial upsert table's partition ingestion during commit") | ||
private boolean _pausePartialUpsertPartitionIngestionDuringCommit; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rohityadav1993 can you also add it as a cluster level config where this table-level config will override the cluster-level value? In Uber, we would like to roll it out for almost all tables as data-correctness > data-ingestion-SLAs. For tables where ingestion lag is in order of 10s of minutes there we can make a call to use this table level config.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The suggestion sounds good, do you know how it is being done for any other table config?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can check BaseTableDataManager#init
it takes in InstanceDataManagerConfig
var
if (!_realtimeTableDataManager.isPartialUpsertEnabled() || !_tableConfig.getUpsertConfig() | ||
.isPausePartialUpsertPartitionIngestionDuringCommit()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should check if tableConfig.getUpsertConfig
is not null as well
@@ -959,7 +959,13 @@ AtomicBoolean getAcquiredConsumerSemaphore() { | |||
|
|||
@VisibleForTesting | |||
SegmentBuildDescriptor buildSegmentInternal(boolean forCommit) { | |||
closeStreamConsumers(); | |||
// for partial upsert tables, do not release _partitionGroupConsumerSemaphore proactively and rely on onDestroy() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: looks like offload()
closes the consumers
but anyway it looks right to me that the semaphore should be released in the end, when the segment gets replaced
@@ -94,6 +94,9 @@ public enum ConsistencyMode { | |||
@JsonPropertyDescription("Whether to drop out-of-order record") | |||
private boolean _dropOutOfOrderRecord; | |||
|
|||
@JsonPropertyDescription("Whether to pause partial upsert table's partition ingestion during commit") | |||
private boolean _pausePartialUpsertPartitionIngestionDuringCommit; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should pause consumption by default. Consider renaming it to allowPartialUpsertConsumptionDuringCommit
and reverse the behavior?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @tibrewalpratik17 What's your opinion on enabling the new feature by default?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 on making this the default behaviour!
Would also suggest to add a cluster-level and table-level config to enable / disable it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Making default as enabled sounds good. This is a behaviour change, I also think it makes sense to make this a cluster level property as well so it can be toggled at bulk.
Suggest to add |
…meSegmentDataManager
16eb518
to
e9002c5
Compare
@@ -1611,6 +1622,10 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf | |||
_segmentLogger | |||
.info("Starting consumption on realtime consuming segment {} maxRowCount {} maxEndTime {}", llcSegmentName, | |||
_segmentMaxRowCount, new DateTime(_consumeEndTime, DateTimeZone.UTC)); | |||
_allowPartialUpsertConsumptionDuringCommit = | |||
_realtimeTableDataManager.isPartialUpsertEnabled() ? _tableConfig.getUpsertConfig() != null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the config name as allowPartialUpsertConsumptionDuringCommit
or default.allow.partial.upsert.consumption.during.commit
makes a lot sense and it's false by default.
but this variable name _allowPartialUpsertConsumptionDuringCommit
here might be a bit less readable. Here we had to set it true for non-paritial-upsert table. I'd call it _allowConsumptionDuringCommit
, then it's more intuitive to leave it as true for non-paritial-upsert table
_allowConsumptionDuringCommit = !_realtimeTableDataManager.isPartialUpsertEnabled()? true : _tableConfig.getUpsertConfig().isAllowPartialUpsertConsumptionDuringCommit()
// nit: no need to check getUpsertConfig() is not null as isPartialUpsertEnabled() is true already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. Updated.
a90c6a1
to
7e87d37
Compare
7e87d37
to
69e28c2
Compare
// server level config honoured only when table level config is not set to true | ||
if (!upsertConfig.isAllowPartialUpsertConsumptionDuringCommit()) { | ||
upsertConfig.setAllowPartialUpsertConsumptionDuringCommit(Boolean.parseBoolean( | ||
instanceUpsertConfig.getProperty(UPSERT_DEFAULT_ALLOW_PARTIAL_UPSERT_CONSUMPTION_DURING_COMMIT, "false"))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we wanted to keep this true
by default right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah my bad! I see we have changed the behaviour from pause
to allow
. Then this makes sense!
bugfix
upsert
Relates to #13140
This PR configures if we should do an early release of
_partitionGroupConsumerSemaphore
throughcloseStreamConsumers()
during segment build and replace and segment download and replace.By skipping to make this call early, we rely on segment.onDestroy() method to release the semaphore which ensures that the new consuming segment does not start consumption until the previous mutable segment is completely replaced during commit.
This helps in avoiding any unnecessary data inconsistencies between replicas for partial upsert tables but also leads to a stopped consumption during segment commit.
New configs:
Use server configuration:
"pinot.server.instance.upsert.default.allow.partial.upsert.consumption.during.commit": "true/false"
to toggle consumption during commit for partial upsert table. By defualt the consumption during commit will be paused from this commit onwards.Use table upsert config:
allowPartialUpsertConsumptionDuringCommit: true/false
toggle consumption during commit for partial upsert table. If the table config is set as true then the server config is ignored.