port "ari is learning!" assets to live go backend. some prep work for further developments for ari melody LIVE
74 lines
1.6 KiB
Go
74 lines
1.6 KiB
Go
package broadcast
|
|
|
|
import "context"
|
|
|
|
type BroadcastChannel[T comparable] interface {
|
|
Subscribe() <-chan T
|
|
Cancel(<-chan T)
|
|
}
|
|
|
|
type broadcastChannel[T comparable] struct {
|
|
source <-chan T
|
|
listeners []chan T
|
|
addListener chan chan T
|
|
removeListener chan (<-chan T)
|
|
}
|
|
|
|
func (b *broadcastChannel[T]) Subscribe() <-chan T {
|
|
newListener := make(chan T)
|
|
b.addListener <- newListener
|
|
return newListener
|
|
}
|
|
|
|
func (b *broadcastChannel[T]) Cancel(channel <-chan T) {
|
|
b.removeListener <- channel
|
|
}
|
|
|
|
func NewBroadcastChannel[T comparable](ctx context.Context, source <-chan T) BroadcastChannel[T] {
|
|
service := &broadcastChannel[T]{
|
|
source: source,
|
|
listeners: make([]chan T, 0),
|
|
addListener: make(chan chan T),
|
|
removeListener: make(chan (<-chan T)),
|
|
}
|
|
go service.serve(ctx)
|
|
return service
|
|
}
|
|
|
|
func (b *broadcastChannel[T]) serve(ctx context.Context) {
|
|
defer func() {
|
|
for _, listener := range b.listeners {
|
|
if listener != nil {
|
|
close(listener)
|
|
}
|
|
}
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case newListener := <-b.addListener:
|
|
b.listeners = append(b.listeners, newListener)
|
|
case removeListener := <-b.removeListener:
|
|
for i, ch := range b.listeners {
|
|
if ch == removeListener {
|
|
b.listeners[i] = b.listeners[len(b.listeners) - 1]
|
|
b.listeners = b.listeners[:len(b.listeners) - 1]
|
|
close(ch)
|
|
break
|
|
}
|
|
}
|
|
case val, ok := <-b.source:
|
|
if !ok { return }
|
|
for _, listener := range b.listeners {
|
|
if listener == nil { continue }
|
|
select {
|
|
case listener <- val:
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|