Refactor previous COOPGO Identity service - Initial commit
This commit is contained in:
120
storage/etcd.go
Normal file
120
storage/etcd.go
Normal file
@@ -0,0 +1,120 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/spf13/viper"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"go.etcd.io/etcd/client/v3/namespace"
|
||||
)
|
||||
|
||||
type EtcdKVStore struct {
|
||||
Client *clientv3.Client
|
||||
}
|
||||
|
||||
func NewEtcdKVStore(cfg *viper.Viper) (EtcdKVStore, error) {
|
||||
var (
|
||||
endpoints = cfg.GetStringSlice("storage.kv.etcd.endpoints")
|
||||
prefix = cfg.GetString("storage.kv.etcd.prefix")
|
||||
)
|
||||
|
||||
cli, err := clientv3.New(clientv3.Config{
|
||||
Endpoints: endpoints,
|
||||
DialTimeout: 5 * time.Second,
|
||||
})
|
||||
if err != nil {
|
||||
return EtcdKVStore{}, err
|
||||
}
|
||||
|
||||
cli.KV = namespace.NewKV(cli.KV, prefix)
|
||||
cli.Watcher = namespace.NewWatcher(cli.Watcher, prefix)
|
||||
cli.Lease = namespace.NewLease(cli.Lease, prefix)
|
||||
|
||||
return EtcdKVStore{
|
||||
Client: cli,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s EtcdKVStore) Put(k string, v any) error {
|
||||
// var data bytes.Buffer // Stand-in for a network connection
|
||||
// enc := gob.NewEncoder(&data)
|
||||
// err := enc.Encode(v)
|
||||
// if err != nil {
|
||||
// return err
|
||||
// }
|
||||
data, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// _, err = s.Client.KV.Put(context.TODO(), k, data.String())
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
_, err = s.Client.KV.Put(ctx, k, string(data))
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s EtcdKVStore) PutWithTTL(k string, v any, duration time.Duration) error {
|
||||
lease, err := s.Client.Lease.Grant(context.TODO(), int64(duration.Seconds()))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// var data bytes.Buffer // Stand-in for a network connection
|
||||
// enc := gob.NewEncoder(&data)
|
||||
// err = enc.Encode(v)
|
||||
// if err != nil {
|
||||
// return err
|
||||
// }
|
||||
data, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// _, err = s.Client.KV.Put(context.TODO(), k, data.String(), clientv3.WithLease(lease.ID))
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
_, err = s.Client.KV.Put(ctx, k, string(data), clientv3.WithLease(lease.ID))
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s EtcdKVStore) Get(k string) (any, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
resp, err := s.Client.KV.Get(ctx, k)
|
||||
cancel()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, v := range resp.Kvs {
|
||||
var data any
|
||||
// var reader bytes.Buffer
|
||||
// reader.Write(v.Value)
|
||||
// enc := gob.NewDecoder(&reader)
|
||||
// err := enc.Decode(&data)
|
||||
err := json.Unmarshal([]byte(v.Value), &data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// We return directly as we want to last revision of value
|
||||
return data, nil
|
||||
}
|
||||
return nil, errors.New(fmt.Sprintf("no value %v", k))
|
||||
}
|
||||
|
||||
func (s EtcdKVStore) Delete(k string) error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
_, err := s.Client.KV.Delete(ctx, k)
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user