system.reactive - Merging multiple streams, keeping ordering and avoiding duplicates -


i have problem not know how handle beautifully rx. have multiple streams supposedly contain same elements however​ each stream may lose messages (udp involved) or late/early compared others. each of these messages have sequence number.

now want achieve single stream out of streams, ​without duplicate , keeping message order​. in other words, same sequence number should not appear twice , values have increase, never decrease. when message lost on streams, i'm ok losing (as there tcp mechanism involved allows me ask explicitly missing messages).

i looking in rxjava, guess problem not specific java.

here's marble diagram visualizing want achieve: marble diagram

you can see in diagram waiting 2 on first stream output 3 second stream. likewise, 6 outputted once receive 6 second stream because @ point can know sure 5 never received stream.

this browser code, think should give idea of how solve this.

public static iobservable<t> sequenced<t>(     iobservable<t> source,     func<t, int> getsequencenumber,     int sequencebegin,     int sequenceredundancy) {     return observable.create(observer =>     {         // next sequence number in order.         var sequencenext = sequencebegin;          // key sequence number.         // value (t, count).         var counts = new sorteddictionary<int, tuple<t, int>>();         return source.subscribe(             value =>             {                 var sequencenumber = getsequencenumber(value);                  // if sequence number current value                 // earlier in sequence, throw away value.                 if (sequencenumber < sequencenext)                 {                     return;                 }                  // update counts based on current value.                 tuple<t, int> count;                 if (!counts.trygetvalue(sequencenumber, out count))                 {                     count = tuple.create(value, 0);                 }                 count = tuple.create(count.item1, count.item2 + 1);                 counts[sequencenumber] = count;                  // if current count has reached sequenceredundancy,                 // means seqeunce values s such                 // sequencenext < s < sequencenumber , s has not been                 // seen yet never seen. emit                 // have seen point, in order.                 if (count.item2 >= sequenceredundancy)                 {                     var removal = counts.keys                         .takewhile(seq => seq <= sequencenumber)                         .tolist();                     foreach (var seq in removal)                     {                         count = counts[seq];                         observer.onnext(count.item1);                         counts.remove(seq);                     }                     sequencenext++;                 }                  // emit stored values long keep having                 // next sequence value.                 while (counts.trygetvalue(sequencenext, out count))                 {                     observer.onnext(count.item1);                     counts.remove(sequencenext);                     sequencenext++;                 }             },             observer.onerror,             () =>             {                 // emit in order remaining values.                 foreach (var count in counts.values)                 {                     observer.onnext(count.item1);                 }                 observer.oncompleted();             });     }); } 

if have 2 streams iobservable<message> a , iobservable<message> b, use doing observable.merge(a, b).sequenced(msg => msg.sequencenumber, 1, 2).

for example marble diagram, following, source column shows values emitted observable.merge(a, b) , counts column shows contents of sorteddictionary after each step of algorithm. assuming "messages" of original source sequence (without lost values) (a,1), (b,2), (c,3), (d,4), (e,5), (f,6) second component of each message sequence number.

source | counts -------|-----------  (a,1) | --> emit  (a,1) | --> skip  (c,3) | (3,(c,1))  (b,2) | (3,(c,1)) --> emit b,c , remove c  (d,4) | --> emit d  (f,6) | (6,(f,1))  (f,6) | (6,(f,2)) --> emit f , remove 

Comments

Popular posts from this blog

How to show in django cms breadcrumbs full path? -

php - Invalid Cofiguration - yii\base\InvalidConfigException - Yii2 -

ruby on rails - npm error: tunneling socket could not be established, cause=connect ETIMEDOUT -