c# - Rx GroupBy - How to obtain the grouped values? -


based on presentation @ react 2014 'everything stream' , looking @ reactive trader source code, thought try re-work old code of mine follow pattern stumped.

i have 2 methods following signatures:

public static iobservable<orderdto> getorderstream(string name) public static iobservable<pricedto> getpricestream(string exchange, string security) 

both methods use observable.create wrap events , use publish() , refcount() on created observable.

each orderdto contains fields exchange , security. want group orders exchange , security can request pricing info separate stream. final result want print each order along current pricing exchange/security in order.

for orders have following:

var orders = observable.defer(() => getorderstream("fnzctest"))                                                 .groupby(o => new { o.exchange, o.security })                                                 .publish()                                                 .refcount(); 

if use:

var j = order in orders                 o in order                 price in getpricestream(order.key.exchange, order.key.security).materialize()                 select new { order = o, price = price };  idisposable disposable = j.subscribe(x => console.writeline("{0} : {1}", x.order, x.price.hasvalue ? x.price.value : new pricedto())); 

i desired output, getpricestream called repeatedly same exchange/security (i.e. not once each group).

if change

var j = order in orders         price in getpricestream(order.key.exchange, order.key.security).materialize()         select new { price = price };  idisposable disposable = j.subscribe(x => console.writeline("{0} : {1}", "", x.price.hasvalue ? x.price.value : new pricedto())); 

then getpricestream called once each group expect. problem - how desired behaviour , gain access each orderdto in group can output order , price together.

there few things going on here don't make sense me. don't understand why doing .defer(...) , why doing .publish().refcount().

also, grouping results , flattening them. why not leave them are?

finally, appears you're using .materialize() somehow handle case when getpricestream doesn't produce actual value.

so, given of that, seems query need:

var query =     order in getorderstream("fnzctest")     price in getpricestream(order.exchange, order.security)         .defaultifempty(new pricedto())     select new     {         order = order,         price = price,     };  idisposable disposable =     query.subscribe(x => console.writeline("{0} : {1}", x.order, x.price)); 

now, it's possible implementation of observable.create causing grief. so, if great if add implementations @ end of question.


Comments