Skip to content

Commit 63bd573

Browse files
hmansclaude
andcommitted
feat: add channel-based file watcher with typed events
- Add EventType (Created, Updated, Deleted) and BeanEvent types - Implement Subscribe() returning event channel + unsubscribe func - Fan-out pattern: multiple subscribers each get their own channel - Non-blocking sends prevent slow subscribers from blocking others - Add StartWatching() as preferred API (Watch() kept for compat) - Migrate TUI to use channel-based subscription - Add comprehensive tests for subscription and event detection 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]>
1 parent da583d3 commit 63bd573

File tree

4 files changed

+377
-13
lines changed

4 files changed

+377
-13
lines changed

internal/beancore/core.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,12 @@ type Core struct {
3939
// File watching (optional)
4040
watching bool
4141
done chan struct{}
42-
onChange func() // callback when beans change
42+
onChange func() // callback when beans change (legacy API)
43+
44+
// Event subscribers (for channel-based API)
45+
subscribers map[uint64]*subscription
46+
subMu sync.RWMutex
47+
nextSubID uint64
4348

4449
// Warning logger for non-fatal errors (defaults to stderr)
4550
warnWriter io.Writer
@@ -48,10 +53,11 @@ type Core struct {
4853
// New creates a new Core with the given root path and configuration.
4954
func New(root string, cfg *config.Config) *Core {
5055
return &Core{
51-
root: root,
52-
config: cfg,
53-
beans: make(map[string]*bean.Bean),
54-
warnWriter: os.Stderr,
56+
root: root,
57+
config: cfg,
58+
beans: make(map[string]*bean.Bean),
59+
subscribers: make(map[uint64]*subscription),
60+
warnWriter: os.Stderr,
5561
}
5662
}
5763

internal/beancore/core_test.go

Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -644,3 +644,210 @@ func TestClose(t *testing.T) {
644644
t.Errorf("Close() error = %v", err)
645645
}
646646
}
647+
648+
func TestSubscribe(t *testing.T) {
649+
core, beansDir := setupTestCore(t)
650+
651+
// Start watching
652+
if err := core.StartWatching(); err != nil {
653+
t.Fatalf("StartWatching() error = %v", err)
654+
}
655+
defer core.Unwatch()
656+
657+
// Subscribe to events
658+
ch, unsub := core.Subscribe()
659+
defer unsub()
660+
661+
// Give watcher time to start
662+
time.Sleep(50 * time.Millisecond)
663+
664+
// Create a bean file (should trigger EventCreated)
665+
content := `---
666+
title: New Bean
667+
status: todo
668+
---
669+
`
670+
if err := os.WriteFile(filepath.Join(beansDir, "new1--new.md"), []byte(content), 0644); err != nil {
671+
t.Fatalf("failed to write test file: %v", err)
672+
}
673+
674+
// Wait for events
675+
select {
676+
case events := <-ch:
677+
if len(events) == 0 {
678+
t.Error("expected at least one event")
679+
}
680+
found := false
681+
for _, e := range events {
682+
if e.Type == EventCreated && e.BeanID == "new1" {
683+
found = true
684+
if e.Bean == nil {
685+
t.Error("EventCreated should include Bean")
686+
}
687+
}
688+
}
689+
if !found {
690+
t.Errorf("expected EventCreated for new1, got: %+v", events)
691+
}
692+
case <-time.After(500 * time.Millisecond):
693+
t.Error("timeout waiting for events")
694+
}
695+
}
696+
697+
func TestSubscribeMultiple(t *testing.T) {
698+
core, beansDir := setupTestCore(t)
699+
700+
if err := core.StartWatching(); err != nil {
701+
t.Fatalf("StartWatching() error = %v", err)
702+
}
703+
defer core.Unwatch()
704+
705+
// Create two subscribers
706+
ch1, unsub1 := core.Subscribe()
707+
defer unsub1()
708+
ch2, unsub2 := core.Subscribe()
709+
defer unsub2()
710+
711+
// Give watcher time to start
712+
time.Sleep(50 * time.Millisecond)
713+
714+
// Create a bean file
715+
content := `---
716+
title: Multi Test
717+
status: todo
718+
---
719+
`
720+
if err := os.WriteFile(filepath.Join(beansDir, "mult--multi.md"), []byte(content), 0644); err != nil {
721+
t.Fatalf("failed to write test file: %v", err)
722+
}
723+
724+
// Both subscribers should receive events
725+
received1, received2 := false, false
726+
timeout := time.After(500 * time.Millisecond)
727+
728+
for !received1 || !received2 {
729+
select {
730+
case <-ch1:
731+
received1 = true
732+
case <-ch2:
733+
received2 = true
734+
case <-timeout:
735+
t.Fatalf("timeout: received1=%v, received2=%v", received1, received2)
736+
}
737+
}
738+
}
739+
740+
func TestUnsubscribe(t *testing.T) {
741+
core, _ := setupTestCore(t)
742+
743+
if err := core.StartWatching(); err != nil {
744+
t.Fatalf("StartWatching() error = %v", err)
745+
}
746+
defer core.Unwatch()
747+
748+
ch, unsub := core.Subscribe()
749+
unsub()
750+
751+
// Channel should be closed
752+
_, ok := <-ch
753+
if ok {
754+
t.Error("expected channel to be closed after unsubscribe")
755+
}
756+
}
757+
758+
func TestEventTypes(t *testing.T) {
759+
core, beansDir := setupTestCore(t)
760+
761+
// Create an initial bean
762+
createTestBean(t, core, "evt1", "Event Test", "todo")
763+
764+
if err := core.StartWatching(); err != nil {
765+
t.Fatalf("StartWatching() error = %v", err)
766+
}
767+
defer core.Unwatch()
768+
769+
ch, unsub := core.Subscribe()
770+
defer unsub()
771+
772+
// Give watcher time to start
773+
time.Sleep(50 * time.Millisecond)
774+
775+
t.Run("update event", func(t *testing.T) {
776+
// Modify the existing bean file
777+
content := `---
778+
title: Updated Title
779+
status: in-progress
780+
---
781+
`
782+
if err := os.WriteFile(filepath.Join(beansDir, "evt1--event-test.md"), []byte(content), 0644); err != nil {
783+
t.Fatalf("failed to write test file: %v", err)
784+
}
785+
786+
select {
787+
case events := <-ch:
788+
found := false
789+
for _, e := range events {
790+
if e.Type == EventUpdated && e.BeanID == "evt1" {
791+
found = true
792+
if e.Bean == nil {
793+
t.Error("EventUpdated should include Bean")
794+
}
795+
if e.Bean.Title != "Updated Title" {
796+
t.Errorf("expected updated title, got %q", e.Bean.Title)
797+
}
798+
}
799+
}
800+
if !found {
801+
t.Errorf("expected EventUpdated for evt1, got: %+v", events)
802+
}
803+
case <-time.After(500 * time.Millisecond):
804+
t.Error("timeout waiting for update event")
805+
}
806+
})
807+
808+
t.Run("delete event", func(t *testing.T) {
809+
// Delete the bean file
810+
if err := os.Remove(filepath.Join(beansDir, "evt1--event-test.md")); err != nil {
811+
t.Fatalf("failed to delete file: %v", err)
812+
}
813+
814+
select {
815+
case events := <-ch:
816+
found := false
817+
for _, e := range events {
818+
if e.Type == EventDeleted && e.BeanID == "evt1" {
819+
found = true
820+
if e.Bean != nil {
821+
t.Error("EventDeleted should have nil Bean")
822+
}
823+
}
824+
}
825+
if !found {
826+
t.Errorf("expected EventDeleted for evt1, got: %+v", events)
827+
}
828+
case <-time.After(500 * time.Millisecond):
829+
t.Error("timeout waiting for delete event")
830+
}
831+
})
832+
}
833+
834+
func TestSubscribersClosedOnUnwatch(t *testing.T) {
835+
core, _ := setupTestCore(t)
836+
837+
if err := core.StartWatching(); err != nil {
838+
t.Fatalf("StartWatching() error = %v", err)
839+
}
840+
841+
ch, _ := core.Subscribe() // Note: not calling unsub
842+
843+
// Unwatch should close subscriber channels
844+
if err := core.Unwatch(); err != nil {
845+
t.Fatalf("Unwatch() error = %v", err)
846+
}
847+
848+
// Channel should be closed
849+
_, ok := <-ch
850+
if ok {
851+
t.Error("expected channel to be closed after Unwatch")
852+
}
853+
}

0 commit comments

Comments
 (0)