package broadcast import "context" type BroadcastChannel[T comparable] interface { Subscribe() <-chan T Cancel(<-chan T) } type broadcastChannel[T comparable] struct { source <-chan T listeners []chan T addListener chan chan T removeListener chan (<-chan T) } func (b *broadcastChannel[T]) Subscribe() <-chan T { newListener := make(chan T) b.addListener <- newListener return newListener } func (b *broadcastChannel[T]) Cancel(channel <-chan T) { b.removeListener <- channel } func NewBroadcastChannel[T comparable](ctx context.Context, source <-chan T) BroadcastChannel[T] { service := &broadcastChannel[T]{ source: source, listeners: make([]chan T, 0), addListener: make(chan chan T), removeListener: make(chan (<-chan T)), } go service.serve(ctx) return service } func (b *broadcastChannel[T]) serve(ctx context.Context) { defer func() { for _, listener := range b.listeners { if listener != nil { close(listener) } } }() for { select { case <-ctx.Done(): return case newListener := <-b.addListener: b.listeners = append(b.listeners, newListener) case removeListener := <-b.removeListener: for i, ch := range b.listeners { if ch == removeListener { b.listeners[i] = b.listeners[len(b.listeners) - 1] b.listeners = b.listeners[:len(b.listeners) - 1] close(ch) break } } case val, ok := <-b.source: if !ok { return } for _, listener := range b.listeners { if listener == nil { continue } select { case listener <- val: case <-ctx.Done(): return } } } } }