The workaround published in https://github.com/Kotlin/kotlinx.coroutines/issues/2886#issuecomment-901201125 works ok. It's still single observer but that makes sense since by definition these things are supposed to be observed once and once only.import kotlinx.coroutines.sync.*
class ExactlyOnceEventBus<T> {
private val buffer = ArrayDeque<T>()
private val semaphore = Semaphore(Int.MAX_VALUE, Int.MAX_VALUE)
fun send(event: T) {
synchronized(buffer) { buffer.add(event) }
semaphore.release()
}
suspend fun receive(): T {
semaphore.acquire()
return synchronized(buffer) { buffer.removeFirst() }
}
}