diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/ActorInterpreterLifecycleSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/ActorInterpreterLifecycleSpec.scala index 65e27f8646..37e3e3ce29 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/ActorInterpreterLifecycleSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/ActorInterpreterLifecycleSpec.scala @@ -3,14 +3,13 @@ */ package akka.stream.impl -import java.util.concurrent.atomic.AtomicBoolean - import akka.stream.Supervision._ import akka.stream._ -import akka.stream.impl.fusing.{ InterpreterLifecycleSpecKit, ActorInterpreter } +import akka.stream.impl.fusing.{ ActorInterpreter, InterpreterLifecycleSpecKit } import akka.stream.stage.Stage import akka.stream.testkit.Utils.TE import akka.stream.testkit.{ AkkaSpec, _ } + import scala.concurrent.duration._ class ActorInterpreterLifecycleSpec extends AkkaSpec with InterpreterLifecycleSpecKit { @@ -113,7 +112,7 @@ class ActorInterpreterLifecycleSpec extends AkkaSpec with InterpreterLifecycleSp up.subscribe(processor) val upsub = up.expectSubscription() upsub.sendComplete() - down.expectComplete() // failures in postStop are logged, but not propagated // TODO Future features? make this a setting? + down.expectComplete() } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpecKit.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpecKit.scala index fbe2d19976..252dd28130 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpecKit.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpecKit.scala @@ -16,7 +16,7 @@ trait InterpreterLifecycleSpecKit { onUpstreamCompleted: () ⇒ Unit = () ⇒ (), onUpstreamFailed: Throwable ⇒ Unit = ex ⇒ ()) extends PushStage[T, T] { - override def preStart(ctx: Context[T]) = onStart(ctx) + override def preStart(ctx: LifecycleContext) = onStart(ctx) override def onPush(elem: T, ctx: Context[T]) = ctx.push(elem) @@ -34,7 +34,9 @@ trait InterpreterLifecycleSpecKit { } private[akka] case class PreStartFailer[T](pleaseThrow: () ⇒ Unit) extends PushStage[T, T] { - override def preStart(ctx: Context[T]) = pleaseThrow() + + override def preStart(ctx: LifecycleContext) = + pleaseThrow() override def onPush(elem: T, ctx: Context[T]) = ctx.push(elem) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala index 2879cceb66..c0e275e32a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala @@ -161,7 +161,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], import AbstractStage._ import OneBoundedInterpreter._ - type UntypedOp = AbstractStage[Any, Any, Directive, Directive, Context[Any]] + type UntypedOp = AbstractStage[Any, Any, Directive, Directive, Context[Any], LifecycleContext] require(ops.nonEmpty, "OneBoundedInterpreter cannot be created without at least one Op") private final val pipeline: Array[UntypedOp] = ops.map(_.asInstanceOf[UntypedOp])(breakOut) @@ -197,7 +197,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], private var lastOpFailing: Int = -1 private def pipeName(op: UntypedOp): String = { - val o = (op: AbstractStage[_, _, _, _, _]) + val o = (op: AbstractStage[_, _, _, _, _, _]) (o match { case Finished ⇒ "finished" case _: BoundaryStage ⇒ "boundary" @@ -692,13 +692,12 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], activeOpIndex = op a.preStart(a.context) - case a: AbstractStage[Any, Any, Any, Any, Any] @unchecked ⇒ + case a: AbstractStage[Any, Any, Any, Any, Any, Any] @unchecked ⇒ val state = new EntryState("stage", op) a.context = state try a.preStart(state) catch { case NonFatal(ex) ⇒ failures ::= InitializationFailure(op, ex) // not logging here as 'most downstream' exception will be signalled via onError - // TODO could use decider here, but semantics become a bit iffy (Resume => ignore error in prestart? Doesn't sound like a good idea). } } op += 1 diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 504e5e3569..ed4eb52f2d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -528,7 +528,7 @@ private[akka] final case class Log[T](name: String, extract: T ⇒ Any, logAdapt // TODO more optimisations can be done here - prepare logOnPush function etc - override def preStart(ctx: Context[T]): Unit = { + override def preStart(ctx: LifecycleContext): Unit = { logLevels = ctx.attributes.logLevels.getOrElse(DefaultLogLevels) log = logAdapter match { case Some(l) ⇒ l diff --git a/akka-stream/src/main/scala/akka/stream/stage/Stage.scala b/akka-stream/src/main/scala/akka/stream/stage/Stage.scala index f4af139595..1fee01963b 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/Stage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/Stage.scala @@ -38,7 +38,7 @@ private[stream] object AbstractStage { final val TerminationPending = 0x8000 } -abstract class AbstractStage[-In, Out, PushD <: Directive, PullD <: Directive, Ctx <: Context[Out]] extends Stage[In, Out] { +abstract class AbstractStage[-In, Out, PushD <: Directive, PullD <: Directive, Ctx <: Context[Out], LifeCtx <: LifecycleContext] extends Stage[In, Out] { /** * INTERNAL API */ @@ -97,8 +97,7 @@ abstract class AbstractStage[-In, Out, PushD <: Directive, PullD <: Directive, C * Empty default implementation. */ @throws(classOf[Exception]) - def preStart(ctx: Ctx): Unit = () // TODO or hide as LifecycleContext... then AsyncStage cannot do anything about it - // TODO hide here and make Async Stage final def preStart + def asyncPreStart ??? + def preStart(ctx: LifeCtx): Unit = () /** * `onPush` is called when an element from upstream is available and there is demand from downstream, i.e. @@ -231,7 +230,7 @@ abstract class AbstractStage[-In, Out, PushD <: Directive, PullD <: Directive, C * @see [[StatefulStage]] * @see [[PushStage]] */ -abstract class PushPullStage[In, Out] extends AbstractStage[In, Out, SyncDirective, SyncDirective, Context[Out]] +abstract class PushPullStage[In, Out] extends AbstractStage[In, Out, SyncDirective, SyncDirective, Context[Out], LifecycleContext] /** * `PushStage` is a [[PushPullStage]] that always perform transitive pull by calling `ctx.pull` from `onPull`. @@ -266,7 +265,7 @@ abstract class PushStage[In, Out] extends PushPullStage[In, Out] { * @see [[PushPullStage]] */ abstract class DetachedStage[In, Out] - extends AbstractStage[In, Out, UpstreamDirective, DownstreamDirective, DetachedContext[Out]] { + extends AbstractStage[In, Out, UpstreamDirective, DownstreamDirective, DetachedContext[Out], LifecycleContext] { private[stream] override def isDetached = true /** @@ -291,7 +290,7 @@ abstract class DetachedStage[In, Out] * with the provided data item. */ abstract class AsyncStage[In, Out, Ext] - extends AbstractStage[In, Out, UpstreamDirective, DownstreamDirective, AsyncContext[Out, Ext]] { + extends AbstractStage[In, Out, UpstreamDirective, DownstreamDirective, AsyncContext[Out, Ext], AsyncContext[Out, Ext]] { private[stream] override def isDetached = true /** @@ -512,7 +511,7 @@ abstract class StatefulStage[In, Out] extends PushPullStage[In, Out] { * BoundaryStages are the elements that make the interpreter *tick*, there is no other way to start the interpreter * than using a BoundaryStage. */ -private[akka] abstract class BoundaryStage extends AbstractStage[Any, Any, Directive, Directive, BoundaryContext] { +private[akka] abstract class BoundaryStage extends AbstractStage[Any, Any, Directive, Directive, BoundaryContext, LifecycleContext] { final override def decide(t: Throwable): Supervision.Directive = Supervision.Stop final override def restart(): BoundaryStage =