Skip to content

Commit

Permalink
fix: upload csv to oss
Browse files Browse the repository at this point in the history
  • Loading branch information
Mryashbhardwaj committed Dec 17, 2024
1 parent 9ba1e8c commit 67c6f99
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 37 deletions.
3 changes: 2 additions & 1 deletion ext/sheets/gsheet/gsheet.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package gsheet
import (
"context"
"errors"
"fmt"

"google.golang.org/api/option"
"google.golang.org/api/sheets/v4"
Expand All @@ -21,7 +22,7 @@ type GSheets struct {
func NewGSheets(ctx context.Context, creds string) (*GSheets, error) {
srv, err := sheets.NewService(ctx, option.WithCredentialsJSON([]byte(creds)))
if err != nil {
return nil, errors.New("not able to create sheets service")
return nil, fmt.Errorf("not able to create sheets service err: %w", err)
}

return &GSheets{srv: srv}, nil
Expand Down
6 changes: 3 additions & 3 deletions ext/store/maxcompute/external_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ func (e ExternalTableHandle) Create(res *resource.Resource) error {
return err
}

if err := e.mcSchema.Create(e.mcSQLExecutor.CurrentSchemaName(), true, ""); err != nil {
return errors.InternalError(EntitySchema, "error while creating schema on maxcompute", err)
}
//if err := e.mcSchema.Create(e.mcSQLExecutor.CurrentSchemaName(), true, ""); err != nil {
// return errors.InternalError(EntitySchema, "error while creating schema on maxcompute", err)
//}

tableSchema, err := buildExternalTableSchema(table)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions ext/store/maxcompute/external_table_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ const (
func handlerForFormat(format string) string {
switch strings.ToUpper(format) {
// the built-in text extractor for CSV and TSV
case GoogleSheet:
return "com.aliyun.odps.CsvStorageHandler"
case CSV:
return "com.aliyun.odps.CsvStorageHandler"
case TSV:
Expand Down
47 changes: 14 additions & 33 deletions ext/store/maxcompute/sheet_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ import (
)

const (
GsheetCreds = ""
OSSCreds = ""
ExtLocation = ""
putTimeOut = time.Duration(time.Second * 10)
GsheetCredsKey = "GOOGLE_SHEETS_ACCOUNT"
OSSCredsKey = "OSS_CREDS"
ExtLocation = ""
putTimeOut = time.Duration(time.Second * 10)
)

type SyncerService struct {
Expand Down Expand Up @@ -66,7 +66,7 @@ func (s *SyncerService) Sync(ctx context.Context, res *resource.Resource) error
}

func (s *SyncerService) getGsheet(ctx context.Context, tnnt tenant.Tenant, sheetURI string, range_ string) (string, error) {
secret, err := s.secretProvider.GetSecret(ctx, tnnt, GsheetCreds)
secret, err := s.secretProvider.GetSecret(ctx, tnnt, GsheetCredsKey)
if err != nil {
return "", err
}
Expand All @@ -75,7 +75,6 @@ func (s *SyncerService) getGsheet(ctx context.Context, tnnt tenant.Tenant, sheet
if err != nil {
return "", err
}

return sheets.GetAsCSV(sheetURI, range_)
}

Expand All @@ -93,15 +92,18 @@ func (s *SyncerService) getBucketName(ctx context.Context, res *resource.Resourc
}

func (s *SyncerService) getObjectKey(ctx context.Context, res *resource.Resource, et *ExternalTable) (string, error) {
components, err := getURNComponent(res)
if err != nil {
return "", err
}
location, err := s.getLocation(ctx, res, et)
if err != nil {
return "", err
}

parts := strings.Split(location, "/")
if len(parts) > 4 {
path := strings.Join(parts[4:], "/")
return fmt.Sprintf("%s/%s.csv", path, res.Name().String()), nil
return fmt.Sprintf("%s%s.csv", path, components.Name), nil
}
return "", errors.New("unable to get object path from location")
}
Expand All @@ -124,7 +126,7 @@ func (s *SyncerService) getLocation(ctx context.Context, res *resource.Resource,

func (s *SyncerService) writeContentToLocation(ctx context.Context, tnnt tenant.Tenant, bucketName, objectKey, content string) error {
// Setup oss bucket writer
creds, err := s.secretProvider.GetSecret(ctx, tnnt, OSSCreds)
creds, err := s.secretProvider.GetSecret(ctx, tnnt, OSSCredsKey)
if err != nil {
return err
}
Expand All @@ -133,32 +135,11 @@ func (s *SyncerService) writeContentToLocation(ctx context.Context, tnnt tenant.
return err
}

// oss put request
var putStatus chan int64

resp, err := ossClient.PutObject(ctx, &oss.PutObjectRequest{
_, err = ossClient.PutObject(ctx, &oss.PutObjectRequest{
Bucket: &bucketName,
Key: &objectKey,
ContentType: oss.Ptr("text/csv"),
Body: strings.NewReader(content),
ProgressFn: func(increment, transferred, total int64) {
putStatus <- total
},
}, nil)
if err != nil {
return err
}

for {
select {
case <-putStatus:
if resp.StatusCode != 200 {
return errors.New(fmt.Sprintf("error putting OSS object, status:%s", resp.Status))
}
return nil
case <-time.After(putTimeOut):
return errors.New("put timeout")
}

}
})
return err
}

0 comments on commit 67c6f99

Please sign in to comment.