i'm trying understand how use new akka.http library. send http request server , read whole response body single string in order produce source[string,?].
here best solution able produce far:
def get( modelid: string, pool: flow[(httprequest,int),(try[httpresponse],int),http.hostconnectionpool] ): source[string,unit] = { val uri = reactionsurl(modelid) val req = httprequest(uri = uri) source.single( (req,0) ) .via( pool ) .map { case (success(resp),_) => resp.entity.databytes.map( _.decodestring("utf-8") ) }.flatten(flattenstrategy.concat) .grouped( 1024 ) .map( _.mkstring ) it seems work (except missing error path), bit clunky such simple tasks. there smarter solution ? can avoid grouped/mkstring ?
you can use tostrict method of httpresponse timeout. gathers whole answer future.
def tostrict(timeout: finiteduration)(implicit ec: executioncontext, fm: materializer): future[strict] returns sharable , serializable
copy of message strict entity.
example:
import akka.actor.actorsystem import akka.http.scaladsl.http import akka.http.scaladsl.model.{httpresponse, httprequest} import akka.stream.{materializer, actormaterializer} import akka.stream.scaladsl.{sink, flow, source} import scala.concurrent.{executioncontext, future} import scala.concurrent.duration._ import scala.util.{try, success} object main extends app { implicit val system = actorsystem() import system.dispatcher implicit val materializer = actormaterializer() val host = "127.0.0.1" lazy val pool = http().newhostconnectionpool[int](host, 9000) flowbuilder.get("/path", pool).to(sink.foreach(_.foreach(println))).run() } object flowbuilder { def get(modelid: string, pool: flow[(httprequest, int), (try[httpresponse], int), http.hostconnectionpool]) (implicit ec: executioncontext, mat: materializer): source[future[string], unit] = { val uri = modelid val req = httprequest(uri = modelid) source.single((req, 0)).via(pool) .map { case (success(resp), _) => resp.entity.tostrict(5 seconds).map(_.data.decodestring("utf-8")) } } }
Comments
Post a Comment