75 lines
1.6 KiB
Go
75 lines
1.6 KiB
Go
|
|
package broadcast
|
||
|
|
|
||
|
|
import "context"
|
||
|
|
|
||
|
|
type BroadcastChannel[T comparable] interface {
|
||
|
|
Subscribe() <-chan T
|
||
|
|
Cancel(<-chan T)
|
||
|
|
}
|
||
|
|
|
||
|
|
type broadcastChannel[T comparable] struct {
|
||
|
|
source <-chan T
|
||
|
|
listeners []chan T
|
||
|
|
addListener chan chan T
|
||
|
|
removeListener chan (<-chan T)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (b *broadcastChannel[T]) Subscribe() <-chan T {
|
||
|
|
newListener := make(chan T)
|
||
|
|
b.addListener <- newListener
|
||
|
|
return newListener
|
||
|
|
}
|
||
|
|
|
||
|
|
func (b *broadcastChannel[T]) Cancel(channel <-chan T) {
|
||
|
|
b.removeListener <- channel
|
||
|
|
}
|
||
|
|
|
||
|
|
func NewBroadcastChannel[T comparable](ctx context.Context, source <-chan T) BroadcastChannel[T] {
|
||
|
|
service := &broadcastChannel[T]{
|
||
|
|
source: source,
|
||
|
|
listeners: make([]chan T, 0),
|
||
|
|
addListener: make(chan chan T),
|
||
|
|
removeListener: make(chan (<-chan T)),
|
||
|
|
}
|
||
|
|
go service.serve(ctx)
|
||
|
|
return service
|
||
|
|
}
|
||
|
|
|
||
|
|
func (b *broadcastChannel[T]) serve(ctx context.Context) {
|
||
|
|
defer func() {
|
||
|
|
for _, listener := range b.listeners {
|
||
|
|
if listener != nil {
|
||
|
|
close(listener)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}()
|
||
|
|
|
||
|
|
for {
|
||
|
|
select {
|
||
|
|
case <-ctx.Done():
|
||
|
|
return
|
||
|
|
case newListener := <-b.addListener:
|
||
|
|
b.listeners = append(b.listeners, newListener)
|
||
|
|
case removeListener := <-b.removeListener:
|
||
|
|
for i, ch := range b.listeners {
|
||
|
|
if ch == removeListener {
|
||
|
|
b.listeners[i] = b.listeners[len(b.listeners) - 1]
|
||
|
|
b.listeners = b.listeners[:len(b.listeners) - 1]
|
||
|
|
close(ch)
|
||
|
|
break
|
||
|
|
}
|
||
|
|
}
|
||
|
|
case val, ok := <-b.source:
|
||
|
|
if !ok { return }
|
||
|
|
for _, listener := range b.listeners {
|
||
|
|
if listener == nil { continue }
|
||
|
|
select {
|
||
|
|
case listener <- val:
|
||
|
|
case <-ctx.Done():
|
||
|
|
return
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|