mobility-accounts/storage/etcd.go

107 lines
2.3 KiB
Go
Executable File

package storage
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
"github.com/spf13/viper"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/namespace"
)
type EtcdKVStore struct {
Client *clientv3.Client
}
func NewEtcdKVStore(cfg *viper.Viper) (EtcdKVStore, error) {
var (
endpoints = cfg.GetStringSlice("storage.kv.etcd.endpoints")
prefix = cfg.GetString("storage.kv.etcd.prefix")
username = cfg.GetString("storage.kv.etcd.username")
password = cfg.GetString("storage.kv.etcd.password")
)
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
Username: username,
Password: password,
DialTimeout: 5 * time.Second,
})
if err != nil {
return EtcdKVStore{}, err
}
cli.KV = namespace.NewKV(cli.KV, prefix)
cli.Watcher = namespace.NewWatcher(cli.Watcher, prefix)
cli.Lease = namespace.NewLease(cli.Lease, prefix)
return EtcdKVStore{
Client: cli,
}, nil
}
func (s EtcdKVStore) Put(k string, v any) error {
data, err := json.Marshal(v)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
_, err = s.Client.KV.Put(ctx, k, string(data))
cancel()
if err != nil {
return err
}
return nil
}
func (s EtcdKVStore) PutWithTTL(k string, v any, duration time.Duration) error {
lease, err := s.Client.Lease.Grant(context.TODO(), int64(duration.Seconds()))
if err != nil {
return err
}
data, err := json.Marshal(v)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
_, err = s.Client.KV.Put(ctx, k, string(data), clientv3.WithLease(lease.ID))
cancel()
if err != nil {
return err
}
return nil
}
func (s EtcdKVStore) Get(k string) (any, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
resp, err := s.Client.KV.Get(ctx, k)
cancel()
if err != nil {
return nil, err
}
for _, v := range resp.Kvs {
var data any
err := json.Unmarshal([]byte(v.Value), &data)
if err != nil {
return nil, err
}
// We return directly as we want to last revision of value
return data, nil
}
return nil, errors.New(fmt.Sprintf("no value %v", k))
}
func (s EtcdKVStore) Delete(k string) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
_, err := s.Client.KV.Delete(ctx, k)
cancel()
if err != nil {
return err
}
return nil
}