readysite / website / internal / content / events.go
7.1 KB
events.go
package content

import (
	"sync"
)

// EventType represents the type of content event.
type EventType string

const (
	EventCreate EventType = "create"
	EventUpdate EventType = "update"
	EventDelete EventType = "delete"
)

// EntityType represents the type of entity that changed.
type EntityType string

const (
	EntityPage       EntityType = "page"
	EntityCollection EntityType = "collection"
	EntityDocument   EntityType = "document"
)

// Event represents a content change event.
type Event struct {
	Type         EventType      `json:"type"`
	EntityType   EntityType     `json:"entityType,omitempty"`   // page, collection, document
	CollectionID string         `json:"collectionId,omitempty"` // For collection/document events
	RecordID     string         `json:"recordId,omitempty"`     // For document events
	PageID       string         `json:"pageId,omitempty"`       // For page events
	Record       map[string]any `json:"record,omitempty"`       // Full record for create/update, nil for delete
}

// Subscriber represents a channel that receives events.
type Subscriber struct {
	ID           string
	CollectionID string // Empty string means subscribe to all events (admin subscription)
	Events       chan Event
	closed       bool
	mu           sync.Mutex
}

// Close closes the subscriber's event channel.
func (s *Subscriber) Close() {
	s.mu.Lock()
	defer s.mu.Unlock()
	if !s.closed {
		s.closed = true
		close(s.Events)
	}
}

// IsClosed returns true if the subscriber is closed.
func (s *Subscriber) IsClosed() bool {
	s.mu.Lock()
	defer s.mu.Unlock()
	return s.closed
}

// Bus manages event subscriptions and publishing.
type Bus struct {
	mu              sync.RWMutex
	subscribers     map[string][]*Subscriber // keyed by collectionID
	adminSubs       []*Subscriber            // subscribers to all events
	nextID          int
}

// NewBus creates a new event bus.
func NewBus() *Bus {
	return &Bus{
		subscribers: make(map[string][]*Subscriber),
		adminSubs:   make([]*Subscriber, 0),
	}
}

// Subscribe creates a new subscriber for a collection.
func (b *Bus) Subscribe(collectionID string) *Subscriber {
	b.mu.Lock()
	defer b.mu.Unlock()

	b.nextID++
	sub := &Subscriber{
		ID:           string(rune(b.nextID)),
		CollectionID: collectionID,
		Events:       make(chan Event, 100), // Buffered to prevent blocking
	}

	b.subscribers[collectionID] = append(b.subscribers[collectionID], sub)
	return sub
}

// SubscribeAll creates a subscriber that receives all events (for admin dashboard).
func (b *Bus) SubscribeAll() *Subscriber {
	b.mu.Lock()
	defer b.mu.Unlock()

	b.nextID++
	sub := &Subscriber{
		ID:           string(rune(b.nextID)),
		CollectionID: "", // Empty means all events
		Events:       make(chan Event, 100),
	}

	b.adminSubs = append(b.adminSubs, sub)
	return sub
}

// Unsubscribe removes a subscriber.
func (b *Bus) Unsubscribe(sub *Subscriber) {
	b.mu.Lock()
	defer b.mu.Unlock()

	// Check if it's an admin subscriber
	if sub.CollectionID == "" {
		for i, s := range b.adminSubs {
			if s == sub {
				b.adminSubs = append(b.adminSubs[:i], b.adminSubs[i+1:]...)
				sub.Close()
				return
			}
		}
		return
	}

	// Regular collection subscriber
	subs := b.subscribers[sub.CollectionID]
	for i, s := range subs {
		if s == sub {
			// Remove from slice
			b.subscribers[sub.CollectionID] = append(subs[:i], subs[i+1:]...)
			sub.Close()
			return
		}
	}
}

