diff --git a/.github/workflows/go.yaml b/.github/workflows/go.yaml index 62f7740..f07246c 100644 --- a/.github/workflows/go.yaml +++ b/.github/workflows/go.yaml @@ -9,57 +9,56 @@ jobs: os: [ubuntu-latest, macOS-latest] runs-on: ${{matrix.os}} steps: - - name: Checkout code - uses: actions/checkout@v1 - with: - path: src/github.com/nats-io/nats-surveyor - - name: Setup Go - uses: actions/setup-go@v1 - with: - go-version: ${{matrix.go}} - - name: Install deps - shell: bash --noprofile --norc -x -eo pipefail {0} - env: - GO111MODULE: 'on' - run: | - export GOPATH="$RUNNER_WORKSPACE" - go get -v github.com/wadey/gocovmerge - go get -v github.com/golangci/golangci-lint/cmd/golangci-lint - - name: Lint - shell: bash --noprofile --norc -x -eo pipefail {0} - env: - GO111MODULE: 'on' - run: | - export GOPATH="$RUNNER_WORKSPACE" + - name: Checkout code + uses: actions/checkout@v1 + with: + path: src/github.com/nats-io/nats-surveyor + - name: Setup Go + uses: actions/setup-go@v1 + with: + go-version: ${{matrix.go}} + - name: Install deps + shell: bash --noprofile --norc -x -eo pipefail {0} + env: + GO111MODULE: "on" + run: | + export GOPATH="$RUNNER_WORKSPACE" + go get -v github.com/wadey/gocovmerge + go get -v github.com/golangci/golangci-lint/cmd/golangci-lint + - name: Lint + shell: bash --noprofile --norc -x -eo pipefail {0} + env: + GO111MODULE: "on" + run: | + export GOPATH="$RUNNER_WORKSPACE" - go vet ./... - $(go env GOPATH)/bin/golangci-lint run \ - --no-config --exclude-use-default=false --max-same-issues=0 \ - --disable errcheck \ - --enable golint \ - --enable stylecheck \ - --enable interfacer \ - --enable unconvert \ - --enable dupl \ - --enable gocyclo \ - --enable gofmt \ - --enable goimports \ - --enable misspell \ - --enable unparam \ - --enable nakedret \ - --enable prealloc \ - --enable scopelint \ - --enable gocritic \ - --enable gochecknoinits \ - ./... + go vet ./... + $(go env GOPATH)/bin/golangci-lint run \ + --no-config --exclude-use-default=false --max-same-issues=0 \ + --disable errcheck \ + --enable golint \ + --enable stylecheck \ + --enable interfacer \ + --enable unconvert \ + --enable dupl \ + --enable gocyclo \ + --enable gofmt \ + --enable goimports \ + --enable misspell \ + --enable unparam \ + --enable nakedret \ + --enable prealloc \ + --enable scopelint \ + --enable gocritic \ + ./... - - name: Run tests - shell: bash --noprofile --norc -x -eo pipefail {0} - env: - GO111MODULE: 'on' - CODECOV_TOKEN: ${{secrets.CODECOV_TOKEN}} - run: | - export GOPATH="$RUNNER_WORKSPACE" + - name: Run tests + shell: bash --noprofile --norc -x -eo pipefail {0} + env: + GO111MODULE: "on" + CODECOV_TOKEN: ${{secrets.CODECOV_TOKEN}} + run: | + export GOPATH="$RUNNER_WORKSPACE" - go test -v -race -p 1 ./... - ./scripts/cov.sh CI + go test -v -race -p 1 ./... + ./scripts/cov.sh CI diff --git a/.gitignore b/.gitignore index f1c181e..ae2f47c 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,5 @@ # Output of the go coverage tool, specifically when used with LiteIDE *.out + +nats-surveyor diff --git a/README.md b/README.md index 33156e5..76608e5 100644 --- a/README.md +++ b/README.md @@ -327,6 +327,20 @@ security thread to the node it is running on. More information can be found [here](https://github.com/prometheus/prometheus/issues/5976). +## Service Observations + +Services can be observed by creating JSON files in the `observations` directory, here's an example: + +``` +{ + "name": "email.subscribe", + "topic": "monitor.email.subscribe", + "credential": "/observations/email.subscribe.cred" +} +``` + +Place this in `observations/email.surbscribe.json` and create a credential giving access to this topic in `observations/email.subscribe.cred`, when you restart the service any observations published by the NATS system will be tracked and graphed. + ## TODO - [ ] Windows builds diff --git a/docker-compose/docker-compose.yml b/docker-compose/docker-compose.yml index c2872f7..4d0a74d 100644 --- a/docker-compose/docker-compose.yml +++ b/docker-compose/docker-compose.yml @@ -1,18 +1,18 @@ -version: '3' +version: "3" networks: monitor-net: driver: bridge services: - surveyor: image: synadia/nats-surveyor:${SURVEYOR_DOCKER_TAG} - container_name: nats-surveyor + container_name: nats-surveyor volumes: - $NATS_SURVEYOR_CREDS:/etc/surveyor/SYS.creds + - ./observations:/observations entrypoint: /nats-surveyor - command: -c ${NATS_SURVEYOR_SERVER_COUNT} -creds /etc/surveyor/SYS.creds -s "${NATS_SURVEYOR_SERVERS}" + command: -c ${NATS_SURVEYOR_SERVER_COUNT} -creds /etc/surveyor/SYS.creds -s "${NATS_SURVEYOR_SERVERS}" -observe /observations networks: - monitor-net labels: @@ -31,7 +31,7 @@ services: org.label-schema.group: "nats-monitoring" depends_on: - surveyor - + grafana: image: grafana/grafana:${GRAFANA_DOCKER_TAG} container_name: grafana diff --git a/docker-compose/grafana/provisioning/dashboards/observations-dashboard.json b/docker-compose/grafana/provisioning/dashboards/observations-dashboard.json new file mode 100644 index 0000000..824a5e3 --- /dev/null +++ b/docker-compose/grafana/provisioning/dashboards/observations-dashboard.json @@ -0,0 +1,827 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": "-- Grafana --", + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "editable": true, + "gnetId": null, + "graphTooltip": 1, + "id": 3, + "iteration": 1573823403202, + "links": [], + "panels": [ + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": null, + "description": "Percentile of service time spent in the service ", + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 8, + "x": 0, + "y": 0 + }, + "id": 14, + "legend": { + "alignAsTable": true, + "avg": true, + "current": true, + "max": true, + "min": true, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null as zero", + "options": { + "dataLinks": [] + }, + "percentage": false, + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "histogram_quantile(0.10, sum(rate(nats_latency_service_duration_bucket{service=~\"${service}\"}[$range])) by (le,service))", + "legendFormat": "{{service}} 10th %", + "refId": "A" + }, + { + "expr": "histogram_quantile(0.50, sum(rate(nats_latency_service_duration_bucket{service=~\"${service}\"}[$range])) by (le,service))", + "legendFormat": "{{service}} 50th %", + "refId": "B" + }, + { + "expr": "histogram_quantile(0.95, sum(rate(nats_latency_service_duration_bucket{service=~\"${service}\"}[$range])) by (le,service))", + "legendFormat": "{{service}} 95th %", + "refId": "C" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Service Duration Percentiles", + "tooltip": { + "shared": true, + "sort": 2, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "s", + "label": "duration", + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": "", + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": null, + "description": "Percentile of total time spent in the service and in the total which includes network overhead ", + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 8, + "x": 8, + "y": 0 + }, + "id": 15, + "legend": { + "alignAsTable": true, + "avg": true, + "current": true, + "max": true, + "min": true, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null as zero", + "options": { + "dataLinks": [] + }, + "percentage": false, + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "histogram_quantile(0.10, sum(rate(nats_latency_total_duration_bucket{service=~\"${service}\"}[$range])) by (le,service))", + "legendFormat": "{{service}} 10th %", + "refId": "A" + }, + { + "expr": "histogram_quantile(0.50, sum(rate(nats_latency_total_duration_bucket{service=~\"${service}\"}[$range])) by (le,service))", + "legendFormat": "{{service}} 50th %", + "refId": "B" + }, + { + "expr": "histogram_quantile(0.95, sum(rate(nats_latency_total_duration_bucket{service=~\"${service}\"}[$range])) by (le,service))", + "legendFormat": "{{service}} 95th %", + "refId": "C" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Total Duration Percentiles", + "tooltip": { + "shared": true, + "sort": 2, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "s", + "label": "duration", + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": "", + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": null, + "description": "Average times taken to serve a request.\n\n* **service time** is the time taken to execute the service\n* **total time** includes network latencies between all parties", + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 8, + "x": 16, + "y": 0 + }, + "id": 2, + "legend": { + "alignAsTable": true, + "avg": true, + "current": true, + "hideEmpty": true, + "hideZero": true, + "max": true, + "min": true, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null as zero", + "options": { + "dataLinks": [] + }, + "percentage": false, + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "sum(rate(nats_latency_service_duration_sum{service=~\"${service}\"}[$range]) / rate(nats_latency_service_duration_count{service=~\"${service}\"}[$range])) by (service)", + "legendFormat": "{{service}} service time", + "refId": "A" + }, + { + "expr": "sum(rate(nats_latency_total_duration_sum{service=~\"${service}\"}[$range]) / rate(nats_latency_total_duration_count{service=~\"${service}\"}[$range])) by (service)", + "legendFormat": "{{service}} total time", + "refId": "B" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Average Service Duration", + "tooltip": { + "shared": true, + "sort": 2, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "s", + "label": "duration", + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": null, + "description": "Time distribution spent traversing the NATS cluster", + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 8, + "x": 0, + "y": 8 + }, + "id": 4, + "legend": { + "alignAsTable": true, + "avg": true, + "current": true, + "hideEmpty": true, + "hideZero": true, + "max": true, + "min": true, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null as zero", + "options": { + "dataLinks": [] + }, + "percentage": false, + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [ + { + "alias": "/10th/", + "color": "#1F60C4" + }, + { + "alias": "/50th/", + "color": "#8AB8FF" + }, + { + "alias": "/95th/", + "color": "#C0D8FF" + } + ], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "histogram_quantile(0.10, sum(rate(nats_latency_system_rtt_bucket{service=~\"${service}\"}[$range])) by (service, le))", + "legendFormat": "{{service}} 10th %", + "refId": "D" + }, + { + "expr": "histogram_quantile(0.50, sum(rate(nats_latency_system_rtt_bucket{service=~\"${service}\"}[$range])) by (service, le))", + "legendFormat": "{{service}} 50th %", + "refId": "A" + }, + { + "expr": "histogram_quantile(0.95, sum(rate(nats_latency_system_rtt_bucket{service=~\"${service}\"}[$range])) by (service, le))", + "legendFormat": "{{service}} 95th %", + "refId": "B" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "System RTT Times Percentiles", + "tooltip": { + "shared": true, + "sort": 2, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "s", + "label": "rtt time", + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": null, + "description": "Response time distribution between NATS and the Requesting Client", + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 8, + "x": 8, + "y": 8 + }, + "id": 16, + "legend": { + "alignAsTable": true, + "avg": true, + "current": true, + "max": true, + "min": true, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null as zero", + "options": { + "dataLinks": [] + }, + "percentage": false, + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [ + { + "alias": "/10th/", + "color": "#3274D9" + }, + { + "alias": "/50th/", + "color": "#8AB8FF" + }, + { + "alias": "/95th/", + "color": "#C0D8FF" + } + ], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "histogram_quantile(0.10, sum(rate(nats_latency_requestor_rtt_bucket{service=~\"${service}\"}[$range])) by (service, le))", + "legendFormat": "{{service}} 10th %", + "refId": "D" + }, + { + "expr": "histogram_quantile(0.50, sum(rate(nats_latency_requestor_rtt_bucket{service=~\"${service}\"}[$range])) by (service, le))", + "legendFormat": "{{service}} 50th %", + "refId": "A" + }, + { + "expr": "histogram_quantile(0.95, sum(rate(nats_latency_requestor_rtt_bucket{service=~\"${service}\"}[$range])) by (service, le))", + "legendFormat": "{{service}} 95th %", + "refId": "B" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Requestor RTT Times Percentiles", + "tooltip": { + "shared": true, + "sort": 2, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "s", + "label": "rtt time", + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": null, + "description": "Response time distribution between NATS and the Service", + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 8, + "x": 16, + "y": 8 + }, + "id": 17, + "legend": { + "alignAsTable": true, + "avg": true, + "current": true, + "max": true, + "min": true, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null as zero", + "options": { + "dataLinks": [] + }, + "percentage": false, + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [ + { + "alias": "/10th/", + "color": "#3274D9" + }, + { + "alias": "/50th/", + "color": "#8AB8FF" + }, + { + "alias": "/95th/", + "color": "#C0D8FF" + } + ], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "histogram_quantile(0.10, sum(rate(nats_latency_responder_rtt_bucket{service=~\"${service}\"}[$range])) by (service, le))", + "legendFormat": "{{service}} 10th %", + "refId": "D" + }, + { + "expr": "histogram_quantile(0.50, sum(rate(nats_latency_responder_rtt_bucket{service=~\"${service}\"}[$range])) by (service, le))", + "legendFormat": "{{service}} 50th %", + "refId": "A" + }, + { + "expr": "histogram_quantile(0.95, sum(rate(nats_latency_responder_rtt_bucket{service=~\"${service}\"}[$range])) by (service, le))", + "legendFormat": "{{service}} 95th %", + "refId": "B" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Responder RTT Times Percentiles", + "tooltip": { + "shared": true, + "sort": 2, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "s", + "label": "rtt time", + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": null, + "description": "Number of observations received", + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 24, + "x": 0, + "y": 16 + }, + "id": 6, + "legend": { + "alignAsTable": true, + "avg": true, + "current": true, + "max": true, + "min": true, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null as zero", + "options": { + "dataLinks": [] + }, + "percentage": false, + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": true, + "steppedLine": false, + "targets": [ + { + "expr": "sum(rate(nats_latency_observation_count{service=~\"${service}\"}[$range])) by (service)", + "legendFormat": "{{service}} observations", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Observations Received", + "tooltip": { + "shared": true, + "sort": 2, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": "observations / second", + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": "", + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + } + ], + "refresh": "10s", + "schemaVersion": 20, + "style": "dark", + "tags": [], + "templating": { + "list": [ + { + "allValue": ".+", + "current": { + "text": "All", + "value": [ + "$__all" + ] + }, + "datasource": "Prometheus", + "definition": "label_values(nats_latency_total_duration_sum, service)", + "hide": 0, + "includeAll": true, + "label": "Service", + "multi": true, + "name": "service", + "options": [], + "query": "label_values(nats_latency_total_duration_sum, service)", + "refresh": 2, + "regex": "", + "skipUrlSync": false, + "sort": 1, + "tagValuesQuery": "", + "tags": [], + "tagsQuery": "", + "type": "query", + "useTags": false + }, + { + "auto": false, + "auto_count": 30, + "auto_min": "10s", + "current": { + "text": "1m", + "value": "1m" + }, + "hide": 0, + "label": "Range", + "name": "range", + "options": [ + { + "selected": true, + "text": "1m", + "value": "1m" + }, + { + "selected": false, + "text": "5m", + "value": "5m" + }, + { + "selected": false, + "text": "10m", + "value": "10m" + } + ], + "query": "1m,5m,10m", + "refresh": 2, + "skipUrlSync": false, + "type": "interval" + } + ] + }, + "time": { + "from": "now-1h", + "to": "now" + }, + "timepicker": { + "refresh_intervals": [ + "5s", + "10s", + "30s", + "1m", + "5m", + "15m", + "30m", + "1h", + "2h", + "1d" + ] + }, + "timezone": "", + "title": "Service Observations", + "uid": "FVNA1y1Zz", + "version": 61 +} \ No newline at end of file diff --git a/docker-compose/observations/.keep b/docker-compose/observations/.keep new file mode 100644 index 0000000..e69de29 diff --git a/go.sum b/go.sum index 06a994c..e57f7a2 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,3 @@ -github.com/nats-io/nats-surveyor v0.0.0-20191023152254-42d8b22e920a h1:aw5bZGh/axh6uXx0e2rvebV+yQae3odOhRxyw+zdQXs= -github.com/nats-io/nats-surveyor v0.0.0-20191023152254-42d8b22e920a/go.mod h1:+C51JIQxDsdZ+iP0KRpjCducVuMNYTX/jzduzRYVUlQ= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= diff --git a/main.go b/main.go index 90b0489..67d914c 100644 --- a/main.go +++ b/main.go @@ -51,6 +51,7 @@ func main() { flag.StringVar(&opts.HTTPUser, "http_user", "", "Enable basic auth and set user name for HTTP scrapes.") flag.StringVar(&opts.HTTPPassword, "http_pass", "", "Set the password for HTTP scrapes. NATS bcrypt supported.") flag.StringVar(&opts.Prefix, "prefix", "", "Replace the default prefix for all the metrics.") + flag.StringVar(&opts.ObservationConfigDir, "observe", "", "Listen for observation statistics based on config files in a directory.") flag.Parse() if printVersion { @@ -60,9 +61,12 @@ func main() { s, err := surveyor.NewSurveyor(opts) if err != nil { - log.Fatalf("couldn't start surveyor: %v", err) + log.Fatalf("couldn't start surveyor: %v", err) + } + err = s.Start() + if err != nil { + log.Fatalf("couldn't start surveyor: %s", err) } - s.Start() // Setup the interrupt handler to gracefully exit. c := make(chan os.Signal, 1) diff --git a/surveyor/observation.go b/surveyor/observation.go new file mode 100644 index 0000000..58da939 --- /dev/null +++ b/surveyor/observation.go @@ -0,0 +1,188 @@ +// Copyright 2019 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package surveyor + +import ( + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "log" + "os" + "strings" + + server "github.com/nats-io/nats-server/v2/server" + nats "github.com/nats-io/nats.go" + "github.com/prometheus/client_golang/prometheus" +) + +// ServiceObsListener listens for observations from nats service latency checks +type ServiceObsListener struct { + nc *nats.Conn + opts *serviceObsOptions + sopts *Options +} + +type serviceObsOptions struct { + ServiceName string `json:"name"` + Topic string `json:"topic"` + Credentials string `json:"credential"` +} + +func (o *serviceObsOptions) Validate() error { + errs := []string{} + + if o.ServiceName == "" { + errs = append(errs, fmt.Sprintf("name is required")) + } + + if o.Topic == "" { + errs = append(errs, fmt.Sprintf("topic is required")) + } + + if o.Credentials == "" { + errs = append(errs, fmt.Sprintf("credential is required")) + } else { + _, err := os.Stat(o.Credentials) + if err != nil { + errs = append(errs, fmt.Sprintf("invalid credential file: %s", err)) + } + } + + if len(errs) == 0 { + return nil + } + + return errors.New(strings.Join(errs, ", ")) +} + +var ( + observationsGauge = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: prometheus.BuildFQName("nats", "survey", "observerations_count"), + Help: "Number of observations that are running", + }) + + observationsReceived = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "nats_latency_observation_count", + Help: "Number of observations received by this surveyor across all services", + }, []string{"service", "app"}) + + invalidObservationsReceived = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "nats_latency_observation_error_count", + Help: "Number of observations received by this surveyor across all services that could not be handled", + }, []string{"service"}) + + serviceLatency = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "nats_latency_service_duration", + Help: "Time spent serving the request in the service", + }, []string{"service", "app"}) + + totalLatency = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "nats_latency_total_duration", + Help: "Total time spent serving a service including network overheads", + }, []string{"service", "app"}) + + requestorRTT = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "nats_latency_requestor_rtt", + Help: "The RTT to the client making a request", + }, []string{"service", "app"}) + + responderRTT = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "nats_latency_responder_rtt", + Help: "The RTT to the service serving the request", + }, []string{"service", "app"}) + + systemRTT = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "nats_latency_system_rtt", + Help: "The RTT within the NATS system - time traveling clusters, gateways and leaf nodes", + }, []string{"service", "app"}) +) + +func init() { + prometheus.MustRegister(invalidObservationsReceived) + prometheus.MustRegister(observationsReceived) + prometheus.MustRegister(serviceLatency) + prometheus.MustRegister(totalLatency) + prometheus.MustRegister(requestorRTT) + prometheus.MustRegister(responderRTT) + prometheus.MustRegister(systemRTT) + prometheus.MustRegister(observationsGauge) +} + +// NewServiceObservation creates a new performance observation listener +func NewServiceObservation(f string, sopts Options) (*ServiceObsListener, error) { + js, err := ioutil.ReadFile(f) + if err != nil { + return nil, err + } + + opts := &serviceObsOptions{} + err = json.Unmarshal(js, opts) + if err != nil { + return nil, fmt.Errorf("invalid service observation configuration: %s: %s", f, err) + } + + err = opts.Validate() + if err != nil { + return nil, fmt.Errorf("invalid service observation configuration: %s: %s", f, err) + } + + sopts.Name = fmt.Sprintf("%s (observing %s)", sopts.Name, opts.ServiceName) + sopts.Credentials = opts.Credentials + nc, err := connect(&sopts) + if err != nil { + return nil, fmt.Errorf("nats connection failed: %s", err) + } + + return &ServiceObsListener{ + nc: nc, + opts: opts, + sopts: &sopts, + }, nil +} + +// Start starts listening for observations +func (o *ServiceObsListener) Start() error { + _, err := o.nc.Subscribe(o.opts.Topic, o.observationHandler) + if err != nil { + return fmt.Errorf("could not subscribe to observation topic for %s (%s): %s", o.opts.ServiceName, o.opts.Topic, err) + } + + observationsGauge.Inc() + log.Printf("Started observing stats on %s for %s", o.opts.Topic, o.opts.ServiceName) + + return nil +} + +func (o *ServiceObsListener) observationHandler(m *nats.Msg) { + obs := &server.ServiceLatency{} + err := json.Unmarshal(m.Data, obs) + if err != nil { + invalidObservationsReceived.WithLabelValues(o.opts.ServiceName).Inc() + log.Printf("Unparsable observation received on %s: %s", o.opts.Topic, err) + return + } + + observationsReceived.WithLabelValues(o.opts.ServiceName, obs.AppName).Inc() + serviceLatency.WithLabelValues(o.opts.ServiceName, obs.AppName).Observe(obs.ServiceLatency.Seconds()) + totalLatency.WithLabelValues(o.opts.ServiceName, obs.AppName).Observe(obs.TotalLatency.Seconds()) + requestorRTT.WithLabelValues(o.opts.ServiceName, obs.AppName).Observe(obs.NATSLatency.Requestor.Seconds()) + responderRTT.WithLabelValues(o.opts.ServiceName, obs.AppName).Observe(obs.NATSLatency.Responder.Seconds()) + systemRTT.WithLabelValues(o.opts.ServiceName, obs.AppName).Observe(obs.NATSLatency.System.Seconds()) +} + +// Stop closes the connection to the network +func (o *ServiceObsListener) Stop() { + o.nc.Close() +} diff --git a/surveyor/observation_test.go b/surveyor/observation_test.go new file mode 100644 index 0000000..a79c90c --- /dev/null +++ b/surveyor/observation_test.go @@ -0,0 +1,107 @@ +// Copyright 2019 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package surveyor + +import ( + "encoding/json" + "testing" + "time" + + server "github.com/nats-io/nats-server/v2/server" + st "github.com/nats-io/nats-surveyor/test" + ptu "github.com/prometheus/client_golang/prometheus/testutil" +) + +func TestServiceObservation_Load(t *testing.T) { + sc := st.NewSuperCluster(t) + defer sc.Shutdown() + + opt := getTestOptions() + obs, err := NewServiceObservation("testdata/goodobs/good.json", *opt) + if err != nil { + t.Fatalf("observation load error: %s", err) + } + obs.Stop() + + _, err = NewServiceObservation("testdata/badobs/missing.json", *opt) + if err.Error() != "open testdata/badobs/missing.json: no such file or directory" { + t.Fatalf("observation load error: %s", err) + } + + _, err = NewServiceObservation("testdata/badobs/bad.json", *opt) + if err.Error() != "invalid service observation configuration: testdata/badobs/bad.json: name is required, topic is required, credential is required" { + t.Fatalf("observation load error: %s", err) + } + + _, err = NewServiceObservation("testdata/badobs/badauth.json", *opt) + if err.Error() != "nats connection failed: nats: Authorization Violation" { + t.Fatalf("observation load error: %s", err) + } +} + +func TestServiceObservation_Handle(t *testing.T) { + sc := st.NewSuperCluster(t) + defer sc.Shutdown() + + opt := getTestOptions() + obs, err := NewServiceObservation("testdata/goodobs/good.json", *opt) + if err != nil { + t.Fatalf("observation load error: %s", err) + } + defer obs.Stop() + err = obs.Start() + if err != nil { + t.Fatalf("subscribe failed: %s", err) + } + + for i := 0; i < 10; i++ { + observation := &server.ServiceLatency{ + AppName: "testing", + RequestStart: time.Now(), + TotalLatency: time.Second, + NATSLatency: server.NATSLatency{ + Requestor: 333 * time.Microsecond, + Responder: 333 * time.Microsecond, + System: 333 * time.Microsecond, + }, + } + oj, err := json.Marshal(observation) + if err != nil { + t.Fatalf("encode error: %s", err) + } + + err = sc.Clients[0].Publish("testing.topic", oj) + if err != nil { + t.Fatalf("publish error: %s", err) + } + } + + sc.Clients[0].Flush() + + // ugh, but it has to travel through nats etc? what better way? + time.Sleep(100 * time.Microsecond) + if ptu.ToFloat64(observationsReceived) != 10.0 { + t.Fatalf("process error: metrics not handled") + } + + err = sc.Clients[0].Publish("testing.topic", []byte{}) + if err != nil { + t.Fatalf("publish error: %s", err) + } + + time.Sleep(100 * time.Microsecond) + if ptu.ToFloat64(invalidObservationsReceived) != 1.0 { + t.Fatalf("process error: metrics not handled") + } +} diff --git a/surveyor/surveyor.go b/surveyor/surveyor.go index 09c5528..0ed6de9 100644 --- a/surveyor/surveyor.go +++ b/surveyor/surveyor.go @@ -24,6 +24,7 @@ import ( "net" "net/http" "os" + "path/filepath" "strconv" "strings" "sync" @@ -49,27 +50,35 @@ var ( // Options are used to configure the NATS collector type Options struct { - URLs string - Credentials string - PollTimeout time.Duration - ExpectedServers int - ListenAddress string - ListenPort int - CertFile string - KeyFile string - CaFile string - HTTPCertFile string - HTTPKeyFile string - HTTPCaFile string - NATSServerURL string - HTTPUser string // User in metrics scrape by Prometheus. - HTTPPassword string - Prefix string // TODO + Name string + URLs string + Credentials string + PollTimeout time.Duration + ExpectedServers int + ListenAddress string + ListenPort int + CertFile string + KeyFile string + CaFile string + HTTPCertFile string + HTTPKeyFile string + HTTPCaFile string + NATSServerURL string + HTTPUser string // User in metrics scrape by Prometheus. + HTTPPassword string + Prefix string // TODO + ObservationConfigDir string } // GetDefaultOptions returns the default set of options func GetDefaultOptions() *Options { + hostname, err := os.Hostname() + if err != nil { + hostname = "127.0.0.1" + } + opts := &Options{ + Name: fmt.Sprintf("NATS_Surveyor - %s", hostname), ListenAddress: DefaultListenAddress, ListenPort: DefaultListenPort, URLs: DefaultURL, @@ -82,10 +91,11 @@ func GetDefaultOptions() *Options { // A Surveyor instance type Surveyor struct { sync.Mutex - opts Options - nc *nats.Conn - http net.Listener - statzC *StatzCollector + opts Options + nc *nats.Conn + http net.Listener + statzC *StatzCollector + observations []*ServiceObsListener } func connect(opts *Options) (*nats.Conn, error) { @@ -95,11 +105,7 @@ func connect(opts *Options) (*nats.Conn, error) { }) prometheus.Register(reconnCtr) - hostname, err := os.Hostname() - if err != nil { - hostname = "127.0.0.1" - } - nopts := []nats.Option{nats.Name(fmt.Sprintf("NATS_Surveyor - %s", hostname))} + nopts := []nats.Option{nats.Name(opts.Name)} if opts.Credentials != "" { nopts = append(nopts, nats.UserCredentials(opts.Credentials)) } @@ -147,8 +153,9 @@ func NewSurveyor(opts *Options) (*Surveyor, error) { return nil, err } return &Surveyor{ - nc: nc, - opts: *opts, + nc: nc, + opts: *opts, + observations: []*ServiceObsListener{}, }, nil } @@ -347,11 +354,59 @@ func (s *Surveyor) startHTTP() error { return nil } +func (s *Surveyor) startObservations() error { + observationsGauge.Set(0) + + dir := s.opts.ObservationConfigDir + if dir == "" { + log.Printf("Skipping observation startup, no directory configured") + return nil + } + + fs, err := os.Stat(dir) + if err != nil { + return fmt.Errorf("could not start observations, %s does not exist", dir) + } + + if !fs.IsDir() { + return fmt.Errorf("observations dir %s is not a directory", dir) + } + + err = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + + if filepath.Ext(info.Name()) != ".json" { + return nil + } + + obs, err := NewServiceObservation(path, s.opts) + if err != nil { + return fmt.Errorf("could not create observation from %s: %s", path, err) + } + + err = obs.Start() + if err != nil { + return fmt.Errorf("could not start observation from %s: %s", path, err) + } + + s.observations = append(s.observations, obs) + + return nil + }) + + return err +} + // Start starts the surveyor func (s *Surveyor) Start() error { if err := s.startHTTP(); err != nil { return err } + if err := s.startObservations(); err != nil { + return err + } if err := s.createCollector(); err != nil { return err } @@ -361,6 +416,11 @@ func (s *Surveyor) Start() error { // Stop stops the surveyor func (s *Surveyor) Stop() { s.Lock() + + for _, o := range s.observations { + o.nc.Close() + } + prometheus.Unregister(s.statzC) s.http.Close() s.nc.Drain() diff --git a/surveyor/surveyor_test.go b/surveyor/surveyor_test.go index 047f976..f6b01ae 100644 --- a/surveyor/surveyor_test.go +++ b/surveyor/surveyor_test.go @@ -26,6 +26,7 @@ import ( "time" st "github.com/nats-io/nats-surveyor/test" + ptu "github.com/prometheus/client_golang/prometheus/testutil" ) // Testing constants @@ -376,6 +377,28 @@ func TestSurveyor_MissingResponses(t *testing.T) { } } +func TestSurveyor_Observations(t *testing.T) { + sc := st.NewSuperCluster(t) + defer sc.Shutdown() + + opt := getTestOptions() + opt.ObservationConfigDir = "testdata/observations" + + s, err := NewSurveyor(opt) + if err != nil { + t.Fatalf("couldn't create surveyor: %v", err) + } + defer s.Stop() + + if err = s.Start(); err != nil { + t.Fatalf("start error: %v", err) + } + + if ptu.ToFloat64(observationsGauge) != 1 { + t.Fatalf("process error: observations not started") + } +} + func TestSurveyor_ConcurrentBlock(t *testing.T) { sc := st.NewSuperCluster(t) defer sc.Shutdown() diff --git a/surveyor/testdata/badobs/bad.json b/surveyor/testdata/badobs/bad.json new file mode 100644 index 0000000..9e26dfe --- /dev/null +++ b/surveyor/testdata/badobs/bad.json @@ -0,0 +1 @@ +{} \ No newline at end of file diff --git a/surveyor/testdata/badobs/badauth.creds b/surveyor/testdata/badobs/badauth.creds new file mode 100644 index 0000000..e69de29 diff --git a/surveyor/testdata/badobs/badauth.json b/surveyor/testdata/badobs/badauth.json new file mode 100644 index 0000000..d967db3 --- /dev/null +++ b/surveyor/testdata/badobs/badauth.json @@ -0,0 +1,5 @@ +{ + "name": "testing", + "topic": "testing.topic", + "credential": "testdata/badobs/badauth.creds" +} \ No newline at end of file diff --git a/surveyor/testdata/goodobs/good.json b/surveyor/testdata/goodobs/good.json new file mode 100644 index 0000000..bea44a6 --- /dev/null +++ b/surveyor/testdata/goodobs/good.json @@ -0,0 +1,5 @@ +{ + "name": "testing", + "topic": "testing.topic", + "credential": "../test/myuser.creds" +} \ No newline at end of file