agenda dispositifs exports
This commit is contained in:
@@ -1,91 +1 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
"github.com/spf13/viper"
|
||||
"time"
|
||||
)
|
||||
|
||||
type BadgerHandler struct {
|
||||
db *badger.DB
|
||||
}
|
||||
|
||||
func NewBadgerHandler(cfg *viper.Viper) (*BadgerHandler, error) {
|
||||
dbPath := cfg.GetString("storage.kv.badger.dbPath")
|
||||
|
||||
opts := badger.DefaultOptions(dbPath)
|
||||
db, err := badger.Open(opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &BadgerHandler{
|
||||
db: db,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (bh *BadgerHandler) Put(k string, v any) error {
|
||||
err := bh.db.Update(func(txn *badger.Txn) error {
|
||||
bytes, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return txn.Set([]byte(k), bytes)
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (bh *BadgerHandler) PutWithTTL(k string, v any, duration time.Duration) error {
|
||||
err := bh.db.Update(func(txn *badger.Txn) error {
|
||||
bytes, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
entry := badger.NewEntry([]byte(k), bytes).WithTTL(duration)
|
||||
return txn.SetEntry(entry)
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (bh *BadgerHandler) Get(k string) (any, error) {
|
||||
var value []byte
|
||||
|
||||
err := bh.db.View(func(txn *badger.Txn) error {
|
||||
item, err := txn.Get([]byte(k))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = item.Value(func(val []byte) error {
|
||||
value = make([]byte, len(val))
|
||||
copy(value, val)
|
||||
return nil
|
||||
})
|
||||
|
||||
return err
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var result map[string]interface{}
|
||||
err = json.Unmarshal(value, &result)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (bh *BadgerHandler) Delete(k string) error {
|
||||
err := bh.db.Update(func(txn *badger.Txn) error {
|
||||
err := txn.Delete([]byte(k))
|
||||
return err
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -1,170 +1 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/spf13/viper"
|
||||
"gopkg.in/square/go-jose.v2/json"
|
||||
"log"
|
||||
"net/http"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
"github.com/mailgun/groupcache/v2"
|
||||
)
|
||||
|
||||
type GroupCacheHandler struct {
|
||||
KVHandler KVHandler
|
||||
Group *groupcache.Group
|
||||
cacheTTL int64
|
||||
}
|
||||
|
||||
func NewGroupCacheHandler(cfg *viper.Viper) (*GroupCacheHandler, error) {
|
||||
var (
|
||||
endpoint = cfg.GetString("storage.kv.groupcache.endpoint")
|
||||
ports = cfg.GetStringSlice("storage.kv.groupcache.ports")
|
||||
cacheTTL = cfg.GetInt64("storage.kv.groupcache.cacheTTL")
|
||||
cacheServers []*http.Server
|
||||
)
|
||||
|
||||
var kvHandler KVHandler
|
||||
var getterFunc groupcache.GetterFunc
|
||||
kvType := cfg.GetString("storage.kv.groupcache.db")
|
||||
getterFunc = func(ctx context.Context, key string, dest groupcache.Sink) error {
|
||||
resp, err := kvHandler.Get(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if resp == nil {
|
||||
return fmt.Errorf("key not found in etcd: %s", key)
|
||||
}
|
||||
jsonResp, _ := json.Marshal(resp)
|
||||
dest.SetBytes(jsonResp, time.Now().Add(time.Duration(cacheTTL)*time.Minute))
|
||||
return nil
|
||||
|
||||
}
|
||||
switch kvType {
|
||||
case "etcd":
|
||||
etcdHandler, err := NewEtcdHandler(cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
kvHandler = etcdHandler
|
||||
case "badger":
|
||||
badgerHandler, err := NewBadgerHandler(cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
kvHandler = badgerHandler
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported kvType: %s", kvType)
|
||||
}
|
||||
|
||||
pool := groupcache.NewHTTPPoolOpts("", &groupcache.HTTPPoolOptions{})
|
||||
for _, port := range ports {
|
||||
pool.Set("http://" + endpoint + ":" + port)
|
||||
server := &http.Server{
|
||||
Addr: endpoint + ":" + port,
|
||||
Handler: pool,
|
||||
}
|
||||
cacheServers = append(cacheServers, server)
|
||||
go func(srv *http.Server) {
|
||||
log.Printf("Serving cache server %s \n", srv.Addr)
|
||||
if err := srv.ListenAndServe(); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}(server)
|
||||
}
|
||||
|
||||
group := groupcache.NewGroup("data", 3000000, getterFunc)
|
||||
handler := &GroupCacheHandler{
|
||||
KVHandler: kvHandler,
|
||||
Group: group,
|
||||
cacheTTL: cacheTTL,
|
||||
}
|
||||
|
||||
return handler, nil
|
||||
}
|
||||
|
||||
func (h *GroupCacheHandler) Put(k string, v any) error {
|
||||
// Update the value in the underlying key-value store
|
||||
err := h.KVHandler.Put(k, v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Check if the key exists in the cache
|
||||
var value []byte
|
||||
err = h.Group.Get(nil, k, groupcache.AllocatingByteSliceSink(&value))
|
||||
if err == nil {
|
||||
// Key exists in the cache, compare with the new value
|
||||
if !reflect.DeepEqual(value, v) {
|
||||
// Values are different, update the cache
|
||||
jsonValue, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
expireTime := time.Now().Add(time.Duration(h.cacheTTL) * time.Minute)
|
||||
err = h.Group.Set(context.Background(), k, jsonValue, expireTime, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *GroupCacheHandler) PutWithTTL(k string, v any, duration time.Duration) error {
|
||||
|
||||
err := h.KVHandler.PutWithTTL(k, v, duration)
|
||||
// Check if the key exists in the cache
|
||||
var value []byte
|
||||
err = h.Group.Get(nil, k, groupcache.AllocatingByteSliceSink(&value))
|
||||
if err == nil {
|
||||
// Key exists in the cache, compare with the new value
|
||||
if !reflect.DeepEqual(value, v) {
|
||||
// Values are different, update the cache
|
||||
jsonValue, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
expireTime := time.Now().Add(time.Duration(h.cacheTTL) * time.Minute)
|
||||
err = h.Group.Set(context.Background(), k, jsonValue, expireTime, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
func (h *GroupCacheHandler) Get(k string) (any, error) {
|
||||
var value []byte
|
||||
err := h.Group.Get(nil, k, groupcache.AllocatingByteSliceSink(&value))
|
||||
if err != nil {
|
||||
fmt.Println("error", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var result any
|
||||
err = json.Unmarshal(value, &result)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (h *GroupCacheHandler) Delete(k string) error {
|
||||
err := h.KVHandler.Delete(k)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = h.Group.Remove(context.Background(), k)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user