package storage import ( "context" "encoding/json" "errors" "fmt" "time" "github.com/spf13/viper" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/namespace" ) type EtcdKVStore struct { Client *clientv3.Client } func NewEtcdKVStore(cfg *viper.Viper) (EtcdKVStore, error) { var ( endpoints = cfg.GetStringSlice("storage.kv.etcd.endpoints") prefix = cfg.GetString("storage.kv.etcd.prefix") username = cfg.GetString("storage.kv.etcd.username") password = cfg.GetString("storage.kv.etcd.password") ) cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, Username: username, Password: password, DialTimeout: 5 * time.Second, }) if err != nil { return EtcdKVStore{}, err } cli.KV = namespace.NewKV(cli.KV, prefix) cli.Watcher = namespace.NewWatcher(cli.Watcher, prefix) cli.Lease = namespace.NewLease(cli.Lease, prefix) return EtcdKVStore{ Client: cli, }, nil } func (s EtcdKVStore) Put(k string, v any) error { data, err := json.Marshal(v) if err != nil { return err } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) _, err = s.Client.KV.Put(ctx, k, string(data)) cancel() if err != nil { return err } return nil } func (s EtcdKVStore) PutWithTTL(k string, v any, duration time.Duration) error { lease, err := s.Client.Lease.Grant(context.TODO(), int64(duration.Seconds())) if err != nil { return err } data, err := json.Marshal(v) if err != nil { return err } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) _, err = s.Client.KV.Put(ctx, k, string(data), clientv3.WithLease(lease.ID)) cancel() if err != nil { return err } return nil } func (s EtcdKVStore) Get(k string) (any, error) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) resp, err := s.Client.KV.Get(ctx, k) cancel() if err != nil { return nil, err } for _, v := range resp.Kvs { var data any err := json.Unmarshal([]byte(v.Value), &data) if err != nil { return nil, err } // We return directly as we want to last revision of value return data, nil } return nil, errors.New(fmt.Sprintf("no value %v", k)) } func (s EtcdKVStore) Delete(k string) error { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) _, err := s.Client.KV.Delete(ctx, k) cancel() if err != nil { return err } return nil }