stream-tools/broadcast/broadcast.go
ari melody 6927d54cbd
first commit! 🎉
port "ari is learning!" assets to live go backend.
some prep work for further developments for ari melody LIVE
2026-06-12 04:50:45 +01:00

74 lines
1.6 KiB
Go

package broadcast
import "context"
type BroadcastChannel[T comparable] interface {
Subscribe() <-chan T
Cancel(<-chan T)
}
type broadcastChannel[T comparable] struct {
source <-chan T
listeners []chan T
addListener chan chan T
removeListener chan (<-chan T)
}
func (b *broadcastChannel[T]) Subscribe() <-chan T {
newListener := make(chan T)
b.addListener <- newListener
return newListener
}
func (b *broadcastChannel[T]) Cancel(channel <-chan T) {
b.removeListener <- channel
}
func NewBroadcastChannel[T comparable](ctx context.Context, source <-chan T) BroadcastChannel[T] {
service := &broadcastChannel[T]{
source: source,
listeners: make([]chan T, 0),
addListener: make(chan chan T),
removeListener: make(chan (<-chan T)),
}
go service.serve(ctx)
return service
}
func (b *broadcastChannel[T]) serve(ctx context.Context) {
defer func() {
for _, listener := range b.listeners {
if listener != nil {
close(listener)
}
}
}()
for {
select {
case <-ctx.Done():
return
case newListener := <-b.addListener:
b.listeners = append(b.listeners, newListener)
case removeListener := <-b.removeListener:
for i, ch := range b.listeners {
if ch == removeListener {
b.listeners[i] = b.listeners[len(b.listeners) - 1]
b.listeners = b.listeners[:len(b.listeners) - 1]
close(ch)
break
}
}
case val, ok := <-b.source:
if !ok { return }
for _, listener := range b.listeners {
if listener == nil { continue }
select {
case listener <- val:
case <-ctx.Done():
return
}
}
}
}
}