parent
0e11ec2057
commit
3af02c3bf9
2 changed files with 1 additions and 25 deletions
|
|
@ -43,30 +43,6 @@ package object util {
|
|||
private[http] implicit def enhanceByteStringsMat[Mat](byteStrings: Source[ByteString, Mat]): EnhancedByteStringSource[Mat] =
|
||||
new EnhancedByteStringSource(byteStrings)
|
||||
|
||||
private[http] def printEvent[T](marker: String): Flow[T, T, NotUsed] =
|
||||
Flow[T].transform(() ⇒ new PushPullStage[T, T] {
|
||||
override def onPush(element: T, ctx: Context[T]): SyncDirective = {
|
||||
println(s"$marker: $element")
|
||||
ctx.push(element)
|
||||
}
|
||||
override def onPull(ctx: Context[T]): SyncDirective = {
|
||||
println(s"$marker: PULL")
|
||||
ctx.pull()
|
||||
}
|
||||
override def onUpstreamFailure(cause: Throwable, ctx: Context[T]): TerminationDirective = {
|
||||
println(s"$marker: Error $cause")
|
||||
super.onUpstreamFailure(cause, ctx)
|
||||
}
|
||||
override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = {
|
||||
println(s"$marker: Complete")
|
||||
super.onUpstreamFinish(ctx)
|
||||
}
|
||||
override def onDownstreamFinish(ctx: Context[T]): TerminationDirective = {
|
||||
println(s"$marker: Cancel")
|
||||
super.onDownstreamFinish(ctx)
|
||||
}
|
||||
})
|
||||
|
||||
private[this] var eventStreamLogger: ActorRef = _
|
||||
private[http] def installEventStreamLoggerFor(channel: Class[_])(implicit system: ActorSystem): Unit = {
|
||||
synchronized {
|
||||
|
|
|
|||
|
|
@ -975,6 +975,6 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec {
|
|||
|
||||
val trace = false // set to `true` for debugging purposes
|
||||
def printEvent[T](marker: String): Flow[T, T, NotUsed] =
|
||||
if (trace) akka.http.impl.util.printEvent(marker)
|
||||
if (trace) Flow[T].log(marker)
|
||||
else Flow[T]
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue