Skip to content

Commit

Permalink
[GH-15810] Allow the user to adjust parquet import timezone [nocheck] (
Browse files Browse the repository at this point in the history
…#16304)

* [GH-15810] Allow the user to adjust parquet import timezone

* new approach

* new approach part 2

* add export option

* fix test

* fix java tests compilation
  • Loading branch information
krasinski authored Oct 29, 2024
1 parent 57bc954 commit a7c8c08
Show file tree
Hide file tree
Showing 31 changed files with 204 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public static void gbm_example_flow() {
null,
(byte)'\\',
false,
false,
null).execute().body();
System.out.println("parseSetupBody: " + parseSetupBody);

Expand Down Expand Up @@ -140,6 +141,7 @@ public static void gbm_example_flow() {
null,
null,
parseSetupBody.escapechar,
false,
null).execute().body();
System.out.println("parseBody: " + parseBody);

Expand Down
2 changes: 1 addition & 1 deletion h2o-core/src/main/java/water/api/FramesHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ public FramesV3 export(int version, FramesV3 s) {
if (s.parallel) {
Log.warn("Parallel export to a single file is not supported for parquet format! Export will continue with a parquet-specific setup.");
}
s.job = new JobV3(Frame.exportParquet(fr, s.path, s.force, s.compression, s.write_checksum));
s.job = new JobV3(Frame.exportParquet(fr, s.path, s.force, s.compression, s.write_checksum, s.tz_adjust_from_local));
} else {
Frame.CSVStreamParams csvParms = new Frame.CSVStreamParams()
.setSeparator(s.separator)
Expand Down
2 changes: 1 addition & 1 deletion h2o-core/src/main/java/water/api/ParseHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public ParseV3 parse(int version, ParseV3 parse) {
new ParseWriter.ParseErr[0], parse.chunk_size,
parse.decrypt_tool != null ? parse.decrypt_tool.key() : null, parse.skipped_columns,
parse.custom_non_data_line_markers != null ? parse.custom_non_data_line_markers.getBytes(): null,
parse.escapechar, parse.force_col_types);
parse.escapechar, parse.force_col_types, parse.tz_adjust_to_local);

if (parse.source_frames == null)
throw new H2OIllegalArgumentException("Data for Frame '" + parse.destination_frame.name + "' is not available. Please check that the path is valid (for all H2O nodes).'");
Expand Down
3 changes: 3 additions & 0 deletions h2o-core/src/main/java/water/api/schemas3/FramesV3.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public class FramesV3 extends RequestSchemaV3<Frames, FramesV3> {
@API(help="Specifies if checksum should be written next to data files on export (if supported by export format).")
public boolean write_checksum = true;

@API(help="Specifies if the timezone should be adjusted from local to UTC timezone (parquet only).")
public boolean tz_adjust_from_local = false;

@API(help="Field separator (default ',')")
public byte separator = Frame.CSVStreamParams.DEFAULT_SEPARATOR;

Expand Down
3 changes: 3 additions & 0 deletions h2o-core/src/main/java/water/api/schemas3/ParseSetupV3.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ public class ParseSetupV3 extends RequestSchemaV3<ParseSetup, ParseSetupV3> {
" will happen without setting this parameter. Defaults to false.", direction=API.Direction.INPUT)
public boolean force_col_types;

@API(help="Adjust the imported time from GMT timezone to cluster timezone.", direction=API.Direction.INPUT)
public boolean tz_adjust_to_local;

@Override
public ParseSetup fillImpl(ParseSetup impl) {
ParseSetup parseSetup = fillImpl(impl, new String[] {"parse_type"});
Expand Down
3 changes: 3 additions & 0 deletions h2o-core/src/main/java/water/api/schemas3/ParseV3.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,7 @@ public class ParseV3 extends RequestSchemaV3<Iced, ParseV3> {

@API(help="One ASCII character used to escape other characters.", direction=API.Direction.INOUT)
public byte escapechar = ParseSetup.DEFAULT_ESCAPE_CHAR;

@API(help="Adjust the imported time from GMT timezone to cluster timezone.", direction=API.Direction.INPUT)
public boolean tz_adjust_to_local;
}
4 changes: 2 additions & 2 deletions h2o-core/src/main/java/water/fvec/Frame.java
Original file line number Diff line number Diff line change
Expand Up @@ -1614,7 +1614,7 @@ public static Job export(Frame fr, String path, String frameName, boolean overwr
return job.start(t, fr.anyVec().nChunks());
}

public static Job exportParquet(Frame fr, String path, boolean overwrite, String compression, boolean writeChecksum) {
public static Job exportParquet(Frame fr, String path, boolean overwrite, String compression, boolean writeChecksum, boolean tzAdjustFromLocal) {
// Validate input
if (H2O.getPM().isFileAccessDenied(path)) {
throw new H2OFileAccessDeniedException("File " + path + " access denied");
Expand All @@ -1638,7 +1638,7 @@ public static Job exportParquet(Frame fr, String path, boolean overwrite, String
}
Job job = new Job<>(fr._key, "water.fvec.Frame", "Export dataset");

H2O.H2OCountedCompleter t = parquetExporter.export(fr, path, overwrite, compression, writeChecksum);
H2O.H2OCountedCompleter t = parquetExporter.export(fr, path, overwrite, compression, writeChecksum, tzAdjustFromLocal);
return job.start(t, fr.anyVec().nChunks());
}

Expand Down
2 changes: 1 addition & 1 deletion h2o-core/src/main/java/water/parser/ARFFParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ static ParseSetup guessSetup(ByteVec bv, byte[] bits, byte sep, boolean singleQu
naStrings = addDefaultNAs(naStrings, ncols);

// Return the final setup
return new ParseSetup(ARFF_INFO, sep, singleQuotes, ParseSetup.NO_HEADER, ncols, labels, ctypes, domains, naStrings, data, nonDataLineMarkers, escapechar);
return new ParseSetup(ARFF_INFO, sep, singleQuotes, ParseSetup.NO_HEADER, ncols, labels, ctypes, domains, naStrings, data, nonDataLineMarkers, escapechar, false);
}

private static String[][] addDefaultNAs(String[][] naStrings, int nCols) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

public interface BinaryFormatExporter {

H2O.H2OCountedCompleter export(Frame frame, String path, boolean force, String compression, boolean writeChecksum);
H2O.H2OCountedCompleter export(Frame frame, String path, boolean force, String compression, boolean writeChecksum, boolean tzAdjustFromLocal);

boolean supports(ExportFileFormat format);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public abstract class BinaryParserProvider extends ParserProvider {
@Deprecated
public final ParseSetup guessSetup(ByteVec v, byte[] bits, byte sep, int ncols, boolean singleQuotes, int checkHeader, String[] columnNames, byte[] columnTypes, String[][] domains, String[][] naStrings) {
ParseSetup ps = new ParseSetup(null, sep, singleQuotes, checkHeader,
ncols, columnNames, columnTypes, domains, naStrings, null);
ncols, columnNames, columnTypes, domains, naStrings, null, false);
return guessSetup(v, bits, ps);
}

Expand Down
4 changes: 2 additions & 2 deletions h2o-core/src/main/java/water/parser/CsvParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ else if (ParseUUID.isUUID(str))
}
//FIXME should set warning message and let fall through
return new ParseSetup(CSV_INFO, GUESS_SEP, singleQuotes, checkHeader, 1, null, ctypes, domains, naStrings, data, new ParseWriter.ParseErr[0],FileVec.DFLT_CHUNK_SIZE,
nonDataLineMarkers, escapechar);
nonDataLineMarkers, escapechar, false);
}
}
data[0] = determineTokens(lines[0], sep, singleQuotes, escapechar);
Expand Down Expand Up @@ -791,7 +791,7 @@ else if (ParseUUID.isUUID(str))

// Assemble the setup understood so far
ParseSetup resSetup = new ParseSetup(CSV_INFO, sep, singleQuotes, checkHeader, ncols, labels, null, null /*domains*/, naStrings, data,
nonDataLineMarkers, escapechar);
nonDataLineMarkers, escapechar, false);

// now guess the types
if (columnTypes == null || ncols != columnTypes.length) {
Expand Down
52 changes: 33 additions & 19 deletions h2o-core/src/main/java/water/parser/ParseSetup.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class ParseSetup extends Iced {
int[] _parse_columns_indices; // store column indices to be parsed into the final file
byte[] _nonDataLineMarkers;
boolean _force_col_types = false; // at end of parsing, change column type to users specified ones
boolean _tz_adjust_to_local = false;
String[] _orig_column_types; // copy over the original column type setup before translating to byte[]

String[] _synthetic_column_names; // Columns with constant values to be added to parsed Frame
Expand Down Expand Up @@ -73,35 +74,35 @@ public ParseSetup(ParseSetup ps) {
ps._separator, ps._single_quotes, ps._check_header, ps._number_columns,
ps._column_names, ps._column_types, ps._domains, ps._na_strings, ps._data,
new ParseWriter.ParseErr[0], ps._chunk_size, ps._decrypt_tool, ps._skipped_columns,
ps._nonDataLineMarkers, ps._escapechar);
ps._nonDataLineMarkers, ps._escapechar, ps._tz_adjust_to_local);
}

public static ParseSetup makeSVMLightSetup(){
return new ParseSetup(SVMLight_INFO, ParseSetup.GUESS_SEP,
false,ParseSetup.NO_HEADER,1,null,new byte[]{Vec.T_NUM},null,null,null, new ParseWriter.ParseErr[0],
null);
null, false);
}

