+str #15108 Use value equality on Producers wrapping iterables
* Fixes #15108
This commit is contained in:
parent
71e285027c
commit
f5ac4bb4b1
4 changed files with 41 additions and 7 deletions
|
|
@ -74,7 +74,7 @@ private[akka] object Ast {
|
|||
def createProducer(materializer: ActorBasedFlowMaterializer, flowName: String): Producer[I] =
|
||||
if (iterable.isEmpty) EmptyProducer.asInstanceOf[Producer[I]]
|
||||
else new ActorProducer[I](materializer.context.actorOf(IterableProducer.props(iterable, materializer.settings),
|
||||
name = s"$flowName-0-iterable"))
|
||||
name = s"$flowName-0-iterable"), Some(iterable))
|
||||
}
|
||||
final case class ThunkProducerNode[I](f: () ⇒ I) extends ProducerNode[I] {
|
||||
def createProducer(materializer: ActorBasedFlowMaterializer, flowName: String): Producer[I] =
|
||||
|
|
@ -86,12 +86,12 @@ private[akka] object Ast {
|
|||
future.value match {
|
||||
case Some(Success(element)) ⇒
|
||||
new ActorProducer[I](materializer.context.actorOf(IterableProducer.props(List(element), materializer.settings),
|
||||
name = s"$flowName-0-future"))
|
||||
name = s"$flowName-0-future"), Some(future))
|
||||
case Some(Failure(t)) ⇒
|
||||
new ErrorProducer(t).asInstanceOf[Producer[I]]
|
||||
ErrorProducer(t).asInstanceOf[Producer[I]]
|
||||
case None ⇒
|
||||
new ActorProducer[I](materializer.context.actorOf(FutureProducer.props(future, materializer.settings),
|
||||
name = s"$flowName-0-future"))
|
||||
name = s"$flowName-0-future"), Some(future))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -37,8 +37,19 @@ private[akka] trait ActorProducerLike[T] extends Producer[T] {
|
|||
|
||||
/**
|
||||
* INTERNAL API
|
||||
* If equalityValue is defined it is used for equals and hashCode, otherwise default reference equality.
|
||||
*/
|
||||
private[akka] class ActorProducer[T]( final val impl: ActorRef) extends ActorProducerLike[T]
|
||||
private[akka] class ActorProducer[T]( final val impl: ActorRef, val equalityValue: Option[AnyRef] = None) extends ActorProducerLike[T] {
|
||||
override def equals(o: Any): Boolean = (equalityValue, o) match {
|
||||
case (Some(v), ActorProducer(_, Some(otherValue))) ⇒ v.equals(otherValue)
|
||||
case _ ⇒ super.equals(o)
|
||||
}
|
||||
|
||||
override def hashCode: Int = equalityValue match {
|
||||
case Some(v) ⇒ v.hashCode
|
||||
case None ⇒ super.hashCode
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -46,6 +57,11 @@ private[akka] class ActorProducer[T]( final val impl: ActorRef) extends ActorPro
|
|||
private[akka] object ActorProducer {
|
||||
def props[T](settings: MaterializerSettings, f: () ⇒ T): Props =
|
||||
Props(new ActorProducerImpl(f, settings))
|
||||
|
||||
def unapply(o: Any): Option[(ActorRef, Option[AnyRef])] = o match {
|
||||
case other: ActorProducer[_] ⇒ Some((other.impl, other.equalityValue))
|
||||
case _ ⇒ None
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ import org.reactivestreams.spi.Subscriber
|
|||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] object EmptyProducer extends Producer[Nothing] with Publisher[Nothing] {
|
||||
private[akka] case object EmptyProducer extends Producer[Nothing] with Publisher[Nothing] {
|
||||
def getPublisher: Publisher[Nothing] = this
|
||||
|
||||
def subscribe(subscriber: Subscriber[Nothing]): Unit =
|
||||
|
|
@ -25,7 +25,7 @@ private[akka] object EmptyProducer extends Producer[Nothing] with Publisher[Noth
|
|||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] class ErrorProducer(t: Throwable) extends Producer[Nothing] with Publisher[Nothing] {
|
||||
private[akka] case class ErrorProducer(t: Throwable) extends Producer[Nothing] with Publisher[Nothing] {
|
||||
def getPublisher: Publisher[Nothing] = this
|
||||
|
||||
def subscribe(subscriber: Subscriber[Nothing]): Unit =
|
||||
|
|
|
|||
|
|
@ -135,5 +135,23 @@ class FlowIterableSpec extends AkkaSpec {
|
|||
}
|
||||
got.size should be < (count - 1)
|
||||
}
|
||||
|
||||
"have value equality of producer" in {
|
||||
val p1 = Flow(List(1, 2, 3)).toProducer(materializer)
|
||||
val p2 = Flow(List(1, 2, 3)).toProducer(materializer)
|
||||
p1 should be(p2)
|
||||
p2 should be(p1)
|
||||
val p3 = Flow(List(1, 2, 3, 4)).toProducer(materializer)
|
||||
p1 should not be (p3)
|
||||
p3 should not be (p1)
|
||||
val p4 = Flow(Vector.empty[String]).toProducer(materializer)
|
||||
val p5 = Flow(Set.empty[String]).toProducer(materializer)
|
||||
p1 should not be (p4)
|
||||
p4 should be(p5)
|
||||
p5 should be(p4)
|
||||
val p6 = Flow(List(1, 2, 3).iterator).toProducer(materializer)
|
||||
p1 should not be (p6)
|
||||
p6 should not be (p1)
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue