Merge pull request #16013 from 2m/wip-hook-balance

=str #15874 Hook-up balance operation
This commit is contained in:
Martynas Mickevičius 2014-10-03 15:57:12 +03:00
commit ba40bfa399
5 changed files with 224 additions and 0 deletions

View file

@ -58,6 +58,16 @@ class FlowGraphCompileSpec extends AkkaSpec {
}.run()
}
"build simple balance" in {
FlowGraph { b
val balance = Balance[String]
b.
addEdge(in1, f1, balance).
addEdge(balance, f2, out1).
addEdge(balance, f3, out2)
}
}
"build simple merge - broadcast" in {
FlowGraph { b
val merge = Merge[String]

View file

@ -0,0 +1,154 @@
package akka.stream.scaladsl2
import akka.stream.MaterializerSettings
import akka.stream.scaladsl2.FlowGraphImplicits._
import akka.stream.testkit.{ AkkaSpec, StreamTestKit }
import scala.concurrent.Await
import scala.concurrent.duration._
class GraphBalanceSpec extends AkkaSpec {
val settings = MaterializerSettings(system)
.withInputBuffer(initialSize = 2, maxSize = 16)
.withFanOutBuffer(initialSize = 1, maxSize = 16)
implicit val materializer = FlowMaterializer(settings)
"A balance" must {
"balance between subscribers which signal demand" in {
val c1 = StreamTestKit.SubscriberProbe[Int]()
val c2 = StreamTestKit.SubscriberProbe[Int]()
FlowGraph { implicit b
val balance = Balance[Int]("balance")
Source(List(1, 2, 3)) ~> balance
balance ~> SubscriberDrain(c1)
balance ~> SubscriberDrain(c2)
}.run()
val sub1 = c1.expectSubscription()
val sub2 = c2.expectSubscription()
sub1.request(1)
c1.expectNext(1)
c1.expectNoMsg(100.millis)
sub2.request(2)
c2.expectNext(2)
c2.expectNext(3)
c1.expectComplete()
c2.expectComplete()
}
"work with 5-way balance" in {
val f1 = FutureDrain[Seq[Int]]
val f2 = FutureDrain[Seq[Int]]
val f3 = FutureDrain[Seq[Int]]
val f4 = FutureDrain[Seq[Int]]
val f5 = FutureDrain[Seq[Int]]
val g = FlowGraph { implicit b
val balance = Balance[Int]("balance")
Source(0 to 14) ~> balance
balance ~> Flow[Int].grouped(15) ~> f1
balance ~> Flow[Int].grouped(15) ~> f2
balance ~> Flow[Int].grouped(15) ~> f3
balance ~> Flow[Int].grouped(15) ~> f4
balance ~> Flow[Int].grouped(15) ~> f5
}.run()
Set(f1, f2, f3, f4, f5) flatMap (sink Await.result(sink.future(g), 3.seconds)) should be((0 to 14).toSet)
}
"fairly balance between three outputs" in {
val numElementsForSink = 10000
val f1, f2, f3 = FoldDrain[Int, Int](0)(_ + _)
val g = FlowGraph { implicit b
val balance = Balance[Int]("balance")
Source(Stream.fill(10000 * 3)(1)) ~> balance ~> f1
balance ~> f2
balance ~> f3
}.run()
Seq(f1, f2, f3) map { sink
Await.result(sink.future(g), 3.seconds) should be(numElementsForSink +- 1000)
}
}
"produce to second even though first cancels" in {
val c1 = StreamTestKit.SubscriberProbe[Int]()
val c2 = StreamTestKit.SubscriberProbe[Int]()
FlowGraph { implicit b
val balance = Balance[Int]("balance")
Source(List(1, 2, 3)) ~> balance
balance ~> Flow[Int] ~> SubscriberDrain(c1)
balance ~> Flow[Int] ~> SubscriberDrain(c2)
}.run()
val sub1 = c1.expectSubscription()
sub1.cancel()
val sub2 = c2.expectSubscription()
sub2.request(3)
c2.expectNext(1)
c2.expectNext(2)
c2.expectNext(3)
c2.expectComplete()
}
"produce to first even though second cancels" in {
val c1 = StreamTestKit.SubscriberProbe[Int]()
val c2 = StreamTestKit.SubscriberProbe[Int]()
FlowGraph { implicit b
val balance = Balance[Int]("balance")
Source(List(1, 2, 3)) ~> balance
balance ~> Flow[Int] ~> SubscriberDrain(c1)
balance ~> Flow[Int] ~> SubscriberDrain(c2)
}.run()
val sub1 = c1.expectSubscription()
val sub2 = c2.expectSubscription()
sub2.cancel()
sub1.request(3)
c1.expectNext(1)
c1.expectNext(2)
c1.expectNext(3)
c1.expectComplete()
}
"cancel upstream when downstreams cancel" in {
val p1 = StreamTestKit.PublisherProbe[Int]()
val c1 = StreamTestKit.SubscriberProbe[Int]()
val c2 = StreamTestKit.SubscriberProbe[Int]()
FlowGraph { implicit b
val balance = Balance[Int]("balance")
Source(p1.getPublisher) ~> balance
balance ~> Flow[Int] ~> SubscriberDrain(c1)
balance ~> Flow[Int] ~> SubscriberDrain(c2)
}.run()
val bsub = p1.expectSubscription()
val sub1 = c1.expectSubscription()
val sub2 = c2.expectSubscription()
sub1.request(1)
p1.expectRequest(bsub, 16)
bsub.sendNext(1)
c1.expectNext(1)
sub2.request(1)
bsub.sendNext(2)
c2.expectNext(2)
sub1.cancel()
sub2.cancel()
bsub.expectCancellation()
}
}
}

View file

@ -34,6 +34,26 @@ class GraphOpsIntegrationSpec extends AkkaSpec {
Await.result(g.getDrainFor(resultFuture), 3.seconds).sorted should be(List(1, 2, 3, 4, 5, 6))
}
"support balance - merge (parallelization) layouts" in {
val elements = 0 to 10
val in = Source(elements)
val f = Flow[Int]
val out = FutureDrain[Seq[Int]]
val g = FlowGraph { implicit b
val balance = Balance[Int]
val merge = Merge[Int]
in ~> balance ~> f ~> merge
balance ~> f ~> merge
balance ~> f ~> merge
balance ~> f ~> merge
balance ~> f ~> merge ~> Flow[Int].grouped(elements.size * 2) ~> out
}.run()
Await.result(out.future(g), 3.seconds).sorted should be(elements)
}
"support wikipedia Topological_sorting 2" in {
// see https://en.wikipedia.org/wiki/Topological_sorting#mediaviewer/File:Directed_acyclic_graph.png
val resultFuture2 = FutureDrain[Seq[Int]]

View file

@ -89,6 +89,10 @@ private[akka] object Ast {
override def name = "broadcast"
}
case object Balance extends FanOutAstNode {
override def name = "balance"
}
case object Zip extends FanInAstNode {
override def name = "zip"
}
@ -234,6 +238,8 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting
val impl = op match {
case Ast.Broadcast
actorOf(Broadcast.props(settings, outputCount).withDispatcher(settings.dispatcher), actorName)
case Ast.Balance
actorOf(Balance.props(settings, outputCount).withDispatcher(settings.dispatcher), actorName)
case Ast.Unzip
actorOf(Unzip.props(settings).withDispatcher(settings.dispatcher), actorName)
}

View file

@ -68,6 +68,7 @@ object Merge {
*/
def apply[T](name: String): Merge[T] = new Merge[T](Some(name))
}
/**
* Merge several streams, taking elements as they arrive from input streams
* (picking randomly when several have elements ready).
@ -101,6 +102,7 @@ object Broadcast {
*/
def apply[T](name: String): Broadcast[T] = new Broadcast[T](Some(name))
}
/**
* Fan-out the stream to several streams. Each element is produced to
* the other streams. It will not shutdown until the subscriptions for at least
@ -116,6 +118,38 @@ final class Broadcast[T](override val name: Option[String]) extends FlowGraphInt
override private[akka] def astNode = Ast.Broadcast
}
object Balance {
/**
* Create a new anonymous `Balance` vertex with the specified input type.
* Note that a `Balance` instance can only be used at one place (one vertex)
* in the `FlowGraph`. This method creates a new instance every time it
* is called and those instances are not `equal`.
*/
def apply[T]: Balance[T] = new Balance[T](None)
/**
* Create a named `Balance` vertex with the specified input type.
* Note that a `Balance` with a specific name can only be used at one place (one vertex)
* in the `FlowGraph`. Calling this method several times with the same name
* returns instances that are `equal`.
*/
def apply[T](name: String): Balance[T] = new Balance[T](Some(name))
}
/**
* Fan-out the stream to several streams. Each element is produced to
* one of the other streams. It will not shutdown until the subscriptions for at least
* two downstream subscribers have been established.
*/
final class Balance[T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex with Junction[T] {
override private[akka] def vertex = this
override def minimumInputCount: Int = 1
override def maximumInputCount: Int = 1
override def minimumOutputCount: Int = 2
override def maximumOutputCount: Int = Int.MaxValue
override private[akka] def astNode = Ast.Balance
}
object Zip {
/**
* Create a new anonymous `Zip` vertex with the specified input types.