diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowIterableSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowIterableSpec.scala index 11281b9ebd..ed6a56d670 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowIterableSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowIterableSpec.scala @@ -105,15 +105,15 @@ class FlowIterableSpec extends AkkaSpec { c.expectComplete() } - "produce elements with two transformation steps" ignore { - // val p = FlowFrom(List(1, 2, 3, 4)).filter(_ % 2 == 0).map(_ * 2).toPublisher() - // val c = StreamTestKit.SubscriberProbe[Int]() - // p.subscribe(c) - // val sub = c.expectSubscription() - // sub.request(10) - // c.expectNext(4) - // c.expectNext(8) - // c.expectComplete() + "produce elements with two transformation steps" in { + val p = FlowFrom(List(1, 2, 3, 4)).filter(_ % 2 == 0).map(_ * 2).toPublisher() + val c = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c) + val sub = c.expectSubscription() + sub.request(10) + c.expectNext(4) + c.expectNext(8) + c.expectComplete() } "allow cancel before receiving all elements" in { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowIteratorSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowIteratorSpec.scala index 6db382b711..c9b9d4837a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowIteratorSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowIteratorSpec.scala @@ -106,16 +106,15 @@ class FlowIteratorSpec extends AkkaSpec { c.expectComplete() } - // FIXME enable test when filter is implemented - "produce elements with two transformation steps" ignore { - // val p = FlowFrom(List(1, 2, 3, 4).iterator).filter(_ % 2 == 0).map(_ * 2).toPublisher() - // val c = StreamTestKit.SubscriberProbe[Int]() - // p.subscribe(c) - // val sub = c.expectSubscription() - // sub.request(10) - // c.expectNext(4) - // c.expectNext(8) - // c.expectComplete() + "produce elements with two transformation steps" in { + val p = FlowFrom(List(1, 2, 3, 4).iterator).filter(_ % 2 == 0).map(_ * 2).toPublisher() + val c = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c) + val sub = c.expectSubscription() + sub.request(10) + c.expectNext(4) + c.expectNext(8) + c.expectComplete() } "allow cancel before receiving all elements" in { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowThunkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowThunkSpec.scala new file mode 100644 index 0000000000..daca41400f --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowThunkSpec.scala @@ -0,0 +1,63 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +import scala.concurrent.duration._ +import akka.stream.testkit.AkkaSpec +import akka.stream.testkit.StreamTestKit +import akka.stream.testkit.StreamTestKit.OnComplete +import akka.stream.testkit.StreamTestKit.OnError +import akka.stream.testkit.StreamTestKit.OnNext + +class FlowThunkSpec extends AkkaSpec { + + implicit val materializer = FlowMaterializer() + + "A Flow based on a thunk generator" must { + "produce elements" in { + + val iter = List(1, 2, 3).iterator + val p = FlowFrom(() ⇒ if (iter.hasNext) Some(iter.next()) else None).map(_ + 10).toPublisher() + val c = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c) + val sub = c.expectSubscription() + sub.request(1) + c.expectNext(11) + c.expectNoMsg(100.millis) + sub.request(3) + c.expectNext(12) + c.expectNext(13) + c.expectComplete() + } + + "complete empty" in { + val p = FlowFrom(() ⇒ None).toPublisher() + val c = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c) + val sub = c.expectSubscription() + sub.request(1) + c.expectComplete() + c.expectNoMsg(100.millis) + } + + "allow cancel before receiving all elements" in { + val count = 100000 + val iter = (1 to count).iterator + val p = FlowFrom(() ⇒ if (iter.hasNext) Some(iter.next()) else None).toPublisher() + val c = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c) + val sub = c.expectSubscription() + sub.request(count) + c.expectNext(1) + sub.cancel() + val got = c.probe.receiveWhile(3.seconds) { + case _: OnNext[_] ⇒ + case OnComplete ⇒ fail("Cancel expected before OnComplete") + case OnError(e) ⇒ fail(e) + } + got.size should be < (count - 1) + } + + } +} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Source.scala index bc6a24c9de..656b0de1a2 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Source.scala @@ -11,7 +11,7 @@ import scala.util.{ Failure, Success } import org.reactivestreams.{ Publisher, Subscriber } -import akka.stream.impl.{ ActorPublisher, EmptyPublisher, ErrorPublisher, FuturePublisher, IterablePublisher, IteratorPublisher, SimpleCallbackPublisher, TickPublisher } +import akka.stream.impl.{ ActorPublisher, EmptyPublisher, ErrorPublisher, FuturePublisher, IterablePublisher, IteratorPublisher, SimpleCallbackPublisher, TickPublisher, Stop } import akka.stream.impl2.ActorBasedFlowMaterializer object FlowFrom { @@ -230,8 +230,8 @@ final case class ThunkSource[In](f: () ⇒ Option[In]) extends SimpleSource[In] create(materializer, flowName).subscribe(flowSubscriber) override def isActive: Boolean = true override def create(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[In] = - ActorPublisher[In](materializer.actorOf(SimpleCallbackPublisher.props(materializer.settings, f), - name = s"$flowName-0-thunk")) + ActorPublisher[In](materializer.actorOf(SimpleCallbackPublisher.props(materializer.settings, + () ⇒ f().getOrElse(throw Stop)), name = s"$flowName-0-thunk")) } /**