From 36abbb4234d5490cf84d9a9166e5a21fe9aeb3e0 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sun, 14 Jun 2015 03:12:30 -0400 Subject: [PATCH] Introduces `fold` as a Flow transformation and generalizes Sink.fold to be Flow.fold + Sink.head Conflicts: akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala akka-stream/src/main/scala/akka/stream/impl/Stages.scala akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala --- akka-docs-dev/rst/stages-overview.rst | 1 + .../stream/tck/HeadSinkSubscriberTest.scala | 3 +- .../akka/stream/ActorMaterializerSpec.scala | 7 +- .../fusing/InterpreterSupervisionSpec.scala | 2 +- .../test/scala/akka/stream/io/TcpSpec.scala | 19 +++-- .../akka/stream/scaladsl/FlowFoldSpec.scala | 35 ++++++-- .../stream/impl/ActorMaterializerImpl.scala | 77 +++++++++--------- .../stream/impl/BlackholeSubscriber.scala | 20 ++--- .../stream/impl/CompletedPublishers.scala | 7 ++ .../main/scala/akka/stream/impl/Sinks.scala | 50 ++++++------ .../main/scala/akka/stream/impl/Stages.scala | 6 ++ .../scala/akka/stream/impl/StreamLayout.scala | 43 ++++------ .../stream/impl/fusing/ActorInterpreter.scala | 5 +- .../scala/akka/stream/impl/fusing/Ops.scala | 6 +- .../main/scala/akka/stream/javadsl/Flow.scala | 20 +++++ .../scala/akka/stream/javadsl/Source.scala | 9 +++ .../scala/akka/stream/scaladsl/Flow.scala | 23 +++++- .../scala/akka/stream/scaladsl/Sink.scala | 80 ++----------------- 18 files changed, 204 insertions(+), 209 deletions(-) diff --git a/akka-docs-dev/rst/stages-overview.rst b/akka-docs-dev/rst/stages-overview.rst index 7a319fc2f9..312d66c906 100644 --- a/akka-docs-dev/rst/stages-overview.rst +++ b/akka-docs-dev/rst/stages-overview.rst @@ -35,6 +35,7 @@ filter the given predicate returns true for the element collect the provided partial function is defined for the element the partial function is defined for the element and downstream backpressures upstream completes grouped the specified number of elements has been accumulated or upstream completed a group has been assembled and downstream backpressures upstream completes scan the function scanning the element returns a new element downstream backpressures upstream completes +fold upstream completes downstream backpressures upstream completes drop the specified number of elements has been dropped already the specified number of elements has been dropped and downstream backpressures upstream completes take the specified number of elements to take has not yet been reached downstream backpressures the defined number of elements has been taken or upstream completes takeWhile the predicate is true and until the first false result downstream backpressures predicate returned false or upstream completes diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/HeadSinkSubscriberTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/HeadSinkSubscriberTest.scala index 8e83821c28..d56527cfb3 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/HeadSinkSubscriberTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/HeadSinkSubscriberTest.scala @@ -12,8 +12,7 @@ import scala.concurrent.Promise class HeadSinkSubscriberTest extends AkkaSubscriberBlackboxVerification[Int] { import HeadSink._ - override def createSubscriber(): Subscriber[Int] = - new HeadSinkSubscriber[Int](Promise[Int]()) + override def createSubscriber(): Subscriber[Int] = new HeadSinkSubscriber[Int] override def createElement(element: Int): Int = element } diff --git a/akka-stream-tests/src/test/scala/akka/stream/ActorMaterializerSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/ActorMaterializerSpec.scala index 7d61333187..b006971a77 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/ActorMaterializerSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/ActorMaterializerSpec.scala @@ -22,13 +22,15 @@ class ActorMaterializerSpec extends AkkaSpec with ImplicitSender { } "properly shut down actors associated with it" in { + pending // FIXME disabled due to https://github.com/akka/akka/issues/17849 + val m = ActorMaterializer.create(system) val f = Source.lazyEmpty[Int].runFold(0)(_ + _)(m) + m.shutdown() - an[AbruptTerminationException] should be thrownBy - Await.result(f, 3.seconds) + an[AbruptTerminationException] should be thrownBy Await.result(f, 3.seconds) } "refuse materialization after shutdown" in { @@ -65,7 +67,6 @@ class ActorMaterializerSpec extends AkkaSpec with ImplicitSender { sys.awaitTermination() m.isShutdown should ===(true) } - } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSupervisionSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSupervisionSpec.scala index 2ce04baa81..2438588ad7 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSupervisionSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSupervisionSpec.scala @@ -324,7 +324,7 @@ class InterpreterSupervisionSpec extends InterpreterSpecKit { val pf: PartialFunction[Int, Int] = { case x: Int ⇒ if (x == 0) throw TE else x } new TestSetup(Seq( - Collect(restartingDecider)(pf))) { + Collect(pf, restartingDecider))) { downstream.requestOne() lastEvents() should be(Set(RequestOne)) upstream.onNext(2) diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala index 2eb65db124..cca24bb8d5 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala @@ -346,17 +346,17 @@ class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround- val writeButIgnoreRead: Flow[ByteString, ByteString, Unit] = Flow.wrap(Sink.ignore, Source.single(ByteString("Early response")))(Keep.right) - val binding = Tcp().bind(serverAddress.getHostName, serverAddress.getPort, halfClose = false).toMat(Sink.foreach { conn ⇒ - conn.flow.join(writeButIgnoreRead).run() - })(Keep.left).run() + val binding = Tcp() + .bind(serverAddress.getHostName, serverAddress.getPort, halfClose = false) + .toMat(Sink.foreach(_.flow.join(writeButIgnoreRead).run()))(Keep.left).run() - val result = Source(() ⇒ Iterator.continually(ByteString("client data"))) + val result = Source.repeat(ByteString("client data")) .via(Tcp().outgoingConnection(serverAddress.getHostName, serverAddress.getPort)) .runFold(ByteString.empty)(_ ++ _) - Await.result(result, 3.seconds) should ===(ByteString("Early response")) - - binding.map(_.unbind()) + val r: ByteString = Await.result(result, 3.seconds) + r should ===(ByteString("Early response")) + binding.foreach(_.unbind()) } "Echo should work even if server is in full close mode" in { @@ -374,7 +374,7 @@ class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround- Await.result(result, 3.seconds) should ===(10000) - binding.map(_.unbind()) + binding.foreach(_.unbind()) } "handle when connection actor terminates unexpectedly" in { @@ -510,7 +510,7 @@ class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround- val folder = Source(immutable.Iterable.fill(1000)(ByteString(0))) .via(Tcp().outgoingConnection(address)) - .toMat(Sink.fold(0)(_ + _.size))(Keep.right) + .fold(0)(_ + _.size).toMat(Sink.head)(Keep.right) val total = folder.run() val rejected = folder.run() @@ -521,7 +521,6 @@ class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround- Await.result(rejected, 5.seconds) should ===(1000) } } - } def validateServerClientCommunication(testData: ByteString, diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldSpec.scala index 2768247760..c20718261f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldSpec.scala @@ -15,23 +15,42 @@ class FlowFoldSpec extends AkkaSpec { implicit val mat = ActorMaterializer() "A Fold" must { + val input = 1 to 100 + val expected = input.fold(0)(_ + _) + val inputSource = Source(input).filter(_ ⇒ true).map(identity) + val foldSource = inputSource.fold[Int](0)(_ + _).filter(_ ⇒ true).map(identity) + val foldFlow = Flow[Int].filter(_ ⇒ true).map(identity).fold(0)(_ + _).filter(_ ⇒ true).map(identity) + val foldSink = Sink.fold[Int, Int](0)(_ + _) - "fold" in assertAllStagesStopped { - val input = 1 to 100 - val future = Source(input).runFold(0)(_ + _) - val expected = input.fold(0)(_ + _) - Await.result(future, 3.seconds) should be(expected) + "work when using Source.runFold" in assertAllStagesStopped { + Await.result(inputSource.runFold(0)(_ + _), 3.seconds) should be(expected) + } + + "work when using Source.fold" in assertAllStagesStopped { + Await.result(foldSource runWith Sink.head, 3.seconds) should be(expected) + } + + "work when using Sink.fold" in assertAllStagesStopped { + Await.result(inputSource runWith foldSink, 3.seconds) should be(expected) + } + + "work when using Flow.fold" in assertAllStagesStopped { + Await.result(inputSource via foldFlow runWith Sink.head, 3.seconds) should be(expected) + } + + "work when using Source.fold + Flow.fold + Sink.fold" in assertAllStagesStopped { + Await.result(foldSource via foldFlow runWith foldSink, 3.seconds) should be(expected) } "propagate an error" in assertAllStagesStopped { val error = new Exception with NoStackTrace - val future = Source[Unit](() ⇒ throw error).runFold(())(Keep.none) + val future = inputSource.map(x ⇒ if (x > 50) throw error else x).runFold(())(Keep.none) the[Exception] thrownBy Await.result(future, 3.seconds) should be(error) } - "complete future with failure when function throws" in assertAllStagesStopped { + "complete future with failure when folding function throws" in assertAllStagesStopped { val error = new Exception with NoStackTrace - val future = Source.single(1).runFold(0)((_, _) ⇒ throw error) + val future = inputSource.runFold(0)((x, y) ⇒ if (x > 50) throw error else x + y) the[Exception] thrownBy Await.result(future, 3.seconds) should be(error) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index 7fb4825dd2..7af228309b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -18,6 +18,7 @@ import akka.stream.scaladsl._ import akka.stream._ import akka.stream.io._ import akka.stream.io.SslTls.TlsModule +import akka.stream.stage.Stage import akka.util.ByteString import org.reactivestreams._ @@ -80,8 +81,8 @@ private[akka] case class ActorMaterializerImpl( override protected def materializeAtomic(atomic: Module, effectiveAttributes: Attributes): Any = { - def newMaterializationContext() = new MaterializationContext(ActorMaterializerImpl.this, - effectiveAttributes, stageName(effectiveAttributes)) + def newMaterializationContext() = + new MaterializationContext(ActorMaterializerImpl.this, effectiveAttributes, stageName(effectiveAttributes)) atomic match { case sink: SinkModule[_, _] ⇒ val (sub, mat) = sink.create(newMaterializationContext()) @@ -98,9 +99,10 @@ private[akka] case class ActorMaterializerImpl( assignPort(stage.outPort, processor) mat - case tls: TlsModule ⇒ + case tls: TlsModule ⇒ // TODO solve this so TlsModule doesn't need special treatment here val es = effectiveSettings(effectiveAttributes) - val props = SslTlsCipherActor.props(es, tls.sslContext, tls.firstSession, tracing = false, tls.role, tls.closing) + val props = + SslTlsCipherActor.props(es, tls.sslContext, tls.firstSession, tracing = false, tls.role, tls.closing) val impl = actorOf(props, stageName(effectiveAttributes), es.dispatcher) def factory(id: Int) = new ActorPublisher[Any](impl) { override val wakeUpMsg = FanOut.SubstreamSubscribePending(id) @@ -114,7 +116,8 @@ private[akka] case class ActorMaterializerImpl( assignPort(tls.plainIn, FanIn.SubInput[Any](impl, SslTlsCipherActor.UserIn)) assignPort(tls.cipherIn, FanIn.SubInput[Any](impl, SslTlsCipherActor.TransportIn)) - case junction: JunctionModule ⇒ materializeJunction(junction, effectiveAttributes, effectiveSettings(effectiveAttributes)) + case junction: JunctionModule ⇒ + materializeJunction(junction, effectiveAttributes, effectiveSettings(effectiveAttributes)) } } @@ -125,10 +128,8 @@ private[akka] case class ActorMaterializerImpl( case Identity(attr) ⇒ (new VirtualProcessor, ()) case _ ⇒ val (opprops, mat) = ActorProcessorFactory.props(ActorMaterializerImpl.this, op, effectiveAttributes) - val processor = ActorProcessorFactory[Any, Any](actorOf( - opprops, - stageName(effectiveAttributes), - effectiveSettings.dispatcher)) + val processor = ActorProcessorFactory[Any, Any]( + actorOf(opprops, stageName(effectiveAttributes), effectiveSettings.dispatcher)) processor -> mat } @@ -183,18 +184,15 @@ private[akka] case class ActorMaterializerImpl( } val impl = actorOf(props, stageName(effectiveAttributes), effectiveSettings.dispatcher) val size = outs.size - def factory(id: Int) = new ActorPublisher[Any](impl) { - override val wakeUpMsg = FanOut.SubstreamSubscribePending(id) - } + def factory(id: Int) = + new ActorPublisher[Any](impl) { override val wakeUpMsg = FanOut.SubstreamSubscribePending(id) } val publishers = if (outs.size < 8) Vector.tabulate(size)(factory) else List.tabulate(size)(factory) + impl ! FanOut.ExposedPublishers(publishers) - - publishers.zip(outs).foreach { case (pub, out) ⇒ assignPort(out, pub) } - val subscriber = ActorSubscriber[Any](impl) - assignPort(in, subscriber) - + publishers.iterator.zip(outs.iterator).foreach { case (pub, out) ⇒ assignPort(out, pub) } + assignPort(in, ActorSubscriber[Any](impl)) } } @@ -257,7 +255,8 @@ private[akka] object StreamSupervisor { def props(settings: ActorMaterializerSettings, haveShutDown: AtomicBoolean): Props = Props(new StreamSupervisor(settings, haveShutDown)).withDeploy(Deploy.local) - final case class Materialize(props: Props, name: String) extends DeadLetterSuppression with NoSerializationVerificationNeeded + final case class Materialize(props: Props, name: String) + extends DeadLetterSuppression with NoSerializationVerificationNeeded /** Testing purpose */ final case object GetChildren @@ -294,41 +293,41 @@ private[akka] object ActorProcessorFactory { import akka.stream.impl.Stages._ import ActorMaterializerImpl._ - private val _identity = (x: Any) ⇒ x - def props(materializer: ActorMaterializer, op: StageModule, parentAttributes: Attributes): (Props, Any) = { val att = parentAttributes and op.attributes // USE THIS TO AVOID CLOSING OVER THE MATERIALIZER BELOW // Also, otherwise the attributes will not affect the settings properly! val settings = materializer.effectiveSettings(att) + def interp(s: Stage[_, _]): (Props, Unit) = (ActorInterpreter.props(settings, List(s), materializer, att), ()) op match { case Identity(_) ⇒ throw new AssertionError("Identity cannot end up in ActorProcessorFactory") case Fused(ops, _) ⇒ (ActorInterpreter.props(settings, ops, materializer, att), ()) - case Map(f, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Map(f, settings.supervisionDecider)), materializer, att), ()) - case Filter(p, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Filter(p, settings.supervisionDecider)), materializer, att), ()) - case TakeWhile(p, _) ⇒ (ActorInterpreter.props(settings, List(fusing.TakeWhile(p, settings.supervisionDecider)), materializer, att), ()) - case DropWhile(p, _) ⇒ (ActorInterpreter.props(settings, List(fusing.DropWhile(p, settings.supervisionDecider)), materializer, att), ()) - case Drop(n, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Drop(n)), materializer, att), ()) - case Take(n, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Take(n)), materializer, att), ()) - case Collect(pf, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Collect(settings.supervisionDecider)(pf)), materializer, att), ()) - case Scan(z, f, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Scan(z, f, settings.supervisionDecider)), materializer, att), ()) - case Expand(s, f, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Expand(s, f)), materializer, att), ()) - case Conflate(s, f, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Conflate(s, f, settings.supervisionDecider)), materializer, att), ()) - case Buffer(n, s, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Buffer(n, s)), materializer, att), ()) - case MapConcat(f, _) ⇒ (ActorInterpreter.props(settings, List(fusing.MapConcat(f, settings.supervisionDecider)), materializer, att), ()) - case MapAsync(p, f, _) ⇒ (ActorInterpreter.props(settings, List(fusing.MapAsync(p, f, settings.supervisionDecider)), materializer, att), ()) - case MapAsyncUnordered(p, f, _) ⇒ (ActorInterpreter.props(settings, List(fusing.MapAsyncUnordered(p, f, settings.supervisionDecider)), materializer, att), ()) - case Grouped(n, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Grouped(n)), materializer, att), ()) - case Log(n, e, l, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Log(n, e, l)), materializer, att), ()) + case Map(f, _) ⇒ interp(fusing.Map(f, settings.supervisionDecider)) + case Filter(p, _) ⇒ interp(fusing.Filter(p, settings.supervisionDecider)) + case Drop(n, _) ⇒ interp(fusing.Drop(n)) + case Take(n, _) ⇒ interp(fusing.Take(n)) + case TakeWhile(p, _) ⇒ interp(fusing.TakeWhile(p, settings.supervisionDecider)) + case DropWhile(p, _) ⇒ interp(fusing.DropWhile(p, settings.supervisionDecider)) + case Collect(pf, _) ⇒ interp(fusing.Collect(pf, settings.supervisionDecider)) + case Scan(z, f, _) ⇒ interp(fusing.Scan(z, f, settings.supervisionDecider)) + case Fold(z, f, _) ⇒ interp(fusing.Fold(z, f, settings.supervisionDecider)) + case Expand(s, f, _) ⇒ interp(fusing.Expand(s, f)) + case Conflate(s, f, _) ⇒ interp(fusing.Conflate(s, f, settings.supervisionDecider)) + case Buffer(n, s, _) ⇒ interp(fusing.Buffer(n, s)) + case MapConcat(f, _) ⇒ interp(fusing.MapConcat(f, settings.supervisionDecider)) + case MapAsync(p, f, _) ⇒ interp(fusing.MapAsync(p, f, settings.supervisionDecider)) + case MapAsyncUnordered(p, f, _) ⇒ interp(fusing.MapAsyncUnordered(p, f, settings.supervisionDecider)) + case Grouped(n, _) ⇒ interp(fusing.Grouped(n)) + case Log(n, e, l, _) ⇒ interp(fusing.Log(n, e, l)) case GroupBy(f, _) ⇒ (GroupByProcessorImpl.props(settings, f), ()) case PrefixAndTail(n, _) ⇒ (PrefixAndTailImpl.props(settings, n), ()) case Split(d, _) ⇒ (SplitWhereProcessorImpl.props(settings, d), ()) case ConcatAll(_) ⇒ (ConcatAllImpl.props(materializer), ()) - case StageFactory(mkStage, _) ⇒ (ActorInterpreter.props(settings, List(mkStage()), materializer, att), ()) + case StageFactory(mkStage, _) ⇒ interp(mkStage()) case TimerTransform(mkStage, _) ⇒ (TimerTransformerProcessorsImpl.props(settings, mkStage()), ()) case MaterializingStageFactory(mkStageAndMat, _) ⇒ - val sm = mkStageAndMat() - (ActorInterpreter.props(settings, List(sm._1), materializer, att), sm._2) + val s_m = mkStageAndMat() + (ActorInterpreter.props(settings, List(s_m._1), materializer, att), s_m._2) case DirectProcessor(p, m) ⇒ throw new AssertionError("DirectProcessor cannot end up in ActorProcessorFactory") } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/BlackholeSubscriber.scala b/akka-stream/src/main/scala/akka/stream/impl/BlackholeSubscriber.scala index 8eed98db97..5ed8595903 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/BlackholeSubscriber.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/BlackholeSubscriber.scala @@ -13,24 +13,27 @@ import org.reactivestreams.{ Subscriber, Subscription } private[akka] class BlackholeSubscriber[T](highWatermark: Int, onComplete: Promise[Unit]) extends Subscriber[T] { - private val lowWatermark = Math.max(1, highWatermark / 2) + private val lowWatermark: Int = Math.max(1, highWatermark / 2) private var requested = 0L - - private val subscription: AtomicReference[Subscription] = new AtomicReference(null) + private var subscription: Subscription = null override def onSubscribe(sub: Subscription): Unit = { ReactiveStreamsCompliance.requireNonNullSubscription(sub) - if (subscription.compareAndSet(null, sub)) requestMore() - else sub.cancel() + if (subscription ne null) sub.cancel() + else { + subscription = sub + requestMore() + } } override def onError(cause: Throwable): Unit = { ReactiveStreamsCompliance.requireNonNullException(cause) onComplete.tryFailure(cause) - () } - override def onComplete(): Unit = onComplete.trySuccess(()) + override def onComplete(): Unit = { + onComplete.trySuccess(()) + } override def onNext(element: T): Unit = { ReactiveStreamsCompliance.requireNonNullElement(element) @@ -41,8 +44,7 @@ private[akka] class BlackholeSubscriber[T](highWatermark: Int, onComplete: Promi protected def requestMore(): Unit = if (requested < lowWatermark) { val amount = highWatermark - requested - subscription.get().request(amount) requested += amount + subscription.request(amount) } - } diff --git a/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala b/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala index 87ab5d6f59..22555807aa 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala @@ -79,6 +79,13 @@ private[akka] case object CancelledSubscription extends Subscription { override def cancel(): Unit = () } +private[akka] final class CancellingSubscriber[T] extends Subscriber[T] { + override def onError(t: Throwable): Unit = () + override def onSubscribe(s: Subscription): Unit = s.cancel() + override def onComplete(): Unit = () + override def onNext(t: T): Unit = () +} + /** * INTERNAL API */ diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index c78b0363b5..924ce74e4b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -98,26 +98,33 @@ private[akka] final class FanoutPublisherSink[In]( * INTERNAL API */ private[akka] object HeadSink { - class HeadSinkSubscriber[In](p: Promise[In]) extends Subscriber[In] { - private val sub = new AtomicReference[Subscription] + final class HeadSinkSubscriber[In] extends Subscriber[In] { + private[this] var subscription: Subscription = null + private[this] val promise: Promise[In] = Promise[In]() + def future: Future[In] = promise.future override def onSubscribe(s: Subscription): Unit = { ReactiveStreamsCompliance.requireNonNullSubscription(s) - if (!sub.compareAndSet(null, s)) s.cancel() - else s.request(1) + if (subscription ne null) s.cancel() + else { + subscription = s + s.request(1) + } } override def onNext(elem: In): Unit = { ReactiveStreamsCompliance.requireNonNullElement(elem) - p.trySuccess(elem) - sub.get.cancel() + promise.trySuccess(elem) + subscription.cancel() + subscription = null } override def onError(t: Throwable): Unit = { ReactiveStreamsCompliance.requireNonNullException(t) - p.tryFailure(t) + promise.tryFailure(t) } - override def onComplete(): Unit = p.tryFailure(new NoSuchElementException("empty stream")) + override def onComplete(): Unit = + promise.tryFailure(new NoSuchElementException("empty stream")) } } @@ -130,17 +137,13 @@ private[akka] object HeadSink { * the Future into the corresponding failed state) or the end-of-stream * (failing the Future with a NoSuchElementException). */ -private[akka] class HeadSink[In](val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, Future[In]](shape) { - +private[akka] final class HeadSink[In](val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, Future[In]](shape) { override def create(context: MaterializationContext) = { - val p = Promise[In]() - val sub = new HeadSink.HeadSinkSubscriber[In](p) - (sub, p.future) + val sub = new HeadSink.HeadSinkSubscriber[In] + (sub, sub.future) } - override protected def newInstance(shape: SinkShape[In]): SinkModule[In, Future[In]] = new HeadSink[In](attributes, shape) override def withAttributes(attr: Attributes): Module = new HeadSink[In](attr, amendShape(attr)) - override def toString: String = "HeadSink" } @@ -159,6 +162,7 @@ private[akka] final class BlackholeSink(val attributes: Attributes, shape: SinkS override protected def newInstance(shape: SinkShape[Any]): SinkModule[Any, Future[Unit]] = new BlackholeSink(attributes, shape) override def withAttributes(attr: Attributes): Module = new BlackholeSink(attr, amendShape(attr)) + override def toString: String = "BlackholeSink" } /** @@ -171,6 +175,7 @@ private[akka] final class SubscriberSink[In](subscriber: Subscriber[In], val att override protected def newInstance(shape: SinkShape[In]): SinkModule[In, Unit] = new SubscriberSink[In](subscriber, attributes, shape) override def withAttributes(attr: Attributes): Module = new SubscriberSink[In](subscriber, attr, amendShape(attr)) + override def toString: String = "SubscriberSink" } /** @@ -178,19 +183,10 @@ private[akka] final class SubscriberSink[In](subscriber: Subscriber[In], val att * A sink that immediately cancels its upstream upon materialization. */ private[akka] final class CancelSink(val attributes: Attributes, shape: SinkShape[Any]) extends SinkModule[Any, Unit](shape) { - - override def create(context: MaterializationContext): (Subscriber[Any], Unit) = { - val subscriber = new Subscriber[Any] { - override def onError(t: Throwable): Unit = () - override def onSubscribe(s: Subscription): Unit = s.cancel() - override def onComplete(): Unit = () - override def onNext(t: Any): Unit = () - } - (subscriber, ()) - } - + override def create(context: MaterializationContext): (Subscriber[Any], Unit) = (new CancellingSubscriber[Any], ()) override protected def newInstance(shape: SinkShape[Any]): SinkModule[Any, Unit] = new CancelSink(attributes, shape) override def withAttributes(attr: Attributes): Module = new CancelSink(attr, amendShape(attr)) + override def toString: String = "CancelSink" } /** @@ -207,6 +203,7 @@ private[akka] final class ActorSubscriberSink[In](props: Props, val attributes: override protected def newInstance(shape: SinkShape[In]): SinkModule[In, ActorRef] = new ActorSubscriberSink[In](props, attributes, shape) override def withAttributes(attr: Attributes): Module = new ActorSubscriberSink[In](props, attr, amendShape(attr)) + override def toString: String = "ActorSubscriberSink" } /** @@ -228,5 +225,6 @@ private[akka] final class ActorRefSink[In](ref: ActorRef, onCompleteMessage: Any new ActorRefSink[In](ref, onCompleteMessage, attributes, shape) override def withAttributes(attr: Attributes): Module = new ActorRefSink[In](ref, onCompleteMessage, attr, amendShape(attr)) + override def toString: String = "ActorRefSink" } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index d0373c475f..611b88cd4e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -35,6 +35,7 @@ private[stream] object Stages { val takeWhile = name("takeWhile") val dropWhile = name("dropWhile") val scan = name("scan") + val fold = name("fold") val buffer = name("buffer") val conflate = name("conflate") val expand = name("expand") @@ -192,6 +193,11 @@ private[stream] object Stages { override protected def newInstance: StageModule = this.copy() } + final case class Fold(zero: Any, f: (Any, Any) ⇒ Any, attributes: Attributes = fold) extends StageModule { + def withAttributes(attributes: Attributes) = copy(attributes = attributes) + override protected def newInstance: StageModule = this.copy() + } + final case class Buffer(size: Int, overflowStrategy: OverflowStrategy, attributes: Attributes = buffer) extends StageModule { require(size > 0, s"Buffer size must be larger than zero but was [$size]") diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala index e89f8688e3..e20f1d753b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala @@ -596,35 +596,11 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo // When we exit the scope of a copied module, pick up the Subscribers/Publishers belonging to exposed ports of // the original module and assign them to the copy ports in the outer scope that we will return to enclosing.copyOf.shape.inlets.iterator.zip(enclosing.shape.inlets.iterator).foreach { - case (original, exposed) ⇒ - assignPort(exposed, scopeSubscribers(original)) + case (original, exposed) ⇒ assignPort(exposed, scopeSubscribers(original)) } enclosing.copyOf.shape.outlets.iterator.zip(enclosing.shape.outlets.iterator).foreach { - case (original, exposed) ⇒ - assignPort(exposed, scopePublishers(original)) - } - - } - - // Cancels all intermediate Publishers and fails all intermediate Subscribers. - // (This is an attempt to clean up after an exception during materialization) - private def panic(cause: Throwable): Unit = { - val panicError = new MaterializationPanic(cause) - for (subMap ← subscribersStack; sub ← subMap.valuesIterator) { - sub.onSubscribe(new Subscription { - override def cancel(): Unit = () - override def request(n: Long): Unit = sub.onError(panicError) - }) - } - - for (pubMap ← publishersStack; pub ← pubMap.valuesIterator) { - pub.subscribe(new Subscriber[Any] { - override def onSubscribe(s: Subscription): Unit = s.cancel() - override def onComplete(): Unit = () - override def onError(t: Throwable): Unit = () - override def onNext(t: Any): Unit = () - }) + case (original, exposed) ⇒ assignPort(exposed, scopePublishers(original)) } } @@ -635,9 +611,18 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo s"The top level module cannot be materialized because it has unconnected ports: ${(topLevel.inPorts ++ topLevel.outPorts).mkString(", ")}") try materializeModule(topLevel, topLevel.attributes) catch { - case NonFatal(e) ⇒ - panic(e) - throw e + case NonFatal(cause) ⇒ + // PANIC!!! THE END OF THE MATERIALIZATION IS NEAR! + // Cancels all intermediate Publishers and fails all intermediate Subscribers. + // (This is an attempt to clean up after an exception during materialization) + val errorPublisher = new ErrorPublisher(new MaterializationPanic(cause), "") + for (subMap ← subscribersStack; sub ← subMap.valuesIterator) + errorPublisher.subscribe(sub) + + for (pubMap ← publishersStack; pub ← pubMap.valuesIterator) + pub.subscribe(new CancellingSubscriber) + + throw cause } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala index 760ca0a0d2..8481c9d1fc 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala @@ -374,11 +374,12 @@ private[akka] class ActorInterpreter(val settings: ActorMaterializerSettings, va override def postStop(): Unit = { // This should handle termination while interpreter is running. If the upstream have been closed already this // call has no effect and therefore do the right thing: nothing. - try upstream.onInternalError(AbruptTerminationException(self)) + val ex = AbruptTerminationException(self) + try upstream.onInternalError(ex) // Will only have an effect if the above call to the interpreter failed to emit a proper failure to the downstream // otherwise this will have no effect finally { - downstream.fail(AbruptTerminationException(self)) + downstream.fail(ex) upstream.cancel() } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index eea818dd1d..bd8b6be08e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -61,7 +61,7 @@ private[akka] final case class DropWhile[T](p: T ⇒ Boolean, decider: Supervisi taking = true ctx.push(elem) } else { - ctx.pull + ctx.pull() } override def decide(t: Throwable): Supervision.Directive = decider(t) @@ -74,7 +74,7 @@ private[akka] final object Collect { final val NotApplied: Any ⇒ Any = _ ⇒ Collect.NotApplied } -private[akka] final case class Collect[In, Out](decider: Supervision.Decider)(pf: PartialFunction[In, Out]) extends PushStage[In, Out] { +private[akka] final case class Collect[In, Out](pf: PartialFunction[In, Out], decider: Supervision.Decider) extends PushStage[In, Out] { import Collect.NotApplied @@ -183,7 +183,7 @@ private[akka] final case class Scan[In, Out](zero: Out, f: (Out, In) ⇒ Out, de * INTERNAL API */ private[akka] final case class Fold[In, Out](zero: Out, f: (Out, In) ⇒ Out, decider: Supervision.Decider) extends PushPullStage[In, Out] { - private var aggregator = zero + private[this] var aggregator: Out = zero override def onPush(elem: In, ctx: Context[Out]): SyncDirective = { aggregator = f(aggregator, elem) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 9b7fc45b00..645f1ae89a 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -331,6 +331,26 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph def scan[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Flow[In, T, Mat] = new Flow(delegate.scan(zero)(f.apply)) + /** + * Similar to `scan` but only emits its result when the upstream completes, + * after which it also completes. Applies the given function `f` towards its current and next value, + * yielding the next current value. + * + * If the function `f` throws an exception and the supervision decision is + * [[akka.stream.Supervision#restart]] current value starts at `zero` again + * the stream will continue. + * + * '''Emits when''' upstream completes + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def fold[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Flow[In, T, Mat] = + new Flow(delegate.fold(zero)(f.apply)) + /** * Chunk up this stream into groups of elements received within a time window, * or limited by the given number of elements, whatever happens first. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 68ea778c01..fd0e3153fe 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -389,6 +389,15 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour def scan[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Source[T, Mat] = new Source(delegate.scan(zero)(f.apply)) + /** + * Similar to `scan` but only emits the current value once, when completing. + * Its current value which starts at `zero` and then + * applies the current and next value to the given function `f`, + * yielding the next current value. + */ + def fold[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Source[T, Mat] = + new Source(delegate.fold(zero)(f.apply)) + /** * Chunk up this stream into groups of elements received within a time window, * or limited by the given number of elements, whatever happens first. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index fa558ffe3a..dac2cc548b 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -96,9 +96,7 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) * value of the current flow (ignoring the given Sink’s value), use * [[Flow#toMat[Mat2* toMat]] if a different strategy is needed. */ - def to[Mat2](sink: Graph[SinkShape[Out], Mat2]): Sink[In, Mat] = { - toMat(sink)(Keep.left) - } + def to[Mat2](sink: Graph[SinkShape[Out], Mat2]): Sink[In, Mat] = toMat(sink)(Keep.left) /** * Connect this [[Flow]] to a [[Sink]], concatenating the processing steps of both. @@ -541,6 +539,25 @@ trait FlowOps[+Out, +Mat] { */ def scan[T](zero: T)(f: (T, Out) ⇒ T): Repr[T, Mat] = andThen(Scan(zero, f.asInstanceOf[(Any, Any) ⇒ Any])) + /** + * Similar to `scan` but only emits its result when the upstream completes, + * after which it also completes. Applies the given function towards its current and next value, + * yielding the next current value. + * + * If the function `f` throws an exception and the supervision decision is + * [[akka.stream.Supervision.Restart]] current value starts at `zero` again + * the stream will continue. + * + * '''Emits when''' upstream completes + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def fold[T](zero: T)(f: (T, Out) ⇒ T): Repr[T, Mat] = andThen(Fold(zero, f.asInstanceOf[(Any, Any) ⇒ Any])) + /** * Chunk up this stream into groups of elements received within a time window, * or limited by the given number of elements, whatever happens first. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index bd6957bac7..6a64662cf3 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -9,12 +9,8 @@ import akka.stream._ import akka.stream.impl.Stages.{ MapAsyncUnordered, DefaultAttributes } import akka.stream.impl.StreamLayout.Module import akka.stream.impl._ -import akka.stream.stage.Context -import akka.stream.stage.PushStage -import akka.stream.stage.SyncDirective -import akka.stream.{ SinkShape, Inlet, Outlet, Graph, Attributes } import akka.stream.Attributes._ -import akka.stream.stage.{ TerminationDirective, Directive, Context, PushStage } +import akka.stream.stage.{ TerminationDirective, Directive, Context, PushStage, SyncDirective } import org.reactivestreams.{ Publisher, Subscriber } import scala.concurrent.{ ExecutionContext, Future, Promise } @@ -107,37 +103,8 @@ object Sink extends SinkApply { * normal end of the stream, or completed with `Failure` if there is a failure signaled in * the stream.. */ - def foreach[T](f: T ⇒ Unit): Sink[T, Future[Unit]] = { - - def newForeachStage(): (PushStage[T, Unit], Future[Unit]) = { - val promise = Promise[Unit]() - - val stage = new PushStage[T, Unit] { - override def onPush(elem: T, ctx: Context[Unit]): SyncDirective = { - f(elem) - ctx.pull() - } - override def onUpstreamFailure(cause: Throwable, ctx: Context[Unit]): TerminationDirective = { - promise.failure(cause) - ctx.fail(cause) - } - override def onUpstreamFinish(ctx: Context[Unit]): TerminationDirective = { - promise.success(()) - ctx.finish() - } - - override def decide(cause: Throwable): Supervision.Directive = { - // supervision will be implemented by #16916 - promise.tryFailure(cause) - super.decide(cause) - } - } - - (stage, promise.future) - } - - Flow[T].transformMaterializing(newForeachStage).to(Sink.ignore).named("foreachSink") - } + def foreach[T](f: T ⇒ Unit): Sink[T, Future[Unit]] = + Flow[T].map(f).toMat(Sink.ignore)(Keep.right).named("foreachSink") /** * A `Sink` that will invoke the given function to each of the elements @@ -153,9 +120,7 @@ object Sink extends SinkApply { * @see [[#mapAsyncUnordered]] */ def foreachParallel[T](parallelism: Int)(f: T ⇒ Unit)(implicit ec: ExecutionContext): Sink[T, Future[Unit]] = - Flow[T].andThen( - MapAsyncUnordered(parallelism, - { out: T ⇒ Future(f(out)) }.asInstanceOf[Any ⇒ Future[Unit]])).toMat(Sink.ignore)(Keep.right) + Flow[T].mapAsyncUnordered(parallelism)(t ⇒ Future(f(t))).toMat(Sink.ignore)(Keep.right) /** * A `Sink` that will invoke the given function for every received element, giving it its previous @@ -164,41 +129,8 @@ object Sink extends SinkApply { * function evaluation when the input stream ends, or completed with `Failure` * if there is a failure signaled in the stream. */ - def fold[U, T](zero: U)(f: (U, T) ⇒ U): Sink[T, Future[U]] = { - - def newFoldStage(): (PushStage[T, U], Future[U]) = { - val promise = Promise[U]() - - val stage = new PushStage[T, U] { - private var aggregator = zero - - override def onPush(elem: T, ctx: Context[U]): SyncDirective = { - aggregator = f(aggregator, elem) - ctx.pull() - } - - override def onUpstreamFailure(cause: Throwable, ctx: Context[U]): TerminationDirective = { - promise.failure(cause) - ctx.fail(cause) - } - - override def onUpstreamFinish(ctx: Context[U]): TerminationDirective = { - promise.success(aggregator) - ctx.finish() - } - - override def decide(cause: Throwable): Supervision.Directive = { - // supervision will be implemented by #16916 - promise.tryFailure(cause) - super.decide(cause) - } - } - - (stage, promise.future) - } - - Flow[T].transformMaterializing(newFoldStage).to(Sink.ignore).named("foldSink") - } + def fold[U, T](zero: U)(f: (U, T) ⇒ U): Sink[T, Future[U]] = + Flow[T].fold(zero)(f).toMat(Sink.head)(Keep.right).named("foldSink") /** * A `Sink` that when the flow is completed, either through a failure or normal