Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backfill] allow externally partitioned segment uploads for upsert tables #13107

Merged
merged 9 commits into from
Jun 10, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ public static Integer getRealtimeSegmentPartitionId(String segmentName, String r
if (llcSegmentName != null) {
return llcSegmentName.getPartitionGroupId();
}

UploadedRealtimeSegmentName uploadedRealtimeSegmentName = UploadedRealtimeSegmentName.of(segmentName);
if (uploadedRealtimeSegmentName != null) {
return uploadedRealtimeSegmentName.getPartitionId();
}

// Otherwise, retrieve the partition id from the segment zk metadata.
SegmentZKMetadata segmentZKMetadata =
ZKMetadataProvider.getSegmentZKMetadata(helixManager.getHelixPropertyStore(), realtimeTableName, segmentName);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.common.utils;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;


/**
* Class to represent segment names like: {prefix}__{tableName}__{partitionId}__{creationTime}__{suffix}
*
* <p>This naming convention is adopted to represent a segment uploaded to a realtime table. The naming
* convention has been kept semantically similar to {@link LLCSegmentName} but differs in following ways:
*
* <li> prefix to quickly identify the type/source of segment e.g. "uploaded"/"minion"
* <li> name of the table to which the segment belongs
* <li> partitionId which should be consistent as the stream partitioning in case of upsert realtime tables.
* <li> creationTime creation time of segment of the format yyyyMMdd'T'HHmm'Z'
* <li> suffix to uniquely identify segments created at the same time.
*
* Use {@link org.apache.pinot.segment.spi.creator.name.UploadedRealtimeSegmentNameGenerator} to generate segment names.
*/
public class UploadedRealtimeSegmentName implements Comparable<UploadedRealtimeSegmentName> {
klsince marked this conversation as resolved.
Show resolved Hide resolved

private static final String SEPARATOR = "__";
private static final String DATE_FORMAT = "yyyyMMdd'T'HHmm'Z'";
private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormat.forPattern(DATE_FORMAT).withZoneUTC();
private final String _prefix;
private final String _tableName;
private final int _partitionId;
private final String _creationTime;
private final String _segmentName;
private final String _suffix;

public UploadedRealtimeSegmentName(String segmentName) {
try {
String[] parts = StringUtils.splitByWholeSeparator(segmentName, SEPARATOR);
Preconditions.checkState(parts.length == 5,
"Uploaded segment name must be of the format {prefix}__{tableName}__{partitionId}__{creationTime}__{suffix}");
_prefix = parts[0];
_tableName = parts[1];
_partitionId = Integer.parseInt(parts[2]);
_creationTime = parts[3];
_suffix = parts[4];
_segmentName = segmentName;
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Invalid segment name: " + segmentName, e);
}
}

/**
* Constructor for UploadedRealtimeSegmentName.
* @param tableName
* @param partitionId
* @param msSinceEpoch
* @param prefix
* @param suffix
*/
public UploadedRealtimeSegmentName(String tableName, int partitionId, long msSinceEpoch, String prefix,
String suffix) {
klsince marked this conversation as resolved.
Show resolved Hide resolved
Preconditions.checkArgument(
StringUtils.isNotBlank(tableName) && !tableName.contains(SEPARATOR) && StringUtils.isNotBlank(prefix)
&& !prefix.contains(SEPARATOR) && StringUtils.isNotBlank(suffix) && !suffix.contains(SEPARATOR),
"tableName, prefix and suffix must be non-null, non-empty and not contain '__'");
_tableName = tableName;
_partitionId = partitionId;
_creationTime = DATE_FORMATTER.print(msSinceEpoch);
_prefix = prefix;
_suffix = suffix;
_segmentName = Joiner.on(SEPARATOR).join(prefix, tableName, partitionId, _creationTime, suffix);
}

/**
* Checks if the segment name is of the format: {prefix}__{tableName}__{partitionId}__{creationTime}__{suffix}
* @param segmentName
* @return boolean true if the segment name is of the format: {prefix}__{tableName}__{partitionId}__{creationTime}
* __{suffix}
*/
public static boolean isUploadedRealtimeSegmentName(String segmentName) {
int numSeparators = 0;
int index = 0;
while ((index = segmentName.indexOf(SEPARATOR, index)) != -1) {
numSeparators++;
index += 2; // SEPARATOR.length()
}
return numSeparators == 4;
}

@Nullable
public static UploadedRealtimeSegmentName of(String segmentName) {
try {
return new UploadedRealtimeSegmentName(segmentName);
} catch (Exception e) {
return null;
}
}

public String getTableName() {
return _tableName;
}

public int getPartitionId() {
return _partitionId;
}

/**
* Returns the creation time in the format yyyyMMdd'T'HHmm'Z'
* To be used for only human readability and not for any computation
* @return
*/
public String getCreationTime() {
return _creationTime;
}

public String getSegmentName() {
return _segmentName;
}

public String getPrefix() {
return _prefix;
}

public String getSuffix() {
return _suffix;
}

/**
* Compares the string representation of the segment name.
* @param other the object to be compared.
* @return
*/
@Override
public int compareTo(UploadedRealtimeSegmentName other) {
Preconditions.checkState(_tableName.equals(other._tableName),
"Cannot compare segment names from different table: %s, %s", _segmentName, other.getSegmentName());
return _segmentName.compareTo(other._segmentName);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof UploadedRealtimeSegmentName)) {
return false;
}
UploadedRealtimeSegmentName that = (UploadedRealtimeSegmentName) o;
return _segmentName.equals(that._segmentName);
}

