events.go
package content
import (
"sync"
)
// EventType represents the type of content event.
type EventType string
const (
EventCreate EventType = "create"
EventUpdate EventType = "update"
EventDelete EventType = "delete"
)
// EntityType represents the type of entity that changed.
type EntityType string
const (
EntityPage EntityType = "page"
EntityCollection EntityType = "collection"
EntityDocument EntityType = "document"
)
// Event represents a content change event.
type Event struct {
Type EventType `json:"type"`
EntityType EntityType `json:"entityType,omitempty"` // page, collection, document
CollectionID string `json:"collectionId,omitempty"` // For collection/document events
RecordID string `json:"recordId,omitempty"` // For document events
PageID string `json:"pageId,omitempty"` // For page events
Record map[string]any `json:"record,omitempty"` // Full record for create/update, nil for delete
}
// Subscriber represents a channel that receives events.
type Subscriber struct {
ID string
CollectionID string // Empty string means subscribe to all events (admin subscription)
Events chan Event
closed bool
mu sync.Mutex
}
// Close closes the subscriber's event channel.
func (s *Subscriber) Close() {
s.mu.Lock()
defer s.mu.Unlock()
if !s.closed {
s.closed = true
close(s.Events)
}
}
// IsClosed returns true if the subscriber is closed.
func (s *Subscriber) IsClosed() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.closed
}
// Bus manages event subscriptions and publishing.
type Bus struct {
mu sync.RWMutex
subscribers map[string][]*Subscriber // keyed by collectionID
adminSubs []*Subscriber // subscribers to all events
nextID int
}
// NewBus creates a new event bus.
func NewBus() *Bus {
return &Bus{
subscribers: make(map[string][]*Subscriber),
adminSubs: make([]*Subscriber, 0),
}
}
// Subscribe creates a new subscriber for a collection.
func (b *Bus) Subscribe(collectionID string) *Subscriber {
b.mu.Lock()
defer b.mu.Unlock()
b.nextID++
sub := &Subscriber{
ID: string(rune(b.nextID)),
CollectionID: collectionID,
Events: make(chan Event, 100), // Buffered to prevent blocking
}
b.subscribers[collectionID] = append(b.subscribers[collectionID], sub)
return sub
}
// SubscribeAll creates a subscriber that receives all events (for admin dashboard).
func (b *Bus) SubscribeAll() *Subscriber {
b.mu.Lock()
defer b.mu.Unlock()
b.nextID++
sub := &Subscriber{
ID: string(rune(b.nextID)),
CollectionID: "", // Empty means all events
Events: make(chan Event, 100),
}
b.adminSubs = append(b.adminSubs, sub)
return sub
}
// Unsubscribe removes a subscriber.
func (b *Bus) Unsubscribe(sub *Subscriber) {
b.mu.Lock()
defer b.mu.Unlock()
// Check if it's an admin subscriber
if sub.CollectionID == "" {
for i, s := range b.adminSubs {
if s == sub {
b.adminSubs = append(b.adminSubs[:i], b.adminSubs[i+1:]...)
sub.Close()
return
}
}
return
}
// Regular collection subscriber
subs := b.subscribers[sub.CollectionID]
for i, s := range subs {
if s == sub {
// Remove from slice
b.subscribers[sub.CollectionID] = append(subs[:i], subs[i+1:]...)
sub.Close()
return
}
}
}
// Publish sends an event to all subscribers of a collection.
func (b *Bus) Publish(event Event) {
b.mu.RLock()
subs := b.subscribers[event.CollectionID]
adminSubs := b.adminSubs
b.mu.RUnlock()
// Send to collection-specific subscribers
for _, sub := range subs {
if sub.IsClosed() {
continue
}
// Non-blocking send - drop if buffer is full
select {
case sub.Events <- event:
default:
// Buffer full, skip this event for this subscriber
}
}
// Send to admin subscribers (all events)
for _, sub := range adminSubs {
if sub.IsClosed() {
continue
}
select {
case sub.Events <- event:
default:
}
}
}
// PublishCreate publishes a create event for a document.
func (b *Bus) PublishCreate(collectionID, recordID string, record map[string]any) {
b.Publish(Event{
Type: EventCreate,
EntityType: EntityDocument,
CollectionID: collectionID,
RecordID: recordID,
Record: record,
})
}
// PublishUpdate publishes an update event for a document.
func (b *Bus) PublishUpdate(collectionID, recordID string, record map[string]any) {
b.Publish(Event{
Type: EventUpdate,
EntityType: EntityDocument,
CollectionID: collectionID,
RecordID: recordID,
Record: record,
})
}
// PublishDelete publishes a delete event.
func (b *Bus) PublishDelete(collectionID, recordID string) {
b.Publish(Event{
Type: EventDelete,
EntityType: EntityDocument,
CollectionID: collectionID,
RecordID: recordID,
})
}
// PublishPageCreate publishes a page creation event.
func (b *Bus) PublishPageCreate(pageID string, data map[string]any) {
b.Publish(Event{
Type: EventCreate,
EntityType: EntityPage,
PageID: pageID,
Record: data,
})
}
// PublishPageUpdate publishes a page update event.
func (b *Bus) PublishPageUpdate(pageID string, data map[string]any) {
b.Publish(Event{
Type: EventUpdate,
EntityType: EntityPage,
PageID: pageID,
Record: data,
})
}
// PublishPageDelete publishes a page deletion event.
func (b *Bus) PublishPageDelete(pageID string) {
b.Publish(Event{
Type: EventDelete,
EntityType: EntityPage,
PageID: pageID,
})
}
// PublishCollectionCreate publishes a collection creation event.
func (b *Bus) PublishCollectionCreate(collectionID string, data map[string]any) {
b.Publish(Event{
Type: EventCreate,
EntityType: EntityCollection,
CollectionID: collectionID,
Record: data,
})
}
// PublishCollectionUpdate publishes a collection update event.
func (b *Bus) PublishCollectionUpdate(collectionID string, data map[string]any) {
b.Publish(Event{
Type: EventUpdate,
EntityType: EntityCollection,
CollectionID: collectionID,
Record: data,
})
}
// PublishCollectionDelete publishes a collection deletion event.
func (b *Bus) PublishCollectionDelete(collectionID string) {
b.Publish(Event{
Type: EventDelete,
EntityType: EntityCollection,
CollectionID: collectionID,
})
}
// SubscriberCount returns the number of subscribers for a collection.
func (b *Bus) SubscriberCount(collectionID string) int {
b.mu.RLock()
defer b.mu.RUnlock()
return len(b.subscribers[collectionID])
}
// Cleanup removes closed subscribers.
func (b *Bus) Cleanup() {
b.mu.Lock()
defer b.mu.Unlock()
for collectionID, subs := range b.subscribers {
var active []*Subscriber
for _, sub := range subs {
if !sub.IsClosed() {
active = append(active, sub)
}
}
b.subscribers[collectionID] = active
}
// Clean up admin subscribers
var activeAdmin []*Subscriber
for _, sub := range b.adminSubs {
if !sub.IsClosed() {
activeAdmin = append(activeAdmin, sub)
}
}
b.adminSubs = activeAdmin
}
// CollectionEvents is the global event bus for collection changes.
var CollectionEvents = NewBus()