diff --git a/ext/sheets/gsheet/gsheet.go b/ext/sheets/gsheet/gsheet.go index a413c171d4..40af491871 100644 --- a/ext/sheets/gsheet/gsheet.go +++ b/ext/sheets/gsheet/gsheet.go @@ -3,6 +3,7 @@ package gsheet import ( "context" "errors" + "fmt" "google.golang.org/api/option" "google.golang.org/api/sheets/v4" @@ -21,7 +22,7 @@ type GSheets struct { func NewGSheets(ctx context.Context, creds string) (*GSheets, error) { srv, err := sheets.NewService(ctx, option.WithCredentialsJSON([]byte(creds))) if err != nil { - return nil, errors.New("not able to create sheets service") + return nil, fmt.Errorf("not able to create sheets service err: %w", err) } return &GSheets{srv: srv}, nil diff --git a/ext/store/maxcompute/external_table.go b/ext/store/maxcompute/external_table.go index d9b5ab33d7..4925a67d1e 100644 --- a/ext/store/maxcompute/external_table.go +++ b/ext/store/maxcompute/external_table.go @@ -38,9 +38,9 @@ func (e ExternalTableHandle) Create(res *resource.Resource) error { return err } - if err := e.mcSchema.Create(e.mcSQLExecutor.CurrentSchemaName(), true, ""); err != nil { - return errors.InternalError(EntitySchema, "error while creating schema on maxcompute", err) - } + //if err := e.mcSchema.Create(e.mcSQLExecutor.CurrentSchemaName(), true, ""); err != nil { + // return errors.InternalError(EntitySchema, "error while creating schema on maxcompute", err) + //} tableSchema, err := buildExternalTableSchema(table) if err != nil { diff --git a/ext/store/maxcompute/external_table_options.go b/ext/store/maxcompute/external_table_options.go index e928801d56..cc86d32369 100644 --- a/ext/store/maxcompute/external_table_options.go +++ b/ext/store/maxcompute/external_table_options.go @@ -20,6 +20,8 @@ const ( func handlerForFormat(format string) string { switch strings.ToUpper(format) { // the built-in text extractor for CSV and TSV + case GoogleSheet: + return "com.aliyun.odps.CsvStorageHandler" case CSV: return "com.aliyun.odps.CsvStorageHandler" case TSV: diff --git a/ext/store/maxcompute/sheet_sync.go b/ext/store/maxcompute/sheet_sync.go index 39add59a2e..c7b1c0499d 100644 --- a/ext/store/maxcompute/sheet_sync.go +++ b/ext/store/maxcompute/sheet_sync.go @@ -16,10 +16,10 @@ import ( ) const ( - GsheetCreds = "" - OSSCreds = "" - ExtLocation = "" - putTimeOut = time.Duration(time.Second * 10) + GsheetCredsKey = "GOOGLE_SHEETS_ACCOUNT" + OSSCredsKey = "OSS_CREDS" + ExtLocation = "" + putTimeOut = time.Duration(time.Second * 10) ) type SyncerService struct { @@ -66,7 +66,7 @@ func (s *SyncerService) Sync(ctx context.Context, res *resource.Resource) error } func (s *SyncerService) getGsheet(ctx context.Context, tnnt tenant.Tenant, sheetURI string, range_ string) (string, error) { - secret, err := s.secretProvider.GetSecret(ctx, tnnt, GsheetCreds) + secret, err := s.secretProvider.GetSecret(ctx, tnnt, GsheetCredsKey) if err != nil { return "", err } @@ -75,7 +75,6 @@ func (s *SyncerService) getGsheet(ctx context.Context, tnnt tenant.Tenant, sheet if err != nil { return "", err } - return sheets.GetAsCSV(sheetURI, range_) } @@ -93,15 +92,18 @@ func (s *SyncerService) getBucketName(ctx context.Context, res *resource.Resourc } func (s *SyncerService) getObjectKey(ctx context.Context, res *resource.Resource, et *ExternalTable) (string, error) { + components, err := getURNComponent(res) + if err != nil { + return "", err + } location, err := s.getLocation(ctx, res, et) if err != nil { return "", err } - parts := strings.Split(location, "/") if len(parts) > 4 { path := strings.Join(parts[4:], "/") - return fmt.Sprintf("%s/%s.csv", path, res.Name().String()), nil + return fmt.Sprintf("%s%s.csv", path, components.Name), nil } return "", errors.New("unable to get object path from location") } @@ -124,7 +126,7 @@ func (s *SyncerService) getLocation(ctx context.Context, res *resource.Resource, func (s *SyncerService) writeContentToLocation(ctx context.Context, tnnt tenant.Tenant, bucketName, objectKey, content string) error { // Setup oss bucket writer - creds, err := s.secretProvider.GetSecret(ctx, tnnt, OSSCreds) + creds, err := s.secretProvider.GetSecret(ctx, tnnt, OSSCredsKey) if err != nil { return err } @@ -133,32 +135,11 @@ func (s *SyncerService) writeContentToLocation(ctx context.Context, tnnt tenant. return err } - // oss put request - var putStatus chan int64 - - resp, err := ossClient.PutObject(ctx, &oss.PutObjectRequest{ + _, err = ossClient.PutObject(ctx, &oss.PutObjectRequest{ Bucket: &bucketName, Key: &objectKey, ContentType: oss.Ptr("text/csv"), Body: strings.NewReader(content), - ProgressFn: func(increment, transferred, total int64) { - putStatus <- total - }, - }, nil) - if err != nil { - return err - } - - for { - select { - case <-putStatus: - if resp.StatusCode != 200 { - return errors.New(fmt.Sprintf("error putting OSS object, status:%s", resp.Status)) - } - return nil - case <-time.After(putTimeOut): - return errors.New("put timeout") - } - - } + }) + return err }