@Override
public int hashCode() {
return Objects.hash(_segmentName);
}

@Override
public String toString() {
return _segmentName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,29 @@
*/
package org.apache.pinot.common.utils;

import java.util.HashMap;
import java.util.HashSet;
import org.apache.helix.HelixManager;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.testng.annotations.Test;

import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.AssertJUnit.fail;


public class SegmentUtilsTest {
private static final String TABLE_NAME_WITH_TYPE = "testTable_REALTIME";
private static final String SEGMENT = "testSegment";
private static final String PARTITION_COLUMN = "partitionColumn";

@Test
public void testGetSegmentCreationTimeMs() {
Expand All @@ -35,4 +50,49 @@ public void testGetSegmentCreationTimeMs() {
segmentZKMetadata.setPushTime(2000L);
assertEquals(SegmentUtils.getSegmentCreationTimeMs(segmentZKMetadata), 2000L);
}

@Test
public void testGetRealtimeSegmentPartitionIdFromZkMetadata() {

// mocks
SegmentZKMetadata segmentZKMetadata = mock(SegmentZKMetadata.class);
SegmentPartitionMetadata segmentPartitionMetadata = mock(SegmentPartitionMetadata.class);
HashMap<String, ColumnPartitionMetadata> columnPartitionMetadataMap = new HashMap<>();
HashSet<Integer> partitions = new HashSet<>();
partitions.add(3);
columnPartitionMetadataMap.put(PARTITION_COLUMN,
new ColumnPartitionMetadata("modulo", 8, partitions, new HashMap<>()));

when(segmentPartitionMetadata.getColumnPartitionMap()).thenReturn(columnPartitionMetadataMap);
when(segmentZKMetadata.getPartitionMetadata()).thenReturn(segmentPartitionMetadata);

HelixManager helixManager = mock(HelixManager.class);
ZkHelixPropertyStore zkHelixPropertyStore = mock(ZkHelixPropertyStore.class);
when(helixManager.getHelixPropertyStore()).thenReturn(zkHelixPropertyStore);

// mock static ZKMetadataProvider.getSegmentZKMetadata
try (MockedStatic<ZKMetadataProvider> zkMetadataProviderMockedStatic = Mockito.mockStatic(
ZKMetadataProvider.class)) {
when(ZKMetadataProvider.getSegmentZKMetadata(Mockito.any(ZkHelixPropertyStore.class), eq(TABLE_NAME_WITH_TYPE),
eq(SEGMENT))).thenReturn(segmentZKMetadata);

Integer partitionId =
SegmentUtils.getRealtimeSegmentPartitionId(SEGMENT, TABLE_NAME_WITH_TYPE, helixManager, PARTITION_COLUMN);

assertEquals(partitionId, 3);
}
}

@Test
void testGetRealtimeSegmentPartitionIdForUploadedRealtimeSegment() {
String segmentName = "uploaded__table_name__3__100__1716185755000";

try {
Integer partitionId =
SegmentUtils.getRealtimeSegmentPartitionId(segmentName, "realtimeTableName", null, "partitionColumn");
assertEquals(partitionId, 3);
} catch (Exception e) {
fail("Exception should not be thrown");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.common.utils;

import org.testng.Assert;
import org.testng.annotations.Test;

import static org.testng.Assert.*;


public class UploadedRealtimeSegmentNameTest {

@Test
public void testSegmentNameParsing() {
String segmentName = "uploaded__table_name__1__20240530T0000Z__suffix";
UploadedRealtimeSegmentName uploadedRealtimeSegmentName = new UploadedRealtimeSegmentName(segmentName);

Assert.assertEquals(uploadedRealtimeSegmentName.getTableName(), "table_name");
Assert.assertEquals(uploadedRealtimeSegmentName.getPartitionId(), 1);
Assert.assertEquals(uploadedRealtimeSegmentName.getPrefix(), "uploaded");
Assert.assertEquals(uploadedRealtimeSegmentName.getSuffix(), "suffix");
Assert.assertEquals(uploadedRealtimeSegmentName.getCreationTime(), "20240530T0000Z");
}

@Test
public void testSegmentNameGeneration() {
UploadedRealtimeSegmentName uploadedRealtimeSegmentName =
new UploadedRealtimeSegmentName("tableName", 1, 1717027200000L, "uploaded", "2");
String expectedSegmentName = "uploaded__tableName__1__20240530T0000Z__2";

Assert.assertEquals(uploadedRealtimeSegmentName.getSegmentName(), expectedSegmentName);
}

@Test
public void testIsUploadedRealtimeSegmentName() {
String validSegmentName = "uploaded__table__0__20220101T0000Z__suffix";
Assert.assertTrue(UploadedRealtimeSegmentName.isUploadedRealtimeSegmentName(validSegmentName));

String invalidSegmentName = "uploaded__table__0__20220101T0000Z";
Assert.assertFalse(UploadedRealtimeSegmentName.isUploadedRealtimeSegmentName(invalidSegmentName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.plugin.ingestion.batch.common;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -29,6 +30,7 @@
import org.apache.pinot.segment.spi.creator.name.NormalizedDateSegmentNameGenerator;
import org.apache.pinot.segment.spi.creator.name.SegmentNameGenerator;
import org.apache.pinot.segment.spi.creator.name.SimpleSegmentNameGenerator;
import org.apache.pinot.segment.spi.creator.name.UploadedRealtimeSegmentNameGenerator;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
Expand Down Expand Up @@ -159,14 +161,25 @@ private SegmentNameGenerator getSegmentNameGenerator(SegmentGeneratorConfig segm
Boolean.parseBoolean(segmentNameGeneratorConfigs.get(EXCLUDE_SEQUENCE_ID)),
IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig),
IngestionConfigUtils.getBatchSegmentIngestionFrequency(tableConfig), dateTimeFormatSpec,
segmentNameGeneratorConfigs.get(SEGMENT_NAME_POSTFIX),
appendUUIDToSegmentName);
segmentNameGeneratorConfigs.get(SEGMENT_NAME_POSTFIX), appendUUIDToSegmentName);
case BatchConfigProperties.SegmentNameGeneratorType.INPUT_FILE:
String inputFileUri = _taskSpec.getCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY);
return new InputFileSegmentNameGenerator(segmentNameGeneratorConfigs.get(FILE_PATH_PATTERN),
segmentNameGeneratorConfigs.get(SEGMENT_NAME_TEMPLATE),
inputFileUri,
appendUUIDToSegmentName);
segmentNameGeneratorConfigs.get(SEGMENT_NAME_TEMPLATE), inputFileUri, appendUUIDToSegmentName);
case BatchConfigProperties.SegmentNameGeneratorType.UPLOADED_REALTIME:
Preconditions.checkState(segmentGeneratorConfig.getCreationTime() != null,
"Creation time must be set for uploaded realtime segment name generator");
Preconditions.checkState(segmentGeneratorConfig.getUploadedSegmentPartitionId() != -1,
"Valid partition id must be set for uploaded realtime segment name generator");
Comment on lines +172 to +173
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also add sequence_id here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean for validation?

The sequenceId is sent as part of SegmentNameGenerator method interface

String generateSegmentName(int sequenceId, @Nullable Object minTimeValue, @Nullable Object maxTimeValue);

long creationTime;
try {
creationTime = Long.parseLong(segmentGeneratorConfig.getCreationTime());
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Creation time must be a valid long value in segmentGeneratorConfig");
}
return new UploadedRealtimeSegmentNameGenerator(tableName,
segmentGeneratorConfig.getUploadedSegmentPartitionId(), creationTime,
segmentGeneratorConfig.getSegmentNamePrefix(), segmentGeneratorConfig.getSegmentNamePostfix());
default:
throw new UnsupportedOperationException("Unsupported segment name generator type: " + segmentNameGeneratorType);
}
Expand Down
Loading
Loading