-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathreduce_geo.go
139 lines (127 loc) · 3.43 KB
/
reduce_geo.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package metrics
import (
"fmt"
"strconv"
"strings"
"github.com/mmcloughlin/geohash"
)
var missingLogFields = coords{
missingLatLon: true,
missingGeo: true,
}
type coords struct {
lat float64
lon float64
rawLat string
rawLon string
missingLatLon bool
missingGeo bool
extractError error
convertError error
}
func (c coords) isZero() bool {
return strings.TrimSpace(c.rawLat) == "" &&
strings.TrimSpace(c.rawLon) == "" &&
c.lat == 0.0 &&
c.lon == 0.0 &&
c.rawLat == "" &&
c.rawLon == "" &&
c.extractError == nil &&
c.convertError == nil
}
func (c coords) isValid() bool {
return !c.missingGeo && !c.missingLatLon &&
c.extractError == nil &&
c.convertError == nil
}
func (c coords) logErrors(logline map[string]interface{}, logf func(f string, args ...interface{})) {
if c.missingGeo {
logf("missing geo object (geo: %+v)", logline[geoHash])
}
if c.missingLatLon {
logf("missing lat/lon (geo: %+v)", logline[geoHash])
}
if c.extractError != nil {
logf("parse error (geo: %+v)(error: %+v)", logline[geoHash], c.extractError)
}
if c.convertError != nil {
logf("converting raw lat/lon error: (error: %+v)", c.convertError)
}
}
func convertLatLon(rawLat, rawLon string) (float64, float64, error) {
lat, latErr := strconv.ParseFloat(rawLat, 64)
lon, lonErr := strconv.ParseFloat(rawLon, 64)
if latErr != nil || lonErr != nil {
err := fmt.Errorf("%+v and %+v", latErr, lonErr)
return lat, lon, err
}
return lat, lon, nil
}
func extractGeoip(logline map[string]interface{}) coords {
rawGeo, ok := logline["geo"]
if !ok {
return missingLogFields
}
geo, isMap := rawGeo.(map[string]interface{})
if !isMap {
return missingLogFields
}
rawLatLon, hasLatLon := geo["latlon"]
latlon, isLatLonString := rawLatLon.(string)
rawLat, rawLon, extractErr := extractLatLon(latlon)
lat, lon, convertErr := convertLatLon(rawLat, rawLon)
return coords{
lat: lat,
lon: lon,
rawLat: rawLat,
rawLon: rawLon,
missingLatLon: !isLatLonString && !hasLatLon,
missingGeo: !isMap,
extractError: extractErr,
convertError: convertErr,
}
}
func extractLatLon(latlon string) (string, string, error) {
parts := strings.Split(latlon, ",")
if len(parts) != 2 {
return "", "", fmt.Errorf("found more extrat lat/lon info: %s", latlon)
}
lat, lon := parts[0], parts[1]
if strings.TrimSpace(lat) == "" {
return lat, lon, fmt.Errorf("did not parse a value for lat")
}
if strings.TrimSpace(lon) == "" {
return lat, lon, fmt.Errorf("did not parse a value for lon")
}
return lat, lon, nil
}
func convertLatLonToHash(labels map[string]string, logline map[string]interface{}) (map[string]string, coords) {
if labels == nil {
return map[string]string{geoHash: geoMissing}, coords{}
}
c := extractGeoip(logline)
if !c.isValid() {
labels[geoHash] = geoMissing
return labels, c
}
hash := geohash.EncodeWithPrecision(c.lat, c.lon, effectiveHashPrecision)
labels[geoHash] = hash
return labels, c
}
// scrubGeoHash must create a new map since the values of the
// provided map could be used at some future time after having been
// previously provided to other calls since passed by pointer
func scrubGeoHash(labels map[string]string) map[string]string {
rv := map[string]string{}
for k, v := range labels {
switch k {
case geoHash: // remove/scrub these keys
continue
case aeeHealthcheckLabel:
continue
default:
rv[k] = v
}
}
return rv
}