adding groupcache
This commit is contained in:
parent
aa13d60ca6
commit
9f66851529
|
@ -0,0 +1,99 @@
|
||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/base64"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"net/http"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/mailgun/groupcache/v2"
|
||||||
|
"github.com/spf13/viper"
|
||||||
|
)
|
||||||
|
|
||||||
|
type EtcdGroupCacheHandler struct {
|
||||||
|
EtcdHandler *EtcdHandler
|
||||||
|
Group *groupcache.Group
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewEtcdGroupCacheHandler(cfg *viper.Viper) (*EtcdGroupCacheHandler, error) {
|
||||||
|
var endpoint = cfg.GetString("storage.kv.groupcache.endpoint")
|
||||||
|
etcdHandler, err := NewEtcdHandler(cfg)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
pool := groupcache.NewHTTPPoolOpts("http://"+endpoint, &groupcache.HTTPPoolOptions{})
|
||||||
|
server := http.Server{
|
||||||
|
Addr: endpoint,
|
||||||
|
Handler: pool,
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
log.Printf("Serving....\n")
|
||||||
|
if err := server.ListenAndServe(); err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
group := groupcache.NewGroup("data", 3000000, groupcache.GetterFunc(
|
||||||
|
func(ctx context.Context, key string, dest groupcache.Sink) error {
|
||||||
|
|
||||||
|
resp, err := etcdHandler.Get(key)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if resp == nil {
|
||||||
|
return errors.New("key not found in ETCD")
|
||||||
|
}
|
||||||
|
decoded, err := base64.StdEncoding.DecodeString(resp.(string))
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to decode value for key %s: %v", key, err)
|
||||||
|
}
|
||||||
|
dest.SetString(string(decoded), time.Now().Add(time.Minute*5))
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
))
|
||||||
|
|
||||||
|
return &EtcdGroupCacheHandler{
|
||||||
|
EtcdHandler: etcdHandler,
|
||||||
|
Group: group,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *EtcdGroupCacheHandler) Put(k string, v interface{}) error {
|
||||||
|
value, err := json.Marshal(v)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return h.EtcdHandler.Put(k, value)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *EtcdGroupCacheHandler) PutWithTTL(k string, v interface{}, duration time.Duration) error {
|
||||||
|
value, err := json.Marshal(v)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return h.EtcdHandler.PutWithTTL(k, value, duration)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *EtcdGroupCacheHandler) Get(k string) (interface{}, error) {
|
||||||
|
var value []byte
|
||||||
|
err := h.Group.Get(nil, k, groupcache.AllocatingByteSliceSink(&value))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return value, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *EtcdGroupCacheHandler) Delete(k string) error {
|
||||||
|
err := h.EtcdHandler.Delete(k)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = h.Group.Remove(context.Background(), k)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -0,0 +1,75 @@
|
||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"github.com/spf13/viper"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestEtcdGroupCacheHandler(t *testing.T) {
|
||||||
|
cfg := createConfig()
|
||||||
|
handler, err := NewEtcdGroupCacheHandler(cfg)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to create EtcdGroupCacheHandler: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
key := "test-key"
|
||||||
|
value := map[string]string{"key1": "value1", "key2": "value2"}
|
||||||
|
|
||||||
|
// test Put and Get
|
||||||
|
err = handler.Put(key, value)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to put key: %v", err)
|
||||||
|
}
|
||||||
|
resp, err := handler.Get(key)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to get key: %v", err)
|
||||||
|
}
|
||||||
|
var result map[string]string
|
||||||
|
err = json.Unmarshal(resp.([]byte), &result)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to unmarshal response: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(result) != len(value) || result["key1"] != value["key1"] || result["key2"] != value["key2"] {
|
||||||
|
t.Fatalf("unexpected response: %v", result)
|
||||||
|
}
|
||||||
|
|
||||||
|
// test PutWithTTL and Get
|
||||||
|
err = handler.PutWithTTL(key, value, time.Second*2)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to put key with TTL: %v", err)
|
||||||
|
}
|
||||||
|
resp, err = handler.Get(key)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to get key: %v", err)
|
||||||
|
}
|
||||||
|
err = json.Unmarshal(resp.([]byte), &result)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to unmarshal response: %v", err)
|
||||||
|
}
|
||||||
|
if len(result) != len(value) || result["key1"] != value["key1"] || result["key2"] != value["key2"] {
|
||||||
|
t.Fatalf("unexpected response: %v", result)
|
||||||
|
}
|
||||||
|
|
||||||
|
// test Delete
|
||||||
|
err = handler.Delete(key)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to delete key: %v", err)
|
||||||
|
}
|
||||||
|
resp, err = handler.Get(key)
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("expected key to be deleted")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func createConfig() *viper.Viper {
|
||||||
|
cfg := viper.New()
|
||||||
|
cfg.Set("storage.kv.etcd.endpoints", []string{"localhost:2379"})
|
||||||
|
cfg.Set("storage.kv.etcd.username", "")
|
||||||
|
cfg.Set("storage.kv.etcd.prefix", "parcoursmob/cache/")
|
||||||
|
cfg.Set("storage.kv.etcd.password", "")
|
||||||
|
cfg.Set("storage.kv.groupcache.endpoint", "localhost:8080")
|
||||||
|
return cfg
|
||||||
|
}
|
Loading…
Reference in New Issue