164 lines
3.8 KiB
Go
164 lines
3.8 KiB
Go
package storage
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/gob"
|
|
"encoding/json"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/spf13/viper"
|
|
clientv3 "go.etcd.io/etcd/client/v3"
|
|
"go.etcd.io/etcd/client/v3/namespace"
|
|
)
|
|
|
|
type EtcdSerializer interface {
|
|
Deserialize(d []byte, m *any) error
|
|
Serialize(m any) ([]byte, error)
|
|
}
|
|
|
|
type JSONEtcdSerializer struct{}
|
|
|
|
// Serialize to JSON. Will err if there are unmarshalable key values
|
|
func (s JSONEtcdSerializer) Serialize(m any) ([]byte, error) {
|
|
return json.Marshal(m)
|
|
}
|
|
|
|
// Deserialize back to map[string]interface{}
|
|
func (s JSONEtcdSerializer) Deserialize(d []byte, m *any) (err error) {
|
|
err = json.Unmarshal(d, &m)
|
|
if err != nil {
|
|
fmt.Printf("JSONSerializer.deserialize() Error: %v", err)
|
|
return err
|
|
}
|
|
return
|
|
}
|
|
|
|
// GobEtcdSerializer uses gob package to encode the session map
|
|
type GobEtcdSerializer struct{}
|
|
|
|
// Serialize using gob
|
|
func (s GobEtcdSerializer) Serialize(m any) ([]byte, error) {
|
|
buf := new(bytes.Buffer)
|
|
enc := gob.NewEncoder(buf)
|
|
err := enc.Encode(m)
|
|
if err == nil {
|
|
return buf.Bytes(), nil
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
// Deserialize back to map[interface{}]interface{}
|
|
func (s GobEtcdSerializer) Deserialize(d []byte, m any) error {
|
|
dec := gob.NewDecoder(bytes.NewBuffer(d))
|
|
return dec.Decode(&m)
|
|
}
|
|
|
|
type EtcdHandler struct {
|
|
*clientv3.Client
|
|
serializer EtcdSerializer
|
|
}
|
|
|
|
func NewEtcdHandler(cfg *viper.Viper) (*EtcdHandler, error) {
|
|
var (
|
|
endpoints = cfg.GetStringSlice("storage.kv.etcd.endpoints")
|
|
prefix = cfg.GetString("storage.kv.etcd.prefix")
|
|
username = cfg.GetString("storage.kv.etcd.username")
|
|
password = cfg.GetString("storage.kv.etcd.password")
|
|
)
|
|
|
|
cli, err := clientv3.New(clientv3.Config{
|
|
Endpoints: endpoints,
|
|
Username: username,
|
|
Password: password,
|
|
DialTimeout: 5 * time.Second,
|
|
})
|
|
if err != nil {
|
|
fmt.Println(err)
|
|
return nil, err
|
|
}
|
|
|
|
cli.KV = namespace.NewKV(cli.KV, prefix)
|
|
cli.Watcher = namespace.NewWatcher(cli.Watcher, prefix)
|
|
cli.Lease = namespace.NewLease(cli.Lease, prefix)
|
|
|
|
return &EtcdHandler{
|
|
Client: cli,
|
|
serializer: JSONEtcdSerializer{},
|
|
}, nil
|
|
}
|
|
|
|
func (s *EtcdHandler) Put(k string, v any) error {
|
|
data, err := s.serializer.Serialize(v)
|
|
if err != nil {
|
|
fmt.Println(err)
|
|
return err
|
|
}
|
|
// _, err = s.Client.KV.Put(context.TODO(), k, data.String())
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
_, err = s.Client.KV.Put(ctx, k, string(data))
|
|
cancel()
|
|
if err != nil {
|
|
fmt.Println(err)
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *EtcdHandler) PutWithTTL(k string, v any, duration time.Duration) error {
|
|
lease, err := s.Client.Lease.Grant(context.TODO(), int64(duration.Seconds()))
|
|
if err != nil {
|
|
fmt.Println(err)
|
|
return err
|
|
}
|
|
|
|
data, err := s.serializer.Serialize(v)
|
|
if err != nil {
|
|
fmt.Println(err)
|
|
return err
|
|
}
|
|
// _, err = s.Client.KV.Put(context.TODO(), k, data.String(), clientv3.WithLease(lease.ID))
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
_, err = s.Client.KV.Put(ctx, k, string(data), clientv3.WithLease(lease.ID))
|
|
cancel()
|
|
|
|
if err != nil {
|
|
fmt.Println(err)
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *EtcdHandler) Get(k string) (any, error) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
resp, err := s.Client.KV.Get(ctx, k)
|
|
cancel()
|
|
if err != nil {
|
|
fmt.Println(err)
|
|
return nil, err
|
|
}
|
|
for _, v := range resp.Kvs {
|
|
var data any
|
|
err := s.serializer.Deserialize([]byte(v.Value), &data)
|
|
if err != nil {
|
|
fmt.Println(err)
|
|
return nil, err
|
|
}
|
|
// We return directly as we want to last revision of value
|
|
return data, nil
|
|
}
|
|
return nil, fmt.Errorf("no value %v", k)
|
|
}
|
|
|
|
func (s *EtcdHandler) Delete(k string) error {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
_, err := s.Client.KV.Delete(ctx, k)
|
|
cancel()
|
|
if err != nil {
|
|
fmt.Println(err)
|
|
return err
|
|
}
|
|
return nil
|
|
}
|