+str #18072 fix concat with empty source

This commit is contained in:
Alexander Golubev 2015-09-08 23:57:17 -04:00
parent f378e8266e
commit 0d1861cd2f
2 changed files with 81 additions and 18 deletions

View file

@ -3,24 +3,22 @@
*/
package akka.stream.scaladsl
import java.util.concurrent.atomic.AtomicLong
import akka.dispatch.Dispatchers
import akka.actor._
import akka.stream.Supervision._
import akka.stream.impl.Stages.StageModule
import akka.stream.impl._
import akka.stream.impl.fusing.ActorInterpreter
import akka.stream.stage.Stage
import akka.stream.testkit.Utils._
import akka.stream.testkit._
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.{ AbruptTerminationException, ActorMaterializer, ActorMaterializerSettings, Attributes }
import akka.testkit.TestEvent.{ Mute, UnMute }
import akka.testkit.{ EventFilter, TestDuration }
import com.typesafe.config.ConfigFactory
import org.reactivestreams.{ Publisher, Subscriber }
import scala.collection.immutable
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.actor._
import akka.stream.{ AbruptTerminationException, Attributes, ActorMaterializerSettings, ActorMaterializer }
import akka.stream.impl._
import akka.stream.testkit._
import akka.stream.testkit.Utils._
import akka.testkit.{ TestDuration, EventFilter }
import akka.testkit.TestEvent.{ UnMute, Mute }
import com.typesafe.config.ConfigFactory
import org.reactivestreams.{ Subscription, Processor, Subscriber, Publisher }
import akka.stream.impl.fusing.ActorInterpreter
import scala.util.control.NoStackTrace
object FlowSpec {
@ -313,6 +311,59 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
subs.expectComplete()
}
"be able to concat with empty source" in {
val probe = Source.single(1).concat(Source.empty)
.runWith(TestSink.probe[Int])
probe.request(1)
probe.expectNext(1)
probe.expectComplete()
}
"be able to concat empty source" in {
val probe = Source.empty.concat(Source.single(1))
.runWith(TestSink.probe[Int])
probe.request(1)
probe.expectNext(1)
probe.expectComplete()
}
"be able to concat two empty sources" in {
val probe = Source.empty.concat(Source.empty)
.runWith(TestSink.probe[Int])
probe.expectSubscription()
probe.expectComplete()
}
"be able to concat source with error" in {
val probe = Source.single(1).concat(Source.failed(TestException))
.runWith(TestSink.probe[Int])
probe.expectSubscription()
probe.expectError(TestException)
}
"subscribe at once to initial source and to one that it's concat to" in {
val publisher1 = TestPublisher.probe[Int]()
val publisher2 = TestPublisher.probe[Int]()
val probeSink = Source.apply(publisher1).concat(Source.apply(publisher2))
.runWith(TestSink.probe[Int])
val sub1 = publisher1.expectSubscription()
val sub2 = publisher2.expectSubscription()
val subSink = probeSink.expectSubscription()
sub1.sendNext(1)
subSink.request(1)
probeSink.expectNext(1)
sub1.sendComplete()
sub2.sendNext(2)
subSink.request(1)
probeSink.expectNext(2)
sub2.sendComplete()
probeSink.expectComplete()
}
"be possible to convert to a processor, and should be able to take a Processor" in {
val identity1 = Flow[Int].toProcessor
val identity2 = Flow(() identity1.run())

View file

@ -56,6 +56,9 @@ private[akka] object FanIn {
private var markedPending = 0
private var markedDepleted = 0
private var receivedInput = false
private var completedCounter = 0
private[this] final def hasState(index: Int, flag: Int): Boolean =
(states(index) & flag) != 0
private[this] final def setState(index: Int, flag: Int, on: Boolean): Unit =
@ -65,7 +68,10 @@ private[akka] object FanIn {
private[this] final def cancelled(index: Int, on: Boolean): Unit = setState(index, Cancelled, on)
private[this] final def completed(index: Int): Boolean = hasState(index, Completed)
private[this] final def completed(index: Int, on: Boolean): Unit = setState(index, Completed, on)
private[this] final def registerCompleted(index: Int): Unit = {
completedCounter += 1
setState(index, Completed, true)
}
private[this] final def depleted(index: Int): Boolean = hasState(index, Depleted)
private[this] final def depleted(index: Int, on: Boolean): Unit = setState(index, Depleted, on)
@ -111,6 +117,8 @@ private[akka] object FanIn {
def onDepleted(input: Int): Unit = ()
def onCompleteWhenNoInput(): Unit = ()
def markInput(input: Int): Unit = {
if (!marked(input)) {
if (depleted(input)) markedDepleted += 1
@ -151,6 +159,8 @@ private[akka] object FanIn {
def isCancelled(input: Int): Boolean = cancelled(input)
def isAllCompleted(): Boolean = inputCount == completedCounter
def idToDequeue(): Int = {
var id = preferredId
while (!(marked(id) && pending(id))) {
@ -205,7 +215,7 @@ private[akka] object FanIn {
}
def inputsAvailableFor(id: Int) = new TransferState {
override def isCompleted: Boolean = depleted(id) || cancelled(id)
override def isCompleted: Boolean = depleted(id) || cancelled(id) || (!pending(id) && completed(id))
override def isReady: Boolean = pending(id)
}
@ -221,6 +231,7 @@ private[akka] object FanIn {
case OnNext(id, elem)
if (marked(id) && !pending(id)) markedPending += 1
pending(id, on = true)
receivedInput = true
inputs(id).subreceive(ActorSubscriberMessage.OnNext(elem))
case OnComplete(id)
if (!pending(id)) {
@ -228,8 +239,9 @@ private[akka] object FanIn {
depleted(id, on = true)
onDepleted(id)
}
completed(id, on = true)
registerCompleted(id)
inputs(id).subreceive(ActorSubscriberMessage.OnComplete)
if (!receivedInput && isAllCompleted) onCompleteWhenNoInput()
case OnError(id, e)
onError(id, e)
})
@ -247,6 +259,7 @@ private[akka] abstract class FanIn(val settings: ActorMaterializerSettings, val
protected val primaryOutputs: Outputs = new SimpleOutputs(self, this)
protected val inputBunch = new InputBunch(inputCount, settings.maxInputBufferSize, this) {
override def onError(input: Int, e: Throwable): Unit = fail(e)
override def onCompleteWhenNoInput(): Unit = pumpFinished()
}
override def pumpFinished(): Unit = {
@ -350,9 +363,8 @@ private[akka] final class Concat(_settings: ActorMaterializerSettings) extends F
if (!inputBunch.isDepleted(First)) {
val elem = inputBunch.dequeue(First)
primaryOutputs.enqueueOutputElement(elem)
} else {
nextPhase(drainSecond)
}
if (inputBunch.isDepleted(First)) nextPhase(drainSecond)
}
def drainSecond = TransferPhase(inputBunch.inputsAvailableFor(Second) && primaryOutputs.NeedsDemand) { ()