I got to this, which is actually looking pretty simple/good and is the closest to chaining interface I could get.
Working code:
let ob = Observable<Nat>( func (subscriber) {
subscriber.next(3);
subscriber.next(5);
subscriber.next(12);
});
pipe3(
ob,
map( func (val : Nat) : Nat16 {
Nat16.fromNat(val + 10)
}),
map( func (val : Nat16) : Nat {
Nat16.toNat(val + 30)
})
).subscribe( {
next = func (v) {
Debug.print("next " # debug_show(v));
}
});
Output:
next 43
next 45
next 52
Module here Attempt to implement ReactiveX in Motoko with "chaining" · GitHub
Pipe function is outside of the class and you need to change its name whenever you add more parameters pipe2, pipe3, pipe4, etc.