Skip to content

Commit

Permalink
Handle create index with batch FlintJob (opensearch-project#2734) (op…
Browse files Browse the repository at this point in the history
…ensearch-project#2738)

* update grammar file



* batch job for create manual refresh index



* dispatcher test for index dml query



* borrow lease for refresh query, not batch



* spotlessApply



* add release note



* update comment



---------


(cherry picked from commit b959039)

Signed-off-by: Sean Kao <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 4541486 commit aa606a9
Show file tree
Hide file tree
Showing 8 changed files with 261 additions and 40 deletions.
5 changes: 4 additions & 1 deletion release-notes/opensearch-sql.release-notes-2.15.0.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ Compatible with OpenSearch and OpenSearch Dashboards Version 2.15.0
* Add option to use LakeFormation in S3Glue data source ([#2624](https://github.com/opensearch-project/sql/pull/2624))
* Remove direct ClusterState access in LocalClusterState ([#2717](https://github.com/opensearch-project/sql/pull/2717))

### Bug Fixes
* Handle create index with batch FlintJob ([#2734](https://github.com/opensearch-project/sql/pull/2734))

### Maintenance
* Use EMR serverless bundled iceberg JAR ([#2632](https://github.com/opensearch-project/sql/pull/2632))
* Update maintainers list ([#2663](https://github.com/opensearch-project/sql/pull/2663))
Expand All @@ -28,4 +31,4 @@ Compatible with OpenSearch and OpenSearch Dashboards Version 2.15.0
* Abstract queryId generation ([#2695](https://github.com/opensearch-project/sql/pull/2695))
* Introduce SessionConfigSupplier to abstract settings ([#2707](https://github.com/opensearch-project/sql/pull/2707))
* Add accountId to data models ([#2709](https://github.com/opensearch-project/sql/pull/2709))
* Pass down request context to data accessors ([#2715](https://github.com/opensearch-project/sql/pull/2715))
* Pass down request context to data accessors ([#2715](https://github.com/opensearch-project/sql/pull/2715))
44 changes: 38 additions & 6 deletions spark/src/main/antlr/SqlBaseLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,35 @@ lexer grammar SqlBaseLexer;
public void markUnclosedComment() {
has_unclosed_bracketed_comment = true;
}

/**
* When greater than zero, it's in the middle of parsing ARRAY/MAP/STRUCT type.
*/
public int complex_type_level_counter = 0;

/**
* Increase the counter by one when hits KEYWORD 'ARRAY', 'MAP', 'STRUCT'.
*/
public void incComplexTypeLevelCounter() {
complex_type_level_counter++;
}

/**
* Decrease the counter by one when hits close tag '>' && the counter greater than zero
* which means we are in the middle of complex type parsing. Otherwise, it's a dangling
* GT token and we do nothing.
*/
public void decComplexTypeLevelCounter() {
if (complex_type_level_counter > 0) complex_type_level_counter--;
}

/**
* If the counter is zero, it's a shift right operator. It can be closing tags of an complex
* type definition, such as MAP<INT, ARRAY<INT>>.
*/
public boolean isShiftRightOperator() {
return complex_type_level_counter == 0 ? true : false;
}
}

SEMICOLON: ';';
Expand Down Expand Up @@ -100,14 +129,15 @@ ANTI: 'ANTI';
ANY: 'ANY';
ANY_VALUE: 'ANY_VALUE';
ARCHIVE: 'ARCHIVE';
ARRAY: 'ARRAY';
ARRAY: 'ARRAY' {incComplexTypeLevelCounter();};
AS: 'AS';
ASC: 'ASC';
AT: 'AT';
AUTHORIZATION: 'AUTHORIZATION';
BETWEEN: 'BETWEEN';
BIGINT: 'BIGINT';
BINARY: 'BINARY';
BINDING: 'BINDING';
BOOLEAN: 'BOOLEAN';
BOTH: 'BOTH';
BUCKET: 'BUCKET';
Expand Down Expand Up @@ -137,6 +167,7 @@ COMMENT: 'COMMENT';
COMMIT: 'COMMIT';
COMPACT: 'COMPACT';
COMPACTIONS: 'COMPACTIONS';
COMPENSATION: 'COMPENSATION';
COMPUTE: 'COMPUTE';
CONCATENATE: 'CONCATENATE';
CONSTRAINT: 'CONSTRAINT';
Expand Down Expand Up @@ -257,7 +288,7 @@ LOCKS: 'LOCKS';
LOGICAL: 'LOGICAL';
LONG: 'LONG';
MACRO: 'MACRO';
MAP: 'MAP';
MAP: 'MAP' {incComplexTypeLevelCounter();};
MATCHED: 'MATCHED';
MERGE: 'MERGE';
MICROSECOND: 'MICROSECOND';
Expand Down Expand Up @@ -298,8 +329,6 @@ OVERWRITE: 'OVERWRITE';
PARTITION: 'PARTITION';
PARTITIONED: 'PARTITIONED';
PARTITIONS: 'PARTITIONS';
PERCENTILE_CONT: 'PERCENTILE_CONT';
PERCENTILE_DISC: 'PERCENTILE_DISC';
PERCENTLIT: 'PERCENT';
PIVOT: 'PIVOT';
PLACING: 'PLACING';
Expand Down Expand Up @@ -362,7 +391,7 @@ STATISTICS: 'STATISTICS';
STORED: 'STORED';
STRATIFY: 'STRATIFY';
STRING: 'STRING';
STRUCT: 'STRUCT';
STRUCT: 'STRUCT' {incComplexTypeLevelCounter();};
SUBSTR: 'SUBSTR';
SUBSTRING: 'SUBSTRING';
SYNC: 'SYNC';
Expand Down Expand Up @@ -439,8 +468,11 @@ NEQ : '<>';
NEQJ: '!=';
LT : '<';
LTE : '<=' | '!>';
GT : '>';
GT : '>' {decComplexTypeLevelCounter();};
GTE : '>=' | '!<';
SHIFT_LEFT: '<<';
SHIFT_RIGHT: '>>' {isShiftRightOperator()}?;
SHIFT_RIGHT_UNSIGNED: '>>>' {isShiftRightOperator()}?;

PLUS: '+';
MINUS: '-';
Expand Down
77 changes: 50 additions & 27 deletions spark/src/main/antlr/SqlBaseParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ statement
| USE identifierReference #use
| USE namespace identifierReference #useNamespace
| SET CATALOG (errorCapturingIdentifier | stringLit) #setCatalog
| CREATE namespace (IF NOT EXISTS)? identifierReference
| CREATE namespace (IF errorCapturingNot EXISTS)? identifierReference
(commentSpec |
locationSpec |
(WITH (DBPROPERTIES | PROPERTIES) propertyList))* #createNamespace
Expand All @@ -92,7 +92,7 @@ statement
| createTableHeader (LEFT_PAREN createOrReplaceTableColTypeList RIGHT_PAREN)? tableProvider?
createTableClauses
(AS? query)? #createTable
| CREATE TABLE (IF NOT EXISTS)? target=tableIdentifier
| CREATE TABLE (IF errorCapturingNot EXISTS)? target=tableIdentifier
LIKE source=tableIdentifier
(tableProvider |
rowFormat |
Expand Down Expand Up @@ -141,7 +141,7 @@ statement
SET SERDE stringLit (WITH SERDEPROPERTIES propertyList)? #setTableSerDe
| ALTER TABLE identifierReference (partitionSpec)?
SET SERDEPROPERTIES propertyList #setTableSerDe
| ALTER (TABLE | VIEW) identifierReference ADD (IF NOT EXISTS)?
| ALTER (TABLE | VIEW) identifierReference ADD (IF errorCapturingNot EXISTS)?
partitionSpecLocation+ #addTablePartition
| ALTER TABLE identifierReference
from=partitionSpec RENAME TO to=partitionSpec #renameTablePartition
Expand All @@ -153,17 +153,19 @@ statement
| DROP TABLE (IF EXISTS)? identifierReference PURGE? #dropTable
| DROP VIEW (IF EXISTS)? identifierReference #dropView
| CREATE (OR REPLACE)? (GLOBAL? TEMPORARY)?
VIEW (IF NOT EXISTS)? identifierReference
VIEW (IF errorCapturingNot EXISTS)? identifierReference
identifierCommentList?
(commentSpec |
schemaBinding |
(PARTITIONED ON identifierList) |
(TBLPROPERTIES propertyList))*
AS query #createView
| CREATE (OR REPLACE)? GLOBAL? TEMPORARY VIEW
tableIdentifier (LEFT_PAREN colTypeList RIGHT_PAREN)? tableProvider
(OPTIONS propertyList)? #createTempViewUsing
| ALTER VIEW identifierReference AS? query #alterViewQuery
| CREATE (OR REPLACE)? TEMPORARY? FUNCTION (IF NOT EXISTS)?
| ALTER VIEW identifierReference schemaBinding #alterViewSchemaBinding
| CREATE (OR REPLACE)? TEMPORARY? FUNCTION (IF errorCapturingNot EXISTS)?
identifierReference AS className=stringLit
(USING resource (COMMA resource)*)? #createFunction
| DROP TEMPORARY? FUNCTION (IF EXISTS)? identifierReference #dropFunction
Expand Down Expand Up @@ -224,7 +226,7 @@ statement
| SET .*? #setConfiguration
| RESET configKey #resetQuotedConfiguration
| RESET .*? #resetConfiguration
| CREATE INDEX (IF NOT EXISTS)? identifier ON TABLE?
| CREATE INDEX (IF errorCapturingNot EXISTS)? identifier ON TABLE?
identifierReference (USING indexType=identifier)?
LEFT_PAREN columns=multipartIdentifierPropertyList RIGHT_PAREN
(OPTIONS options=propertyList)? #createIndex
Expand Down Expand Up @@ -315,7 +317,7 @@ unsupportedHiveNativeCommands
;

createTableHeader
: CREATE TEMPORARY? EXTERNAL? TABLE (IF NOT EXISTS)? identifierReference
: CREATE TEMPORARY? EXTERNAL? TABLE (IF errorCapturingNot EXISTS)? identifierReference
;

replaceTableHeader
Expand All @@ -342,6 +344,10 @@ locationSpec
: LOCATION stringLit
;

schemaBinding
: WITH SCHEMA (BINDING | COMPENSATION | EVOLUTION | TYPE EVOLUTION)
;

commentSpec
: COMMENT stringLit
;
Expand All @@ -351,8 +357,8 @@ query
;

insertInto
: INSERT OVERWRITE TABLE? identifierReference (partitionSpec (IF NOT EXISTS)?)? ((BY NAME) | identifierList)? #insertOverwriteTable
| INSERT INTO TABLE? identifierReference partitionSpec? (IF NOT EXISTS)? ((BY NAME) | identifierList)? #insertIntoTable
: INSERT OVERWRITE TABLE? identifierReference (partitionSpec (IF errorCapturingNot EXISTS)?)? ((BY NAME) | identifierList)? #insertOverwriteTable
| INSERT INTO TABLE? identifierReference partitionSpec? (IF errorCapturingNot EXISTS)? ((BY NAME) | identifierList)? #insertIntoTable
| INSERT INTO TABLE? identifierReference REPLACE whereClause #insertIntoReplaceWhere
| INSERT OVERWRITE LOCAL? DIRECTORY path=stringLit rowFormat? createFileFormat? #insertOverwriteHiveDir
| INSERT OVERWRITE LOCAL? DIRECTORY (path=stringLit)? tableProvider (OPTIONS options=propertyList)? #insertOverwriteDir
Expand Down Expand Up @@ -389,6 +395,7 @@ describeFuncName
| comparisonOperator
| arithmeticOperator
| predicateOperator
| shiftOperator
| BANG
;

Expand Down Expand Up @@ -588,11 +595,11 @@ matchedClause
: WHEN MATCHED (AND matchedCond=booleanExpression)? THEN matchedAction
;
notMatchedClause
: WHEN NOT MATCHED (BY TARGET)? (AND notMatchedCond=booleanExpression)? THEN notMatchedAction
: WHEN errorCapturingNot MATCHED (BY TARGET)? (AND notMatchedCond=booleanExpression)? THEN notMatchedAction
;

notMatchedBySourceClause
: WHEN NOT MATCHED BY SOURCE (AND notMatchedBySourceCond=booleanExpression)? THEN notMatchedBySourceAction
: WHEN errorCapturingNot MATCHED BY SOURCE (AND notMatchedBySourceCond=booleanExpression)? THEN notMatchedBySourceAction
;

matchedAction
Expand Down Expand Up @@ -838,9 +845,11 @@ tableArgumentPartitioning
: ((WITH SINGLE PARTITION)
| ((PARTITION | DISTRIBUTE) BY
(((LEFT_PAREN partition+=expression (COMMA partition+=expression)* RIGHT_PAREN))
| (expression (COMMA invalidMultiPartitionExpression=expression)+)
| partition+=expression)))
((ORDER | SORT) BY
(((LEFT_PAREN sortItem (COMMA sortItem)* RIGHT_PAREN)
| (sortItem (COMMA invalidMultiSortItem=sortItem)+)
| sortItem)))?
;

Expand Down Expand Up @@ -956,28 +965,40 @@ booleanExpression
;

predicate
: NOT? kind=BETWEEN lower=valueExpression AND upper=valueExpression
| NOT? kind=IN LEFT_PAREN expression (COMMA expression)* RIGHT_PAREN
| NOT? kind=IN LEFT_PAREN query RIGHT_PAREN
| NOT? kind=RLIKE pattern=valueExpression
| NOT? kind=(LIKE | ILIKE) quantifier=(ANY | SOME | ALL) (LEFT_PAREN RIGHT_PAREN | LEFT_PAREN expression (COMMA expression)* RIGHT_PAREN)
| NOT? kind=(LIKE | ILIKE) pattern=valueExpression (ESCAPE escapeChar=stringLit)?
| IS NOT? kind=NULL
| IS NOT? kind=(TRUE | FALSE | UNKNOWN)
| IS NOT? kind=DISTINCT FROM right=valueExpression
: errorCapturingNot? kind=BETWEEN lower=valueExpression AND upper=valueExpression
| errorCapturingNot? kind=IN LEFT_PAREN expression (COMMA expression)* RIGHT_PAREN
| errorCapturingNot? kind=IN LEFT_PAREN query RIGHT_PAREN
| errorCapturingNot? kind=RLIKE pattern=valueExpression
| errorCapturingNot? kind=(LIKE | ILIKE) quantifier=(ANY | SOME | ALL) (LEFT_PAREN RIGHT_PAREN | LEFT_PAREN expression (COMMA expression)* RIGHT_PAREN)
| errorCapturingNot? kind=(LIKE | ILIKE) pattern=valueExpression (ESCAPE escapeChar=stringLit)?
| IS errorCapturingNot? kind=NULL
| IS errorCapturingNot? kind=(TRUE | FALSE | UNKNOWN)
| IS errorCapturingNot? kind=DISTINCT FROM right=valueExpression
;

errorCapturingNot
: NOT
| BANG
;

valueExpression
: primaryExpression #valueExpressionDefault
| operator=(MINUS | PLUS | TILDE) valueExpression #arithmeticUnary
| left=valueExpression operator=(ASTERISK | SLASH | PERCENT | DIV) right=valueExpression #arithmeticBinary
| left=valueExpression operator=(PLUS | MINUS | CONCAT_PIPE) right=valueExpression #arithmeticBinary
| left=valueExpression shiftOperator right=valueExpression #shiftExpression
| left=valueExpression operator=AMPERSAND right=valueExpression #arithmeticBinary
| left=valueExpression operator=HAT right=valueExpression #arithmeticBinary
| left=valueExpression operator=PIPE right=valueExpression #arithmeticBinary
| left=valueExpression comparisonOperator right=valueExpression #comparison
;

shiftOperator
: SHIFT_LEFT
| SHIFT_RIGHT
| SHIFT_RIGHT_UNSIGNED
;

datetimeUnit
: YEAR | QUARTER | MONTH
| WEEK | DAY | DAYOFYEAR
Expand Down Expand Up @@ -1143,7 +1164,7 @@ qualifiedColTypeWithPosition
;

colDefinitionDescriptorWithPosition
: NOT NULL
: errorCapturingNot NULL
| defaultExpression
| commentSpec
| colPosition
Expand All @@ -1162,7 +1183,7 @@ colTypeList
;

colType
: colName=errorCapturingIdentifier dataType (NOT NULL)? commentSpec?
: colName=errorCapturingIdentifier dataType (errorCapturingNot NULL)? commentSpec?
;

createOrReplaceTableColTypeList
Expand All @@ -1174,7 +1195,7 @@ createOrReplaceTableColType
;

colDefinitionOption
: NOT NULL
: errorCapturingNot NULL
| defaultExpression
| generationExpression
| commentSpec
Expand All @@ -1189,7 +1210,7 @@ complexColTypeList
;

complexColType
: errorCapturingIdentifier COLON? dataType (NOT NULL)? commentSpec?
: errorCapturingIdentifier COLON? dataType (errorCapturingNot NULL)? commentSpec?
;

whenClause
Expand Down Expand Up @@ -1296,7 +1317,7 @@ alterColumnAction
: TYPE dataType
| commentSpec
| colPosition
| setOrDrop=(SET | DROP) NOT NULL
| setOrDrop=(SET | DROP) errorCapturingNot NULL
| SET defaultExpression
| dropDefault=DROP DEFAULT
;
Expand Down Expand Up @@ -1343,6 +1364,7 @@ ansiNonReserved
| BIGINT
| BINARY
| BINARY_HEX
| BINDING
| BOOLEAN
| BUCKET
| BUCKETS
Expand All @@ -1365,6 +1387,7 @@ ansiNonReserved
| COMMIT
| COMPACT
| COMPACTIONS
| COMPENSATION
| COMPUTE
| CONCATENATE
| COST
Expand Down Expand Up @@ -1643,6 +1666,7 @@ nonReserved
| BIGINT
| BINARY
| BINARY_HEX
| BINDING
| BOOLEAN
| BOTH
| BUCKET
Expand Down Expand Up @@ -1672,6 +1696,7 @@ nonReserved
| COMMIT
| COMPACT
| COMPACTIONS
| COMPENSATION
| COMPUTE
| CONCATENATE
| CONSTRAINT
Expand Down Expand Up @@ -1824,8 +1849,6 @@ nonReserved
| PARTITION
| PARTITIONED
| PARTITIONS
| PERCENTILE_CONT
| PERCENTILE_DISC
| PERCENTLIT
| PIVOT
| PLACING
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
import org.opensearch.sql.spark.dispatcher.model.JobType;
import org.opensearch.sql.spark.leasemanager.LeaseManager;
import org.opensearch.sql.spark.leasemanager.model.LeaseRequest;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;

/**
Expand Down Expand Up @@ -69,8 +68,6 @@ public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
@Override
public DispatchQueryResponse submit(
DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) {
leaseManager.borrow(new LeaseRequest(JobType.BATCH, dispatchQueryRequest.getDatasource()));

String clusterName = dispatchQueryRequest.getClusterName();
Map<String, String> tags = context.getTags();
DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata();
Expand Down
Loading

0 comments on commit aa606a9

Please sign in to comment.