diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala index c49014a328..47fb0cbf81 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala @@ -74,7 +74,7 @@ private[akka] object Ast { def createProducer(materializer: ActorBasedFlowMaterializer, flowName: String): Producer[I] = if (iterable.isEmpty) EmptyProducer.asInstanceOf[Producer[I]] else new ActorProducer[I](materializer.context.actorOf(IterableProducer.props(iterable, materializer.settings), - name = s"$flowName-0-iterable")) + name = s"$flowName-0-iterable"), Some(iterable)) } final case class ThunkProducerNode[I](f: () ⇒ I) extends ProducerNode[I] { def createProducer(materializer: ActorBasedFlowMaterializer, flowName: String): Producer[I] = @@ -86,12 +86,12 @@ private[akka] object Ast { future.value match { case Some(Success(element)) ⇒ new ActorProducer[I](materializer.context.actorOf(IterableProducer.props(List(element), materializer.settings), - name = s"$flowName-0-future")) + name = s"$flowName-0-future"), Some(future)) case Some(Failure(t)) ⇒ - new ErrorProducer(t).asInstanceOf[Producer[I]] + ErrorProducer(t).asInstanceOf[Producer[I]] case None ⇒ new ActorProducer[I](materializer.context.actorOf(FutureProducer.props(future, materializer.settings), - name = s"$flowName-0-future")) + name = s"$flowName-0-future"), Some(future)) } } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProducer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProducer.scala index 04581a5a63..eda89aeaad 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProducer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProducer.scala @@ -37,8 +37,19 @@ private[akka] trait ActorProducerLike[T] extends Producer[T] { /** * INTERNAL API + * If equalityValue is defined it is used for equals and hashCode, otherwise default reference equality. */ -private[akka] class ActorProducer[T]( final val impl: ActorRef) extends ActorProducerLike[T] +private[akka] class ActorProducer[T]( final val impl: ActorRef, val equalityValue: Option[AnyRef] = None) extends ActorProducerLike[T] { + override def equals(o: Any): Boolean = (equalityValue, o) match { + case (Some(v), ActorProducer(_, Some(otherValue))) ⇒ v.equals(otherValue) + case _ ⇒ super.equals(o) + } + + override def hashCode: Int = equalityValue match { + case Some(v) ⇒ v.hashCode + case None ⇒ super.hashCode + } +} /** * INTERNAL API @@ -46,6 +57,11 @@ private[akka] class ActorProducer[T]( final val impl: ActorRef) extends ActorPro private[akka] object ActorProducer { def props[T](settings: MaterializerSettings, f: () ⇒ T): Props = Props(new ActorProducerImpl(f, settings)) + + def unapply(o: Any): Option[(ActorRef, Option[AnyRef])] = o match { + case other: ActorProducer[_] ⇒ Some((other.impl, other.equalityValue)) + case _ ⇒ None + } } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/EmptyProducer.scala b/akka-stream/src/main/scala/akka/stream/impl/EmptyProducer.scala index 26b254393b..d6ffd1631d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/EmptyProducer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/EmptyProducer.scala @@ -11,7 +11,7 @@ import org.reactivestreams.spi.Subscriber /** * INTERNAL API */ -private[akka] object EmptyProducer extends Producer[Nothing] with Publisher[Nothing] { +private[akka] case object EmptyProducer extends Producer[Nothing] with Publisher[Nothing] { def getPublisher: Publisher[Nothing] = this def subscribe(subscriber: Subscriber[Nothing]): Unit = @@ -25,7 +25,7 @@ private[akka] object EmptyProducer extends Producer[Nothing] with Publisher[Noth /** * INTERNAL API */ -private[akka] class ErrorProducer(t: Throwable) extends Producer[Nothing] with Publisher[Nothing] { +private[akka] case class ErrorProducer(t: Throwable) extends Producer[Nothing] with Publisher[Nothing] { def getPublisher: Publisher[Nothing] = this def subscribe(subscriber: Subscriber[Nothing]): Unit = diff --git a/akka-stream/src/test/scala/akka/stream/FlowIterableSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowIterableSpec.scala index f298a74e59..ab486f6e70 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowIterableSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowIterableSpec.scala @@ -135,5 +135,23 @@ class FlowIterableSpec extends AkkaSpec { } got.size should be < (count - 1) } + + "have value equality of producer" in { + val p1 = Flow(List(1, 2, 3)).toProducer(materializer) + val p2 = Flow(List(1, 2, 3)).toProducer(materializer) + p1 should be(p2) + p2 should be(p1) + val p3 = Flow(List(1, 2, 3, 4)).toProducer(materializer) + p1 should not be (p3) + p3 should not be (p1) + val p4 = Flow(Vector.empty[String]).toProducer(materializer) + val p5 = Flow(Set.empty[String]).toProducer(materializer) + p1 should not be (p4) + p4 should be(p5) + p5 should be(p4) + val p6 = Flow(List(1, 2, 3).iterator).toProducer(materializer) + p1 should not be (p6) + p6 should not be (p1) + } } } \ No newline at end of file