Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Learn more about Collectives

Teams

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

Learn more about Teams

I have two observables and I want listen to the one that emits its first value last, is there an operator for this ? Something like that :

let obs1 = Rx.Observable.timer(500,500);
let obs2 = Rx.Observable.timer(1000,1000); // I want the values from this one
let sloth = Rx.Observable.sloth(obs1,obs2);

where the sloth observable would emit the values from obs2 as it is the one who emits its first value last.

If that's not the case, is there any other way ?

let obs1 = Rx.Observable.timer(500,500).map(i=>`cheetah ${i}`);
let obs2 = Rx.Observable.timer(1000,1000).map(i=>`sloth ${i}`);
let sloth = Rx.Observable.merge(
  obs1.take(1).mapTo(obs1),
  obs2.take(1).mapTo(obs2)
).takeLast(1).mergeAll()
sloth.subscribe(data=>console.log(data))
<script src="https://unpkg.com/@reactivex/[email protected]/dist/global/Rx.js"></script>
let obs1 = Rx.Observable.timer(500,500).map(i=>`cheetah ${i}`).publish();
let obs2 = Rx.Observable.timer(1000,1000).map(i=>`sloth ${i}`).publish();
obs1.connect();
obs2.connect();
let sloth = Rx.Observable.merge(
  obs1.take(1).map((val)=>obs1.startWith(val)),
  obs2.take(1).map((val)=>obs2.startWith(val))
).takeLast(1).mergeAll();
sloth.subscribe(data=>console.log(data));
<script src="https://unpkg.com/@reactivex/[email protected]/dist/global/Rx.js"></script>

I like your solution (though I suspect you might never see the first emitted value if you have a hot stream - if the source is cold, all seems good). Can you make a jsfiddle to check that out? If you dont miss any value, your solution is the best. If you do, it might be possible to correct it by adding the skipped value back to the source (obs1.take(1).map(val => obs1.startWith(val)).

Otherwise, for a generic lengthy solution, the key here is that you have state, so you need also the scan operator. We tag the source with an index, and we keep a state which represents the indices of the sources which already have started. When all but one have started, we know the index of the one who hasnt, and we pick only the values from that one. Please note, that this should work independently of whether the sources are hot or cold as all is made in one pass, i,e, there is no multiple subscriptions.

Rx.Observable.merge(
  obs1.map(val => {val, sourceId: 1})
  obs2.map(val => {val, sourceId: 2})
  obsn.map(val => {val, sourceId: n})
).scan( 
(acc, valueStruct) => {
  acc.valueStruct = valueStruct
  acc.alreadyEmitted[valueStruct.sourceId - 1] = true
  if (acc.alreadyEmitted.filter(Boolean).length === n - 1) {
    acc.lastSourceId = 1 + acc.alreadyEmitted.findIndex(element => element === false)
  return acc
}, {alreadyEmitted : new Array(n).fill(false), lastSourceId : 0, valueStruct: null}
.map (acc => acc.valueStruct.sourceId === acc.lastSourceId ? acc.valueStruct.val : null )
.filter(Boolean)

Maybe there is shorter, I dont know. I'll try to put that in a fiddle to see if it actually works, or if you do before let me know.

No fiddle, because I was just aking this by curiosity, but your answer seems interesting. I'll try that tomorrow. Thanks for taking time to answer. – n00dl3 Apr 24, 2017 at 19:56 I think I'll stick to my solution with the modifications you suggested for hot streams because it is much less verbose. I tried to make a snippet from your solution, but there was syntax error and I didn't want to dive into it. Thank you for your time anyway. – n00dl3 Apr 25, 2017 at 8:19 observables.length === 2 ? Rx.Observable.race( observables[0].take(1).concat(observables[1]), observables[1].take(1).concat(observables[0]) ).skip(1) : observables.reduce((prev, current) => sloth(prev, current))[0]; Sure, nice question! Multiple observables would be something like this: let sloth = (...observables) => observables.length === 1 ? observables[0] : observables.length === 2 ? Rx.Observable.race( observables[0].take(1).concat(observables[1]), observables[1].take(1).concat(observables[0]) ).skip(1) : observables.reduce((result, current) => sloth(result, current), Observable.of('immediate item is ignored') )[0]; – ZahiC Apr 25, 2017 at 19:33

I had the same issue and was able to solve it using a combination of merge and skipUntil. The pipe(last()) stops you receiving multiple results if both complete at the same time.

Try pasting the following into https://rxviz.com/:

const { timer, merge } = Rx;
const { mapTo, skipUntil, last } = RxOperators;
let obs1 = timer(500).pipe(mapTo('1'));
let obs2 = timer(1000).pipe(mapTo('2')); // I want the values from this one
let sloth = merge(
    obs1.pipe(skipUntil(obs2)),
    obs2.pipe(skipUntil(obs1))
).pipe(last())
sloth
        

Thanks for contributing an answer to Stack Overflow!

  • Please be sure to answer the question. Provide details and share your research!

But avoid

  • Asking for help, clarification, or responding to other answers.
  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.