=str #16683 fixes NPE in stateful stage example
This commit is contained in:
parent
ed7a81e94d
commit
a5e57736c8
2 changed files with 10 additions and 2 deletions
|
|
@ -4,12 +4,13 @@ import akka.stream.ActorFlowMaterializer
|
|||
import akka.stream.scaladsl.{ RunnableFlow, Sink, Source, Flow }
|
||||
import akka.stream.stage.PushPullStage
|
||||
import akka.stream.testkit.AkkaSpec
|
||||
import org.scalatest.concurrent.{ ScalaFutures, Futures }
|
||||
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.Await
|
||||
import scala.concurrent.duration._
|
||||
|
||||
class FlowStagesSpec extends AkkaSpec {
|
||||
class FlowStagesSpec extends AkkaSpec with ScalaFutures {
|
||||
//#import-stage
|
||||
import akka.stream.stage._
|
||||
//#import-stage
|
||||
|
|
@ -110,13 +111,18 @@ class FlowStagesSpec extends AkkaSpec {
|
|||
|
||||
//#doubler-stateful
|
||||
class Duplicator[A]() extends StatefulStage[A, A] {
|
||||
override val initial: StageState[A, A] = new StageState[A, A] {
|
||||
override def initial: StageState[A, A] = new StageState[A, A] {
|
||||
override def onPush(elem: A, ctx: Context[A]): Directive =
|
||||
emit(List(elem, elem).iterator, ctx)
|
||||
}
|
||||
}
|
||||
//#doubler-stateful
|
||||
|
||||
val fold = Source(1 to 2).transform(() ⇒ new Duplicator[Int]).runFold("")(_ + _)
|
||||
whenReady(fold) { s ⇒
|
||||
s should be("1122")
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue