diff --git a/rag/persistency.go b/rag/persistency.go index 9cabda9..9462fe8 100644 --- a/rag/persistency.go +++ b/rag/persistency.go @@ -19,7 +19,7 @@ import ( // CollectionState represents the persistent state of a collection type CollectionState struct { - ExternalSources []ExternalSource `json:"external_sources"` + ExternalSources []*ExternalSource `json:"external_sources"` Index map[string][]engine.Result `json:"index"` } @@ -29,7 +29,7 @@ type PersistentKB struct { path string assetDir string maxChunkSize int - sources []ExternalSource + sources []*ExternalSource index map[string][]engine.Result } @@ -48,7 +48,7 @@ func loadDB(path string) (*CollectionState, error) { if err := json.Unmarshal(data, &legacyFiles); err != nil { return nil, err } - state.ExternalSources = []ExternalSource{} + state.ExternalSources = []*ExternalSource{} state.Index = map[string][]engine.Result{} } @@ -68,7 +68,7 @@ func NewPersistentCollectionKB(stateFile, assetDir string, store Engine, maxChun Engine: store, assetDir: assetDir, maxChunkSize: maxChunkSize, - sources: []ExternalSource{}, + sources: []*ExternalSource{}, index: map[string][]engine.Result{}, } persistentKB.Lock() @@ -104,7 +104,7 @@ func (db *PersistentKB) Reset() error { for f := range db.index { os.Remove(filepath.Join(db.assetDir, f)) } - db.sources = []ExternalSource{} + db.sources = []*ExternalSource{} db.index = map[string][]engine.Result{} db.save() db.Unlock() @@ -368,14 +368,14 @@ func chunkFile(fpath string, maxchunksize int) ([]string, error) { } // GetExternalSources returns the list of external sources for this collection -func (db *PersistentKB) GetExternalSources() []ExternalSource { +func (db *PersistentKB) GetExternalSources() []*ExternalSource { db.Lock() defer db.Unlock() return db.sources } // AddExternalSource adds an external source to the collection -func (db *PersistentKB) AddExternalSource(source ExternalSource) error { +func (db *PersistentKB) AddExternalSource(source *ExternalSource) error { db.Lock() defer db.Unlock() diff --git a/rag/source_manager.go b/rag/source_manager.go index 6bf014d..a201937 100644 --- a/rag/source_manager.go +++ b/rag/source_manager.go @@ -22,8 +22,8 @@ type ExternalSource struct { // SourceManager manages external sources for collections type SourceManager struct { - sources map[string][]ExternalSource // collection name -> sources - collections map[string]*PersistentKB // collection name -> collection + sources map[string][]*ExternalSource // collection name -> sources + collections map[string]*PersistentKB // collection name -> collection mu sync.RWMutex ctx context.Context cancel context.CancelFunc @@ -33,7 +33,7 @@ type SourceManager struct { func NewSourceManager() *SourceManager { ctx, cancel := context.WithCancel(context.Background()) return &SourceManager{ - sources: make(map[string][]ExternalSource), + sources: make(map[string][]*ExternalSource), collections: make(map[string]*PersistentKB), ctx: ctx, cancel: cancel, @@ -72,14 +72,14 @@ func (sm *SourceManager) AddSource(collectionName, url string, updateInterval ti } // Add the source to the collection's persistent storage - if err := collection.AddExternalSource(source); err != nil { + if err := collection.AddExternalSource(&source); err != nil { return err } - sm.sources[collectionName] = append(sm.sources[collectionName], source) + sm.sources[collectionName] = append(sm.sources[collectionName], &source) // Trigger an immediate update - go sm.updateSource(collectionName, source, collection) + go sm.updateSource(collectionName, &source, collection) return nil } @@ -116,7 +116,10 @@ func (sm *SourceManager) RemoveSource(collectionName, url string) error { } // updateSource updates a single source -func (sm *SourceManager) updateSource(collectionName string, source ExternalSource, collection *PersistentKB) { +func (sm *SourceManager) updateSource(collectionName string, source *ExternalSource, collection *PersistentKB) { + + // update LastUpdate + source.LastUpdate = time.Now() xlog.Info("Updating source", "url", source.URL) content, err := sources.SourceRouter(source.URL) diff --git a/test/e2e/persistency_test.go b/test/e2e/persistency_test.go index 72a3279..82d8773 100644 --- a/test/e2e/persistency_test.go +++ b/test/e2e/persistency_test.go @@ -123,7 +123,7 @@ var _ = Describe("Persistency", func() { LastUpdate: time.Now(), } - err := kb.AddExternalSource(source) + err := kb.AddExternalSource(&source) Expect(err).To(BeNil()) sources := kb.GetExternalSources() @@ -144,10 +144,10 @@ var _ = Describe("Persistency", func() { LastUpdate: time.Now(), } - err := kb.AddExternalSource(source) + err := kb.AddExternalSource(&source) Expect(err).To(BeNil()) - err = kb.AddExternalSource(source) + err = kb.AddExternalSource(&source) Expect(err).ToNot(BeNil()) }) }) diff --git a/test/e2e/source_manager_test.go b/test/e2e/source_manager_test.go index b97b6dd..52cdf12 100644 --- a/test/e2e/source_manager_test.go +++ b/test/e2e/source_manager_test.go @@ -76,7 +76,7 @@ var _ = Describe("SourceManager", func() { UpdateInterval: DefaultUpdateInterval, LastUpdate: time.Now(), } - err := kb.AddExternalSource(source) + err := kb.AddExternalSource(&source) Expect(err).To(BeNil()) // Register the collection @@ -142,7 +142,7 @@ var _ = Describe("SourceManager", func() { sourceManager.Start() // Wait for at least one update cycle and verify the source was updated - Eventually(func() []rag.ExternalSource { + Eventually(func() []*rag.ExternalSource { return kb.GetExternalSources() }, TestTimeout, TestPollingInterval).Should(HaveLen(1))