// Publish sends an event to all subscribers of a collection.
func (b *Bus) Publish(event Event) {
	b.mu.RLock()
	subs := b.subscribers[event.CollectionID]
	adminSubs := b.adminSubs
	b.mu.RUnlock()

	// Send to collection-specific subscribers
	for _, sub := range subs {
		if sub.IsClosed() {
			continue
		}

		// Non-blocking send - drop if buffer is full
		select {
		case sub.Events <- event:
		default:
			// Buffer full, skip this event for this subscriber
		}
	}

	// Send to admin subscribers (all events)
	for _, sub := range adminSubs {
		if sub.IsClosed() {
			continue
		}

		select {
		case sub.Events <- event:
		default:
		}
	}
}

// PublishCreate publishes a create event for a document.
func (b *Bus) PublishCreate(collectionID, recordID string, record map[string]any) {
	b.Publish(Event{
		Type:         EventCreate,
		EntityType:   EntityDocument,
		CollectionID: collectionID,
		RecordID:     recordID,
		Record:       record,
	})
}

// PublishUpdate publishes an update event for a document.
func (b *Bus) PublishUpdate(collectionID, recordID string, record map[string]any) {
	b.Publish(Event{
		Type:         EventUpdate,
		EntityType:   EntityDocument,
		CollectionID: collectionID,
		RecordID:     recordID,
		Record:       record,
	})
}

// PublishDelete publishes a delete event.
func (b *Bus) PublishDelete(collectionID, recordID string) {
	b.Publish(Event{
		Type:         EventDelete,
		EntityType:   EntityDocument,
		CollectionID: collectionID,
		RecordID:     recordID,
	})
}

// PublishPageCreate publishes a page creation event.
func (b *Bus) PublishPageCreate(pageID string, data map[string]any) {
	b.Publish(Event{
		Type:       EventCreate,
		EntityType: EntityPage,
		PageID:     pageID,
		Record:     data,
	})
}

// PublishPageUpdate publishes a page update event.
func (b *Bus) PublishPageUpdate(pageID string, data map[string]any) {
	b.Publish(Event{
		Type:       EventUpdate,
		EntityType: EntityPage,
		PageID:     pageID,
		Record:     data,
	})
}

// PublishPageDelete publishes a page deletion event.
func (b *Bus) PublishPageDelete(pageID string) {
	b.Publish(Event{
		Type:       EventDelete,
		EntityType: EntityPage,
		PageID:     pageID,
	})
}

// PublishCollectionCreate publishes a collection creation event.
func (b *Bus) PublishCollectionCreate(collectionID string, data map[string]any) {
	b.Publish(Event{
		Type:         EventCreate,
		EntityType:   EntityCollection,
		CollectionID: collectionID,
		Record:       data,
	})
}

// PublishCollectionUpdate publishes a collection update event.
func (b *Bus) PublishCollectionUpdate(collectionID string, data map[string]any) {
	b.Publish(Event{
		Type:         EventUpdate,
		EntityType:   EntityCollection,
		CollectionID: collectionID,
		Record:       data,
	})
}

// PublishCollectionDelete publishes a collection deletion event.
func (b *Bus) PublishCollectionDelete(collectionID string) {
	b.Publish(Event{
		Type:         EventDelete,
		EntityType:   EntityCollection,
		CollectionID: collectionID,
	})
}

// SubscriberCount returns the number of subscribers for a collection.
func (b *Bus) SubscriberCount(collectionID string) int {
	b.mu.RLock()
	defer b.mu.RUnlock()
	return len(b.subscribers[collectionID])
}

// Cleanup removes closed subscribers.
func (b *Bus) Cleanup() {
	b.mu.Lock()
	defer b.mu.Unlock()

	for collectionID, subs := range b.subscribers {
		var active []*Subscriber
		for _, sub := range subs {
			if !sub.IsClosed() {
				active = append(active, sub)
			}
		}
		b.subscribers[collectionID] = active
	}

	// Clean up admin subscribers
	var activeAdmin []*Subscriber
	for _, sub := range b.adminSubs {
		if !sub.IsClosed() {
			activeAdmin = append(activeAdmin, sub)
		}
	}
	b.adminSubs = activeAdmin
}

// CollectionEvents is the global event bus for collection changes.
var CollectionEvents = NewBus()
← Back