=htc smaller improvements to util helpers

This commit is contained in:
Mathias 2015-04-18 22:23:50 +02:00
parent 5b0af36797
commit 475c073997
3 changed files with 63 additions and 26 deletions

View file

@ -67,7 +67,7 @@ private[http] object OutgoingConnectionBlueprint {
import ParserOutput._
val responsePrep = Flow[List[ResponseOutput]]
.transform(recover { case x: ResponseParsingError x.error :: Nil }) // FIXME after #16565
.transform(StreamUtils.recover { case x: ResponseParsingError x.error :: Nil }) // FIXME after #16565
.mapConcat(identityFunc)
.splitWhen(x x.isInstanceOf[MessageStart] || x == MessageEnd)
.headAndTail
@ -227,23 +227,4 @@ private[http] object OutgoingConnectionBlueprint {
}
private class ResponseParsingError(val error: ErrorOutput) extends RuntimeException
// TODO: remove after #16394 is cleared
def recover[A, B >: A](pf: PartialFunction[Throwable, B]): () PushPullStage[A, B] = {
val stage = new PushPullStage[A, B] {
var recovery: Option[B] = None
def onPush(elem: A, ctx: Context[B]): SyncDirective = ctx.push(elem)
def onPull(ctx: Context[B]): SyncDirective = recovery match {
case None ctx.pull()
case Some(x) { recovery = null; ctx.push(x) }
case null ctx.finish()
}
override def onUpstreamFailure(cause: Throwable, ctx: Context[B]): TerminationDirective =
if (pf isDefinedAt cause) {
recovery = Some(pf(cause))
ctx.absorbTermination()
} else super.onUpstreamFailure(cause, ctx)
}
() stage
}
}

View file

@ -23,7 +23,7 @@ import akka.stream.stage._
* INTERNAL API
*/
private[http] object StreamUtils {
import OperationAttributes._
import OperationAttributes.none
/**
* Creates a transformer that will call `f` for each incoming ByteString and output its result. After the complete
@ -159,8 +159,6 @@ private[http] object StreamUtils {
def fromInputStreamSource(inputStream: InputStream,
fileIODispatcher: String,
defaultChunkSize: Int = 65536): Source[ByteString, Unit] = {
import akka.stream.impl._
val onlyOnceFlag = new AtomicBoolean(false)
val iterator = new Iterator[ByteString] {
@ -183,7 +181,6 @@ private[http] object StreamUtils {
}
Source(() iterator).withAttributes(ActorOperationAttributes.dispatcher(fileIODispatcher))
}
/**
@ -270,6 +267,25 @@ private[http] object StreamUtils {
override def initialCompletionHandling: CompletionHandling = eagerClose
}
}
// TODO: remove after #16394 is cleared
def recover[A, B >: A](pf: PartialFunction[Throwable, B]): () PushPullStage[A, B] = {
val stage = new PushPullStage[A, B] {
var recovery: Option[B] = None
def onPush(elem: A, ctx: Context[B]): SyncDirective = ctx.push(elem)
def onPull(ctx: Context[B]): SyncDirective = recovery match {
case None ctx.pull()
case Some(x) { recovery = null; ctx.push(x) }
case null ctx.finish()
}
override def onUpstreamFailure(cause: Throwable, ctx: Context[B]): TerminationDirective =
if (pf isDefinedAt cause) {
recovery = Some(pf(cause))
ctx.absorbTermination()
} else super.onUpstreamFailure(cause, ctx)
}
() stage
}
}
/**
@ -280,4 +296,4 @@ private[http] class EnhancedByteStringSource[Mat](val byteStringStream: Source[B
byteStringStream.runFold(ByteString.empty)(_ ++ _)
def utf8String(implicit materializer: FlowMaterializer, ec: ExecutionContext): Future[String] =
join.map(_.utf8String)
}
}

View file

@ -7,11 +7,13 @@ package akka.http
import language.implicitConversions
import language.higherKinds
import java.nio.charset.Charset
import java.util.concurrent.atomic.AtomicInteger
import com.typesafe.config.Config
import akka.stream.scaladsl.{ FlattenStrategy, Flow, Source }
import akka.stream.stage._
import scala.concurrent.duration.Duration
import scala.concurrent.{ Await, Future }
import scala.reflect.ClassTag
import scala.util.{ Failure, Success }
import scala.util.matching.Regex
import akka.event.LoggingAdapter
@ -62,11 +64,15 @@ package object util {
}
def printEvent[T](marker: String): Flow[T, T, Unit] =
Flow[T].transform(() new PushStage[T, T] {
Flow[T].transform(() new PushPullStage[T, T] {
override def onPush(element: T, ctx: Context[T]): SyncDirective = {
println(s"$marker: $element")
ctx.push(element)
}
override def onPull(ctx: Context[T]): SyncDirective = {
println(s"$marker: PULL")
ctx.pull()
}
override def onUpstreamFailure(cause: Throwable, ctx: Context[T]): TerminationDirective = {
println(s"$marker: Error $cause")
super.onUpstreamFailure(cause, ctx)
@ -81,6 +87,17 @@ package object util {
}
})
private[this] var eventStreamLogger: ActorRef = _
private[http] def installEventStreamLoggerFor(channel: Class[_])(implicit system: ActorSystem): Unit = {
synchronized {
if (eventStreamLogger == null)
eventStreamLogger = system.actorOf(Props[util.EventStreamLogger], name = "event-stream-logger")
}
system.eventStream.subscribe(eventStreamLogger, channel)
}
private[http] def installEventStreamLoggerFor[T](implicit ct: ClassTag[T], system: ActorSystem): Unit =
installEventStreamLoggerFor(ct.runtimeClass)
private[http] implicit class AddFutureAwaitResult[T](future: Future[T]) {
/** "Safe" Await.result that doesn't throw away half of the stacktrace */
def awaitResult(atMost: Duration): T = {
@ -114,3 +131,26 @@ package object util {
} else bytes.toString + " B"
}
}
package util {
private[http] class EventStreamLogger extends Actor with ActorLogging {
def receive = { case x log.warning(x.toString) }
}
// Provisioning of actor names composed of a common prefix + a counter. According to #16613 not in scope as public API.
private[http] final class SeqActorName(prefix: String) extends AtomicInteger {
def next(): String = prefix + '-' + getAndIncrement()
}
private[http] trait LogMessages extends ActorLogging { this: Actor
def logMessages(mark: String = "")(r: Receive): Receive =
new Receive {
def isDefinedAt(x: Any): Boolean = r.isDefinedAt(x)
def apply(x: Any): Unit = {
log.debug(s"[$mark] received: $x")
r(x)
}
}
}
}