From 0d1861cd2fba194ea33bb4ccf7982f4d24dd9465 Mon Sep 17 00:00:00 2001 From: Alexander Golubev Date: Tue, 8 Sep 2015 23:57:17 -0400 Subject: [PATCH] +str #18072 fix concat with empty source --- .../scala/akka/stream/scaladsl/FlowSpec.scala | 77 +++++++++++++++---- .../main/scala/akka/stream/impl/FanIn.scala | 22 ++++-- 2 files changed, 81 insertions(+), 18 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala index 17c5eac49a..8040714ea1 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala @@ -3,24 +3,22 @@ */ package akka.stream.scaladsl -import java.util.concurrent.atomic.AtomicLong -import akka.dispatch.Dispatchers +import akka.actor._ import akka.stream.Supervision._ -import akka.stream.impl.Stages.StageModule +import akka.stream.impl._ +import akka.stream.impl.fusing.ActorInterpreter import akka.stream.stage.Stage +import akka.stream.testkit.Utils._ +import akka.stream.testkit._ +import akka.stream.testkit.scaladsl.TestSink +import akka.stream.{ AbruptTerminationException, ActorMaterializer, ActorMaterializerSettings, Attributes } +import akka.testkit.TestEvent.{ Mute, UnMute } +import akka.testkit.{ EventFilter, TestDuration } +import com.typesafe.config.ConfigFactory +import org.reactivestreams.{ Publisher, Subscriber } import scala.collection.immutable import scala.concurrent.Await import scala.concurrent.duration._ -import akka.actor._ -import akka.stream.{ AbruptTerminationException, Attributes, ActorMaterializerSettings, ActorMaterializer } -import akka.stream.impl._ -import akka.stream.testkit._ -import akka.stream.testkit.Utils._ -import akka.testkit.{ TestDuration, EventFilter } -import akka.testkit.TestEvent.{ UnMute, Mute } -import com.typesafe.config.ConfigFactory -import org.reactivestreams.{ Subscription, Processor, Subscriber, Publisher } -import akka.stream.impl.fusing.ActorInterpreter import scala.util.control.NoStackTrace object FlowSpec { @@ -313,6 +311,59 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece subs.expectComplete() } + "be able to concat with empty source" in { + val probe = Source.single(1).concat(Source.empty) + .runWith(TestSink.probe[Int]) + probe.request(1) + probe.expectNext(1) + probe.expectComplete() + } + + "be able to concat empty source" in { + val probe = Source.empty.concat(Source.single(1)) + .runWith(TestSink.probe[Int]) + probe.request(1) + probe.expectNext(1) + probe.expectComplete() + } + + "be able to concat two empty sources" in { + val probe = Source.empty.concat(Source.empty) + .runWith(TestSink.probe[Int]) + probe.expectSubscription() + probe.expectComplete() + } + + "be able to concat source with error" in { + val probe = Source.single(1).concat(Source.failed(TestException)) + .runWith(TestSink.probe[Int]) + probe.expectSubscription() + probe.expectError(TestException) + } + + "subscribe at once to initial source and to one that it's concat to" in { + val publisher1 = TestPublisher.probe[Int]() + val publisher2 = TestPublisher.probe[Int]() + val probeSink = Source.apply(publisher1).concat(Source.apply(publisher2)) + .runWith(TestSink.probe[Int]) + + val sub1 = publisher1.expectSubscription() + val sub2 = publisher2.expectSubscription() + val subSink = probeSink.expectSubscription() + + sub1.sendNext(1) + subSink.request(1) + probeSink.expectNext(1) + sub1.sendComplete() + + sub2.sendNext(2) + subSink.request(1) + probeSink.expectNext(2) + sub2.sendComplete() + + probeSink.expectComplete() + } + "be possible to convert to a processor, and should be able to take a Processor" in { val identity1 = Flow[Int].toProcessor val identity2 = Flow(() ⇒ identity1.run()) diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala b/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala index 70feb471ec..54f0c6c4b9 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala @@ -56,6 +56,9 @@ private[akka] object FanIn { private var markedPending = 0 private var markedDepleted = 0 + private var receivedInput = false + private var completedCounter = 0 + private[this] final def hasState(index: Int, flag: Int): Boolean = (states(index) & flag) != 0 private[this] final def setState(index: Int, flag: Int, on: Boolean): Unit = @@ -65,7 +68,10 @@ private[akka] object FanIn { private[this] final def cancelled(index: Int, on: Boolean): Unit = setState(index, Cancelled, on) private[this] final def completed(index: Int): Boolean = hasState(index, Completed) - private[this] final def completed(index: Int, on: Boolean): Unit = setState(index, Completed, on) + private[this] final def registerCompleted(index: Int): Unit = { + completedCounter += 1 + setState(index, Completed, true) + } private[this] final def depleted(index: Int): Boolean = hasState(index, Depleted) private[this] final def depleted(index: Int, on: Boolean): Unit = setState(index, Depleted, on) @@ -111,6 +117,8 @@ private[akka] object FanIn { def onDepleted(input: Int): Unit = () + def onCompleteWhenNoInput(): Unit = () + def markInput(input: Int): Unit = { if (!marked(input)) { if (depleted(input)) markedDepleted += 1 @@ -151,6 +159,8 @@ private[akka] object FanIn { def isCancelled(input: Int): Boolean = cancelled(input) + def isAllCompleted(): Boolean = inputCount == completedCounter + def idToDequeue(): Int = { var id = preferredId while (!(marked(id) && pending(id))) { @@ -205,7 +215,7 @@ private[akka] object FanIn { } def inputsAvailableFor(id: Int) = new TransferState { - override def isCompleted: Boolean = depleted(id) || cancelled(id) + override def isCompleted: Boolean = depleted(id) || cancelled(id) || (!pending(id) && completed(id)) override def isReady: Boolean = pending(id) } @@ -221,6 +231,7 @@ private[akka] object FanIn { case OnNext(id, elem) ⇒ if (marked(id) && !pending(id)) markedPending += 1 pending(id, on = true) + receivedInput = true inputs(id).subreceive(ActorSubscriberMessage.OnNext(elem)) case OnComplete(id) ⇒ if (!pending(id)) { @@ -228,8 +239,9 @@ private[akka] object FanIn { depleted(id, on = true) onDepleted(id) } - completed(id, on = true) + registerCompleted(id) inputs(id).subreceive(ActorSubscriberMessage.OnComplete) + if (!receivedInput && isAllCompleted) onCompleteWhenNoInput() case OnError(id, e) ⇒ onError(id, e) }) @@ -247,6 +259,7 @@ private[akka] abstract class FanIn(val settings: ActorMaterializerSettings, val protected val primaryOutputs: Outputs = new SimpleOutputs(self, this) protected val inputBunch = new InputBunch(inputCount, settings.maxInputBufferSize, this) { override def onError(input: Int, e: Throwable): Unit = fail(e) + override def onCompleteWhenNoInput(): Unit = pumpFinished() } override def pumpFinished(): Unit = { @@ -350,9 +363,8 @@ private[akka] final class Concat(_settings: ActorMaterializerSettings) extends F if (!inputBunch.isDepleted(First)) { val elem = inputBunch.dequeue(First) primaryOutputs.enqueueOutputElement(elem) - } else { - nextPhase(drainSecond) } + if (inputBunch.isDepleted(First)) nextPhase(drainSecond) } def drainSecond = TransferPhase(inputBunch.inputsAvailableFor(Second) && primaryOutputs.NeedsDemand) { () ⇒