From d3a343579c43cef8b0550486618d86c0d00e9f6f Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 31 Oct 2014 08:53:27 +0100 Subject: [PATCH] !str #16188 Remove value equality of iterable flow --- ...SynchronousPublisherFromIterableSpec.scala | 15 ------------- .../stream/scaladsl/FlowIterableSpec.scala | 19 +---------------- .../impl/ActorBasedFlowMaterializer.scala | 4 ++-- .../akka/stream/impl/ActorProcessor.scala | 2 +- .../akka/stream/impl/ActorPublisher.scala | 21 ++++--------------- .../SynchronousPublisherFromIterable.scala | 7 ------- .../stream/scaladsl/ActorFlowSource.scala | 6 +++--- 7 files changed, 11 insertions(+), 63 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/SynchronousPublisherFromIterableSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/SynchronousPublisherFromIterableSpec.scala index 9232db7846..f08cbf31ee 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/SynchronousPublisherFromIterableSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/SynchronousPublisherFromIterableSpec.scala @@ -172,21 +172,6 @@ class SynchronousPublisherFromIterableSpec extends AkkaSpec { probe.expectMsg("complete") } - "have value equality of publisher" in { - val p1 = SynchronousPublisherFromIterable(List(1, 2, 3)) - val p2 = SynchronousPublisherFromIterable(List(1, 2, 3)) - p1 should be(p2) - p2 should be(p1) - val p3 = SynchronousPublisherFromIterable(List(1, 2, 3, 4)) - p1 should not be (p3) - p3 should not be (p1) - val p4 = SynchronousPublisherFromIterable(Vector.empty[String]) - val p5 = SynchronousPublisherFromIterable(Set.empty[String]) - p1 should not be (p4) - p4 should be(p5) - p5 should be(p4) - } - "have nice toString" in { SynchronousPublisherFromIterable(List(1, 2, 3)).toString should be("SynchronousPublisherFromIterable(1, 2, 3)") } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIterableSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIterableSpec.scala index 1142978c84..d94a04d304 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIterableSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIterableSpec.scala @@ -136,22 +136,5 @@ class FlowIterableSpec extends AkkaSpec { got.size should be < (count - 1) } - "have value equality of publisher" in { - val p1 = Source(List(1, 2, 3)).runWith(Sink.publisher) - val p2 = Source(List(1, 2, 3)).runWith(Sink.publisher) - p1 should be(p2) - p2 should be(p1) - val p3 = Source(List(1, 2, 3, 4)).runWith(Sink.publisher) - p1 should not be (p3) - p3 should not be (p1) - val p4 = Source(Vector.empty[String]).runWith(Sink.publisher) - val p5 = Source(Set.empty[String]).runWith(Sink.publisher) - p1 should not be (p4) - p4 should be(p5) - p5 should be(p4) - val p6 = Source(List(1, 2, 3).iterator).runWith(Sink.publisher) - p1 should not be (p6) - p6 should not be (p1) - } } -} \ No newline at end of file +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala index 56d3dc2dd4..8bf8e87f0e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala @@ -246,7 +246,7 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting withDispatcher(settings.dispatcher), actorName) } - val publisher = new ActorPublisher[Out](impl, equalityValue = None) + val publisher = new ActorPublisher[Out](impl) impl ! ExposedPublisher(publisher.asInstanceOf[ActorPublisher[Any]]) val subscribers = Vector.tabulate(inputCount)(FanIn.SubInput[In](impl, _)) (subscribers, List(publisher)) @@ -261,7 +261,7 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting actorOf(Unzip.props(settings).withDispatcher(settings.dispatcher), actorName) } - val publishers = Vector.tabulate(outputCount)(id ⇒ new ActorPublisher[Out](impl, equalityValue = None) { + val publishers = Vector.tabulate(outputCount)(id ⇒ new ActorPublisher[Out](impl) { override val wakeUpMsg = FanOut.SubstreamSubscribePending(id) }) impl ! FanOut.ExposedPublishers(publishers.asInstanceOf[immutable.Seq[ActorPublisher[Any]]]) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index 9c5cf0ff54..82570589f8 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -26,7 +26,7 @@ private[akka] object ActorProcessor { /** * INTERNAL API */ -private[akka] class ActorProcessor[I, O](impl: ActorRef) extends ActorPublisher[O](impl, None) +private[akka] class ActorProcessor[I, O](impl: ActorRef) extends ActorPublisher[O](impl) with Processor[I, O] { override def onSubscribe(s: Subscription): Unit = impl ! OnSubscribe(s) override def onError(t: Throwable): Unit = impl ! OnError(t) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala index 40c8867361..03294555c3 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala @@ -29,17 +29,13 @@ private[akka] object ActorPublisher { class NormalShutdownException extends IllegalStateException("Cannot subscribe to shut-down spi.Publisher") with NoStackTrace val NormalShutdownReason: Option[Throwable] = Some(new NormalShutdownException) - def apply[T](impl: ActorRef, equalityValue: Option[AnyRef] = None): ActorPublisher[T] = { - val a = new ActorPublisher[T](impl, equalityValue) + def apply[T](impl: ActorRef): ActorPublisher[T] = { + val a = new ActorPublisher[T](impl) // Resolve cyclic dependency with actor. This MUST be the first message no matter what. impl ! ExposedPublisher(a.asInstanceOf[ActorPublisher[Any]]) a } - def unapply(o: Any): Option[(ActorRef, Option[AnyRef])] = o match { - case other: ActorPublisher[_] ⇒ Some((other.impl, other.equalityValue)) - case _ ⇒ None - } } /** @@ -48,10 +44,10 @@ private[akka] object ActorPublisher { * When you instantiate this class, or its subclasses, you MUST send an ExposedPublisher message to the wrapped * ActorRef! If you don't need to subclass, prefer the apply() method on the companion object which takes care of this. */ -private[akka] class ActorPublisher[T](val impl: ActorRef, val equalityValue: Option[AnyRef]) extends Publisher[T] { +private[akka] class ActorPublisher[T](val impl: ActorRef) extends Publisher[T] { // The subscriber of an subscription attempt is first placed in this list of pending subscribers. - // The actor will call takePendingSubscribers to remove it from the list when it has received the + // The actor will call takePendingSubscribers to remove it from the list when it has received the // SubscribePending message. The AtomicReference is set to null by the shutdown method, which is // called by the actor from postStop. Pending (unregistered) subscription attempts are denied by // the shutdown method. Subscription attempts after shutdown can be denied immediately. @@ -97,15 +93,6 @@ private[akka] class ActorPublisher[T](val impl: ActorRef, val equalityValue: Opt case None ⇒ subscriber.onComplete() } - override def equals(o: Any): Boolean = (equalityValue, o) match { - case (Some(v), ActorPublisher(_, Some(otherValue))) ⇒ v.equals(otherValue) - case _ ⇒ super.equals(o) - } - - override def hashCode: Int = equalityValue match { - case Some(v) ⇒ v.hashCode - case None ⇒ super.hashCode - } } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/SynchronousPublisherFromIterable.scala b/akka-stream/src/main/scala/akka/stream/impl/SynchronousPublisherFromIterable.scala index b58976d3b4..c6bb590c42 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SynchronousPublisherFromIterable.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SynchronousPublisherFromIterable.scala @@ -76,12 +76,5 @@ private[akka] class SynchronousPublisherFromIterable[T](private val iterable: im override def subscribe(subscriber: Subscriber[_ >: T]): Unit = subscriber.onSubscribe(new IteratorSubscription(subscriber, iterable.iterator)) - override def equals(o: Any): Boolean = o match { - case other: SynchronousPublisherFromIterable[T] ⇒ iterable == other.iterable - case _ ⇒ false - } - - override def hashCode: Int = iterable.hashCode - override def toString: String = s"SynchronousPublisherFromIterable(${iterable.mkString(", ")})" } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala index ecc257160e..d7853eddb6 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala @@ -133,7 +133,7 @@ private[scaladsl] final case class IterableSource[Out](iterable: immutable.Itera override def create(materializer: ActorBasedFlowMaterializer, flowName: String) = if (iterable.isEmpty) (EmptyPublisher[Out], ()) else (ActorPublisher[Out](materializer.actorOf(IterablePublisher.props(iterable, materializer.settings), - name = s"$flowName-0-iterable"), Some(iterable)), ()) + name = s"$flowName-0-iterable")), ()) } /** @@ -167,12 +167,12 @@ private[scaladsl] final case class FutureSource[Out](future: Future[Out]) extend future.value match { case Some(Success(element)) ⇒ (ActorPublisher[Out](materializer.actorOf(IterablePublisher.props(List(element), materializer.settings), - name = s"$flowName-0-future"), Some(future)), ()) + name = s"$flowName-0-future")), ()) case Some(Failure(t)) ⇒ (ErrorPublisher(t).asInstanceOf[Publisher[Out]], ()) case None ⇒ (ActorPublisher[Out](materializer.actorOf(FuturePublisher.props(future, materializer.settings), - name = s"$flowName-0-future"), Some(future)), ()) + name = s"$flowName-0-future")), ()) } }