Merge pull request #20225 from akka/wip-17037-stash-patriknw

str: Support stash in ActorPublisher, #17037
This commit is contained in:
Johan Andrén 2016-04-06 18:46:43 +02:00
commit 825f49eae2
2 changed files with 67 additions and 12 deletions

View file

@ -10,10 +10,10 @@ import akka.stream.testkit._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.testkit.TestEvent.Mute import akka.testkit.TestEvent.Mute
import akka.testkit.{ AkkaSpec, EventFilter, ImplicitSender, TestProbe } import akka.testkit.{ AkkaSpec, EventFilter, ImplicitSender, TestProbe }
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
import akka.actor.Stash
object ActorPublisherSpec { object ActorPublisherSpec {
@ -29,6 +29,12 @@ object ActorPublisherSpec {
else p else p
} }
def testPublisherWithStashProps(probe: ActorRef, useTestDispatcher: Boolean = true): Props = {
val p = Props(new TestPublisherWithStash(probe))
if (useTestDispatcher) p.withDispatcher("akka.test.stream-dispatcher")
else p
}
case class TotalDemand(elements: Long) case class TotalDemand(elements: Long)
case class Produce(elem: String) case class Produce(elem: String)
case class Err(reason: String) case class Err(reason: String)
@ -53,6 +59,19 @@ object ActorPublisherSpec {
} }
} }
class TestPublisherWithStash(probe: ActorRef) extends TestPublisher(probe) with Stash {
override def receive = stashing
def stashing: Receive = {
case "unstash"
unstashAll()
context.become(super.receive)
case _ stash()
}
}
def senderProps: Props = Props[Sender].withDispatcher("akka.test.stream-dispatcher") def senderProps: Props = Props[Sender].withDispatcher("akka.test.stream-dispatcher")
class Sender extends ActorPublisher[Int] { class Sender extends ActorPublisher[Int] {
@ -445,6 +464,22 @@ class ActorPublisherSpec extends AkkaSpec(ActorPublisherSpec.config) with Implic
expectMsgType[String] should include("my-dispatcher1") expectMsgType[String] should include("my-dispatcher1")
} }
"handle stash" in {
val probe = TestProbe()
val ref = system.actorOf(testPublisherWithStashProps(probe.ref))
val p = ActorPublisher[String](ref)
val s = TestSubscriber.probe[String]()
p.subscribe(s)
s.request(2)
s.request(3)
ref ! "unstash"
probe.expectMsg(TotalDemand(5))
probe.expectMsg(TotalDemand(5))
s.request(4)
probe.expectMsg(TotalDemand(9))
s.cancel()
}
} }
} }

View file

@ -45,7 +45,18 @@ object ActorPublisherMessage {
* more elements. * more elements.
* @param n number of requested elements * @param n number of requested elements
*/ */
final case class Request(n: Long) extends ActorPublisherMessage with NoSerializationVerificationNeeded final case class Request(n: Long) extends ActorPublisherMessage with NoSerializationVerificationNeeded {
private var processed = false
/**
* INTERNAL API: needed for stash support
*/
private[akka] def markProcessed(): Unit = processed = true
/**
* INTERNAL API: needed for stash support
*/
private[akka] def isProcessed(): Boolean = processed
}
/** /**
* This message is delivered to the [[ActorPublisher]] actor when the stream subscriber cancels the * This message is delivered to the [[ActorPublisher]] actor when the stream subscriber cancels the
@ -256,15 +267,21 @@ trait ActorPublisher[T] extends Actor {
* INTERNAL API * INTERNAL API
*/ */
protected[akka] override def aroundReceive(receive: Receive, msg: Any): Unit = msg match { protected[akka] override def aroundReceive(receive: Receive, msg: Any): Unit = msg match {
case Request(n) case req @ Request(n)
if (n < 1) { if (req.isProcessed()) {
if (lifecycleState == Active) // it's an unstashed Request, demand is already handled
onError(numberOfElementsInRequestMustBePositiveException) super.aroundReceive(receive, req)
} else { } else {
demand += n if (n < 1) {
if (demand < 0) if (lifecycleState == Active)
demand = Long.MaxValue // Long overflow, Reactive Streams Spec 3:17: effectively unbounded onError(numberOfElementsInRequestMustBePositiveException)
super.aroundReceive(receive, msg) } else {
demand += n
if (demand < 0)
demand = Long.MaxValue // Long overflow, Reactive Streams Spec 3:17: effectively unbounded
req.markProcessed()
super.aroundReceive(receive, req)
}
} }
case Subscribe(sub: Subscriber[_]) case Subscribe(sub: Subscriber[_])
@ -293,8 +310,11 @@ trait ActorPublisher[T] extends Actor {
} }
case Cancel case Cancel
cancelSelf() if (lifecycleState != Canceled) {
super.aroundReceive(receive, msg) // possible to receive again in case of stash
cancelSelf()
super.aroundReceive(receive, msg)
}
case SubscriptionTimeoutExceeded case SubscriptionTimeoutExceeded
if (!scheduledSubscriptionTimeout.isCancelled) { if (!scheduledSubscriptionTimeout.isCancelled) {