based on presentation @ react 2014 'everything stream' , looking @ reactive trader source code, thought try re-work old code of mine follow pattern stumped.
i have 2 methods following signatures:
public static iobservable<orderdto> getorderstream(string name) public static iobservable<pricedto> getpricestream(string exchange, string security) both methods use observable.create wrap events , use publish() , refcount() on created observable.
each orderdto contains fields exchange , security. want group orders exchange , security can request pricing info separate stream. final result want print each order along current pricing exchange/security in order.
for orders have following:
var orders = observable.defer(() => getorderstream("fnzctest")) .groupby(o => new { o.exchange, o.security }) .publish() .refcount(); if use:
var j = order in orders o in order price in getpricestream(order.key.exchange, order.key.security).materialize() select new { order = o, price = price }; idisposable disposable = j.subscribe(x => console.writeline("{0} : {1}", x.order, x.price.hasvalue ? x.price.value : new pricedto())); i desired output, getpricestream called repeatedly same exchange/security (i.e. not once each group).
if change
var j = order in orders price in getpricestream(order.key.exchange, order.key.security).materialize() select new { price = price }; idisposable disposable = j.subscribe(x => console.writeline("{0} : {1}", "", x.price.hasvalue ? x.price.value : new pricedto())); then getpricestream called once each group expect. problem - how desired behaviour , gain access each orderdto in group can output order , price together.
there few things going on here don't make sense me. don't understand why doing .defer(...) , why doing .publish().refcount().
also, grouping results , flattening them. why not leave them are?
finally, appears you're using .materialize() somehow handle case when getpricestream doesn't produce actual value.
so, given of that, seems query need:
var query = order in getorderstream("fnzctest") price in getpricestream(order.exchange, order.security) .defaultifempty(new pricedto()) select new { order = order, price = price, }; idisposable disposable = query.subscribe(x => console.writeline("{0} : {1}", x.order, x.price)); now, it's possible implementation of observable.create causing grief. so, if great if add implementations @ end of question.
Comments
Post a Comment