forked from influxdata/influxdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstorage.go
138 lines (114 loc) · 3.22 KB
/
storage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package session
import (
"context"
"encoding/json"
"time"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/platform"
"github.com/influxdata/influxdb/v2/kit/platform/errors"
)
type Store interface {
Set(key, val string, expireAt time.Time) error
Get(key string) (string, error)
Delete(key string) error
ExpireAt(key string, expireAt time.Time) error
}
var storePrefix = "sessionsv2/"
var storeIndex = "sessionsindexv2/"
// Storage is a store translation layer between the data storage unit and the
// service layer.
type Storage struct {
store Store
}
// NewStorage creates a new storage system
func NewStorage(s Store) *Storage {
return &Storage{s}
}
// FindSessionByKey use a given key to retrieve the stored session
func (s *Storage) FindSessionByKey(ctx context.Context, key string) (*influxdb.Session, error) {
val, err := s.store.Get(sessionIndexKey(key))
if err != nil {
return nil, err
}
if val == "" {
return nil, &errors.Error{
Code: errors.ENotFound,
Msg: influxdb.ErrSessionNotFound,
}
}
id, err := platform.IDFromString(val)
if err != nil {
return nil, err
}
return s.FindSessionByID(ctx, *id)
}
// FindSessionByID use a provided id to retrieve the stored session
func (s *Storage) FindSessionByID(ctx context.Context, id platform.ID) (*influxdb.Session, error) {
val, err := s.store.Get(storePrefix + id.String())
if err != nil {
return nil, err
}
if val == "" {
return nil, &errors.Error{
Code: errors.ENotFound,
Msg: influxdb.ErrSessionNotFound,
}
}
session := &influxdb.Session{}
return session, json.Unmarshal([]byte(val), session)
}
// CreateSession creates a new session
func (s *Storage) CreateSession(ctx context.Context, session *influxdb.Session) error {
// create session
sessionBytes, err := json.Marshal(session)
if err != nil {
return err
}
// use a minute time just so the session will expire if we fail to set the expiration later
sessionID := sessionID(session.ID)
if err := s.store.Set(sessionID, string(sessionBytes), session.ExpiresAt); err != nil {
return err
}
// create index
indexKey := sessionIndexKey(session.Key)
if err := s.store.Set(indexKey, session.ID.String(), session.ExpiresAt); err != nil {
return err
}
return nil
}
// RefreshSession updates the expiration time of a session.
func (s *Storage) RefreshSession(ctx context.Context, id platform.ID, expireAt time.Time) error {
session, err := s.FindSessionByID(ctx, id)
if err != nil {
return err
}
if expireAt.Before(session.ExpiresAt) {
// no need to recreate the session if we aren't extending the expiration
return nil
}
session.ExpiresAt = expireAt
return s.CreateSession(ctx, session)
}
// DeleteSession removes the session and index from storage
func (s *Storage) DeleteSession(ctx context.Context, id platform.ID) error {
session, err := s.FindSessionByID(ctx, id)
if err != nil {
return err
}
if session == nil {
return nil
}
if err := s.store.Delete(sessionID(session.ID)); err != nil {
return err
}
if err := s.store.Delete(sessionIndexKey(session.Key)); err != nil {
return err
}
return nil
}
func sessionID(id platform.ID) string {
return storePrefix + id.String()
}
func sessionIndexKey(key string) string {
return storeIndex + key
}