initial commit with admin index, working with in-memory RTree index for now
This commit is contained in:
152
handlers/admin/bleve.go
Normal file
152
handlers/admin/bleve.go
Normal file
@@ -0,0 +1,152 @@
|
||||
package admin
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
"github.com/blevesearch/bleve/v2"
|
||||
"github.com/blevesearch/bleve/v2/geo"
|
||||
"github.com/paulmach/orb/geojson"
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/spf13/viper"
|
||||
)
|
||||
|
||||
type AdminBleveIndex struct {
|
||||
Index bleve.Index
|
||||
Data map[string]*geojson.Feature
|
||||
}
|
||||
|
||||
func NewAdminBleveIndex(cfg *viper.Viper) (*AdminBleveIndex, error) {
|
||||
file := cfg.GetString("storage.index.bleve.file")
|
||||
if file == "" {
|
||||
return nil, errors.New("issue in config : storage.index.beleve.file missing")
|
||||
}
|
||||
index, err := bleve.Open(file)
|
||||
if err != nil {
|
||||
log.Info().Msg("Bleve index does not exist : creating")
|
||||
mapping := bleve.NewIndexMapping()
|
||||
docMapping := bleve.NewDocumentMapping()
|
||||
docMapping.AddFieldMappingsAt("geometry", bleve.NewGeoShapeFieldMapping())
|
||||
// docMapping.AddFieldMappingsAt("properties.nom", bleve.NewTextFieldMapping())
|
||||
mapping.DefaultMapping = docMapping
|
||||
index, err := bleve.New(file, mapping)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("issue creating index : %v", err)
|
||||
}
|
||||
indexstorage := &AdminBleveIndex{
|
||||
Index: index,
|
||||
Data: map[string]*geojson.Feature{},
|
||||
}
|
||||
err = indexstorage.IndexData(cfg.GetStringMapString("data.layers"))
|
||||
return indexstorage, nil
|
||||
}
|
||||
|
||||
return &AdminBleveIndex{
|
||||
Index: index,
|
||||
Data: map[string]*geojson.Feature{},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (b *AdminBleveIndex) IndexData(datalayers map[string]string) error {
|
||||
log.Info().Msg("indexing data")
|
||||
var wg sync.WaitGroup
|
||||
var errs chan error
|
||||
for layer, url := range datalayers {
|
||||
wg.Add(1)
|
||||
go func(errs chan error) {
|
||||
defer wg.Done()
|
||||
resp, err := http.Get(url)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("get request failed")
|
||||
return
|
||||
}
|
||||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("cound not read response")
|
||||
return
|
||||
}
|
||||
|
||||
data, err := geojson.UnmarshalFeatureCollection(body)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("could not read input data")
|
||||
return
|
||||
}
|
||||
for _, feature := range data.Features {
|
||||
if err = b.Store(layer, feature); err != nil {
|
||||
log.Error().Err(err).Msg("could not store data")
|
||||
}
|
||||
}
|
||||
}(errs)
|
||||
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
if nb, err := b.Index.DocCount(); err == nil {
|
||||
log.Info().Uint64("number of documents", nb).Msg("indexed all data")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *AdminBleveIndex) Store(layer string, feature *geojson.Feature) error {
|
||||
id := fmt.Sprintf("%s/%s", layer, feature.Properties.MustString("code"))
|
||||
log.Debug().Str("id", id).Str("name", feature.Properties.MustString("nom", "")).Msg("store data")
|
||||
f, err := feature.MarshalJSON()
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("geojson to json bytes issue")
|
||||
return err
|
||||
}
|
||||
var doc map[string]any
|
||||
|
||||
if err = json.Unmarshal(f, &doc); err != nil {
|
||||
log.Error().Err(err).Msg("json reading issue")
|
||||
return err
|
||||
}
|
||||
err = b.Index.Index(id, doc)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("error adding to index")
|
||||
}
|
||||
|
||||
b.Data[id] = feature
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *AdminBleveIndex) GeoSearch(f *geojson.Feature) (map[string][]*geojson.Feature, error) {
|
||||
coordinates := []float64{f.Point().Lon(), f.Point().Lat()}
|
||||
query, err := bleve.NewGeoShapeQuery(
|
||||
[][][][]float64{{{coordinates}}},
|
||||
geo.PointType,
|
||||
"contains",
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not create query : %v", err)
|
||||
}
|
||||
query.SetField("geometry")
|
||||
search := bleve.NewSearchRequest(query)
|
||||
log.Debug().Any("query", query).Msg("query")
|
||||
res, err := b.Index.Search(search)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("issue in search : %v", err)
|
||||
}
|
||||
log.Debug().Any("result", res).Msg("got result")
|
||||
|
||||
for _, r := range res.Hits {
|
||||
log.Debug().Any("region", b.Data[r.ID].Properties.MustString("nom")).Str("type", b.Data[r.ID].Geometry.GeoJSONType()).Msg("hit")
|
||||
}
|
||||
|
||||
return map[string][]*geojson.Feature{}, nil
|
||||
}
|
||||
|
||||
func (s *AdminBleveIndex) Find(layer, id string) (*geojson.Feature, error) {
|
||||
result, ok := s.Data[fmt.Sprintf("%s/%s", layer, id)]
|
||||
if !ok || result == nil {
|
||||
return nil, errors.New("admin not found")
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
102
handlers/admin/memory_rtree.go
Normal file
102
handlers/admin/memory_rtree.go
Normal file
@@ -0,0 +1,102 @@
|
||||
package admin
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"net/http"
|
||||
|
||||
"github.com/paulmach/orb"
|
||||
"github.com/paulmach/orb/geojson"
|
||||
"github.com/paulmach/orb/planar"
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/spf13/viper"
|
||||
"github.com/tidwall/rtree"
|
||||
)
|
||||
|
||||
type AdminMemoryRTreeIndex struct {
|
||||
Layers []string
|
||||
Indexes map[string]*rtree.RTree
|
||||
Documents map[string]map[string]*geojson.Feature
|
||||
}
|
||||
|
||||
func NewAdminMemoryRTreeIndex(cfg *viper.Viper) (*AdminMemoryRTreeIndex, error) {
|
||||
storage := &AdminMemoryRTreeIndex{
|
||||
Layers: []string{},
|
||||
Indexes: map[string]*rtree.RTree{},
|
||||
Documents: map[string]map[string]*geojson.Feature{},
|
||||
}
|
||||
|
||||
layers := cfg.GetStringMapString("data.layers")
|
||||
for layer, url := range layers {
|
||||
storage.IndexLayer(layer, url)
|
||||
}
|
||||
|
||||
return storage, nil
|
||||
}
|
||||
|
||||
func (s *AdminMemoryRTreeIndex) IndexLayer(layer, url string) error {
|
||||
log.Info().Str("layer", layer).Msg("indexing layer")
|
||||
resp, err := http.Get(url)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("get request failed")
|
||||
return err
|
||||
}
|
||||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("cound not read response")
|
||||
return err
|
||||
}
|
||||
|
||||
data, err := geojson.UnmarshalFeatureCollection(body)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("could not read input data")
|
||||
return err
|
||||
}
|
||||
s.Indexes[layer] = &rtree.RTree{}
|
||||
s.Documents[layer] = map[string]*geojson.Feature{}
|
||||
s.Layers = append(s.Layers, layer)
|
||||
for _, feature := range data.Features {
|
||||
id := feature.Properties.MustString("code")
|
||||
bound := feature.Geometry.Bound()
|
||||
s.Indexes[layer].Insert(bound.Min, bound.Max, id)
|
||||
s.Documents[layer][id] = feature
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *AdminMemoryRTreeIndex) GeoSearch(feature *geojson.Feature) (map[string][]*geojson.Feature, error) {
|
||||
results := map[string][]*geojson.Feature{}
|
||||
point := feature.Point()
|
||||
for _, layer := range s.Layers {
|
||||
results[layer] = []*geojson.Feature{}
|
||||
rt := s.Indexes[layer]
|
||||
rt.Search([2]float64{point.Lon(), point.Lat()}, [2]float64{point.Lon(), point.Lat()}, func(min, max [2]float64, id any) bool {
|
||||
resp, found := s.Documents[layer][id.(string)]
|
||||
if !found {
|
||||
return false
|
||||
}
|
||||
if poly, ok := resp.Geometry.(orb.Polygon); ok {
|
||||
if planar.PolygonContains(poly, point) {
|
||||
results[layer] = append(results[layer], resp)
|
||||
return true
|
||||
}
|
||||
} else if multipoly, ok := resp.Geometry.(orb.MultiPolygon); ok {
|
||||
if planar.MultiPolygonContains(multipoly, point) {
|
||||
results[layer] = append(results[layer], resp)
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
})
|
||||
}
|
||||
return results, nil
|
||||
}
|
||||
|
||||
func (s *AdminMemoryRTreeIndex) Find(layer, id string) (*geojson.Feature, error) {
|
||||
result, ok := s.Documents["layer"]["id"]
|
||||
if !ok || result == nil {
|
||||
return nil, errors.New("admin not found")
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
119
handlers/admin/memory_s2.go
Normal file
119
handlers/admin/memory_s2.go
Normal file
@@ -0,0 +1,119 @@
|
||||
package admin
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"net/http"
|
||||
|
||||
"github.com/blevesearch/geo/s2"
|
||||
"github.com/paulmach/orb"
|
||||
"github.com/paulmach/orb/geojson"
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/spf13/viper"
|
||||
)
|
||||
|
||||
type AdminMemoryS2Index struct {
|
||||
Layers []string
|
||||
Indexes map[string]*s2.ShapeIndex
|
||||
Documents map[string]map[string]*geojson.Feature
|
||||
}
|
||||
|
||||
func NewAdminMemoryS2Index(cfg *viper.Viper) (*AdminMemoryS2Index, error) {
|
||||
storage := &AdminMemoryS2Index{
|
||||
Indexes: map[string]*s2.ShapeIndex{},
|
||||
Documents: map[string]map[string]*geojson.Feature{},
|
||||
}
|
||||
|
||||
layers := cfg.GetStringMapString("data.layers")
|
||||
for layer, url := range layers {
|
||||
storage.IndexLayer(layer, url)
|
||||
}
|
||||
|
||||
return storage, nil
|
||||
}
|
||||
|
||||
func (s *AdminMemoryS2Index) IndexLayer(layer, url string) error {
|
||||
resp, err := http.Get(url)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("get request failed")
|
||||
return err
|
||||
}
|
||||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("cound not read response")
|
||||
return err
|
||||
}
|
||||
|
||||
data, err := geojson.UnmarshalFeatureCollection(body)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("could not read input data")
|
||||
return err
|
||||
}
|
||||
s.Indexes[layer] = s2.NewShapeIndex()
|
||||
s.Documents[layer] = map[string]*geojson.Feature{}
|
||||
s.Layers = append(s.Layers, layer)
|
||||
for _, feature := range data.Features {
|
||||
if feature.Geometry.GeoJSONType() == "Polygon" {
|
||||
poly, ok := feature.Geometry.(orb.Polygon)
|
||||
if !ok {
|
||||
log.Error().Str("name", feature.Properties.MustString("nom")).Msg("could not index layer")
|
||||
break
|
||||
}
|
||||
s2Poly, err := orbToS2Polygon(&poly)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Str("featurename", feature.Properties.MustString("nom")).Msg("could not convert polygon to S2")
|
||||
break
|
||||
}
|
||||
|
||||
s.Indexes[layer].Add(s2Poly)
|
||||
|
||||
s.Documents[layer][feature.Properties.MustString("code")] = feature
|
||||
}
|
||||
}
|
||||
s.Indexes[layer].Build()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *AdminMemoryS2Index) GeoSearch(feature *geojson.Feature) (map[string][]*geojson.Feature, error) {
|
||||
// results := map[string][]*geojson.Feature
|
||||
// point := feature.Point()
|
||||
// for _, layer := s.Layers {
|
||||
// query := s2.ContainsPointQuery{s.Indexes[layer], s2.VertexModelClosed}
|
||||
// p := s2.PointFromLatLng(s2.LatLngFromDegrees(point.Lat(), point.Lon()))
|
||||
// shapes := query.ContainingShapes(p)
|
||||
// features := slices.Collect(func(yield func(*geojson.Feature)){
|
||||
// for _, sh := range shapes {
|
||||
// sh.
|
||||
// if !yield()
|
||||
// }
|
||||
// })
|
||||
// }
|
||||
return nil, errors.New("unimplemented")
|
||||
}
|
||||
|
||||
func orbToS2Polygon(poly *orb.Polygon) (*s2.Polygon, error) {
|
||||
loops := []*s2.Loop{}
|
||||
if poly == nil {
|
||||
return nil, errors.New("polygon is nil")
|
||||
}
|
||||
for _, ring := range *poly {
|
||||
points := []s2.Point{}
|
||||
for _, point := range ring {
|
||||
points = append(points, s2.PointFromLatLng(s2.LatLngFromDegrees(point.Lat(), point.Lon())))
|
||||
}
|
||||
loop := s2.LoopFromPoints(points)
|
||||
loops = append(loops, loop)
|
||||
}
|
||||
polygon := s2.PolygonFromLoops(loops)
|
||||
|
||||
return polygon, nil
|
||||
}
|
||||
|
||||
func (s *AdminMemoryS2Index) Find(layer, id string) (*geojson.Feature, error) {
|
||||
result, ok := s.Documents["layer"][id]
|
||||
if !ok || result == nil {
|
||||
return nil, errors.New("admin not found")
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
25
handlers/admin/storage.go
Normal file
25
handlers/admin/storage.go
Normal file
@@ -0,0 +1,25 @@
|
||||
package admin
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/paulmach/orb/geojson"
|
||||
"github.com/spf13/viper"
|
||||
)
|
||||
|
||||
type AdminIndex interface {
|
||||
GeoSearch(*geojson.Feature) (map[string][]*geojson.Feature, error)
|
||||
Find(layer, id string) (*geojson.Feature, error)
|
||||
}
|
||||
|
||||
func NewAdminIndex(cfg *viper.Viper) (AdminIndex, error) {
|
||||
storage_type := cfg.GetString("storage.index.type")
|
||||
|
||||
if storage_type == "bleve" {
|
||||
return NewAdminBleveIndex(cfg)
|
||||
} else if storage_type == "memory_rtree" {
|
||||
return NewAdminMemoryRTreeIndex(cfg)
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("unknown storage type : %v", storage_type)
|
||||
}
|
||||
Reference in New Issue
Block a user