Merge pull request #16189 from akka/wip-16188-rm-equalityValue-patriknw

!str #16188 Remove value equality of iterable flow
This commit is contained in:
Patrik Nordwall 2014-10-31 11:05:26 +01:00
commit 34363374b6
7 changed files with 11 additions and 63 deletions

View file

@ -172,21 +172,6 @@ class SynchronousPublisherFromIterableSpec extends AkkaSpec {
probe.expectMsg("complete")
}
"have value equality of publisher" in {
val p1 = SynchronousPublisherFromIterable(List(1, 2, 3))
val p2 = SynchronousPublisherFromIterable(List(1, 2, 3))
p1 should be(p2)
p2 should be(p1)
val p3 = SynchronousPublisherFromIterable(List(1, 2, 3, 4))
p1 should not be (p3)
p3 should not be (p1)
val p4 = SynchronousPublisherFromIterable(Vector.empty[String])
val p5 = SynchronousPublisherFromIterable(Set.empty[String])
p1 should not be (p4)
p4 should be(p5)
p5 should be(p4)
}
"have nice toString" in {
SynchronousPublisherFromIterable(List(1, 2, 3)).toString should be("SynchronousPublisherFromIterable(1, 2, 3)")
}

View file

@ -136,22 +136,5 @@ class FlowIterableSpec extends AkkaSpec {
got.size should be < (count - 1)
}
"have value equality of publisher" in {
val p1 = Source(List(1, 2, 3)).runWith(Sink.publisher)
val p2 = Source(List(1, 2, 3)).runWith(Sink.publisher)
p1 should be(p2)
p2 should be(p1)
val p3 = Source(List(1, 2, 3, 4)).runWith(Sink.publisher)
p1 should not be (p3)
p3 should not be (p1)
val p4 = Source(Vector.empty[String]).runWith(Sink.publisher)
val p5 = Source(Set.empty[String]).runWith(Sink.publisher)
p1 should not be (p4)
p4 should be(p5)
p5 should be(p4)
val p6 = Source(List(1, 2, 3).iterator).runWith(Sink.publisher)
p1 should not be (p6)
p6 should not be (p1)
}
}
}
}

View file

