saved-search/data/storage/mongodb.go

219 lines
6.2 KiB
Go

package storage
import (
"context"
"fmt"
"time"
"git.coopgo.io/coopgo-platform/saved-search/data/types"
"github.com/rs/zerolog/log"
"github.com/spf13/viper"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
// MongoDBStorage implements Storage interface using MongoDB
type MongoDBStorage struct {
client *mongo.Client
database string
collection string
}
// NewMongoDBStorage creates a new MongoDB storage instance
func NewMongoDBStorage(client *mongo.Client, database, collection string) (*MongoDBStorage, error) {
storage := &MongoDBStorage{
client: client,
database: database,
collection: collection,
}
// Create indexes
if err := storage.createIndexes(); err != nil {
return nil, fmt.Errorf("failed to create indexes: %w", err)
}
return storage, nil
}
// NewMongoDBStorageFromConfig creates a new MongoDB storage from config following solidarity transport pattern
func NewMongoDBStorageFromConfig(cfg *viper.Viper) (Storage, error) {
var (
mongodb_uri = cfg.GetString("storage.db.mongodb.uri")
mongodb_host = cfg.GetString("storage.db.mongodb.host")
mongodb_port = cfg.GetString("storage.db.mongodb.port")
mongodb_dbname = cfg.GetString("storage.db.mongodb.db_name")
mongodb_collection = cfg.GetString("storage.db.mongodb.collections.saved_searches")
)
if mongodb_uri == "" {
mongodb_uri = fmt.Sprintf("mongodb://%s:%s/%s", mongodb_host, mongodb_port, mongodb_dbname)
}
client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(mongodb_uri))
if err != nil {
return nil, err
}
log.Info().Str("collection", mongodb_collection).Msg("mongodb storage initialized")
return NewMongoDBStorage(client, mongodb_dbname, mongodb_collection)
}
func (s *MongoDBStorage) createIndexes() error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
coll := s.client.Database(s.database).Collection(s.collection)
// Create index on owner_id for efficient querying
ownerIndex := mongo.IndexModel{
Keys: bson.D{{Key: "owner_id", Value: 1}},
}
// Create compound index on owner_id and created_at for pagination
paginationIndex := mongo.IndexModel{
Keys: bson.D{{Key: "owner_id", Value: 1}, {Key: "created_at", Value: -1}},
}
_, err := coll.Indexes().CreateMany(ctx, []mongo.IndexModel{ownerIndex, paginationIndex})
return err
}
func (s *MongoDBStorage) CreateSavedSearch(ctx context.Context, search types.SavedSearch) error {
coll := s.client.Database(s.database).Collection(s.collection)
now := time.Now()
search.CreatedAt = now
search.UpdatedAt = now
_, err := coll.InsertOne(ctx, search)
if err != nil {
log.Error().Err(err).Msg("CreateSavedSearch error")
return fmt.Errorf("failed to create saved search: %w", err)
}
return nil
}
func (s *MongoDBStorage) GetSavedSearch(ctx context.Context, id string) (*types.SavedSearch, error) {
coll := s.client.Database(s.database).Collection(s.collection)
var search types.SavedSearch
err := coll.FindOne(ctx, bson.M{"_id": id}).Decode(&search)
if err != nil {
if err == mongo.ErrNoDocuments {
return nil, fmt.Errorf("saved search not found")
}
log.Error().Err(err).Msg("GetSavedSearch error")
return nil, fmt.Errorf("failed to get saved search: %w", err)
}
return &search, nil
}
func (s *MongoDBStorage) GetSavedSearchesByOwner(ctx context.Context, ownerID string) ([]*types.SavedSearch, error) {
coll := s.client.Database(s.database).Collection(s.collection)
// Filter to only include future searches
filter := bson.M{
"owner_id": ownerID,
"datetime": bson.M{"$gt": time.Now()},
}
cursor, err := coll.Find(ctx, filter)
if err != nil {
log.Error().Err(err).Msg("GetSavedSearchesByOwner error")
return nil, fmt.Errorf("failed to get saved searches: %w", err)
}
defer cursor.Close(ctx)
var searches []*types.SavedSearch
for cursor.Next(ctx) {
var search types.SavedSearch
if err := cursor.Decode(&search); err != nil {
log.Error().Err(err).Msg("decode saved search error")
continue
}
searches = append(searches, &search)
}
return searches, nil
}
func (s *MongoDBStorage) UpdateSavedSearch(ctx context.Context, search types.SavedSearch) error {
coll := s.client.Database(s.database).Collection(s.collection)
search.UpdatedAt = time.Now()
filter := bson.M{"_id": search.ID}
update := bson.M{"$set": search}
result, err := coll.UpdateOne(ctx, filter, update)
if err != nil {
log.Error().Err(err).Msg("UpdateSavedSearch error")
return fmt.Errorf("failed to update saved search: %w", err)
}
if result.MatchedCount == 0 {
return fmt.Errorf("saved search not found")
}
return nil
}
func (s *MongoDBStorage) DeleteSavedSearch(ctx context.Context, id string) error {
coll := s.client.Database(s.database).Collection(s.collection)
result, err := coll.DeleteOne(ctx, bson.M{"_id": id})
if err != nil {
log.Error().Err(err).Msg("DeleteSavedSearch error")
return fmt.Errorf("failed to delete saved search: %w", err)
}
if result.DeletedCount == 0 {
return fmt.Errorf("saved search not found")
}
return nil
}
func (s *MongoDBStorage) ListSavedSearches(ctx context.Context, ownerID string, limit, offset int) ([]*types.SavedSearch, int64, error) {
coll := s.client.Database(s.database).Collection(s.collection)
filter := bson.M{}
if ownerID != "" {
filter["owner_id"] = ownerID
}
// Get total count
total, err := coll.CountDocuments(ctx, filter)
if err != nil {
log.Error().Err(err).Msg("ListSavedSearches count error")
return nil, 0, fmt.Errorf("failed to count saved searches: %w", err)
}
// Get paginated results
opts := options.Find().
SetSort(bson.D{{Key: "created_at", Value: -1}}).
SetLimit(int64(limit)).
SetSkip(int64(offset))
cursor, err := coll.Find(ctx, filter, opts)
if err != nil {
log.Error().Err(err).Msg("ListSavedSearches find error")
return nil, 0, fmt.Errorf("failed to list saved searches: %w", err)
}
defer cursor.Close(ctx)
var searches []*types.SavedSearch
for cursor.Next(ctx) {
var search types.SavedSearch
if err := cursor.Decode(&search); err != nil {
log.Error().Err(err).Msg("decode saved search error")
continue
}
searches = append(searches, &search)
}
return searches, total, nil
}