An event bus for projects that use RX and Kotlin
An event bus is a useful thing, when a UI is decoupled from the business logic it is often used to be able to signal state changes. The thing is that I am trying really hard to not take dependencies when I dont have to, experience has taught me that they come back to bite me in the years that follow.
In Android-land I could have used Otto or Event Bus. However when I looked there were any number of articles that recommended that if you were already had a dependency on RX that is was so easy to implement an event bus that you should not take another dependency. The article I found that inspired me was here
http://nerds.weddingpartyapp.com/tech/2014/12/24/implementing-an-event-bus-with-rxjava-rxbus/
Its a shame - but like so many dependencies the link no longer works.
The thing is, I was writing in Kotlin and this example was in Java so I needed to adapt it, I was also inspired by this article and eventually settled in this interface for my event bus.
1
2
3
4
5
6
7
8
9
10
11
interface IEventBusSubscription {
fun unsubscribe()
}
interface IEventBus {
fun publish(event: Any)
fun <EVENT_TYPE> subscribe(eventType: Class<EVENT_TYPE>, receiver: (EVENT_TYPE) -> Unit): IEventBusSubscription
fun <EVENT_TYPE> subscribe(eventType: Class<EVENT_TYPE>, receiver: (EVENT_TYPE) -> Unit, errorReceiver: (Throwable) -> Unit): IEventBusSubscription
}
This describes a logical publish/subscribe event bus without tying my app to an implementation.
Implementing it using RX was pretty straightforward
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class RxEventBusSubscription constructor(private val rxSubscription: Disposable) : IEventBusSubscription {
override fun unsubscribe() = rxSubscription.dispose()
}
class RxEventBus
@Inject constructor(
private val loggerFactory: ILoggerFactory,
private val crashReporter: ICrashReporter,
private val schedulerProvider: ISchedulerProvider
)
: IEventBus {
private val publisher: PublishSubject<Any> = PublishSubject.create()
override fun publish(event: Any) {
if (publisher.hasObservers()) {
publisher.onNext(event)
}
}
override fun <EVENT_TYPE> subscribe(eventType: Class<EVENT_TYPE>, receiver: (EVENT_TYPE) -> Unit): IEventBusSubscription {
return subscribe(eventType, receiver)
{ error: Throwable ->
crashReporter.logNonFatalException(error)
}
}
override fun <EVENT_TYPE> subscribe(eventType: Class<EVENT_TYPE>, receiver: (EVENT_TYPE) -> Unit, errorReceiver: (Throwable) -> Unit): IEventBusSubscription {
val subscriber = publisher
.toFlowable(BackpressureStrategy.BUFFER)
.observeOn(schedulerProvider.androidMainThread())
.ofType(eventType)
.subscribe(
{ event ->
receiver(event)
},
{ error: Throwable ->
errorReceiver(error)
}
)
return RxEventBusSubscription(subscriber)
}
}
To publish an event I just do this
1
2
3
class RecordingStatusChangedEvent(val statusChangedTo: String)
eventBus.publish(RecordingStatusChangedEvent(TrackRecordingControls.ACTION_STOP))
And using it went like this
1
2
3
4
5
6
7
8
override fun bind(theView: IMainView, savedInstanceState: Bundle?) {
eventBusSubscriber = eventBus.subscribe(RecordingStatusChangedEvent::class.java)
{ event ->
// we dont actually care what the new status is we just need to update the UI
theView.updateAvailableActions()
refreshTitle()
}
}
Now maybe this isn’t a great implementation but if that’s the case then I can reimplement the interface if that proves to be true.