From 9f6685152979650214c297ab3ac1b1aa393c3663 Mon Sep 17 00:00:00 2001 From: sbouaram Date: Mon, 15 May 2023 14:49:00 +0200 Subject: [PATCH] adding groupcache --- utils/storage/groupcache.go | 99 ++++++++++++++++++++++++++++++++ utils/storage/groupcache_test.go | 75 ++++++++++++++++++++++++ 2 files changed, 174 insertions(+) create mode 100644 utils/storage/groupcache.go create mode 100644 utils/storage/groupcache_test.go diff --git a/utils/storage/groupcache.go b/utils/storage/groupcache.go new file mode 100644 index 0000000..0e81c2d --- /dev/null +++ b/utils/storage/groupcache.go @@ -0,0 +1,99 @@ +package storage + +import ( + "context" + "encoding/base64" + "encoding/json" + "errors" + "fmt" + "log" + "net/http" + "time" + + "github.com/mailgun/groupcache/v2" + "github.com/spf13/viper" +) + +type EtcdGroupCacheHandler struct { + EtcdHandler *EtcdHandler + Group *groupcache.Group +} + +func NewEtcdGroupCacheHandler(cfg *viper.Viper) (*EtcdGroupCacheHandler, error) { + var endpoint = cfg.GetString("storage.kv.groupcache.endpoint") + etcdHandler, err := NewEtcdHandler(cfg) + if err != nil { + return nil, err + } + pool := groupcache.NewHTTPPoolOpts("http://"+endpoint, &groupcache.HTTPPoolOptions{}) + server := http.Server{ + Addr: endpoint, + Handler: pool, + } + go func() { + log.Printf("Serving....\n") + if err := server.ListenAndServe(); err != nil { + log.Fatal(err) + } + }() + group := groupcache.NewGroup("data", 3000000, groupcache.GetterFunc( + func(ctx context.Context, key string, dest groupcache.Sink) error { + + resp, err := etcdHandler.Get(key) + if err != nil { + return err + } + if resp == nil { + return errors.New("key not found in ETCD") + } + decoded, err := base64.StdEncoding.DecodeString(resp.(string)) + if err != nil { + return fmt.Errorf("failed to decode value for key %s: %v", key, err) + } + dest.SetString(string(decoded), time.Now().Add(time.Minute*5)) + return nil + }, + )) + + return &EtcdGroupCacheHandler{ + EtcdHandler: etcdHandler, + Group: group, + }, nil +} + +func (h *EtcdGroupCacheHandler) Put(k string, v interface{}) error { + value, err := json.Marshal(v) + if err != nil { + return err + } + return h.EtcdHandler.Put(k, value) +} + +func (h *EtcdGroupCacheHandler) PutWithTTL(k string, v interface{}, duration time.Duration) error { + value, err := json.Marshal(v) + if err != nil { + return err + } + return h.EtcdHandler.PutWithTTL(k, value, duration) +} + +func (h *EtcdGroupCacheHandler) Get(k string) (interface{}, error) { + var value []byte + err := h.Group.Get(nil, k, groupcache.AllocatingByteSliceSink(&value)) + if err != nil { + return nil, err + } + return value, nil +} + +func (h *EtcdGroupCacheHandler) Delete(k string) error { + err := h.EtcdHandler.Delete(k) + if err != nil { + return err + } + err = h.Group.Remove(context.Background(), k) + if err != nil { + return err + } + return nil +} diff --git a/utils/storage/groupcache_test.go b/utils/storage/groupcache_test.go new file mode 100644 index 0000000..fb875c1 --- /dev/null +++ b/utils/storage/groupcache_test.go @@ -0,0 +1,75 @@ +package storage + +import ( + "encoding/json" + "github.com/spf13/viper" + "testing" + "time" +) + +func TestEtcdGroupCacheHandler(t *testing.T) { + cfg := createConfig() + handler, err := NewEtcdGroupCacheHandler(cfg) + if err != nil { + t.Fatalf("failed to create EtcdGroupCacheHandler: %v", err) + } + + key := "test-key" + value := map[string]string{"key1": "value1", "key2": "value2"} + + // test Put and Get + err = handler.Put(key, value) + if err != nil { + t.Fatalf("failed to put key: %v", err) + } + resp, err := handler.Get(key) + if err != nil { + t.Fatalf("failed to get key: %v", err) + } + var result map[string]string + err = json.Unmarshal(resp.([]byte), &result) + if err != nil { + t.Fatalf("failed to unmarshal response: %v", err) + } + + if len(result) != len(value) || result["key1"] != value["key1"] || result["key2"] != value["key2"] { + t.Fatalf("unexpected response: %v", result) + } + + // test PutWithTTL and Get + err = handler.PutWithTTL(key, value, time.Second*2) + if err != nil { + t.Fatalf("failed to put key with TTL: %v", err) + } + resp, err = handler.Get(key) + if err != nil { + t.Fatalf("failed to get key: %v", err) + } + err = json.Unmarshal(resp.([]byte), &result) + if err != nil { + t.Fatalf("failed to unmarshal response: %v", err) + } + if len(result) != len(value) || result["key1"] != value["key1"] || result["key2"] != value["key2"] { + t.Fatalf("unexpected response: %v", result) + } + + // test Delete + err = handler.Delete(key) + if err != nil { + t.Fatalf("failed to delete key: %v", err) + } + resp, err = handler.Get(key) + if err == nil { + t.Fatalf("expected key to be deleted") + } +} + +func createConfig() *viper.Viper { + cfg := viper.New() + cfg.Set("storage.kv.etcd.endpoints", []string{"localhost:2379"}) + cfg.Set("storage.kv.etcd.username", "") + cfg.Set("storage.kv.etcd.prefix", "parcoursmob/cache/") + cfg.Set("storage.kv.etcd.password", "") + cfg.Set("storage.kv.groupcache.endpoint", "localhost:8080") + return cfg +}