// This method was called during guess setup, lot of things are null, like ctypes.
// when it is called again, it either contains the guess column types or it will have user defined column types
public ParseSetup(ParserInfo parse_type, byte sep, boolean singleQuotes, int checkHeader, int ncols, String[] columnNames,
byte[] ctypes, String[][] domains, String[][] naStrings, String[][] data, ParseWriter.ParseErr[] errs,
int chunkSize, byte[] nonDataLineMarkers, byte escapeChar) {
int chunkSize, byte[] nonDataLineMarkers, byte escapeChar, boolean tzAdjustToLocal) {
this(parse_type, sep, singleQuotes, checkHeader, ncols, columnNames, ctypes, domains, naStrings, data, errs,
chunkSize, null, null, nonDataLineMarkers, escapeChar);
chunkSize, null, null, nonDataLineMarkers, escapeChar, tzAdjustToLocal);
}

public ParseSetup(ParserInfo parse_type, byte sep, boolean singleQuotes, int checkHeader, int ncols, String[] columnNames,
byte[] ctypes, String[][] domains, String[][] naStrings, String[][] data, ParseWriter.ParseErr[] errs,
int chunkSize, Key<DecryptionTool> decrypt_tool, int[] skipped_columns, byte[] nonDataLineMarkers, byte escapeChar) {
int chunkSize, Key<DecryptionTool> decrypt_tool, int[] skipped_columns, byte[] nonDataLineMarkers, byte escapeChar, boolean tzAdjustToLocal) {
this(parse_type, sep, singleQuotes, checkHeader, ncols, columnNames, ctypes, domains, naStrings, data, errs,
chunkSize, decrypt_tool, skipped_columns, nonDataLineMarkers, escapeChar, false);
chunkSize, decrypt_tool, skipped_columns, nonDataLineMarkers, escapeChar, false, tzAdjustToLocal);
}