@ -246,7 +246,7 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting
withDispatcher(settings.dispatcher), actorName)
}
val publisher = new ActorPublisher[Out](impl, equalityValue = None)
val publisher = new ActorPublisher[Out](impl)
impl ! ExposedPublisher(publisher.asInstanceOf[ActorPublisher[Any]])
val subscribers = Vector.tabulate(inputCount)(FanIn.SubInput[In](impl, _))
(subscribers, List(publisher))
@ -261,7 +261,7 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting
actorOf(Unzip.props(settings).withDispatcher(settings.dispatcher), actorName)
}
val publishers = Vector.tabulate(outputCount)(id new ActorPublisher[Out](impl, equalityValue = None) {
val publishers = Vector.tabulate(outputCount)(id new ActorPublisher[Out](impl) {
override val wakeUpMsg = FanOut.SubstreamSubscribePending(id)
})
impl ! FanOut.ExposedPublishers(publishers.asInstanceOf[immutable.Seq[ActorPublisher[Any]]])

View file

@ -26,7 +26,7 @@ private[akka] object ActorProcessor {
/**
* INTERNAL API
*/
private[akka] class ActorProcessor[I, O](impl: ActorRef) extends ActorPublisher[O](impl, None)
private[akka] class ActorProcessor[I, O](impl: ActorRef) extends ActorPublisher[O](impl)
with Processor[I, O] {
override def onSubscribe(s: Subscription): Unit = impl ! OnSubscribe(s)
override def onError(t: Throwable): Unit = impl ! OnError(t)

View file

@ -29,17 +29,13 @@ private[akka] object ActorPublisher {
class NormalShutdownException extends IllegalStateException("Cannot subscribe to shut-down spi.Publisher") with NoStackTrace
val NormalShutdownReason: Option[Throwable] = Some(new NormalShutdownException)
def apply[T](impl: ActorRef, equalityValue: Option[AnyRef] = None): ActorPublisher[T] = {
val a = new ActorPublisher[T](impl, equalityValue)
def apply[T](impl: ActorRef): ActorPublisher[T] = {
val a = new ActorPublisher[T](impl)
// Resolve cyclic dependency with actor. This MUST be the first message no matter what.
impl ! ExposedPublisher(a.asInstanceOf[ActorPublisher[Any]])
a
}
def unapply(o: Any): Option[(ActorRef, Option[AnyRef])] = o match {
case other: ActorPublisher[_] Some((other.impl, other.equalityValue))
case _ None
}
}
/**
@ -48,10 +44,10 @@ private[akka] object ActorPublisher {
* When you instantiate this class, or its subclasses, you MUST send an ExposedPublisher message to the wrapped
* ActorRef! If you don't need to subclass, prefer the apply() method on the companion object which takes care of this.
*/
private[akka] class ActorPublisher[T](val impl: ActorRef, val equalityValue: Option[AnyRef]) extends Publisher[T] {
private[akka] class ActorPublisher[T](val impl: ActorRef) extends Publisher[T] {
// The subscriber of an subscription attempt is first placed in this list of pending subscribers.
// The actor will call takePendingSubscribers to remove it from the list when it has received the
// The actor will call takePendingSubscribers to remove it from the list when it has received the
// SubscribePending message. The AtomicReference is set to null by the shutdown method, which is
// called by the actor from postStop. Pending (unregistered) subscription attempts are denied by
// the shutdown method. Subscription attempts after shutdown can be denied immediately.
@ -97,15 +93,6 @@ private[akka] class ActorPublisher[T](val impl: ActorRef, val equalityValue: Opt
case None subscriber.onComplete()
}
override def equals(o: Any): Boolean = (equalityValue, o) match {
case (Some(v), ActorPublisher(_, Some(otherValue))) v.equals(otherValue)
case _ super.equals(o)
}
override def hashCode: Int = equalityValue match {
case Some(v) v.hashCode
case None super.hashCode
}
}
/**

View file

@ -76,12 +76,5 @@ private[akka] class SynchronousPublisherFromIterable[T](private val iterable: im
override def subscribe(subscriber: Subscriber[_ >: T]): Unit =
subscriber.onSubscribe(new IteratorSubscription(subscriber, iterable.iterator))
override def equals(o: Any): Boolean = o match {
case other: SynchronousPublisherFromIterable[T] iterable == other.iterable
case _ false
}
override def hashCode: Int = iterable.hashCode
override def toString: String = s"SynchronousPublisherFromIterable(${iterable.mkString(", ")})"
}

View file

@ -133,7 +133,7 @@ private[scaladsl] final case class IterableSource[Out](iterable: immutable.Itera
override def create(materializer: ActorBasedFlowMaterializer, flowName: String) =
if (iterable.isEmpty) (EmptyPublisher[Out], ())
else (ActorPublisher[Out](materializer.actorOf(IterablePublisher.props(iterable, materializer.settings),
name = s"$flowName-0-iterable"), Some(iterable)), ())
name = s"$flowName-0-iterable")), ())
}
/**
@ -167,12 +167,12 @@ private[scaladsl] final case class FutureSource[Out](future: Future[Out]) extend
future.value match {
case Some(Success(element))
(ActorPublisher[Out](materializer.actorOf(IterablePublisher.props(List(element), materializer.settings),
name = s"$flowName-0-future"), Some(future)), ())
name = s"$flowName-0-future")), ())
case Some(Failure(t))
(ErrorPublisher(t).asInstanceOf[Publisher[Out]], ())
case None
(ActorPublisher[Out](materializer.actorOf(FuturePublisher.props(future, materializer.settings),
name = s"$flowName-0-future"), Some(future)), ())
name = s"$flowName-0-future")), ())
}
}