Skip to content

Commit

Permalink
Merge pull request #63 from influxdata/develop
Browse files Browse the repository at this point in the history
Release v0.1.3
  • Loading branch information
GeorgeMac authored Oct 21, 2019
2 parents 5c62df9 + 9df6bfb commit 50e51c7
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 28 deletions.
53 changes: 27 additions & 26 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,33 @@ A home for InfluxDB’s 2.x's golang client. This client is not compatible with


## Example:
```
influx, err := influxdb.New(myHTTPInfluxAddress, myToken, influxdb.WithHTTPClient(myHTTPClient))
if err != nil {
panic(err) // error handling here; normally we wouldn't use fmt but it works for the example
}
// we use client.NewRowMetric for the example because it's easy, but if you need extra performance
// it is fine to manually build the []client.Metric{}.
myMetrics := []influxdb.Metric{
influxdb.NewRowMetric(
map[string]interface{}{"memory": 1000, "cpu": 0.93},
"system-metrics",
map[string]string{"hostname": "hal9000"},
time.Date(2018, 3, 4, 5, 6, 7, 8, time.UTC)),
influxdb.NewRowMetric(
map[string]interface{}{"memory": 1000, "cpu": 0.93},
"system-metrics",
map[string]string{"hostname": "hal9000"},
time.Date(2018, 3, 4, 5, 6, 7, 9, time.UTC)),
}
// The actual write..., this method can be called concurrently.
if _, err := influx.Write(context.Background(), "my-awesome-bucket", "my-very-awesome-org", myMetrics...); err != nil {
log.Fatal(err) // as above use your own error handling here.
}
influx.Close() // closes the client. After this the client is useless.

```go
influx, err := influxdb.New(myHTTPInfluxAddress, myToken, influxdb.WithHTTPClient(myHTTPClient))
if err != nil {
panic(err) // error handling here; normally we wouldn't use fmt but it works for the example
}

// we use client.NewRowMetric for the example because it's easy, but if you need extra performance
// it is fine to manually build the []client.Metric{}.
myMetrics := []influxdb.Metric{
influxdb.NewRowMetric(
map[string]interface{}{"memory": 1000, "cpu": 0.93},
"system-metrics",
map[string]string{"hostname": "hal9000"},
time.Date(2018, 3, 4, 5, 6, 7, 8, time.UTC)),
influxdb.NewRowMetric(
map[string]interface{}{"memory": 1000, "cpu": 0.93},
"system-metrics",
map[string]string{"hostname": "hal9000"},
time.Date(2018, 3, 4, 5, 6, 7, 9, time.UTC)),
}

// The actual write..., this method can be called concurrently.
if _, err := influx.Write(context.Background(), "my-awesome-bucket", "my-very-awesome-org", myMetrics...); err != nil {
log.Fatal(err) // as above use your own error handling here.
}
influx.Close() // closes the client. After this the client is useless.
```

## Releases
Expand Down
2 changes: 1 addition & 1 deletion e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func TestE2E(t *testing.T) {

r, err := influx.QueryCSV(
context.Background(),
`from(bucket:bucket)|>range(start:-1000h)|>group()`,
`from(bucket:bucket)|>range(start:-10000h)|>group(columns:["_field"])`,
`e2e-test-org`,
struct {
Bucket string `flux:"bucket"`
Expand Down
9 changes: 9 additions & 0 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package influxdb

import (
"bytes"
"compress/gzip"
"context"
"encoding/csv"
"encoding/json"
Expand Down Expand Up @@ -89,6 +90,14 @@ func (c *Client) QueryCSV(ctx context.Context, flux string, org string, extern .
}
defer func() { cleanup() }()

switch resp.Header.Get("Content-Encoding") {
case "gzip":
resp.Body, err = gzip.NewReader(resp.Body)
if err != nil {
return nil, err
}
}

if resp.StatusCode < 200 || resp.StatusCode >= 300 {
r := io.LimitReader(resp.Body, 1<<14) // only support errors that are 16kB long, more than that and something is probably wrong.
gerr := &Error{Code: resp.Status}
Expand Down
14 changes: 13 additions & 1 deletion writer/options.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package writer

import "time"
import (
"context"
"time"
)

// Config is a structure used to configure a point writer
type Config struct {
ctxt context.Context
size int
flushInterval time.Duration
retry bool
Expand All @@ -20,6 +24,7 @@ type Options []Option
// applies the callee options and returns the config
func (o Options) Config() Config {
config := Config{
ctxt: context.Background(),
size: defaultBufferSize,
flushInterval: defaultFlushInterval,
}
Expand All @@ -36,6 +41,13 @@ func (o Options) Apply(c *Config) {
}
}

// WithContext sets the context.Context used for each flush
func WithContext(ctxt context.Context) Option {
return func(c *Config) {
c.ctxt = ctxt
}
}

// WithBufferSize sets the size of the underlying buffer on the point writer
func WithBufferSize(size int) Option {
return func(c *Config) {
Expand Down
3 changes: 3 additions & 0 deletions writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ func New(writer BucketMetricWriter, bkt, org string, opts ...Option) *PointWrite
buffered = NewBufferedWriterSize(bucket, config.size)
)

// set bucket write context to provided context
bucket.ctxt = config.ctxt

if config.retry {
// configure automatic retries for transient errors
retry := NewRetryWriter(bucket, config.retryOptions...)
Expand Down

0 comments on commit 50e51c7

Please sign in to comment.