Skip to content

Commit 5dd8bec

Browse files
Kartik KhareKartik Khare
authored andcommitted
Add exceptions for incorrect time columns
1 parent 89230dc commit 5dd8bec

File tree

2 files changed

+50
-23
lines changed

2 files changed

+50
-23
lines changed

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public class DataTypeTransformer implements RecordTransformer {
5353
private final boolean _continueOnError;
5454
private final boolean _validateTimeValues;
5555
private final String _timeColumnName;
56-
private final DateTimeFormatSpec _timeColumnSpec;
56+
private final DateTimeFormatSpec _timeFormatSpec;
5757

5858
public DataTypeTransformer(TableConfig tableConfig, Schema schema) {
5959
for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
@@ -69,11 +69,11 @@ public DataTypeTransformer(TableConfig tableConfig, Schema schema) {
6969
DateTimeFormatSpec timeColumnSpec = null;
7070
if (StringUtils.isNotEmpty(_timeColumnName)) {
7171
DateTimeFieldSpec dateTimeFieldSpec = schema.getSpecForTimeColumn(_timeColumnName);
72-
if (dateTimeFieldSpec != null) {
73-
timeColumnSpec = dateTimeFieldSpec.getFormatSpec();
74-
}
72+
Preconditions.checkState(dateTimeFieldSpec != null, "Failed to find spec for time column: %s from schema: %s",
73+
_timeColumnName, schema.getSchemaName());
74+
timeColumnSpec = dateTimeFieldSpec.getFormatSpec();
7575
}
76-
_timeColumnSpec = timeColumnSpec;
76+
_timeFormatSpec = timeColumnSpec;
7777
}
7878

7979
@Override
@@ -86,13 +86,19 @@ public GenericRow transform(GenericRow record) {
8686
continue;
8787
}
8888

89-
if (_validateTimeValues && _timeColumnSpec != null && column.equals(_timeColumnName)) {
90-
long timeInMs = _timeColumnSpec.fromFormatToMillis(value.toString());
89+
if (_validateTimeValues && _timeFormatSpec != null && column.equals(_timeColumnName)) {
90+
long timeInMs = _timeFormatSpec.fromFormatToMillis(value.toString());
9191
if (!TimeUtils.timeValueInValidRange(timeInMs)) {
92-
LOGGER.debug("Time value {} is not in valid range for column: {}, must be between: {}",
93-
timeInMs, _timeColumnName, TimeUtils.VALID_TIME_INTERVAL);
94-
record.putValue(column, null);
95-
continue;
92+
if (_continueOnError) {
93+
LOGGER.debug("Time value {} is not in valid range for column: {}, must be between: {}", timeInMs,
94+
_timeColumnName, TimeUtils.VALID_TIME_INTERVAL);
95+
record.putValue(column, null);
96+
continue;
97+
} else {
98+
throw new RuntimeException(
99+
String.format("Time value %s is not in valid range for column: %s, must be between: %s", timeInMs,
100+
_timeColumnName, TimeUtils.VALID_TIME_INTERVAL));
101+
}
96102
}
97103
}
98104

pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ public void testDataTypeTransformerIncorrectDataTypes() {
186186

187187
@Test
188188
public void testDataTypeTransformerInvalidTimestamp() {
189+
// Invalid Timestamp and Validation disabled
189190
String timeCol = "timeCol";
190191
Schema schema = new Schema.SchemaBuilder().addDateTime(timeCol, DataType.TIMESTAMP, "1:MILLISECONDS:TIMESTAMP",
191192
"1:MILLISECONDS").build();
@@ -194,31 +195,51 @@ public void testDataTypeTransformerInvalidTimestamp() {
194195

195196
RecordTransformer transformer = new DataTypeTransformer(tableConfig, schema);
196197
GenericRow record = getRecord();
197-
record.putValue(timeCol, 1);
198+
record.putValue(timeCol, 1L);
198199
for (int i = 0; i < NUM_ROUNDS; i++) {
199-
assertThrows(() -> transformer.transform(record));
200+
record = transformer.transform(record);
201+
assertNotNull(record);
202+
assertEquals(record.getValue(timeCol), 1L);
200203
}
201204

205+
// Invalid Timestamp and Validation enabled
202206
tableConfig =
203-
new TableConfigBuilder(TableType.OFFLINE).setTimeColumnName(timeCol).setValidateTimeValue(true)
207+
new TableConfigBuilder(TableType.OFFLINE).setTimeColumnName(timeCol)
208+
.setValidateTimeValue(true)
204209
.setTableName("testTable").build();
205210

206-
RecordTransformer transformerWithDefaultNulls = new DataTypeTransformer(tableConfig, schema);
211+
RecordTransformer transformerWithValidation = new DataTypeTransformer(tableConfig, schema);
207212
GenericRow record1 = getRecord();
208-
record1.putValue(timeCol, 1);
213+
record1.putValue(timeCol, 1L);
209214
for (int i = 0; i < NUM_ROUNDS; i++) {
210-
record1 = transformerWithDefaultNulls.transform(record1);
211-
assertNotNull(record1);
212-
assertNull(record1.getValue(timeCol));
215+
assertThrows(() -> transformerWithValidation.transform(record1));
213216
}
214217

218+
// Invalid timestamp, validation enabled and ignoreErrors enabled
219+
tableConfig =
220+
new TableConfigBuilder(TableType.OFFLINE).setTimeColumnName(timeCol)
221+
.setValidateTimeValue(true)
222+
.setContinueOnError(true)
223+
.setTableName("testTable").build();
224+
225+
transformer = new DataTypeTransformer(tableConfig, schema);
215226
GenericRow record2 = getRecord();
216-
Long currentTimeMillis = System.currentTimeMillis();
217-
record2.putValue(timeCol, currentTimeMillis);
227+
record2.putValue(timeCol, 1L);
218228
for (int i = 0; i < NUM_ROUNDS; i++) {
219-
record2 = transformerWithDefaultNulls.transform(record2);
229+
record2 = transformer.transform(record2);
220230
assertNotNull(record2);
221-
assertEquals(record2.getValue(timeCol), currentTimeMillis);
231+
assertNull(record2.getValue(timeCol));
232+
}
233+
234+
// Valid timestamp
235+
transformer = new DataTypeTransformer(TABLE_CONFIG, schema);
236+
GenericRow record3 = getRecord();
237+
Long currentTimeMillis = System.currentTimeMillis();
238+
record3.putValue(timeCol, currentTimeMillis);
239+
for (int i = 0; i < NUM_ROUNDS; i++) {
240+
record3 = transformer.transform(record3);
241+
assertNotNull(record3);
242+
assertEquals(record3.getValue(timeCol), currentTimeMillis);
222243
}
223244
}
224245

0 commit comments

Comments
 (0)