/** * Copyright (C) 2014 Typesafe Inc. */ package docs.stream import akka.stream.ActorFlowMaterializer import akka.stream.scaladsl._ import akka.stream.testkit.AkkaSpec import scala.collection.immutable.IndexedSeq import scala.concurrent.Await import scala.concurrent.duration._ import scala.util.control.NoStackTrace class FlexiDocSpec extends AkkaSpec { implicit val ec = system.dispatcher implicit val mat = ActorFlowMaterializer() "implement zip using readall" in { //#fleximerge-zip-readall class Zip[A, B] extends FlexiMerge[(A, B)] { import FlexiMerge._ val left = createInputPort[A]() val right = createInputPort[B]() def createMergeLogic = new MergeLogic[(A, B)] { override def inputHandles(inputCount: Int) = { require(inputCount == 2, s"Zip must have two connected inputs, was $inputCount") Vector(left, right) } override def initialState: State[_] = State[ReadAllInputs](ReadAll(left, right)) { (ctx, _, inputs) => val a: A = inputs(left) val b: B = inputs(right) ctx.emit((a, b)) SameState } override def initialCompletionHandling = eagerClose } } //#fleximerge-zip-readall //format: OFF //#fleximerge-zip-connecting val head = Sink.head[(Int, String)] //#fleximerge-zip-connecting val map = //#fleximerge-zip-connecting FlowGraph { implicit b => import FlowGraphImplicits._ val zip = Zip[Int, String] Source.single(1) ~> zip.left Source.single("A") ~> zip.right zip.out ~> head } //#fleximerge-zip-connecting .run() //format: ON Await.result(map.get(head), remaining) should equal((1, "A")) } "implement zip using two states" in { //#fleximerge-zip-states class Zip[A, B] extends FlexiMerge[(A, B)] { import FlexiMerge._ val left = createInputPort[A]() val right = createInputPort[B]() def createMergeLogic = new MergeLogic[(A, B)] { var lastInA: A = _ override def inputHandles(inputCount: Int) = { require(inputCount == 2, s"Zip must have two connected inputs, was $inputCount") Vector(left, right) } val readA: State[A] = State[A](Read(left)) { (ctx, input, element) => lastInA = element readB } val readB: State[B] = State[B](Read(right)) { (ctx, input, element) => ctx.emit((lastInA, element)) readA } override def initialState: State[_] = readA override def initialCompletionHandling = eagerClose } } //#fleximerge-zip-states val head = Sink.head[(Int, String)] val map = FlowGraph { implicit b => import akka.stream.scaladsl.FlowGraphImplicits._ val zip = new Zip[Int, String] Source(1 to 2) ~> zip.left Source(List("A", "B")) ~> zip.right zip.out ~> head }.run() Await.result(map.get(head), remaining) should equal((1, "A")) } "fleximerge completion handling" in { //#fleximerge-completion class ImportantWithBackups[A] extends FlexiMerge[A] { import FlexiMerge._ val important = createInputPort[A]() val replica1 = createInputPort[A]() val replica2 = createInputPort[A]() def createMergeLogic = new MergeLogic[A] { val inputs = Vector(important, replica1, replica2) override def inputHandles(inputCount: Int) = { require(inputCount == 3, s"Must connect 3 inputs, connected only $inputCount") inputs } override def initialCompletionHandling = CompletionHandling( onUpstreamFinish = (ctx, input) => input match { case `important` => log.info("Important input completed, shutting down.") ctx.finish() SameState case replica => log.info("Replica {} completed, " + "no more replicas available, " + "applying eagerClose completion handling.", replica) ctx.changeCompletionHandling(eagerClose) SameState }, onUpstreamFailure = (ctx, input, cause) => input match { case `important` => ctx.fail(cause) SameState case replica => log.error(cause, "Replica {} failed, " + "no more replicas available, " + "applying eagerClose completion handling.", replica) ctx.changeCompletionHandling(eagerClose) SameState }) override def initialState = State[A](ReadAny(inputs)) { (ctx, input, element) => ctx.emit(element) SameState } } } //#fleximerge-completion FlowGraph { implicit b => import FlowGraphImplicits._ val importantWithBackups = new ImportantWithBackups[Int] Source.single(1) ~> importantWithBackups.important Source.single(2) ~> importantWithBackups.replica1 Source.failed[Int](new Exception("Boom!") with NoStackTrace) ~> importantWithBackups.replica2 importantWithBackups.out ~> Sink.ignore }.run() } "flexi preferring merge" in { //#flexi-preferring-merge class PreferringMerge extends FlexiMerge[Int] { import akka.stream.scaladsl.FlexiMerge._ val preferred = createInputPort[Int]() val secondary1 = createInputPort[Int]() val secondary2 = createInputPort[Int]() def createMergeLogic = new MergeLogic[Int] { override def inputHandles(inputCount: Int) = { require(inputCount == 3, s"PreferringMerge must have 3 connected inputs, was $inputCount") Vector(preferred, secondary1, secondary2) } override def initialState = State[Int](ReadPreferred(preferred)(secondary1, secondary2)) { (ctx, input, element) => ctx.emit(element) SameState } } } //#flexi-preferring-merge } "flexi read conditions" in { class X extends FlexiMerge[Int] { import FlexiMerge._ override def createMergeLogic(): MergeLogic[Int] = new MergeLogic[Int] { //#read-conditions val first = createInputPort[Int]() val second = createInputPort[Int]() val third = createInputPort[Int]() //#read-conditions //#read-conditions val onlyFirst = Read(first) val firstOrThird = ReadAny(first, third) val firstAndSecond = ReadAll(first, second) val firstAndThird = ReadAll(first, third) val mostlyFirst = ReadPreferred(first)(second, third) //#read-conditions override def inputHandles(inputCount: Int): IndexedSeq[InputHandle] = Vector() override def initialState: State[_] = State[ReadAllInputs](firstAndSecond) { (ctx, input, inputs) => val in1: Int = inputs(first) SameState } } } } "flexi route" in { //#flexiroute-unzip class Unzip[A, B] extends FlexiRoute[(A, B)] { import FlexiRoute._ val outA = createOutputPort[A]() val outB = createOutputPort[B]() override def createRouteLogic() = new RouteLogic[(A, B)] { override def outputHandles(outputCount: Int) = { require(outputCount == 2, s"Unzip must have two connected outputs, was $outputCount") Vector(outA, outB) } override def initialState = State[Any](DemandFromAll(outA, outB)) { (ctx, _, element) => val (a, b) = element ctx.emit(outA, a) ctx.emit(outB, b) SameState } override def initialCompletionHandling = eagerClose } } //#flexiroute-unzip } "flexi route completion handling" in { //#flexiroute-completion class ImportantRoute[A] extends FlexiRoute[A] { import FlexiRoute._ val important = createOutputPort[A]() val additional1 = createOutputPort[A]() val additional2 = createOutputPort[A]() override def createRouteLogic() = new RouteLogic[A] { val outputs = Vector(important, additional1, additional2) override def outputHandles(outputCount: Int) = { require(outputCount == 3, s"Must have three connected outputs, was $outputCount") outputs } override def initialCompletionHandling = CompletionHandling( // upstream: onUpstreamFinish = (ctx) => (), onUpstreamFailure = (ctx, thr) => (), // downstream: onDownstreamFinish = (ctx, output) => output match { case `important` => // finish all downstreams, and cancel the upstream ctx.finish() SameState case _ => SameState }) override def initialState = State[A](DemandFromAny(outputs)) { (ctx, output, element) => ctx.emit(output, element) SameState } } } //#flexiroute-completion FlowGraph { implicit b => import FlowGraphImplicits._ val route = new ImportantRoute[Int] Source.single(1) ~> route.in route.important ~> Sink.ignore route.additional1 ~> Sink.ignore route.additional2 ~> Sink.ignore }.run() } }