public ParseSetup(ParserInfo parse_type, byte sep, boolean singleQuotes, int checkHeader, int ncols, String[] columnNames,
byte[] ctypes, String[][] domains, String[][] naStrings, String[][] data, ParseWriter.ParseErr[] errs,
int chunkSize, Key<DecryptionTool> decrypt_tool, int[] skipped_columns, byte[] nonDataLineMarkers,
byte escapeChar, boolean force_col_types) {
byte escapeChar, boolean force_col_types, boolean tz_adjust_to_local) {
_parse_type = parse_type;
_separator = sep;
_nonDataLineMarkers = nonDataLineMarkers;
Expand All @@ -119,6 +120,7 @@ public ParseSetup(ParserInfo parse_type, byte sep, boolean singleQuotes, int che
_skipped_columns = skipped_columns;
_escapechar = escapeChar;
_force_col_types = force_col_types;
_tz_adjust_to_local = tz_adjust_to_local;
setParseColumnIndices(ncols, _skipped_columns);
}

Expand Down Expand Up @@ -172,7 +174,7 @@ ps.column_names, strToColumnTypes(ps.column_types),
ps.chunk_size,
ps.decrypt_tool != null ? ps.decrypt_tool.key() : null, ps.skipped_columns,
ps.custom_non_data_line_markers != null ? ps.custom_non_data_line_markers.getBytes() : null,
ps.escapechar, ps.force_col_types);
ps.escapechar, ps.force_col_types, ps.tz_adjust_to_local);
this._force_col_types = ps.force_col_types;
this._orig_column_types = this._force_col_types ? (ps.column_types == null ? null : ps.column_types.clone()) : null;
}
Expand All @@ -185,9 +187,9 @@ ps.column_names, strToColumnTypes(ps.column_types),
*/
public ParseSetup(ParserInfo parseType, byte sep, boolean singleQuotes, int checkHeader,
int ncols, String[] columnNames, byte[] ctypes,
String[][] domains, String[][] naStrings, String[][] data, byte[] nonDataLineMarkers, byte escapeChar) {
String[][] domains, String[][] naStrings, String[][] data, byte[] nonDataLineMarkers, byte escapeChar, boolean tzAdjustToLocal) {
this(parseType, sep, singleQuotes, checkHeader, ncols, columnNames, ctypes,
domains, naStrings, data, new ParseWriter.ParseErr[0], FileVec.DFLT_CHUNK_SIZE, nonDataLineMarkers, escapeChar);
domains, naStrings, data, new ParseWriter.ParseErr[0], FileVec.DFLT_CHUNK_SIZE, nonDataLineMarkers, escapeChar, tzAdjustToLocal);
}

/**
Expand All @@ -198,30 +200,30 @@ public ParseSetup(ParserInfo parseType, byte sep, boolean singleQuotes, int chec
*/
public ParseSetup(ParserInfo parseType, byte sep, boolean singleQuotes, int checkHeader,
int ncols, String[] columnNames, byte[] ctypes,
String[][] domains, String[][] naStrings, String[][] data, byte escapeChar) {
String[][] domains, String[][] naStrings, String[][] data, byte escapeChar, boolean tzAdjustToLocal) {
this(parseType, sep, singleQuotes, checkHeader, ncols, columnNames, ctypes,
domains, naStrings, data, new ParseWriter.ParseErr[0], FileVec.DFLT_CHUNK_SIZE, null, escapeChar);
domains, naStrings, data, new ParseWriter.ParseErr[0], FileVec.DFLT_CHUNK_SIZE, null, escapeChar, tzAdjustToLocal);
}

public ParseSetup(ParserInfo parseType, byte sep, boolean singleQuotes, int checkHeader,
int ncols, String[] columnNames, byte[] ctypes,
String[][] domains, String[][] naStrings, String[][] data) {
String[][] domains, String[][] naStrings, String[][] data, boolean tzAdjustToLocal) {
this(parseType, sep, singleQuotes, checkHeader, ncols, columnNames, ctypes,
domains, naStrings, data, new ParseWriter.ParseErr[0], FileVec.DFLT_CHUNK_SIZE, null, ParseSetup.DEFAULT_ESCAPE_CHAR);
domains, naStrings, data, new ParseWriter.ParseErr[0], FileVec.DFLT_CHUNK_SIZE, null, ParseSetup.DEFAULT_ESCAPE_CHAR, tzAdjustToLocal);
}

public ParseSetup(ParserInfo parseType, byte sep, boolean singleQuotes, int checkHeader,
int ncols, String[] columnNames, byte[] ctypes,
String[][] domains, String[][] naStrings, String[][] data, ParseWriter.ParseErr[] errs, byte[] nonDataLineMarkers) {
String[][] domains, String[][] naStrings, String[][] data, ParseWriter.ParseErr[] errs, byte[] nonDataLineMarkers, boolean tzAdjustToLocal) {
this(parseType, sep, singleQuotes, checkHeader, ncols, columnNames, ctypes,
domains, naStrings, data, errs, FileVec.DFLT_CHUNK_SIZE, nonDataLineMarkers, ParseSetup.DEFAULT_ESCAPE_CHAR);
domains, naStrings, data, errs, FileVec.DFLT_CHUNK_SIZE, nonDataLineMarkers, ParseSetup.DEFAULT_ESCAPE_CHAR, tzAdjustToLocal);
}

public ParseSetup(ParserInfo parseType, byte sep, boolean singleQuotes, int checkHeader,
int ncols, String[] columnNames, byte[] ctypes,
String[][] domains, String[][] naStrings, String[][] data, ParseWriter.ParseErr[] errs) {
this(parseType, sep, singleQuotes, checkHeader, ncols, columnNames, ctypes,
domains, naStrings, data, errs, FileVec.DFLT_CHUNK_SIZE, null, ParseSetup.DEFAULT_ESCAPE_CHAR);
domains, naStrings, data, errs, FileVec.DFLT_CHUNK_SIZE, null, ParseSetup.DEFAULT_ESCAPE_CHAR, false);
}

