2828import java .time .Duration ;
2929import java .time .Instant ;
3030import java .util .ArrayList ;
31+ import java .util .Arrays ;
3132import java .util .Base64 ;
33+ import java .util .Collections ;
3234import java .util .List ;
3335import javax .annotation .Nonnull ;
3436import javax .annotation .Nullable ;
5355 */
5456public class FluxCsvParser {
5557
58+ private static final String ANNOTATION_DATATYPE = "#datatype" ;
59+ private static final String ANNOTATION_GROUP = "#group" ;
60+ private static final String ANNOTATION_DEFAULT = "#default" ;
61+ private static final List <String > ANNOTATIONS = Arrays
62+ .asList (ANNOTATION_DATATYPE , ANNOTATION_GROUP , ANNOTATION_DEFAULT );
63+
5664 private enum ParsingState {
5765 NORMAL ,
5866
@@ -125,6 +133,7 @@ public void parseFluxResponse(@Nonnull final BufferedSource bufferedSource,
125133 int tableId = -1 ;
126134 boolean startNewTable = false ;
127135 FluxTable table = null ;
136+ List <String > groups = Collections .emptyList ();
128137 for (CSVRecord csvRecord : parser ) {
129138
130139 if (cancellable .isCancelled ()) {
@@ -157,10 +166,11 @@ public void parseFluxResponse(@Nonnull final BufferedSource bufferedSource,
157166
158167 String token = csvRecord .get (0 );
159168 //// start new table
160- if ("#datatype" . equals (token )) {
169+ if (ANNOTATIONS . contains (token ) && ! startNewTable ) {
161170 startNewTable = true ;
162171
163172 table = new FluxTable ();
173+ groups = Collections .emptyList ();
164174 consumer .accept (tableIndex , cancellable , table );
165175 tableIndex ++;
166176 tableId = -1 ;
@@ -171,18 +181,17 @@ public void parseFluxResponse(@Nonnull final BufferedSource bufferedSource,
171181 }
172182
173183 //#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string
174- if ("#datatype" .equals (token )) {
184+ if (ANNOTATION_DATATYPE .equals (token )) {
175185 addDataTypes (table , toList (csvRecord ));
176186
177- } else if ("#group" .equals (token )) {
178- addGroups (table , toList (csvRecord ));
179-
180- } else if ("#default" .equals (token )) {
187+ } else if (ANNOTATION_GROUP .equals (token )) {
188+ groups = toList (csvRecord );
189+ } else if (ANNOTATION_DEFAULT .equals (token )) {
181190 addDefaultEmptyValues (table , toList (csvRecord ));
182-
183191 } else {
184192 // parse column names
185193 if (startNewTable ) {
194+ addGroups (table , groups );
186195 addColumnNamesAndTags (table , toList (csvRecord ));
187196 startNewTable = false ;
188197 continue ;
@@ -297,12 +306,12 @@ private void addGroups(@Nonnull final FluxTable table, @Nonnull final List<Strin
297306 Arguments .checkNotNull (table , "table" );
298307 Arguments .checkNotNull (groups , "groups" );
299308
300- for (int i = 0 ; i < groups .size (); i ++) {
309+ for (int i = 0 ; i < table . getColumns () .size (); i ++) {
301310
302311 FluxColumn fluxColumn = getFluxColumn (i , table );
303312
304313 String group = groups .get (i );
305- fluxColumn .setGroup (Boolean .valueOf (group ));
314+ fluxColumn .setGroup (Boolean .parseBoolean (group ));
306315 }
307316 }
308317
0 commit comments