!str #16069 disallows using one Keyed sink/source ambiguously (many times)

This commit is contained in:
Konrad 'ktoso' Malawski 2014-10-27 14:34:36 +01:00
parent 81bc5c76bc
commit 083436dc91
4 changed files with 148 additions and 15 deletions

View file

@ -3,13 +3,13 @@
*/
package akka.stream.scaladsl
import akka.stream.FlowMaterializer
import akka.stream.testkit.AkkaSpec
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._
import akka.stream.FlowMaterializer
import akka.stream.testkit.AkkaSpec
class FlowGraphInitSpec extends AkkaSpec {
import system.dispatcher
@ -40,5 +40,86 @@ class FlowGraphInitSpec extends AkkaSpec {
val graphs = Vector.fill(5)(Future(create()))
val result = Await.result(Future.sequence(graphs), 5.seconds).flatten.size should be(5)
}
"fail when the same `KeyedSink` is used in it multiple times" in {
val s = Source(1 to 5)
val b = Broadcast[Int]
val sink: KeyedSink[Int] = Sink.foreach[Int](i i)
val otherSink: KeyedSink[Int] = Sink.foreach[Int](i 2 * i)
FlowGraph { implicit builder
import FlowGraphImplicits._
// format: OFF
s ~> b ~> sink
b ~> otherSink // this is fine
// format: ON
}
val ex1 = intercept[IllegalArgumentException] {
FlowGraph { implicit builder
import FlowGraphImplicits._
// format: OFF
s ~> b ~> sink
b ~> sink // this is not fine
// format: ON
}
}
ex1.getMessage should include(sink.getClass.getSimpleName)
val ex2 = intercept[IllegalArgumentException] {
FlowGraph { implicit builder
import FlowGraphImplicits._
// format: OFF
s ~> b ~> sink
b ~> otherSink // this is fine
b ~> sink // this is not fine
// format: ON
}
}
ex2.getMessage should include(sink.getClass.getSimpleName)
}
"fail when the same `KeyedSource` is used in it multiple times" in {
val s = Sink.ignore
val m = Merge[Int]
val source1: KeyedSource[Int] = Source.subscriber
val source2: KeyedSource[Int] = Source.subscriber
FlowGraph { implicit builder
import FlowGraphImplicits._
// KeyedSources of same type should be fine to be mixed
// format: OFF
source1 ~> m
m ~> s
source2 ~> m
// format: ON
}
val ex1 = intercept[IllegalArgumentException] {
FlowGraph { implicit builder
import FlowGraphImplicits._
// format: OFF
source1 ~> m
m ~> s
source1 ~> m // whoops
// format: ON
}
}
ex1.getMessage should include(source1.getClass.getSimpleName)
val ex2 = intercept[IllegalArgumentException] {
FlowGraph { implicit builder
import FlowGraphImplicits._
// format: OFF
source1 ~> m
source2 ~> m ~> s
source1 ~> m // this is not fine
// format: ON
}
}
ex2.getMessage should include(source1.getClass.getSimpleName)
}
}
}

View file

@ -202,7 +202,12 @@ class GraphFlowSpec extends AkkaSpec {
val out = UndefinedSink[Int]
val probe = StreamTestKit.SubscriberProbe[Int]()
val source = Source[Int]() { implicit b
val source1 = Source[Int]() { implicit b
import FlowGraphImplicits._
Source(1 to 5) ~> Flow[Int].map(_ * 2) ~> out
out
}
val source2 = Source[Int]() { implicit b
import FlowGraphImplicits._
Source(1 to 5) ~> Flow[Int].map(_ * 2) ~> out
out
@ -211,8 +216,8 @@ class GraphFlowSpec extends AkkaSpec {
FlowGraph { implicit b
import FlowGraphImplicits._
val merge = Merge[Int]("merge")
source ~> merge ~> Sink(probe)
source ~> Flow[Int].map(_ * 10) ~> merge
source1 ~> merge ~> Sink(probe)
source2 ~> Flow[Int].map(_ * 10) ~> merge
}.run()
validateProbe(probe, 10, Set(2, 4, 6, 8, 10, 20, 40, 60, 80, 100))

View file

@ -87,6 +87,7 @@ private[scaladsl] final case class SubscriberSource[Out]() extends KeyedActorFlo
override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String): Subscriber[Out] =
flowSubscriber
}
/**

View file

@ -5,13 +5,19 @@ package akka.stream.scaladsl
import akka.stream.impl.Ast.FanInAstNode
import akka.stream.impl.Ast
import java.util
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import scala.language.existentials
import scalax.collection.edge.{ LkBase, LkDiEdge }
import scalax.collection.mutable.Graph
import scalax.collection.edge.LkBase
import scalax.collection.edge.LkDiEdge
import scalax.collection.immutable.{ Graph ImmutableGraph }
import org.reactivestreams.Subscriber
import org.reactivestreams.Publisher
import akka.stream.FlowMaterializer
import scalax.collection.mutable.Graph
/**
* Fan-in and fan-out vertices in the [[FlowGraph]] implements
@ -455,18 +461,44 @@ private[akka] object FlowGraphInternal {
case class SourceVertex(source: Source[_]) extends Vertex {
override def toString = source.toString
// these are unique keys, case class equality would break them
final override def equals(other: Any): Boolean = super.equals(other)
final override def hashCode: Int = super.hashCode
/**
* These are unique keys, case class equality would break them.
* In the case of KeyedSources we MUST compare by object equality, in order to avoid ambigiousities in materialization.
*/
final override def equals(other: Any): Boolean = other match {
case v: SourceVertex (source, v.source) match {
case (k1: KeyedSource[_], k2: KeyedSource[_]) k1 == k2
case _ super.equals(other)
}
case _ false
}
final override def hashCode: Int = source match {
case k: KeyedSource[_] k.hashCode
case _ super.hashCode
}
final override private[scaladsl] def newInstance() = this.copy()
}
case class SinkVertex(sink: Sink[_]) extends Vertex {
override def toString = sink.toString
// these are unique keys, case class equality would break them
final override def equals(other: Any): Boolean = super.equals(other)
final override def hashCode: Int = super.hashCode
/**
* These are unique keys, case class equality would break them.
* In the case of KeyedSources we MUST compare by object equality, in order to avoid ambigiousities in materialization.
*/
final override def equals(other: Any): Boolean = other match {
case v: SinkVertex (sink, v.sink) match {
case (k1: KeyedSink[_], k2: KeyedSink[_]) k1 == k2
case _ super.equals(other)
}
case _ false
}
final override def hashCode: Int = sink match {
case k: KeyedSink[_] k.hashCode
case _ super.hashCode
}
final override private[scaladsl] def newInstance() = this.copy()
}
@ -873,6 +905,8 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph
}
private def checkAddSourceSinkPrecondition(vertex: Vertex): Unit = {
checkAmbigiousKeyedElement(vertex)
vertex match {
case node @ (_: UndefinedSource[_] | _: UndefinedSink[_])
require(!graph.contains(node), s"[$node] instance is already used in this flow graph")
@ -880,6 +914,18 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph
}
}
private def checkAmbigiousKeyedElement(vertex: Vertex): Unit = {
def warningMessage(el: Any): String =
s"An `${el}` instance MUST NOT be used more than once in a `FlowGraph` to avoid ambiguity. " +
s"Use individual instances instead the same one multiple times instead. Nodes are: ${graph.nodes}"
vertex match {
case v: SourceVertex if v.source.isInstanceOf[KeyedSource[_]] require(!graph.contains(v), warningMessage(v.source))
case v: SinkVertex if v.sink.isInstanceOf[KeyedSink[_]] require(!graph.contains(v), warningMessage(v.sink))
case _ // ok
}
}
private def checkAddOrReplaceSourceSinkPrecondition(vertex: Vertex): Unit = {
vertex match {
// it is ok to add or replace edges with new or existing undefined sources or sinks
@ -896,7 +942,7 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph
case Some(node)
require(
(node.inDegree + 1) <= iv.maximumInputCount,
s"${node.value} must have at most ${iv.maximumInputCount} incoming edges, , has ${node.inDegree}\n${graph.edges}")
s"${node.value} must have at most ${iv.maximumInputCount} incoming edges, has ${node.inDegree}\n${graph.edges}")
case _ // ok
}
case _ // ok, no checks here