-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathconfig.go
129 lines (114 loc) · 4.01 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package sqltocsvgzip
import (
"compress/flate"
"database/sql"
"os"
"runtime"
"github.com/aws/aws-sdk-go/service/s3"
)
const (
minFileSize = 5 * 1024 * 1024
)
type obj struct {
partNumber int64
buf []byte
}
type LogLevel int
const (
Error LogLevel = 1
Warn LogLevel = 2
Info LogLevel = 3
Debug LogLevel = 4
Verbose LogLevel = 5
)
// Converter does the actual work of converting the rows to CSV.
// There are a few settings you can override if you want to do
// some fancy stuff to your CSV.
type Converter struct {
LogLevel LogLevel
Headers []string // Column headers to use (default is rows.Columns())
WriteHeaders bool // Flag to output headers in your CSV (default is true)
TimeFormat string // Format string for any time.Time values (default is time's default)
Delimiter rune // Delimiter to use in your CSV (default is comma)
CsvBufferSize int
CompressionLevel int
GzipGoroutines int
GzipBatchPerGoroutine int
S3Bucket string
S3Region string
S3Acl string
S3Path string
S3Upload bool
UploadThreads int
UploadPartSize int
RowCount int64
s3Svc *s3.S3
s3Resp *s3.CreateMultipartUploadOutput
s3CompletedParts []*s3.CompletedPart
rows *sql.Rows
rowPreProcessor CsvPreProcessorFunc
gzipBuf []byte
partNumber int64
uploadQ chan *obj
quit chan bool
}
// CsvPreprocessorFunc is a function type for preprocessing your CSV.
// It takes the columns after they've been munged into strings but
// before they've been passed into the CSV writer.
//
// Return an outputRow of false if you want the row skipped otherwise
// return the processed Row slice as you want it written to the CSV.
type CsvPreProcessorFunc func(row []string, columnNames []string) (outputRow bool, processedRow []string)
// SetRowPreProcessor lets you specify a CsvPreprocessorFunc for this conversion
func (c *Converter) SetRowPreProcessor(processor CsvPreProcessorFunc) {
c.rowPreProcessor = processor
}
func getLogLevel() (level LogLevel) {
levels := map[string]LogLevel{
"ERROR": Error,
"WARN": Warn,
"INFO": Info,
"DEBUG": Debug,
"VERBOSE": Verbose,
}
var ok bool
if level, ok = levels[os.Getenv("LOG_LEVEL")]; !ok {
level = Info
}
return
}
// WriteConfig will return a Converter which will write your CSV however you like
// but will allow you to set a bunch of non-default behaivour like overriding
// headers or injecting a pre-processing step into your conversion
func WriteConfig(rows *sql.Rows) *Converter {
return &Converter{
rows: rows,
WriteHeaders: true,
Delimiter: ',',
CsvBufferSize: 10 * 1024 * 1024,
CompressionLevel: flate.DefaultCompression,
GzipGoroutines: runtime.GOMAXPROCS(0), // Should be atleast the number of cores. Not sure how it impacts cgroup limits.
GzipBatchPerGoroutine: 512 * 1024, // Should be atleast 100K
LogLevel: getLogLevel(),
}
}
// UploadConfig sets the default values for Converter struct.
func UploadConfig(rows *sql.Rows) *Converter {
return &Converter{
rows: rows,
WriteHeaders: true,
Delimiter: ',',
CompressionLevel: flate.DefaultCompression,
CsvBufferSize: 10 * 1024 * 1024,
GzipGoroutines: runtime.GOMAXPROCS(0), // Should be atleast the number of cores. Not sure how it impacts cgroup limits.
GzipBatchPerGoroutine: 512 * 1024, // Should be atleast 100K
LogLevel: getLogLevel(),
S3Upload: true,
UploadThreads: 4,
UploadPartSize: 50 * 1024 * 1025, // Should be greater than 5 * 1024 * 1024 for s3 upload
S3Bucket: os.Getenv("S3_BUCKET"),
S3Path: os.Getenv("S3_PATH"),
S3Region: os.Getenv("S3_REGION"),
S3Acl: os.Getenv("S3_ACL"),
}
}