From a5e57736c8621533fc27673ebc3bd2888cd453a2 Mon Sep 17 00:00:00 2001 From: Konrad Malawski Date: Fri, 6 Feb 2015 12:29:45 +0100 Subject: [PATCH] =str #16683 fixes NPE in stateful stage example --- .../rst/scala/code/docs/stream/FlowStagesSpec.scala | 10 ++++++++-- .../src/main/scala/akka/stream/stage/Stage.scala | 2 ++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/akka-docs-dev/rst/scala/code/docs/stream/FlowStagesSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/FlowStagesSpec.scala index 99bc1fd714..bd789d4e8a 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/FlowStagesSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/FlowStagesSpec.scala @@ -4,12 +4,13 @@ import akka.stream.ActorFlowMaterializer import akka.stream.scaladsl.{ RunnableFlow, Sink, Source, Flow } import akka.stream.stage.PushPullStage import akka.stream.testkit.AkkaSpec +import org.scalatest.concurrent.{ ScalaFutures, Futures } import scala.collection.immutable import scala.concurrent.Await import scala.concurrent.duration._ -class FlowStagesSpec extends AkkaSpec { +class FlowStagesSpec extends AkkaSpec with ScalaFutures { //#import-stage import akka.stream.stage._ //#import-stage @@ -110,13 +111,18 @@ class FlowStagesSpec extends AkkaSpec { //#doubler-stateful class Duplicator[A]() extends StatefulStage[A, A] { - override val initial: StageState[A, A] = new StageState[A, A] { + override def initial: StageState[A, A] = new StageState[A, A] { override def onPush(elem: A, ctx: Context[A]): Directive = emit(List(elem, elem).iterator, ctx) } } //#doubler-stateful + val fold = Source(1 to 2).transform(() ⇒ new Duplicator[Int]).runFold("")(_ + _) + whenReady(fold) { s ⇒ + s should be("1122") + } + } } diff --git a/akka-stream/src/main/scala/akka/stream/stage/Stage.scala b/akka-stream/src/main/scala/akka/stream/stage/Stage.scala index 6def1b0e3b..ec83b84074 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/Stage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/Stage.scala @@ -210,6 +210,8 @@ abstract class StatefulStage[In, Out] extends PushPullStage[In, Out] { /** * Concrete subclass must return the initial behavior from this method. + * + * **Warning:** This method must not be implemented as `val`. */ def initial: StageState[In, Out]