package storage import ( "bytes" "context" "encoding/gob" "encoding/json" "fmt" "time" "github.com/spf13/viper" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/namespace" ) type EtcdSerializer interface { Deserialize(d []byte, m *any) error Serialize(m any) ([]byte, error) } type JSONEtcdSerializer struct{} // Serialize to JSON. Will err if there are unmarshalable key values func (s JSONEtcdSerializer) Serialize(m any) ([]byte, error) { return json.Marshal(m) } // Deserialize back to map[string]interface{} func (s JSONEtcdSerializer) Deserialize(d []byte, m *any) (err error) { err = json.Unmarshal(d, &m) if err != nil { fmt.Printf("JSONSerializer.deserialize() Error: %v", err) return err } return } // GobEtcdSerializer uses gob package to encode the session map type GobEtcdSerializer struct{} // Serialize using gob func (s GobEtcdSerializer) Serialize(m any) ([]byte, error) { buf := new(bytes.Buffer) enc := gob.NewEncoder(buf) err := enc.Encode(m) if err == nil { return buf.Bytes(), nil } return nil, err } // Deserialize back to map[interface{}]interface{} func (s GobEtcdSerializer) Deserialize(d []byte, m any) error { dec := gob.NewDecoder(bytes.NewBuffer(d)) return dec.Decode(&m) } type EtcdHandler struct { *clientv3.Client serializer EtcdSerializer } func NewEtcdHandler(cfg *viper.Viper) (*EtcdHandler, error) { var ( endpoints = cfg.GetStringSlice("storage.kv.etcd.endpoints") prefix = cfg.GetString("storage.kv.etcd.prefix") ) cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: 5 * time.Second, }) if err != nil { return nil, err } cli.KV = namespace.NewKV(cli.KV, prefix) cli.Watcher = namespace.NewWatcher(cli.Watcher, prefix) cli.Lease = namespace.NewLease(cli.Lease, prefix) return &EtcdHandler{ Client: cli, serializer: JSONEtcdSerializer{}, }, nil } func (s *EtcdHandler) Put(k string, v any) error { data, err := s.serializer.Serialize(v) if err != nil { return err } // _, err = s.Client.KV.Put(context.TODO(), k, data.String()) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) _, err = s.Client.KV.Put(ctx, k, string(data)) cancel() if err != nil { return err } return nil } func (s *EtcdHandler) PutWithTTL(k string, v any, duration time.Duration) error { lease, err := s.Client.Lease.Grant(context.TODO(), int64(duration.Seconds())) if err != nil { return err } data, err := s.serializer.Serialize(v) if err != nil { return err } // _, err = s.Client.KV.Put(context.TODO(), k, data.String(), clientv3.WithLease(lease.ID)) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) _, err = s.Client.KV.Put(ctx, k, string(data), clientv3.WithLease(lease.ID)) cancel() if err != nil { return err } return nil } func (s *EtcdHandler) Get(k string) (any, error) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) resp, err := s.Client.KV.Get(ctx, k) cancel() if err != nil { return nil, err } for _, v := range resp.Kvs { var data any err := s.serializer.Deserialize([]byte(v.Value), &data) if err != nil { return nil, err } // We return directly as we want to last revision of value return data, nil } return nil, fmt.Errorf("no value %v", k) } func (s *EtcdHandler) Delete(k string) error { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) _, err := s.Client.KV.Delete(ctx, k) cancel() if err != nil { return err } return nil }