replace unicode arrows
* ⇒, →, ← * because we don't want to show them in documentation snippets and then it's complicated to avoid that when snippets are located in src/test/scala in individual modules * dont replace object `→` in FSM.scala and PersistentFSM.scala
This commit is contained in:
parent
e4d38f92a4
commit
5c96a5f556
1521 changed files with 18846 additions and 18786 deletions
|
|
@ -21,7 +21,7 @@ import scala.concurrent.{ Future, Promise }
|
|||
@InternalApi
|
||||
private[stream] final case class SourceRefImpl[T](initialPartnerRef: ActorRef) extends SourceRef[T] {
|
||||
def source: Source[T, NotUsed] =
|
||||
Source.fromGraph(new SourceRefStageImpl(OptionVal.Some(initialPartnerRef))).mapMaterializedValue(_ ⇒ NotUsed)
|
||||
Source.fromGraph(new SourceRefStageImpl(OptionVal.Some(initialPartnerRef))).mapMaterializedValue(_ => NotUsed)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -33,15 +33,15 @@ private[stream] final case class SourceRefImpl[T](initialPartnerRef: ActorRef) e
|
|||
@InternalApi
|
||||
private[stream] final class SourceRefStageImpl[Out](
|
||||
val initialPartnerRef: OptionVal[ActorRef]
|
||||
) extends GraphStageWithMaterializedValue[SourceShape[Out], Future[SinkRef[Out]]] { stage ⇒
|
||||
) extends GraphStageWithMaterializedValue[SourceShape[Out], Future[SinkRef[Out]]] { stage =>
|
||||
|
||||
val out: Outlet[Out] = Outlet[Out](s"${Logging.simpleName(getClass)}.out")
|
||||
override def shape = SourceShape.of(out)
|
||||
|
||||
private def initialRefName =
|
||||
initialPartnerRef match {
|
||||
case OptionVal.Some(ref) ⇒ ref.toString
|
||||
case _ ⇒ "<no-initial-ref>"
|
||||
case OptionVal.Some(ref) => ref.toString
|
||||
case _ => "<no-initial-ref>"
|
||||
}
|
||||
|
||||
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[SinkRef[Out]]) = {
|
||||
|
|
@ -127,7 +127,7 @@ private[stream] final class SourceRefStageImpl[Out](
|
|||
scheduleOnce(DemandRedeliveryTimerKey, settings.demandRedeliveryInterval)
|
||||
|
||||
override protected def onTimer(timerKey: Any): Unit = timerKey match {
|
||||
case SubscriptionTimeoutTimerKey ⇒
|
||||
case SubscriptionTimeoutTimerKey =>
|
||||
val ex = StreamRefSubscriptionTimeoutException(
|
||||
// we know the future has been competed by now, since it is in preStart
|
||||
s"[$stageActorName] Remote side did not subscribe (materialize) handed out Sink reference [${promise.future.value}]," +
|
||||
|
|
@ -135,25 +135,25 @@ private[stream] final class SourceRefStageImpl[Out](
|
|||
|
||||
throw ex // this will also log the exception, unlike failStage; this should fail rarely, but would be good to have it "loud"
|
||||
|
||||
case DemandRedeliveryTimerKey ⇒
|
||||
case DemandRedeliveryTimerKey =>
|
||||
log.debug("[{}] Scheduled re-delivery of demand until [{}]", stageActorName, localCumulativeDemand)
|
||||
getPartnerRef ! StreamRefsProtocol.CumulativeDemand(localCumulativeDemand)
|
||||
scheduleDemandRedelivery()
|
||||
|
||||
case TerminationDeadlineTimerKey ⇒
|
||||
case TerminationDeadlineTimerKey =>
|
||||
failStage(RemoteStreamRefActorTerminatedException(s"Remote partner [$partnerRef] has terminated unexpectedly and no clean completion/failure message was received " +
|
||||
"(possible reasons: network partition or subscription timeout triggered termination of partner). Tearing down."))
|
||||
}
|
||||
|
||||
lazy val initialReceive: ((ActorRef, Any)) ⇒ Unit = {
|
||||
case (sender, msg @ StreamRefsProtocol.OnSubscribeHandshake(remoteRef)) ⇒
|
||||
lazy val initialReceive: ((ActorRef, Any)) => Unit = {
|
||||
case (sender, msg @ StreamRefsProtocol.OnSubscribeHandshake(remoteRef)) =>
|
||||
cancelTimer(SubscriptionTimeoutTimerKey)
|
||||
observeAndValidateSender(remoteRef, "Illegal sender in SequencedOnNext")
|
||||
log.debug("[{}] Received handshake {} from {}", stageActorName, msg, sender)
|
||||
|
||||
triggerCumulativeDemand()
|
||||
|
||||
case (sender, msg @ StreamRefsProtocol.SequencedOnNext(seqNr, payload: Out @unchecked)) ⇒
|
||||
case (sender, msg @ StreamRefsProtocol.SequencedOnNext(seqNr, payload: Out @unchecked)) =>
|
||||
observeAndValidateSender(sender, "Illegal sender in SequencedOnNext")
|
||||
observeAndValidateSequenceNr(seqNr, "Illegal sequence nr in SequencedOnNext")
|
||||
log.debug("[{}] Received seq {} from {}", stageActorName, msg, sender)
|
||||
|
|
@ -161,7 +161,7 @@ private[stream] final class SourceRefStageImpl[Out](
|
|||
onReceiveElement(payload)
|
||||
triggerCumulativeDemand()
|
||||
|
||||
case (sender, StreamRefsProtocol.RemoteStreamCompleted(seqNr)) ⇒
|
||||
case (sender, StreamRefsProtocol.RemoteStreamCompleted(seqNr)) =>
|
||||
observeAndValidateSender(sender, "Illegal sender in RemoteSinkCompleted")
|
||||
observeAndValidateSequenceNr(seqNr, "Illegal sequence nr in RemoteSinkCompleted")
|
||||
log.debug("[{}] The remote stream has completed, completing as well...", stageActorName)
|
||||
|
|
@ -170,22 +170,22 @@ private[stream] final class SourceRefStageImpl[Out](
|
|||
completed = true
|
||||
tryPush()
|
||||
|
||||
case (sender, StreamRefsProtocol.RemoteStreamFailure(reason)) ⇒
|
||||
case (sender, StreamRefsProtocol.RemoteStreamFailure(reason)) =>
|
||||
observeAndValidateSender(sender, "Illegal sender in RemoteSinkFailure")
|
||||
log.warning("[{}] The remote stream has failed, failing (reason: {})", stageActorName, reason)
|
||||
|
||||
self.unwatch(sender)
|
||||
failStage(RemoteStreamRefActorTerminatedException(s"Remote stream (${sender.path}) failed, reason: $reason"))
|
||||
|
||||
case (_, Terminated(ref)) ⇒
|
||||
case (_, Terminated(ref)) =>
|
||||
partnerRef match {
|
||||
case OptionVal.Some(`ref`) ⇒
|
||||
case OptionVal.Some(`ref`) =>
|
||||
// we need to start a delayed shutdown in case we were network partitioned and the final signal complete/fail
|
||||
// will never reach us; so after the given timeout we need to forcefully terminate this side of the stream ref
|
||||
// the other (sending) side terminates by default once it gets a Terminated signal so no special handling is needed there.
|
||||
scheduleOnce(TerminationDeadlineTimerKey, settings.finalTerminationSignalDeadline)
|
||||
|
||||
case _ ⇒
|
||||
case _ =>
|
||||
// this should not have happened! It should be impossible that we watched some other actor
|
||||
failStage(RemoteStreamRefActorTerminatedException(s"Received UNEXPECTED Terminated($ref) message! " +
|
||||
s"This actor was NOT our trusted remote partner, which was: $getPartnerRef. Tearing down."))
|
||||
|
|
@ -214,12 +214,12 @@ private[stream] final class SourceRefStageImpl[Out](
|
|||
/** @throws InvalidPartnerActorException when partner ref is invalid */
|
||||
def observeAndValidateSender(partner: ActorRef, msg: String): Unit =
|
||||
partnerRef match {
|
||||
case OptionVal.None ⇒
|
||||
case OptionVal.None =>
|
||||
log.debug("Received first message from {}, assuming it to be the remote partner for this stage", partner)
|
||||
partnerRef = OptionVal(partner)
|
||||
self.watch(partner)
|
||||
|
||||
case OptionVal.Some(ref) ⇒
|
||||
case OptionVal.Some(ref) =>
|
||||
if (partner != ref) {
|
||||
val ex = InvalidPartnerActorException(partner, getPartnerRef, msg)
|
||||
partner ! StreamRefsProtocol.RemoteStreamFailure(ex.getMessage)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue