Merge pull request #15877 from drewhk/wip-15764-graph-concat-drewhk

+str #15764: Add concat combinator
This commit is contained in:
drewhk 2014-09-12 16:28:16 +02:00
commit f73bfc6da1
7 changed files with 307 additions and 74 deletions

View file

@ -89,6 +89,10 @@ private[akka] object Ast {
override def name = "zip"
}
case object Concat extends FanInAstNode {
override def name = "concat"
}
}
/**
@ -209,6 +213,8 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting
actorOf(Props(new FairMerge(settings, inputCount)).withDispatcher(settings.dispatcher), actorName)
case Ast.Zip
actorOf(Props(new Zip(settings)).withDispatcher(settings.dispatcher), actorName)
case Ast.Concat
actorOf(Props(new Concat(settings)).withDispatcher(settings.dispatcher), actorName)
}
val publisher = new ActorPublisher[Out](impl, equalityValue = None)

View file

@ -37,8 +37,9 @@ private[akka] object FanIn {
private var markCount = 0
private val pending = Array.ofDim[Boolean](inputCount)
private var markedPending = 0
private val depleted = Array.ofDim[Boolean](inputCount)
private val completed = Array.ofDim[Boolean](inputCount)
private var markedCompleted = 0
private var markedDepleted = 0
private var preferredId = 0
@ -52,7 +53,7 @@ private[akka] object FanIn {
def markInput(input: Int): Unit = {
if (!marked(input)) {
if (completed(input)) markedCompleted += 1
if (depleted(input)) markedDepleted += 1
if (pending(input)) markedPending += 1
marked(input) = true
markCount += 1
@ -61,13 +62,15 @@ private[akka] object FanIn {
def unmarkInput(input: Int): Unit = {
if (marked(input)) {
if (completed(input)) markedCompleted -= 1
if (depleted(input)) markedDepleted -= 1
if (pending(input)) markedPending -= 1
marked(input) = false
markCount -= 1
}
}
def isDepleted(input: Int): Boolean = depleted(input)
private def idToDequeue(): Int = {
var id = preferredId
while (!(marked(id) && pending(id))) {
@ -82,9 +85,13 @@ private[akka] object FanIn {
val input = inputs(id)
val elem = input.dequeueInputElement()
if (!input.inputsAvailable) {
markedPending -= 1
if (marked(id)) markedPending -= 1
pending(id) = false
}
if (input.inputsDepleted) {
depleted(id) = true
if (marked(id)) markedDepleted += 1
}
elem
}
@ -101,15 +108,25 @@ private[akka] object FanIn {
}
val AllOfMarkedInputs = new TransferState {
override def isCompleted: Boolean = markedCompleted == markCount && markedPending < markCount
override def isCompleted: Boolean = markedDepleted > 0
override def isReady: Boolean = markedPending == markCount
}
val AnyOfMarkedInputs = new TransferState {
override def isCompleted: Boolean = markedCompleted == markCount && markedPending == 0
override def isCompleted: Boolean = markedDepleted == markCount && markedPending == 0
override def isReady: Boolean = markedPending > 0
}
def inputsAvailableFor(id: Int) = new TransferState {
override def isCompleted: Boolean = depleted(id)
override def isReady: Boolean = pending(id)
}
def inputsOrCompleteAvailableFor(id: Int) = new TransferState {
override def isCompleted: Boolean = false
override def isReady: Boolean = pending(id) || depleted(id)
}
// FIXME: Eliminate re-wraps
def subreceive: SubReceive = new SubReceive({
case OnSubscribe(id, subscription)
@ -119,7 +136,10 @@ private[akka] object FanIn {
pending(id) = true
inputs(id).subreceive(ActorSubscriberMessage.OnNext(elem))
case OnComplete(id)
if (marked(id) && !completed(id)) markedCompleted += 1
if (!pending(id)) {
if (marked(id) && !depleted(id)) markedDepleted += 1
depleted(id) = true
}
completed(id) = true
inputs(id).subreceive(ActorSubscriberMessage.OnComplete)
case OnError(id, e) onError(e)
@ -206,4 +226,28 @@ private[akka] class Zip(_settings: MaterializerSettings) extends FanIn(_settings
val elem1 = inputBunch.dequeue(1)
primaryOutputs.enqueueOutputElement((elem0, elem1))
})
}
/**
* INTERNAL API
*/
private[akka] class Concat(_settings: MaterializerSettings) extends FanIn(_settings, inputPorts = 2) {
val First = 0
val Second = 1
def drainFirst = TransferPhase(inputBunch.inputsOrCompleteAvailableFor(First) && primaryOutputs.NeedsDemand) { ()
if (!inputBunch.isDepleted(First)) {
val elem = inputBunch.dequeue(First)
primaryOutputs.enqueueOutputElement(elem)
} else {
nextPhase(drainSecond)
}
}
def drainSecond = TransferPhase(inputBunch.inputsAvailableFor(Second) && primaryOutputs.NeedsDemand) { ()
val elem = inputBunch.dequeue(Second)
primaryOutputs.enqueueOutputElement(elem)
}
nextPhase(drainFirst)
}

View file

@ -97,7 +97,7 @@ private[akka] object FanOut {
val output = outputs(id)
output.enqueueOutputElement(elem)
if (!output.demandAvailable) {
markedPending -= 1
if (marked(id)) markedPending -= 1
pending(id) = false
}
}

View file

@ -74,6 +74,8 @@ final class Merge[T](override val name: Option[String]) extends FlowGraphInterna
override val maximumInputCount: Int = Int.MaxValue
override val minimumOutputCount: Int = 1
override val maximumOutputCount: Int = 1
override private[akka] def astNode = Ast.Merge
}
object Broadcast {
@ -103,6 +105,8 @@ final class Broadcast[T](override val name: Option[String]) extends FlowGraphInt
override def maximumInputCount: Int = 1
override def minimumOutputCount: Int = 2
override def maximumOutputCount: Int = Int.MaxValue
override private[akka] def astNode = Ast.Broadcast
}
object Zip {
@ -149,6 +153,56 @@ final class Zip[A, B](override val name: Option[String]) extends FlowGraphIntern
override def maximumInputCount: Int = 2
override def minimumOutputCount: Int = 1
override def maximumOutputCount: Int = 1
override private[akka] def astNode = Ast.Zip
}
object Concat {
/**
* Create a new anonymous `Concat` vertex with the specified input types.
* Note that a `Concat` instance can only be used at one place (one vertex)
* in the `FlowGraph`. This method creates a new instance every time it
* is called and those instances are not `equal`.*
*/
def apply[T]: Concat[T] = new Concat[T](None)
/**
* Create a named `Concat` vertex with the specified input types.
* Note that a `Concat` instance can only be used at one place (one vertex)
* in the `FlowGraph`. This method creates a new instance every time it
* is called and those instances are not `equal`.*
*/
def apply[T](name: String): Concat[T] = new Concat[T](Some(name))
class First[T] private[akka] (val vertex: Concat[T]) extends JunctionInPort[T] {
override val port = 0
type NextT = T
override def next = vertex.out
}
class Second[T] private[akka] (val vertex: Concat[T]) extends JunctionInPort[T] {
override val port = 1
type NextT = T
override def next = vertex.out
}
class Out[T] private[akka] (val vertex: Concat[T]) extends JunctionOutPort[T]
}
/**
* Takes two streams and outputs an output stream formed from the two input streams
* by consuming one stream first emitting all of its elements, then consuming the
* second stream emitting all of its elements.
*/
final class Concat[T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex {
val first = new Concat.First(this)
val second = new Concat.Second(this)
val out = new Concat.Out(this)
override def minimumInputCount: Int = 2
override def maximumInputCount: Int = 2
override def minimumOutputCount: Int = 1
override def maximumOutputCount: Int = 1
override private[akka] def astNode = Ast.Concat
}
object UndefinedSink {
@ -177,6 +231,8 @@ final class UndefinedSink[T](override val name: Option[String]) extends FlowGrap
override def maximumInputCount: Int = 1
override def minimumOutputCount: Int = 0
override def maximumOutputCount: Int = 0
override private[akka] def astNode = throw new UnsupportedOperationException("Undefined sinks cannot be materialized")
}
object UndefinedSource {
@ -205,6 +261,8 @@ final class UndefinedSource[T](override val name: Option[String]) extends FlowGr
override def maximumInputCount: Int = 0
override def minimumOutputCount: Int = 1
override def maximumOutputCount: Int = 1
override private[akka] def astNode = throw new UnsupportedOperationException("Undefined sources cannot be materialized")
}
/**
@ -236,6 +294,8 @@ private[akka] object FlowGraphInternal {
def minimumOutputCount: Int
def maximumOutputCount: Int
private[akka] def astNode: Ast.JunctionAstNode
final override def equals(obj: Any): Boolean =
obj match {
case other: InternalVertex
@ -577,11 +637,7 @@ class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraph
materializedSinks = memo.materializedSinks ++ materializedSink)
} else {
val op: Ast.JunctionAstNode = v match {
case _: Merge[_] Ast.Merge
case _: Broadcast[_] Ast.Broadcast
case _: Zip[_, _] Ast.Zip
}
val op = v.astNode
val (subscribers, publishers) =
materializer.materializeJunction[Any, Any](op, edge.from.inDegree, edge.from.outDegree)
// TODO: Check for gaps in port numbers

View file

@ -524,7 +524,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
try {
system.eventStream.publish(Mute(filters))
EventFilter[akka.actor.PreRestartException](occurrences = 1) intercept {
EventFilter[akka.actor.PostRestartException](occurrences = 1) intercept {
upstream.expectRequest(upstreamSubscription, 1)
upstreamSubscription.sendNext("a3")
upstreamSubscription.expectCancellation()

View file

@ -0,0 +1,127 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl2
import akka.stream.testkit.StreamTestKit
import akka.stream.scaladsl2._
import akka.stream.scaladsl2.FlowGraphImplicits._
import akka.stream.testkit2.TwoStreamsSetup
import scala.concurrent.Promise
class GraphConcatSpec extends TwoStreamsSetup {
override type Outputs = Int
val op = Concat[Int]
override def operationUnderTestLeft() = op.first
override def operationUnderTestRight() = op.second
"Concat" must {
"work in the happy case" in {
val probe = StreamTestKit.SubscriberProbe[Int]()
FlowGraph { implicit b
val concat1 = Concat[Int]("concat1")
val concat2 = Concat[Int]("concat2")
FlowFrom(List.empty[Int].iterator) ~> concat1.first
FlowFrom((1 to 4).iterator) ~> concat1.second
concat1.out ~> concat2.first
FlowFrom((5 to 10).iterator) ~> concat2.second
concat2.out ~> SubscriberSink(probe)
}.run()
val subscription = probe.expectSubscription()
for (i 1 to 10) {
subscription.request(1)
probe.expectNext(i)
}
probe.expectComplete()
}
commonTests()
"work with one immediately completed and one nonempty publisher" in {
val subscriber1 = setup(completedPublisher, nonemptyPublisher((1 to 4).iterator))
val subscription1 = subscriber1.expectSubscription()
subscription1.request(5)
subscriber1.expectNext(1)
subscriber1.expectNext(2)
subscriber1.expectNext(3)
subscriber1.expectNext(4)
subscriber1.expectComplete()
val subscriber2 = setup(nonemptyPublisher((1 to 4).iterator), completedPublisher)
val subscription2 = subscriber2.expectSubscription()
subscription2.request(5)
subscriber2.expectNext(1)
subscriber2.expectNext(2)
subscriber2.expectNext(3)
subscriber2.expectNext(4)
subscriber2.expectComplete()
}
"work with one delayed completed and one nonempty publisher" in {
val subscriber1 = setup(soonToCompletePublisher, nonemptyPublisher((1 to 4).iterator))
val subscription1 = subscriber1.expectSubscription()
subscription1.request(5)
subscriber1.expectNext(1)
subscriber1.expectNext(2)
subscriber1.expectNext(3)
subscriber1.expectNext(4)
subscriber1.expectComplete()
val subscriber2 = setup(nonemptyPublisher((1 to 4).iterator), soonToCompletePublisher)
val subscription2 = subscriber2.expectSubscription()
subscription2.request(5)
subscriber2.expectNext(1)
subscriber2.expectNext(2)
subscriber2.expectNext(3)
subscriber2.expectNext(4)
subscriber2.expectComplete()
}
"work with one immediately failed and one nonempty publisher" in {
val subscriber1 = setup(failedPublisher, nonemptyPublisher((1 to 4).iterator))
subscriber1.expectErrorOrSubscriptionFollowedByError(TestException)
val subscriber2 = setup(nonemptyPublisher((1 to 4).iterator), failedPublisher)
subscriber2.expectErrorOrSubscriptionFollowedByError(TestException)
}
"work with one delayed failed and one nonempty publisher" in {
val subscriber1 = setup(soonToFailPublisher, nonemptyPublisher((1 to 4).iterator))
subscriber1.expectErrorOrSubscriptionFollowedByError(TestException)
val subscriber2 = setup(nonemptyPublisher((1 to 4).iterator), soonToFailPublisher)
subscriber2.expectErrorOrSubscriptionFollowedByError(TestException)
}
"correctly handle async errors in secondary upstream" in {
val promise = Promise[Int]()
val subscriber = StreamTestKit.SubscriberProbe[Int]()
FlowGraph { implicit b
val concat = Concat[Int]
FlowFrom(List(1, 2, 3)) ~> concat.first
FlowFrom(promise.future) ~> concat.second
concat.out ~> SubscriberSink(subscriber)
}.run()
val subscription = subscriber.expectSubscription()
subscription.request(4)
subscriber.expectNext(1)
subscriber.expectNext(2)
subscriber.expectNext(3)
promise.failure(TestException)
subscriber.expectError(TestException)
}
}
}

View file

@ -15,66 +15,66 @@ class GraphZipSpec extends TwoStreamsSetup {
override def operationUnderTestRight() = op.right
"Zip" must {
//
// "work in the happy case" in {
// val probe = StreamTestKit.SubscriberProbe[(Int, String)]()
//
// FlowGraph { implicit b
// val zip = Zip[Int, String]
//
// FlowFrom(1 to 4) ~> zip.left
// FlowFrom(List("A", "B", "C", "D", "E", "F")) ~> zip.right
//
// zip.out ~> SubscriberSink(probe)
// }.run()
//
// val subscription = probe.expectSubscription()
//
// subscription.request(2)
// probe.expectNext((1, "A"))
// probe.expectNext((2, "B"))
//
// subscription.request(1)
// probe.expectNext((3, "C"))
// subscription.request(1)
// probe.expectNext((4, "D"))
//
// probe.expectComplete()
// }
//
// commonTests()
//
// "work with one immediately completed and one nonempty publisher" in {
// val subscriber1 = setup(completedPublisher, nonemptyPublisher((1 to 4).iterator))
// subscriber1.expectCompletedOrSubscriptionFollowedByComplete()
//
// val subscriber2 = setup(nonemptyPublisher((1 to 4).iterator), completedPublisher)
// subscriber2.expectCompletedOrSubscriptionFollowedByComplete()
// }
//
// "work with one delayed completed and one nonempty publisher" in {
// val subscriber1 = setup(soonToCompletePublisher, nonemptyPublisher((1 to 4).iterator))
// subscriber1.expectCompletedOrSubscriptionFollowedByComplete()
//
// val subscriber2 = setup(nonemptyPublisher((1 to 4).iterator), soonToCompletePublisher)
// subscriber2.expectCompletedOrSubscriptionFollowedByComplete()
// }
//
// "work with one immediately failed and one nonempty publisher" in {
// val subscriber1 = setup(failedPublisher, nonemptyPublisher((1 to 4).iterator))
// subscriber1.expectErrorOrSubscriptionFollowedByError(TestException)
//
// val subscriber2 = setup(nonemptyPublisher((1 to 4).iterator), failedPublisher)
// subscriber2.expectErrorOrSubscriptionFollowedByError(TestException)
// }
//
// "work with one delayed failed and one nonempty publisher" in {
// val subscriber1 = setup(soonToFailPublisher, nonemptyPublisher((1 to 4).iterator))
// subscriber1.expectErrorOrSubscriptionFollowedByError(TestException)
//
// val subscriber2 = setup(nonemptyPublisher((1 to 4).iterator), soonToFailPublisher)
// val subscription2 = subscriber2.expectErrorOrSubscriptionFollowedByError(TestException)
// }
"work in the happy case" in {
val probe = StreamTestKit.SubscriberProbe[(Int, String)]()
FlowGraph { implicit b
val zip = Zip[Int, String]
FlowFrom(1 to 4) ~> zip.left
FlowFrom(List("A", "B", "C", "D", "E", "F")) ~> zip.right
zip.out ~> SubscriberSink(probe)
}.run()
val subscription = probe.expectSubscription()
subscription.request(2)
probe.expectNext((1, "A"))
probe.expectNext((2, "B"))
subscription.request(1)
probe.expectNext((3, "C"))
subscription.request(1)
probe.expectNext((4, "D"))
probe.expectComplete()
}
commonTests()
"work with one immediately completed and one nonempty publisher" in {
val subscriber1 = setup(completedPublisher, nonemptyPublisher((1 to 4).iterator))
subscriber1.expectCompletedOrSubscriptionFollowedByComplete()
val subscriber2 = setup(nonemptyPublisher((1 to 4).iterator), completedPublisher)
subscriber2.expectCompletedOrSubscriptionFollowedByComplete()
}
"work with one delayed completed and one nonempty publisher" in {
val subscriber1 = setup(soonToCompletePublisher, nonemptyPublisher((1 to 4).iterator))
subscriber1.expectCompletedOrSubscriptionFollowedByComplete()
val subscriber2 = setup(nonemptyPublisher((1 to 4).iterator), soonToCompletePublisher)
subscriber2.expectCompletedOrSubscriptionFollowedByComplete()
}
"work with one immediately failed and one nonempty publisher" in {
val subscriber1 = setup(failedPublisher, nonemptyPublisher((1 to 4).iterator))
subscriber1.expectErrorOrSubscriptionFollowedByError(TestException)
val subscriber2 = setup(nonemptyPublisher((1 to 4).iterator), failedPublisher)
subscriber2.expectErrorOrSubscriptionFollowedByError(TestException)
}
"work with one delayed failed and one nonempty publisher" in {
val subscriber1 = setup(soonToFailPublisher, nonemptyPublisher((1 to 4).iterator))
subscriber1.expectErrorOrSubscriptionFollowedByError(TestException)
val subscriber2 = setup(nonemptyPublisher((1 to 4).iterator), soonToFailPublisher)
val subscription2 = subscriber2.expectErrorOrSubscriptionFollowedByError(TestException)
}
}