/**
Expand All @@ -230,7 +232,7 @@ public ParseSetup(ParserInfo parseType, byte sep, boolean singleQuotes, int chec
* Typically used by file type parsers for returning final invalid results
*/
public ParseSetup(ParserInfo parseType, byte sep, boolean singleQuotes, int checkHeader, int ncols, String[][] data, ParseWriter.ParseErr[] errs) {
this(parseType, sep, singleQuotes, checkHeader, ncols, null, null, null, null, data, errs, FileVec.DFLT_CHUNK_SIZE, null, ParseSetup.DEFAULT_ESCAPE_CHAR);
this(parseType, sep, singleQuotes, checkHeader, ncols, null, null, null, null, data, errs, FileVec.DFLT_CHUNK_SIZE, null, ParseSetup.DEFAULT_ESCAPE_CHAR, false);
}

/**
Expand Down Expand Up @@ -258,6 +260,10 @@ public String[] getOrigColumnTypes() {
public boolean getForceColTypes() {
return _force_col_types;
}

public boolean gettzAdjustToLocal() {
return _tz_adjust_to_local;
}

public byte[] getColumnTypes() { return _column_types; }

Expand Down Expand Up @@ -558,6 +564,7 @@ public GuessSetupTsk(ParseSetup userSetup) {
}
if (_gblSetup==null)
throw new RuntimeException("This H2O node couldn't find the file(s) to parse. Please check files and/or working directories.");
_gblSetup.settzAdjustToLocal(_userSetup.gettzAdjustToLocal());
_gblSetup.setFileName(FileUtils.keyToFileName(key));
}

Expand Down Expand Up @@ -587,6 +594,7 @@ public void reduce(GuessSetupTsk other) {
else
_gblSetup._na_strings = _userSetup._na_strings;
}
_gblSetup._tz_adjust_to_local = _gblSetup._tz_adjust_to_local || _userSetup._tz_adjust_to_local;
// if(_gblSetup._errs != null)
for(ParseWriter.ParseErr err:_gblSetup._errs)
Log.warn("ParseSetup: " + err.toString());
Expand All @@ -600,6 +608,7 @@ private ParseSetup mergeSetups(ParseSetup setupA, ParseSetup setupB, String file
}
ParseSetup mergedSetup = setupA;

mergedSetup._tz_adjust_to_local = setupA._tz_adjust_to_local || setupB._tz_adjust_to_local;
mergedSetup._check_header = unifyCheckHeader(setupA._check_header, setupB._check_header);

mergedSetup._separator = unifyColumnSeparators(setupA._separator, setupB._separator);
Expand Down Expand Up @@ -707,7 +716,7 @@ public static ParseSetup guessSetup(ByteVec bv, byte [] bits, ParseSetup userSet
*/
private ParseSetup toInitialSetup() {
return new ParseSetup(_parse_type, _separator, _single_quotes, _check_header, GUESS_COL_CNT, _column_names,
_column_types, null, null, null, _nonDataLineMarkers, _escapechar);
_column_types, null, null, null, _nonDataLineMarkers, _escapechar, false);
}

/**
Expand Down Expand Up @@ -878,6 +887,11 @@ public ParseSetup setForceColTypes(boolean force_col_types) {
return this;
}

public ParseSetup settzAdjustToLocal(boolean tz_adjust_to_local) {
this._tz_adjust_to_local = tz_adjust_to_local;
return this;
}

public ParseSetup setDomains(String[][] domains) {
this._domains = domains;
return this;
Expand Down
2 changes: 1 addition & 1 deletion h2o-core/src/main/java/water/parser/ParserProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public final ParseSetup guessSetup(ByteVec v, byte[] bits, ParseSetup userSetup)
*/
protected ParseSetup guessSetup_impl(ByteVec v, byte[] bits, ParseSetup userSetup) {
ParseSetup ps = guessInitSetup(v, bits, userSetup);
return guessFinalSetup(v, bits, ps);
return guessFinalSetup(v, bits, ps).settzAdjustToLocal(userSetup._tz_adjust_to_local);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion h2o-core/src/main/java/water/parser/SVMLightParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public static ParseSetup guessSetup(byte [] bits) {
if(lastNewline > 0) bits = Arrays.copyOf(bits,lastNewline+1);
SVMLightParser p = new SVMLightParser(new ParseSetup(SVMLight_INFO,
ParseSetup.GUESS_SEP, false,ParseSetup.GUESS_HEADER,ParseSetup.GUESS_COL_CNT,
null,null,null,null,null), null);
null,null,null,null,null, false), null);
SVMLightInspectParseWriter dout = new SVMLightInspectParseWriter();
p.parseChunk(0,new ByteAryData(bits,0), dout);
if (dout._ncols > 0 && dout._nlines > 0 && dout._nlines > dout._invalidLines)
Expand Down
Loading

0 comments on commit a7c8c08

Please sign in to comment.