Cleanup various warnings in akka-stream #26395
This commit is contained in:
parent
c3806c9135
commit
51add23b2a
20 changed files with 44 additions and 60 deletions
|
|
@ -463,7 +463,7 @@ class TraversalBuilderSpec extends AkkaSpec {
|
||||||
//TODO: Dummy test cases just for smoke-testing. Should be removed.
|
//TODO: Dummy test cases just for smoke-testing. Should be removed.
|
||||||
|
|
||||||
"foo" in {
|
"foo" in {
|
||||||
implicit val mat = PhasedFusingActorMaterializer(ActorMaterializerSettings(system).withSyncProcessingLimit(5000))
|
implicit val mat = PhasedFusingActorMaterializer()
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
|
|
||||||
val graph = Source.repeat(1).take(10).toMat(Sink.fold(0)(_ + _))(Keep.right)
|
val graph = Source.repeat(1).take(10).toMat(Sink.fold(0)(_ + _))(Keep.right)
|
||||||
|
|
@ -472,7 +472,7 @@ class TraversalBuilderSpec extends AkkaSpec {
|
||||||
}
|
}
|
||||||
|
|
||||||
"islands 1" in {
|
"islands 1" in {
|
||||||
implicit val mat = PhasedFusingActorMaterializer(ActorMaterializerSettings(system).withSyncProcessingLimit(5000))
|
implicit val mat = PhasedFusingActorMaterializer()
|
||||||
val sub = TestSubscriber.probe[Int]()
|
val sub = TestSubscriber.probe[Int]()
|
||||||
val graph = Source.repeat(1).take(10).toMat(Sink.asPublisher(false))(Keep.right)
|
val graph = Source.repeat(1).take(10).toMat(Sink.asPublisher(false))(Keep.right)
|
||||||
|
|
||||||
|
|
@ -484,7 +484,7 @@ class TraversalBuilderSpec extends AkkaSpec {
|
||||||
}
|
}
|
||||||
|
|
||||||
"islands 2" in {
|
"islands 2" in {
|
||||||
implicit val mat = PhasedFusingActorMaterializer(ActorMaterializerSettings(system).withSyncProcessingLimit(5000))
|
implicit val mat = PhasedFusingActorMaterializer()
|
||||||
val pub = TestPublisher.probe[Int]()
|
val pub = TestPublisher.probe[Int]()
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
|
|
||||||
|
|
@ -503,7 +503,7 @@ class TraversalBuilderSpec extends AkkaSpec {
|
||||||
}
|
}
|
||||||
|
|
||||||
"islands 3" in {
|
"islands 3" in {
|
||||||
implicit val mat = PhasedFusingActorMaterializer(ActorMaterializerSettings(system).withSyncProcessingLimit(5000))
|
implicit val mat = PhasedFusingActorMaterializer()
|
||||||
val sub = TestSubscriber.probe[Int]()
|
val sub = TestSubscriber.probe[Int]()
|
||||||
Source
|
Source
|
||||||
.repeat(1)
|
.repeat(1)
|
||||||
|
|
@ -516,7 +516,7 @@ class TraversalBuilderSpec extends AkkaSpec {
|
||||||
}
|
}
|
||||||
|
|
||||||
"islands 4" in {
|
"islands 4" in {
|
||||||
implicit val mat = PhasedFusingActorMaterializer(ActorMaterializerSettings(system).withSyncProcessingLimit(5000))
|
implicit val mat = PhasedFusingActorMaterializer()
|
||||||
val pub = TestPublisher.probe[Int]()
|
val pub = TestPublisher.probe[Int]()
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
|
|
||||||
|
|
@ -531,9 +531,9 @@ class TraversalBuilderSpec extends AkkaSpec {
|
||||||
}
|
}
|
||||||
|
|
||||||
"bidiflow1" in {
|
"bidiflow1" in {
|
||||||
implicit val mat = PhasedFusingActorMaterializer(ActorMaterializerSettings(system).withSyncProcessingLimit(5000))
|
implicit val mat = PhasedFusingActorMaterializer()
|
||||||
val flow1 = Flow.fromGraph(new fusing.Map((x: Int) ⇒ x + 1))
|
val flow1 = Flow.fromGraph(fusing.Map((x: Int) ⇒ x + 1))
|
||||||
val flow2 = Flow.fromGraph(new fusing.Map((x: Int) ⇒ x + 1))
|
val flow2 = Flow.fromGraph(fusing.Map((x: Int) ⇒ x + 1))
|
||||||
|
|
||||||
val bidi = BidiFlow.fromFlowsMat(flow1, flow2)(Keep.none)
|
val bidi = BidiFlow.fromFlowsMat(flow1, flow2)(Keep.none)
|
||||||
|
|
||||||
|
|
@ -543,7 +543,7 @@ class TraversalBuilderSpec extends AkkaSpec {
|
||||||
}
|
}
|
||||||
|
|
||||||
"bidiflow reverse" in {
|
"bidiflow reverse" in {
|
||||||
implicit val mat = PhasedFusingActorMaterializer(ActorMaterializerSettings(system).withSyncProcessingLimit(5000))
|
implicit val mat = PhasedFusingActorMaterializer()
|
||||||
val flow1 = Flow.fromGraph(new fusing.Map((x: Int) ⇒ x + 1))
|
val flow1 = Flow.fromGraph(new fusing.Map((x: Int) ⇒ x + 1))
|
||||||
val flow2 = Flow.fromGraph(new fusing.Map((x: Int) ⇒ x + 1))
|
val flow2 = Flow.fromGraph(new fusing.Map((x: Int) ⇒ x + 1))
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -2,3 +2,12 @@
|
||||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowWithContextOps.log")
|
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowWithContextOps.log")
|
||||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowWithContextOps.log$default$2")
|
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowWithContextOps.log$default$2")
|
||||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowWithContextOps.log$default$3")
|
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowWithContextOps.log$default$3")
|
||||||
|
# Various compiler warnings in streams #26399
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.PhasedFusingActorMaterializer.apply")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ProcessorModulePhase.this")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.BoundedBuffer#DynamicQueue.this")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.StreamSupervisor.this")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.TlsModulePhase.this")
|
||||||
|
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$TickSource$TickSourceCancellable")
|
||||||
|
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$TickSource$")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.snapshot.MaterializerState.requestFromChild")
|
||||||
|
|
|
||||||
|
|
@ -626,7 +626,7 @@ final class IOSettings private (val tcpWriteBufferSize: Int) {
|
||||||
|
|
||||||
def withTcpWriteBufferSize(value: Int): IOSettings = copy(tcpWriteBufferSize = value)
|
def withTcpWriteBufferSize(value: Int): IOSettings = copy(tcpWriteBufferSize = value)
|
||||||
|
|
||||||
private def copy(tcpWriteBufferSize: Int = tcpWriteBufferSize): IOSettings = new IOSettings(
|
private def copy(tcpWriteBufferSize: Int): IOSettings = new IOSettings(
|
||||||
tcpWriteBufferSize = tcpWriteBufferSize)
|
tcpWriteBufferSize = tcpWriteBufferSize)
|
||||||
|
|
||||||
override def equals(other: Any): Boolean = other match {
|
override def equals(other: Any): Boolean = other match {
|
||||||
|
|
|
||||||
|
|
@ -166,7 +166,7 @@ object Timed extends TimedOps with TimedIntervalBetweenOps {
|
||||||
override def onPush(): Unit = {
|
override def onPush(): Unit = {
|
||||||
val elem = grab(in)
|
val elem = grab(in)
|
||||||
if (matching(elem)) {
|
if (matching(elem)) {
|
||||||
val d = updateInterval(elem)
|
val d = updateInterval()
|
||||||
|
|
||||||
if (matched > 1)
|
if (matched > 1)
|
||||||
onInterval(d)
|
onInterval(d)
|
||||||
|
|
@ -176,7 +176,7 @@ object Timed extends TimedOps with TimedIntervalBetweenOps {
|
||||||
|
|
||||||
override def onPull(): Unit = pull(in)
|
override def onPull(): Unit = pull(in)
|
||||||
|
|
||||||
private def updateInterval(in: T): FiniteDuration = {
|
private def updateInterval(): FiniteDuration = {
|
||||||
matched += 1
|
matched += 1
|
||||||
val nowNanos = System.nanoTime()
|
val nowNanos = System.nanoTime()
|
||||||
val d = nowNanos - prevNanos
|
val d = nowNanos - prevNanos
|
||||||
|
|
|
||||||
|
|
@ -151,7 +151,7 @@ private[akka] class SubFusingActorMaterializerImpl(val delegate: ExtendedActorMa
|
||||||
*/
|
*/
|
||||||
@InternalApi private[akka] object StreamSupervisor {
|
@InternalApi private[akka] object StreamSupervisor {
|
||||||
def props(settings: ActorMaterializerSettings, haveShutDown: AtomicBoolean): Props =
|
def props(settings: ActorMaterializerSettings, haveShutDown: AtomicBoolean): Props =
|
||||||
Props(new StreamSupervisor(settings, haveShutDown)).withDeploy(Deploy.local)
|
Props(new StreamSupervisor(haveShutDown)).withDeploy(Deploy.local)
|
||||||
.withDispatcher(settings.dispatcher)
|
.withDispatcher(settings.dispatcher)
|
||||||
private[stream] val baseName = "StreamSupervisor"
|
private[stream] val baseName = "StreamSupervisor"
|
||||||
private val actorName = SeqActorName(baseName)
|
private val actorName = SeqActorName(baseName)
|
||||||
|
|
@ -173,7 +173,7 @@ private[akka] class SubFusingActorMaterializerImpl(val delegate: ExtendedActorMa
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
@InternalApi private[akka] class StreamSupervisor(settings: ActorMaterializerSettings, haveShutDown: AtomicBoolean) extends Actor {
|
@InternalApi private[akka] class StreamSupervisor(haveShutDown: AtomicBoolean) extends Actor {
|
||||||
import akka.stream.impl.StreamSupervisor._
|
import akka.stream.impl.StreamSupervisor._
|
||||||
|
|
||||||
override def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.stoppingStrategy
|
override def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.stoppingStrategy
|
||||||
|
|
|
||||||
|
|
@ -148,7 +148,7 @@ import akka.event.Logging
|
||||||
}
|
}
|
||||||
|
|
||||||
protected def completed: Actor.Receive = {
|
protected def completed: Actor.Receive = {
|
||||||
case OnSubscribe(subscription) ⇒ throw new IllegalStateException("onSubscribe called after onError or onComplete")
|
case OnSubscribe(_) ⇒ throw new IllegalStateException("onSubscribe called after onError or onComplete")
|
||||||
}
|
}
|
||||||
|
|
||||||
protected def inputOnError(e: Throwable): Unit = {
|
protected def inputOnError(e: Throwable): Unit = {
|
||||||
|
|
@ -229,7 +229,7 @@ import akka.event.Logging
|
||||||
protected def downstreamRunning: Actor.Receive = {
|
protected def downstreamRunning: Actor.Receive = {
|
||||||
case SubscribePending ⇒
|
case SubscribePending ⇒
|
||||||
subscribePending(exposedPublisher.takePendingSubscribers())
|
subscribePending(exposedPublisher.takePendingSubscribers())
|
||||||
case RequestMore(subscription, elements) ⇒
|
case RequestMore(_, elements) ⇒
|
||||||
if (elements < 1) {
|
if (elements < 1) {
|
||||||
error(ReactiveStreamsCompliance.numberOfElementsInRequestMustBePositiveException)
|
error(ReactiveStreamsCompliance.numberOfElementsInRequestMustBePositiveException)
|
||||||
} else {
|
} else {
|
||||||
|
|
@ -238,7 +238,7 @@ import akka.event.Logging
|
||||||
downstreamDemand = Long.MaxValue // Long overflow, Reactive Streams Spec 3:17: effectively unbounded
|
downstreamDemand = Long.MaxValue // Long overflow, Reactive Streams Spec 3:17: effectively unbounded
|
||||||
pump.pump()
|
pump.pump()
|
||||||
}
|
}
|
||||||
case Cancel(subscription) ⇒
|
case Cancel(_) ⇒
|
||||||
downstreamCompleted = true
|
downstreamCompleted = true
|
||||||
exposedPublisher.shutdown(Some(new ActorPublisher.NormalShutdownException))
|
exposedPublisher.shutdown(Some(new ActorPublisher.NormalShutdownException))
|
||||||
pump.pump()
|
pump.pump()
|
||||||
|
|
|
||||||
|
|
@ -84,7 +84,7 @@ import org.reactivestreams.Subscription
|
||||||
|
|
||||||
private def reportSubscribeFailure(subscriber: Subscriber[_ >: T]): Unit =
|
private def reportSubscribeFailure(subscriber: Subscriber[_ >: T]): Unit =
|
||||||
try shutdownReason match {
|
try shutdownReason match {
|
||||||
case Some(e: SpecViolation) ⇒ // ok, not allowed to call onError
|
case Some(_: SpecViolation) ⇒ // ok, not allowed to call onError
|
||||||
case Some(e) ⇒
|
case Some(e) ⇒
|
||||||
tryOnSubscribe(subscriber, CancelledSubscription)
|
tryOnSubscribe(subscriber, CancelledSubscription)
|
||||||
tryOnError(subscriber, e)
|
tryOnError(subscriber, e)
|
||||||
|
|
|
||||||
|
|
@ -176,7 +176,7 @@ private[akka] object Buffer {
|
||||||
|
|
||||||
override def enqueue(elem: T): Unit =
|
override def enqueue(elem: T): Unit =
|
||||||
if (tail - head == FixedQueueSize) {
|
if (tail - head == FixedQueueSize) {
|
||||||
val queue = new DynamicQueue(head)
|
val queue = new DynamicQueue()
|
||||||
while (nonEmpty) {
|
while (nonEmpty) {
|
||||||
queue.enqueue(dequeue())
|
queue.enqueue(dequeue())
|
||||||
}
|
}
|
||||||
|
|
@ -208,7 +208,7 @@ private[akka] object Buffer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private final class DynamicQueue(startIdx: Int) extends ju.LinkedList[T] with Buffer[T] {
|
private final class DynamicQueue() extends ju.LinkedList[T] with Buffer[T] {
|
||||||
override def capacity = BoundedBuffer.this.capacity
|
override def capacity = BoundedBuffer.this.capacity
|
||||||
override def used = size
|
override def used = size
|
||||||
override def isFull = size == capacity
|
override def isFull = size == capacity
|
||||||
|
|
|
||||||
|
|
@ -56,7 +56,6 @@ import scala.annotation.switch
|
||||||
private var trimFront = 0 // number of chars to drop from the front of the bytestring before emitting (skip whitespace etc)
|
private var trimFront = 0 // number of chars to drop from the front of the bytestring before emitting (skip whitespace etc)
|
||||||
private var depth = 0 // counter of object-nesting depth, once hits 0 an object should be emitted
|
private var depth = 0 // counter of object-nesting depth, once hits 0 an object should be emitted
|
||||||
|
|
||||||
private var charsInObject = 0
|
|
||||||
private var completedObject = false
|
private var completedObject = false
|
||||||
private var inStringExpression = false
|
private var inStringExpression = false
|
||||||
private var isStartOfEscapeSequence = false
|
private var isStartOfEscapeSequence = false
|
||||||
|
|
@ -140,7 +139,6 @@ import scala.annotation.switch
|
||||||
depth -= 1
|
depth -= 1
|
||||||
pos += 1
|
pos += 1
|
||||||
if (depth == 0) {
|
if (depth == 0) {
|
||||||
charsInObject = 0
|
|
||||||
completedObject = true
|
completedObject = true
|
||||||
}
|
}
|
||||||
} else if (isWhitespace(input) && !inStringExpression) {
|
} else if (isWhitespace(input) && !inStringExpression) {
|
||||||
|
|
|
||||||
|
|
@ -57,16 +57,16 @@ import akka.util.OptionVal
|
||||||
override def apply(settings: ActorMaterializerSettings, effectiveAttributes: Attributes,
|
override def apply(settings: ActorMaterializerSettings, effectiveAttributes: Attributes,
|
||||||
materializer: PhasedFusingActorMaterializer,
|
materializer: PhasedFusingActorMaterializer,
|
||||||
islandName: String): PhaseIsland[Any] =
|
islandName: String): PhaseIsland[Any] =
|
||||||
new ProcessorModulePhase(materializer, islandName).asInstanceOf[PhaseIsland[Any]]
|
new ProcessorModulePhase().asInstanceOf[PhaseIsland[Any]]
|
||||||
},
|
},
|
||||||
TlsModuleIslandTag → new Phase[Any] {
|
TlsModuleIslandTag → new Phase[Any] {
|
||||||
def apply(settings: ActorMaterializerSettings, effectiveAttributes: Attributes,
|
def apply(settings: ActorMaterializerSettings, effectiveAttributes: Attributes,
|
||||||
materializer: PhasedFusingActorMaterializer, islandName: String): PhaseIsland[Any] =
|
materializer: PhasedFusingActorMaterializer, islandName: String): PhaseIsland[Any] =
|
||||||
new TlsModulePhase(effectiveAttributes, materializer, islandName).asInstanceOf[PhaseIsland[Any]]
|
new TlsModulePhase(materializer, islandName).asInstanceOf[PhaseIsland[Any]]
|
||||||
},
|
},
|
||||||
GraphStageTag → DefaultPhase)
|
GraphStageTag → DefaultPhase)
|
||||||
|
|
||||||
@InternalApi private[akka] def apply(settings: ActorMaterializerSettings)(implicit context: ActorRefFactory): ActorMaterializer = {
|
@InternalApi private[akka] def apply()(implicit context: ActorRefFactory): ActorMaterializer = {
|
||||||
val haveShutDown = new AtomicBoolean(false)
|
val haveShutDown = new AtomicBoolean(false)
|
||||||
val system = actorSystemOf(context)
|
val system = actorSystemOf(context)
|
||||||
val materializerSettings = ActorMaterializerSettings(system)
|
val materializerSettings = ActorMaterializerSettings(system)
|
||||||
|
|
@ -848,7 +848,7 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
@InternalApi private[akka] final class ProcessorModulePhase(materializer: PhasedFusingActorMaterializer, islandName: String)
|
@InternalApi private[akka] final class ProcessorModulePhase()
|
||||||
extends PhaseIsland[Processor[Any, Any]] {
|
extends PhaseIsland[Processor[Any, Any]] {
|
||||||
override def name: String = "ProcessorModulePhase"
|
override def name: String = "ProcessorModulePhase"
|
||||||
private[this] var processor: Processor[Any, Any] = _
|
private[this] var processor: Processor[Any, Any] = _
|
||||||
|
|
@ -876,7 +876,7 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
@InternalApi private[akka] final class TlsModulePhase(attributes: Attributes, materializer: PhasedFusingActorMaterializer, islandName: String) extends PhaseIsland[NotUsed] {
|
@InternalApi private[akka] final class TlsModulePhase(materializer: PhasedFusingActorMaterializer, islandName: String) extends PhaseIsland[NotUsed] {
|
||||||
def name: String = "TlsModulePhase"
|
def name: String = "TlsModulePhase"
|
||||||
|
|
||||||
var tlsActor: ActorRef = _
|
var tlsActor: ActorRef = _
|
||||||
|
|
|
||||||
|
|
@ -14,7 +14,6 @@ import akka.stream.stage._
|
||||||
import akka.stream.scaladsl.SourceQueueWithComplete
|
import akka.stream.scaladsl.SourceQueueWithComplete
|
||||||
import scala.compat.java8.FutureConverters._
|
import scala.compat.java8.FutureConverters._
|
||||||
import scala.concurrent.{ Future, Promise }
|
import scala.concurrent.{ Future, Promise }
|
||||||
import scala.util.control.NonFatal
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
|
|
|
||||||
|
|
@ -275,7 +275,7 @@ import scala.util.control.NonFatal
|
||||||
case s: Subscription ⇒
|
case s: Subscription ⇒
|
||||||
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($s).onComplete -> EmptyPublisher")
|
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($s).onComplete -> EmptyPublisher")
|
||||||
if (!compareAndSet(s, EmptyPublisher)) onComplete()
|
if (!compareAndSet(s, EmptyPublisher)) onComplete()
|
||||||
case b @ Both(s) ⇒
|
case _@ Both(s) ⇒
|
||||||
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($s).onComplete -> Inert")
|
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($s).onComplete -> Inert")
|
||||||
set(Inert)
|
set(Inert)
|
||||||
tryOnComplete(s)
|
tryOnComplete(s)
|
||||||
|
|
|
||||||
|
|
@ -10,9 +10,9 @@ import akka.stream.impl.StreamLayout.AtomicModule
|
||||||
import akka.stream.impl.TraversalBuilder.{ AnyFunction1, AnyFunction2 }
|
import akka.stream.impl.TraversalBuilder.{ AnyFunction1, AnyFunction2 }
|
||||||
import akka.stream.scaladsl.Keep
|
import akka.stream.scaladsl.Keep
|
||||||
import akka.util.OptionVal
|
import akka.util.OptionVal
|
||||||
|
|
||||||
import scala.language.existentials
|
import scala.language.existentials
|
||||||
import scala.collection.immutable.Map.Map1
|
import scala.collection.immutable.Map.Map1
|
||||||
|
|
||||||
import akka.stream.impl.fusing.GraphStageModule
|
import akka.stream.impl.fusing.GraphStageModule
|
||||||
import akka.stream.impl.fusing.GraphStages.SingleSource
|
import akka.stream.impl.fusing.GraphStages.SingleSource
|
||||||
|
|
||||||
|
|
@ -277,7 +277,6 @@ import akka.stream.impl.fusing.GraphStages.SingleSource
|
||||||
*/
|
*/
|
||||||
@InternalApi private[impl] def printTraversal(t: Traversal, indent: Int = 0): Unit = {
|
@InternalApi private[impl] def printTraversal(t: Traversal, indent: Int = 0): Unit = {
|
||||||
var current: Traversal = t
|
var current: Traversal = t
|
||||||
var slot = 0
|
|
||||||
|
|
||||||
def prindent(s: String): Unit = println(" | " * indent + s)
|
def prindent(s: String): Unit = println(" | " * indent + s)
|
||||||
|
|
||||||
|
|
@ -1019,7 +1018,7 @@ import akka.stream.impl.fusing.GraphStages.SingleSource
|
||||||
islandTag = OptionVal.None // islandTag is reset for the new enclosing builder
|
islandTag = OptionVal.None // islandTag is reset for the new enclosing builder
|
||||||
)
|
)
|
||||||
|
|
||||||
case OptionVal.Some(composite) ⇒
|
case OptionVal.Some(_) ⇒
|
||||||
/*
|
/*
|
||||||
* In this case we need to assemble as much as we can, and create a new "sandwich" of
|
* In this case we need to assemble as much as we can, and create a new "sandwich" of
|
||||||
* beforeBuilder ~ pendingBuilder ~ traversalSoFar
|
* beforeBuilder ~ pendingBuilder ~ traversalSoFar
|
||||||
|
|
@ -1089,8 +1088,8 @@ import akka.stream.impl.fusing.GraphStages.SingleSource
|
||||||
*/
|
*/
|
||||||
override def makeIsland(islandTag: IslandTag): LinearTraversalBuilder =
|
override def makeIsland(islandTag: IslandTag): LinearTraversalBuilder =
|
||||||
this.islandTag match {
|
this.islandTag match {
|
||||||
case OptionVal.Some(tag) ⇒ this // Wrapping with an island, then immediately re-wrapping makes the second island empty, so can be omitted
|
case OptionVal.Some(_) ⇒ this // Wrapping with an island, then immediately re-wrapping makes the second island empty, so can be omitted
|
||||||
case OptionVal.None ⇒ copy(islandTag = OptionVal.Some(islandTag))
|
case OptionVal.None ⇒ copy(islandTag = OptionVal.Some(islandTag))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -316,7 +316,7 @@ import scala.util.control.NonFatal
|
||||||
|
|
||||||
private def reportSubscribeFailure(subscriber: Subscriber[Any]): Unit =
|
private def reportSubscribeFailure(subscriber: Subscriber[Any]): Unit =
|
||||||
try shutdownReason match {
|
try shutdownReason match {
|
||||||
case OptionVal.Some(e: SpecViolation) ⇒ // ok, not allowed to call onError
|
case OptionVal.Some(_: SpecViolation) ⇒ // ok, not allowed to call onError
|
||||||
case OptionVal.Some(e) ⇒
|
case OptionVal.Some(e) ⇒
|
||||||
tryOnSubscribe(subscriber, CancelledSubscription)
|
tryOnSubscribe(subscriber, CancelledSubscription)
|
||||||
tryOnError(subscriber, e)
|
tryOnError(subscriber, e)
|
||||||
|
|
@ -348,7 +348,6 @@ import scala.util.control.NonFatal
|
||||||
// interpreter (i.e. inside this op this flag has no effects since if it is completed the op will not be invoked)
|
// interpreter (i.e. inside this op this flag has no effects since if it is completed the op will not be invoked)
|
||||||
private var downstreamCompleted = false
|
private var downstreamCompleted = false
|
||||||
// when upstream failed before we got the exposed publisher
|
// when upstream failed before we got the exposed publisher
|
||||||
private var upstreamFailed: OptionVal[Throwable] = OptionVal.None
|
|
||||||
private var upstreamCompleted: Boolean = false
|
private var upstreamCompleted: Boolean = false
|
||||||
|
|
||||||
private def onNext(elem: Any): Unit = {
|
private def onNext(elem: Any): Unit = {
|
||||||
|
|
@ -369,7 +368,6 @@ import scala.util.control.NonFatal
|
||||||
// No need to fail if had already been cancelled, or we closed earlier
|
// No need to fail if had already been cancelled, or we closed earlier
|
||||||
if (!(downstreamCompleted || upstreamCompleted)) {
|
if (!(downstreamCompleted || upstreamCompleted)) {
|
||||||
upstreamCompleted = true
|
upstreamCompleted = true
|
||||||
upstreamFailed = OptionVal.Some(e)
|
|
||||||
publisher.shutdown(Some(e))
|
publisher.shutdown(Some(e))
|
||||||
if ((subscriber ne null) && !e.isInstanceOf[SpecViolation]) tryOnError(subscriber, e)
|
if ((subscriber ne null) && !e.isInstanceOf[SpecViolation]) tryOnError(subscriber, e)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -215,21 +215,6 @@ import scala.concurrent.{ Future, Promise }
|
||||||
def monitor[T]: GraphStageWithMaterializedValue[FlowShape[T, T], FlowMonitor[T]] =
|
def monitor[T]: GraphStageWithMaterializedValue[FlowShape[T, T], FlowMonitor[T]] =
|
||||||
new MonitorFlow[T]
|
new MonitorFlow[T]
|
||||||
|
|
||||||
private object TickSource {
|
|
||||||
class TickSourceCancellable(cancelled: AtomicBoolean) extends Cancellable {
|
|
||||||
private val cancelPromise = Promise[Done]()
|
|
||||||
|
|
||||||
def cancelFuture: Future[Done] = cancelPromise.future
|
|
||||||
|
|
||||||
override def cancel(): Boolean = {
|
|
||||||
if (!isCancelled) cancelPromise.trySuccess(Done)
|
|
||||||
true
|
|
||||||
}
|
|
||||||
|
|
||||||
override def isCancelled: Boolean = cancelled.get()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
final class TickSource[T](val initialDelay: FiniteDuration, val interval: FiniteDuration, val tick: T)
|
final class TickSource[T](val initialDelay: FiniteDuration, val interval: FiniteDuration, val tick: T)
|
||||||
extends GraphStageWithMaterializedValue[SourceShape[T], Cancellable] {
|
extends GraphStageWithMaterializedValue[SourceShape[T], Cancellable] {
|
||||||
override val shape = SourceShape(Outlet[T]("TickSource.out"))
|
override val shape = SourceShape(Outlet[T]("TickSource.out"))
|
||||||
|
|
@ -440,7 +425,6 @@ import scala.concurrent.{ Future, Promise }
|
||||||
|
|
||||||
(logic, promise.future)
|
(logic, promise.future)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -1471,7 +1471,7 @@ private[stream] object Collect {
|
||||||
override def genString(t: Materializer): String = {
|
override def genString(t: Materializer): String = {
|
||||||
try s"$DefaultLoggerName(${ActorMaterializerHelper.downcast(t).supervisor.path})"
|
try s"$DefaultLoggerName(${ActorMaterializerHelper.downcast(t).supervisor.path})"
|
||||||
catch {
|
catch {
|
||||||
case ex: Exception ⇒ LogSource.fromString.genString(DefaultLoggerName)
|
case _: Exception ⇒ LogSource.fromString.genString(DefaultLoggerName)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -695,7 +695,7 @@ import akka.stream.impl.fusing.GraphStages.SingleSource
|
||||||
else
|
else
|
||||||
setCallback(callback)
|
setCallback(callback)
|
||||||
|
|
||||||
case m: /* Materialized */ AsyncCallback[Command @unchecked] ⇒
|
case _: /* Materialized */ AsyncCallback[Command @unchecked] ⇒
|
||||||
failStage(new IllegalStateException("Substream Source cannot be materialized more than once"))
|
failStage(new IllegalStateException("Substream Source cannot be materialized more than once"))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -84,7 +84,7 @@ import scala.util.{ Failure, Success, Try }
|
||||||
} catch {
|
} catch {
|
||||||
case closingException: Exception ⇒ result match {
|
case closingException: Exception ⇒ result match {
|
||||||
case Success(ioResult) ⇒
|
case Success(ioResult) ⇒
|
||||||
val statusWithClosingException = ioResult.status.transform(d ⇒ Failure(closingException), ex ⇒ Failure(closingException.initCause(ex)))
|
val statusWithClosingException = ioResult.status.transform(_ ⇒ Failure(closingException), ex ⇒ Failure(closingException.initCause(ex)))
|
||||||
completionPromise.trySuccess(ioResult.copy(status = statusWithClosingException))
|
completionPromise.trySuccess(ioResult.copy(status = statusWithClosingException))
|
||||||
case Failure(ex) ⇒ completionPromise.tryFailure(closingException.initCause(ex))
|
case Failure(ex) ⇒ completionPromise.tryFailure(closingException.initCause(ex))
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -52,7 +52,7 @@ object MaterializerState {
|
||||||
|
|
||||||
/** INTERNAL API */
|
/** INTERNAL API */
|
||||||
@InternalApi
|
@InternalApi
|
||||||
private[akka] def requestFromChild(child: ActorRef)(implicit ec: ExecutionContext): Future[StreamSnapshot] = {
|
private[akka] def requestFromChild(child: ActorRef): Future[StreamSnapshot] = {
|
||||||
// FIXME arbitrary timeout
|
// FIXME arbitrary timeout
|
||||||
implicit val timeout: Timeout = 10.seconds
|
implicit val timeout: Timeout = 10.seconds
|
||||||
(child ? ActorGraphInterpreter.Snapshot).mapTo[StreamSnapshot]
|
(child ? ActorGraphInterpreter.Snapshot).mapTo[StreamSnapshot]
|
||||||
|
|
|
||||||
|
|
@ -493,7 +493,6 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
|
||||||
*/
|
*/
|
||||||
final protected def grab[T](in: Inlet[T]): T = {
|
final protected def grab[T](in: Inlet[T]): T = {
|
||||||
val connection = conn(in)
|
val connection = conn(in)
|
||||||
val it = interpreter
|
|
||||||
val elem = connection.slot
|
val elem = connection.slot
|
||||||
|
|
||||||
// Fast path
|
// Fast path
|
||||||
|
|
@ -1146,8 +1145,6 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
|
||||||
// stage has stopped to fail incoming async callback invocations by being set to null
|
// stage has stopped to fail incoming async callback invocations by being set to null
|
||||||
private val asyncCallbacksInProgress = new AtomicReference[List[Promise[Done]]](Nil)
|
private val asyncCallbacksInProgress = new AtomicReference[List[Promise[Done]]](Nil)
|
||||||
|
|
||||||
private def stopped = asyncCallbacksInProgress.get() == null
|
|
||||||
|
|
||||||
private var _stageActor: StageActor = _
|
private var _stageActor: StageActor = _
|
||||||
final def stageActor: StageActor = _stageActor match {
|
final def stageActor: StageActor = _stageActor match {
|
||||||
case null ⇒ throw StageActorRefNotInitializedException()
|
case null ⇒ throw StageActorRefNotInitializedException()
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue