Skip to content

Commit 3633495

Browse files
KKcorpsKartik Khare
andauthored
Add missing properties in CSV plugin (#9399)
* Add missing properties in CSV plugin * Fix linting Co-authored-by: Kartik Khare <[email protected]>
1 parent 9b3ac2a commit 3633495

File tree

3 files changed

+109
-3
lines changed

3 files changed

+109
-3
lines changed

pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVMessageDecoder.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.Set;
3232
import org.apache.commons.csv.CSVFormat;
3333
import org.apache.commons.csv.CSVRecord;
34+
import org.apache.commons.csv.QuoteMode;
3435
import org.apache.commons.lang.StringUtils;
3536
import org.apache.pinot.spi.data.readers.GenericRow;
3637
import org.apache.pinot.spi.stream.StreamMessageDecoder;
@@ -49,6 +50,12 @@ public class CSVMessageDecoder implements StreamMessageDecoder<byte[]> {
4950
private static final String CONFIG_CSV_ESCAPE_CHARACTER = "escapeCharacter";
5051
private static final String CONFIG_CSV_MULTI_VALUE_DELIMITER = "multiValueDelimiter";
5152
public static final String NULL_STRING_VALUE = "nullStringValue";
53+
public static final String SKIP_HEADER = "skipHeader";
54+
public static final String IGNORE_EMPTY_LINES = "ignoreEmptyLines";
55+
public static final String IGNORE_SURROUNDING_SPACES = "ignoreSurroundingSpaces";
56+
public static final String QUOTE_CHARACTER = "quoteCharacter";
57+
public static final String QUOTE_MODE = "quoteMode";
58+
public static final String RECORD_SEPARATOR = "recordSeparator";
5259

5360
private CSVFormat _format;
5461
private CSVRecordExtractor _recordExtractor;
@@ -117,6 +124,36 @@ public void init(Map<String, String> props, Set<String> fieldsToRead, String top
117124
format = format.withNullString(nullString);
118125
}
119126

127+
String skipHeader = props.get(SKIP_HEADER);
128+
if (skipHeader != null) {
129+
format = format.withSkipHeaderRecord(Boolean.parseBoolean(skipHeader));
130+
}
131+
132+
String ignoreEmptyLines = props.get(IGNORE_EMPTY_LINES);
133+
if (ignoreEmptyLines != null) {
134+
format = format.withIgnoreEmptyLines(Boolean.parseBoolean(ignoreEmptyLines));
135+
}
136+
137+
String ignoreSurroundingSpaces = props.get(IGNORE_SURROUNDING_SPACES);
138+
if (ignoreSurroundingSpaces != null) {
139+
format = format.withIgnoreSurroundingSpaces(Boolean.parseBoolean(ignoreSurroundingSpaces));
140+
}
141+
142+
String quoteCharacter = props.get(QUOTE_CHARACTER);
143+
if (quoteCharacter != null) {
144+
format = format.withQuote(quoteCharacter.charAt(0));
145+
}
146+
147+
String quoteMode = props.get(QUOTE_MODE);
148+
if (quoteMode != null) {
149+
format = format.withQuoteMode(QuoteMode.valueOf(quoteMode));
150+
}
151+
152+
String recordSeparator = props.get(RECORD_SEPARATOR);
153+
if (recordSeparator != null) {
154+
format = format.withRecordSeparator(recordSeparator);
155+
}
156+
120157
_format = format;
121158

122159
_recordExtractor = new CSVRecordExtractor();

pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.commons.csv.CSVFormat;
2727
import org.apache.commons.csv.CSVParser;
2828
import org.apache.commons.csv.CSVRecord;
29+
import org.apache.commons.csv.QuoteMode;
2930
import org.apache.commons.lang.StringUtils;
3031
import org.apache.pinot.spi.data.readers.GenericRow;
3132
import org.apache.pinot.spi.data.readers.RecordReader;
@@ -80,7 +81,7 @@ public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable Re
8081
}
8182
}
8283
char delimiter = config.getDelimiter();
83-
format = format.withDelimiter(delimiter).withIgnoreSurroundingSpaces(true);
84+
format = format.withDelimiter(delimiter);
8485
String csvHeader = config.getHeader();
8586
if (csvHeader == null) {
8687
format = format.withHeader();
@@ -89,9 +90,22 @@ public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable Re
8990
validateHeaderForDelimiter(delimiter, csvHeader, format);
9091
format = format.withHeader(StringUtils.split(csvHeader, delimiter));
9192
}
92-
Character commentMarker = config.getCommentMarker();
93-
format = format.withCommentMarker(commentMarker);
93+
94+
format = format.withCommentMarker(config.getCommentMarker());
9495
format = format.withEscape(config.getEscapeCharacter());
96+
format = format.withIgnoreEmptyLines(config.isIgnoreEmptyLines());
97+
format = format.withIgnoreSurroundingSpaces(config.isIgnoreSurroundingSpaces());
98+
format = format.withSkipHeaderRecord(config.isSkipHeader());
99+
format = format.withQuote(config.getQuoteCharacter());
100+
101+
if (config.getQuoteMode() != null) {
102+
format = format.withQuoteMode(QuoteMode.valueOf(config.getQuoteMode()));
103+
}
104+
105+
if (config.getRecordSeparator() != null) {
106+
format = format.withRecordSeparator(config.getRecordSeparator());
107+
}
108+
95109
String nullString = config.getNullStringValue();
96110
if (nullString != null) {
97111
format = format.withNullString(nullString);

pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderConfig.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,13 @@ public class CSVRecordReaderConfig implements RecordReaderConfig {
3636
private Character _commentMarker; // Default is null
3737
private Character _escapeCharacter; // Default is null
3838
private String _nullStringValue;
39+
private boolean _skipHeader;
40+
private boolean _ignoreEmptyLines = true;
41+
private boolean _ignoreSurroundingSpaces = true;
42+
private Character _quoteCharacter = '"';
43+
private String _quoteMode;
44+
private String _recordSeparator;
45+
3946

4047
public String getFileFormat() {
4148
return _fileFormat;
@@ -101,6 +108,54 @@ public void setNullStringValue(String nullStringValue) {
101108
_nullStringValue = nullStringValue;
102109
}
103110

111+
public boolean isSkipHeader() {
112+
return _skipHeader;
113+
}
114+
115+
public void setSkipHeader(boolean skipHeader) {
116+
_skipHeader = skipHeader;
117+
}
118+
119+
public boolean isIgnoreEmptyLines() {
120+
return _ignoreEmptyLines;
121+
}
122+
123+
public void setIgnoreEmptyLines(boolean ignoreEmptyLines) {
124+
_ignoreEmptyLines = ignoreEmptyLines;
125+
}
126+
127+
public boolean isIgnoreSurroundingSpaces() {
128+
return _ignoreSurroundingSpaces;
129+
}
130+
131+
public void setIgnoreSurroundingSpaces(boolean ignoreSurroundingSpaces) {
132+
_ignoreSurroundingSpaces = ignoreSurroundingSpaces;
133+
}
134+
135+
public Character getQuoteCharacter() {
136+
return _quoteCharacter;
137+
}
138+
139+
public void setQuoteCharacter(Character quoteCharacter) {
140+
_quoteCharacter = quoteCharacter;
141+
}
142+
143+
public String getQuoteMode() {
144+
return _quoteMode;
145+
}
146+
147+
public void setQuoteMode(String quoteMode) {
148+
_quoteMode = quoteMode;
149+
}
150+
151+
public String getRecordSeparator() {
152+
return _recordSeparator;
153+
}
154+
155+
public void setRecordSeparator(String recordSeparator) {
156+
_recordSeparator = recordSeparator;
157+
}
158+
104159
@Override
105160
public String toString() {
106161
return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);

0 commit comments

Comments
 (0)