cache refactoring
This commit is contained in:
parent
718c39de00
commit
6299a6c7be
|
@ -0,0 +1,91 @@
|
||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"github.com/dgraph-io/badger/v4"
|
||||||
|
"github.com/spf13/viper"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type BadgerHandler struct {
|
||||||
|
db *badger.DB
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewBadgerHandler(cfg *viper.Viper) (*BadgerHandler, error) {
|
||||||
|
dbPath := cfg.GetString("storage.kv.badger.dbPath")
|
||||||
|
|
||||||
|
opts := badger.DefaultOptions(dbPath)
|
||||||
|
db, err := badger.Open(opts)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &BadgerHandler{
|
||||||
|
db: db,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bh *BadgerHandler) Put(k string, v any) error {
|
||||||
|
err := bh.db.Update(func(txn *badger.Txn) error {
|
||||||
|
bytes, err := json.Marshal(v)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return txn.Set([]byte(k), bytes)
|
||||||
|
})
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bh *BadgerHandler) PutWithTTL(k string, v any, duration time.Duration) error {
|
||||||
|
err := bh.db.Update(func(txn *badger.Txn) error {
|
||||||
|
bytes, err := json.Marshal(v)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
entry := badger.NewEntry([]byte(k), bytes).WithTTL(duration)
|
||||||
|
return txn.SetEntry(entry)
|
||||||
|
})
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bh *BadgerHandler) Get(k string) (any, error) {
|
||||||
|
var value []byte
|
||||||
|
|
||||||
|
err := bh.db.View(func(txn *badger.Txn) error {
|
||||||
|
item, err := txn.Get([]byte(k))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = item.Value(func(val []byte) error {
|
||||||
|
value = make([]byte, len(val))
|
||||||
|
copy(value, val)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var result map[string]interface{}
|
||||||
|
err = json.Unmarshal(value, &result)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bh *BadgerHandler) Delete(k string) error {
|
||||||
|
err := bh.db.Update(func(txn *badger.Txn) error {
|
||||||
|
err := txn.Delete([]byte(k))
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
|
@ -0,0 +1,179 @@
|
||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"github.com/spf13/viper"
|
||||||
|
"gopkg.in/square/go-jose.v2/json"
|
||||||
|
"log"
|
||||||
|
"net/http"
|
||||||
|
"reflect"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/mailgun/groupcache/v2"
|
||||||
|
)
|
||||||
|
|
||||||
|
type GroupCacheHandler struct {
|
||||||
|
KVHandler KVHandler
|
||||||
|
Group *groupcache.Group
|
||||||
|
cacheTTL int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewGroupCacheHandler(cfg *viper.Viper) (*GroupCacheHandler, error) {
|
||||||
|
var (
|
||||||
|
endpoint = cfg.GetString("storage.kv.groupcache.endpoint")
|
||||||
|
ports = cfg.GetStringSlice("storage.kv.groupcache.ports")
|
||||||
|
cacheTTL = cfg.GetInt64("storage.kv.groupcache.cacheTTL")
|
||||||
|
cacheServers []*http.Server
|
||||||
|
)
|
||||||
|
|
||||||
|
var kvHandler KVHandler
|
||||||
|
var getterFunc groupcache.GetterFunc
|
||||||
|
kvType := cfg.GetString("storage.kv.groupcache.db")
|
||||||
|
getterFunc = func(ctx context.Context, key string, dest groupcache.Sink) error {
|
||||||
|
resp, err := kvHandler.Get(key)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if resp == nil {
|
||||||
|
return fmt.Errorf("key not found in etcd: %s", key)
|
||||||
|
}
|
||||||
|
jsonResp, err := json.Marshal(resp.(map[string]interface{}))
|
||||||
|
if err != nil {
|
||||||
|
// Handle the error
|
||||||
|
}
|
||||||
|
dest.SetBytes(jsonResp, time.Now().Add(time.Duration(cacheTTL)*time.Minute))
|
||||||
|
return nil
|
||||||
|
|
||||||
|
}
|
||||||
|
switch kvType {
|
||||||
|
case "etcd":
|
||||||
|
etcdHandler, err := NewEtcdHandler(cfg)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
kvHandler = etcdHandler
|
||||||
|
case "badger":
|
||||||
|
badgerHandler, err := NewBadgerHandler(cfg)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
kvHandler = badgerHandler
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("unsupported kvType: %s", kvType)
|
||||||
|
}
|
||||||
|
|
||||||
|
pool := groupcache.NewHTTPPoolOpts("", &groupcache.HTTPPoolOptions{})
|
||||||
|
for _, port := range ports {
|
||||||
|
pool.Set("http://" + endpoint + ":" + port)
|
||||||
|
server := &http.Server{
|
||||||
|
Addr: endpoint + ":" + port,
|
||||||
|
Handler: pool,
|
||||||
|
}
|
||||||
|
cacheServers = append(cacheServers, server)
|
||||||
|
go func(srv *http.Server) {
|
||||||
|
log.Printf("Serving cache server %s \n", srv.Addr)
|
||||||
|
if err := srv.ListenAndServe(); err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
}(server)
|
||||||
|
}
|
||||||
|
|
||||||
|
group := groupcache.NewGroup("data", 3000000, getterFunc)
|
||||||
|
handler := &GroupCacheHandler{
|
||||||
|
KVHandler: kvHandler,
|
||||||
|
Group: group,
|
||||||
|
cacheTTL: cacheTTL,
|
||||||
|
}
|
||||||
|
|
||||||
|
return handler, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *GroupCacheHandler) Put(k string, v any) error {
|
||||||
|
// Update the value in the underlying key-value store
|
||||||
|
err := h.KVHandler.Put(k, v)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if the key exists in the cache
|
||||||
|
var value []byte
|
||||||
|
err = h.Group.Get(nil, k, groupcache.AllocatingByteSliceSink(&value))
|
||||||
|
if err == nil {
|
||||||
|
// Key exists in the cache, compare with the new value
|
||||||
|
if !reflect.DeepEqual(value, v) {
|
||||||
|
// Values are different, update the cache
|
||||||
|
jsonValue, err := json.Marshal(v)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
expireTime := time.Now().Add(time.Duration(h.cacheTTL) * time.Minute)
|
||||||
|
err = h.Group.Set(context.Background(), k, jsonValue, expireTime, false)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *GroupCacheHandler) PutWithTTL(k string, v any, duration time.Duration) error {
|
||||||
|
|
||||||
|
err := h.KVHandler.PutWithTTL(k, v, duration)
|
||||||
|
// Check if the key exists in the cache
|
||||||
|
var value []byte
|
||||||
|
err = h.Group.Get(nil, k, groupcache.AllocatingByteSliceSink(&value))
|
||||||
|
if err == nil {
|
||||||
|
// Key exists in the cache, compare with the new value
|
||||||
|
if !reflect.DeepEqual(value, v) {
|
||||||
|
// Values are different, update the cache
|
||||||
|
jsonValue, err := json.Marshal(v)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
expireTime := time.Now().Add(time.Duration(h.cacheTTL) * time.Minute)
|
||||||
|
err = h.Group.Set(context.Background(), k, jsonValue, expireTime, false)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *GroupCacheHandler) Get(k string) (any, error) {
|
||||||
|
var value []byte
|
||||||
|
err := h.Group.Get(nil, k, groupcache.AllocatingByteSliceSink(&value))
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("error", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the value is empty, return an empty map
|
||||||
|
if len(value) == 0 {
|
||||||
|
return make(map[string]interface{}), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var result map[string]interface{}
|
||||||
|
err = json.Unmarshal(value, &result)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *GroupCacheHandler) Delete(k string) error {
|
||||||
|
err := h.KVHandler.Delete(k)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = h.Group.Remove(context.Background(), k)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -1,108 +0,0 @@
|
||||||
package storage
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"encoding/base64"
|
|
||||||
"encoding/json"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"log"
|
|
||||||
"net/http"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/mailgun/groupcache/v2"
|
|
||||||
"github.com/spf13/viper"
|
|
||||||
)
|
|
||||||
|
|
||||||
type EtcdGroupCacheHandler struct {
|
|
||||||
EtcdHandler *EtcdHandler
|
|
||||||
Group *groupcache.Group
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewEtcdGroupCacheHandler(cfg *viper.Viper) (*EtcdGroupCacheHandler, error) {
|
|
||||||
var (
|
|
||||||
endpoint = cfg.GetString("storage.kv.groupcache.endpoint")
|
|
||||||
ports = cfg.GetStringSlice("storage.kv.groupcache.ports")
|
|
||||||
cacheTTL = cfg.GetInt64("storage.kv.groupcache.cacheTTL")
|
|
||||||
cacheServers []*http.Server
|
|
||||||
)
|
|
||||||
etcdHandler, err := NewEtcdHandler(cfg)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
pool := groupcache.NewHTTPPoolOpts("", &groupcache.HTTPPoolOptions{})
|
|
||||||
for _, port := range ports {
|
|
||||||
pool.Set("http://" + endpoint + ":" + port)
|
|
||||||
server := &http.Server{
|
|
||||||
Addr: endpoint + ":" + port,
|
|
||||||
Handler: pool,
|
|
||||||
}
|
|
||||||
cacheServers = append(cacheServers, server)
|
|
||||||
go func(srv *http.Server) {
|
|
||||||
log.Printf("Serving cache server %s \n", srv.Addr)
|
|
||||||
if err := srv.ListenAndServe(); err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
}(server)
|
|
||||||
}
|
|
||||||
group := groupcache.NewGroup("data", 3000000, groupcache.GetterFunc(
|
|
||||||
func(ctx context.Context, key string, dest groupcache.Sink) error {
|
|
||||||
|
|
||||||
resp, err := etcdHandler.Get(key)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if resp == nil {
|
|
||||||
return errors.New("key not found in ETCD")
|
|
||||||
}
|
|
||||||
decoded, err := base64.StdEncoding.DecodeString(resp.(string))
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to decode value for key %s: %v", key, err)
|
|
||||||
}
|
|
||||||
dest.SetString(string(decoded), time.Now().Add(time.Duration(cacheTTL)*time.Minute))
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
))
|
|
||||||
|
|
||||||
return &EtcdGroupCacheHandler{
|
|
||||||
EtcdHandler: etcdHandler,
|
|
||||||
Group: group,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *EtcdGroupCacheHandler) Put(k string, v interface{}) error {
|
|
||||||
value, err := json.Marshal(v)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return h.EtcdHandler.Put(k, value)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *EtcdGroupCacheHandler) PutWithTTL(k string, v interface{}, duration time.Duration) error {
|
|
||||||
value, err := json.Marshal(v)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return h.EtcdHandler.PutWithTTL(k, value, duration)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *EtcdGroupCacheHandler) Get(k string) (interface{}, error) {
|
|
||||||
var value []byte
|
|
||||||
err := h.Group.Get(nil, k, groupcache.AllocatingByteSliceSink(&value))
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return value, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *EtcdGroupCacheHandler) Delete(k string) error {
|
|
||||||
err := h.EtcdHandler.Delete(k)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
err = h.Group.Remove(context.Background(), k)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
|
@ -1,77 +0,0 @@
|
||||||
package storage
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/json"
|
|
||||||
"github.com/spf13/viper"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestEtcdGroupCacheHandler(t *testing.T) {
|
|
||||||
cfg := createConfig()
|
|
||||||
handler, err := NewEtcdGroupCacheHandler(cfg)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("failed to create EtcdGroupCacheHandler: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
key := "test-key"
|
|
||||||
value := map[string]string{"key1": "value1", "key2": "value2"}
|
|
||||||
|
|
||||||
// test Put and Get
|
|
||||||
err = handler.Put(key, value)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("failed to put key: %v", err)
|
|
||||||
}
|
|
||||||
resp, err := handler.Get(key)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("failed to get key: %v", err)
|
|
||||||
}
|
|
||||||
var result map[string]string
|
|
||||||
err = json.Unmarshal(resp.([]byte), &result)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("failed to unmarshal response: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(result) != len(value) || result["key1"] != value["key1"] || result["key2"] != value["key2"] {
|
|
||||||
t.Fatalf("unexpected response: %v", result)
|
|
||||||
}
|
|
||||||
|
|
||||||
// test PutWithTTL and Get
|
|
||||||
err = handler.PutWithTTL(key, value, time.Second*2)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("failed to put key with TTL: %v", err)
|
|
||||||
}
|
|
||||||
resp, err = handler.Get(key)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("failed to get key: %v", err)
|
|
||||||
}
|
|
||||||
err = json.Unmarshal(resp.([]byte), &result)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("failed to unmarshal response: %v", err)
|
|
||||||
}
|
|
||||||
if len(result) != len(value) || result["key1"] != value["key1"] || result["key2"] != value["key2"] {
|
|
||||||
t.Fatalf("unexpected response: %v", result)
|
|
||||||
}
|
|
||||||
|
|
||||||
// test Delete
|
|
||||||
err = handler.Delete(key)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("failed to delete key: %v", err)
|
|
||||||
}
|
|
||||||
resp, err = handler.Get(key)
|
|
||||||
if err == nil {
|
|
||||||
t.Fatalf("expected key to be deleted")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func createConfig() *viper.Viper {
|
|
||||||
cfg := viper.New()
|
|
||||||
cfg.Set("storage.kv.etcd.endpoints", []string{"localhost:2379"})
|
|
||||||
cfg.Set("storage.kv.etcd.username", "")
|
|
||||||
cfg.Set("storage.kv.etcd.prefix", "parcoursmob/cache/")
|
|
||||||
cfg.Set("storage.kv.etcd.password", "")
|
|
||||||
cfg.Set("storage.kv.groupcache.endpoint", "localhost")
|
|
||||||
cfg.Set("storage.kv.groupcache.ports", []string{"36000", "36500"})
|
|
||||||
cfg.Set("storage.kv.groupcache.cacheTTL", 10)
|
|
||||||
return cfg
|
|
||||||
}
|
|
|
@ -1,6 +1,7 @@
|
||||||
package storage
|
package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/spf13/viper"
|
"github.com/spf13/viper"
|
||||||
|
@ -14,5 +15,18 @@ type KVHandler interface {
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewKVHandler(cfg *viper.Viper) (KVHandler, error) {
|
func NewKVHandler(cfg *viper.Viper) (KVHandler, error) {
|
||||||
return NewEtcdHandler(cfg)
|
cacheType := cfg.GetString("storage.kv.dbType")
|
||||||
|
switch cacheType {
|
||||||
|
case "etcd":
|
||||||
|
return NewEtcdHandler(cfg)
|
||||||
|
case "badger":
|
||||||
|
return NewBadgerHandler(cfg)
|
||||||
|
case "etcdGroupcache":
|
||||||
|
return NewGroupCacheHandler(cfg)
|
||||||
|
fmt.Println("here")
|
||||||
|
case "badgerGroupcache":
|
||||||
|
return NewGroupCacheHandler(cfg)
|
||||||
|
}
|
||||||
|
return nil, nil
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue