format source with scalafmt, #26511
This commit is contained in:
parent
2ba9b988df
commit
75579bed17
779 changed files with 15729 additions and 13096 deletions
|
|
@ -44,41 +44,46 @@ import akka.util.OptionVal
|
|||
val Debug = false
|
||||
|
||||
val DefaultPhase: Phase[Any] = new Phase[Any] {
|
||||
override def apply(settings: ActorMaterializerSettings,
|
||||
effectiveAttributes: Attributes,
|
||||
materializer: PhasedFusingActorMaterializer,
|
||||
islandName: String): PhaseIsland[Any] =
|
||||
override def apply(
|
||||
settings: ActorMaterializerSettings,
|
||||
effectiveAttributes: Attributes,
|
||||
materializer: PhasedFusingActorMaterializer,
|
||||
islandName: String): PhaseIsland[Any] =
|
||||
new GraphStageIsland(settings, effectiveAttributes, materializer, islandName, subflowFuser = OptionVal.None)
|
||||
.asInstanceOf[PhaseIsland[Any]]
|
||||
}
|
||||
|
||||
val DefaultPhases: Map[IslandTag, Phase[Any]] = Map[IslandTag, Phase[Any]](
|
||||
SinkModuleIslandTag -> new Phase[Any] {
|
||||
override def apply(settings: ActorMaterializerSettings,
|
||||
effectiveAttributes: Attributes,
|
||||
materializer: PhasedFusingActorMaterializer,
|
||||
islandName: String): PhaseIsland[Any] =
|
||||
override def apply(
|
||||
settings: ActorMaterializerSettings,
|
||||
effectiveAttributes: Attributes,
|
||||
materializer: PhasedFusingActorMaterializer,
|
||||
islandName: String): PhaseIsland[Any] =
|
||||
new SinkModulePhase(materializer, islandName).asInstanceOf[PhaseIsland[Any]]
|
||||
},
|
||||
SourceModuleIslandTag -> new Phase[Any] {
|
||||
override def apply(settings: ActorMaterializerSettings,
|
||||
effectiveAttributes: Attributes,
|
||||
materializer: PhasedFusingActorMaterializer,
|
||||
islandName: String): PhaseIsland[Any] =
|
||||
override def apply(
|
||||
settings: ActorMaterializerSettings,
|
||||
effectiveAttributes: Attributes,
|
||||
materializer: PhasedFusingActorMaterializer,
|
||||
islandName: String): PhaseIsland[Any] =
|
||||
new SourceModulePhase(materializer, islandName).asInstanceOf[PhaseIsland[Any]]
|
||||
},
|
||||
ProcessorModuleIslandTag -> new Phase[Any] {
|
||||
override def apply(settings: ActorMaterializerSettings,
|
||||
effectiveAttributes: Attributes,
|
||||
materializer: PhasedFusingActorMaterializer,
|
||||
islandName: String): PhaseIsland[Any] =
|
||||
override def apply(
|
||||
settings: ActorMaterializerSettings,
|
||||
effectiveAttributes: Attributes,
|
||||
materializer: PhasedFusingActorMaterializer,
|
||||
islandName: String): PhaseIsland[Any] =
|
||||
new ProcessorModulePhase().asInstanceOf[PhaseIsland[Any]]
|
||||
},
|
||||
TlsModuleIslandTag -> new Phase[Any] {
|
||||
def apply(settings: ActorMaterializerSettings,
|
||||
effectiveAttributes: Attributes,
|
||||
materializer: PhasedFusingActorMaterializer,
|
||||
islandName: String): PhaseIsland[Any] =
|
||||
def apply(
|
||||
settings: ActorMaterializerSettings,
|
||||
effectiveAttributes: Attributes,
|
||||
materializer: PhasedFusingActorMaterializer,
|
||||
islandName: String): PhaseIsland[Any] =
|
||||
new TlsModulePhase(materializer, islandName).asInstanceOf[PhaseIsland[Any]]
|
||||
},
|
||||
GraphStageTag -> DefaultPhase)
|
||||
|
|
@ -91,12 +96,13 @@ import akka.util.OptionVal
|
|||
val streamSupervisor =
|
||||
context.actorOf(StreamSupervisor.props(materializerSettings, haveShutDown), StreamSupervisor.nextName())
|
||||
|
||||
PhasedFusingActorMaterializer(system,
|
||||
materializerSettings,
|
||||
system.dispatchers,
|
||||
streamSupervisor,
|
||||
haveShutDown,
|
||||
FlowNames(system).name.copy("flow"))
|
||||
PhasedFusingActorMaterializer(
|
||||
system,
|
||||
materializerSettings,
|
||||
system.dispatchers,
|
||||
streamSupervisor,
|
||||
haveShutDown,
|
||||
FlowNames(system).name.copy("flow"))
|
||||
}
|
||||
|
||||
private def actorSystemOf(context: ActorRefFactory): ActorSystem = {
|
||||
|
|
@ -113,11 +119,12 @@ import akka.util.OptionVal
|
|||
|
||||
}
|
||||
|
||||
private final case class SegmentInfo(globalislandOffset: Int, // The island to which the segment belongs
|
||||
length: Int, // How many slots are contained by the segment
|
||||
globalBaseOffset: Int, // The global slot where this segment starts
|
||||
relativeBaseOffset: Int, // the local offset of the slot where this segment starts
|
||||
phase: PhaseIsland[Any]) {
|
||||
private final case class SegmentInfo(
|
||||
globalislandOffset: Int, // The island to which the segment belongs
|
||||
length: Int, // How many slots are contained by the segment
|
||||
globalBaseOffset: Int, // The global slot where this segment starts
|
||||
relativeBaseOffset: Int, // the local offset of the slot where this segment starts
|
||||
phase: PhaseIsland[Any]) {
|
||||
|
||||
override def toString: String =
|
||||
s"""
|
||||
|
|
@ -130,27 +137,30 @@ private final case class SegmentInfo(globalislandOffset: Int, // The island to w
|
|||
""".stripMargin
|
||||
}
|
||||
|
||||
private final case class ForwardWire(islandGlobalOffset: Int,
|
||||
from: OutPort,
|
||||
toGlobalOffset: Int,
|
||||
outStage: Any,
|
||||
phase: PhaseIsland[Any]) {
|
||||
private final case class ForwardWire(
|
||||
islandGlobalOffset: Int,
|
||||
from: OutPort,
|
||||
toGlobalOffset: Int,
|
||||
outStage: Any,
|
||||
phase: PhaseIsland[Any]) {
|
||||
|
||||
override def toString: String =
|
||||
s"ForwardWire(islandId = $islandGlobalOffset, from = $from, toGlobal = $toGlobalOffset, phase = $phase)"
|
||||
}
|
||||
|
||||
private final case class SavedIslandData(islandGlobalOffset: Int,
|
||||
lastVisitedOffset: Int,
|
||||
skippedSlots: Int,
|
||||
phase: PhaseIsland[Any])
|
||||
private final case class SavedIslandData(
|
||||
islandGlobalOffset: Int,
|
||||
lastVisitedOffset: Int,
|
||||
skippedSlots: Int,
|
||||
phase: PhaseIsland[Any])
|
||||
|
||||
@InternalApi private[akka] class IslandTracking(val phases: Map[IslandTag, Phase[Any]],
|
||||
val settings: ActorMaterializerSettings,
|
||||
attributes: Attributes,
|
||||
defaultPhase: Phase[Any],
|
||||
val materializer: PhasedFusingActorMaterializer,
|
||||
islandNamePrefix: String) {
|
||||
@InternalApi private[akka] class IslandTracking(
|
||||
val phases: Map[IslandTag, Phase[Any]],
|
||||
val settings: ActorMaterializerSettings,
|
||||
attributes: Attributes,
|
||||
defaultPhase: Phase[Any],
|
||||
val materializer: PhasedFusingActorMaterializer,
|
||||
islandNamePrefix: String) {
|
||||
|
||||
import PhasedFusingActorMaterializer.Debug
|
||||
|
||||
|
|
@ -188,11 +198,12 @@ private final case class SavedIslandData(islandGlobalOffset: Int,
|
|||
|
||||
if (length > 0) {
|
||||
// We just finished a segment by entering an island.
|
||||
val previousSegment = SegmentInfo(globalislandOffset = currentIslandGlobalOffset,
|
||||
length = currentGlobalOffset - currentSegmentGlobalOffset,
|
||||
globalBaseOffset = currentSegmentGlobalOffset,
|
||||
relativeBaseOffset = currentSegmentGlobalOffset - currentIslandGlobalOffset - currentIslandSkippedSlots,
|
||||
currentPhase)
|
||||
val previousSegment = SegmentInfo(
|
||||
globalislandOffset = currentIslandGlobalOffset,
|
||||
length = currentGlobalOffset - currentSegmentGlobalOffset,
|
||||
globalBaseOffset = currentSegmentGlobalOffset,
|
||||
relativeBaseOffset = currentSegmentGlobalOffset - currentIslandGlobalOffset - currentIslandSkippedSlots,
|
||||
currentPhase)
|
||||
|
||||
// Segment tracking is by demand, we only allocate this list if it is used.
|
||||
// If there are no islands, then there is no need to track segments
|
||||
|
|
@ -336,11 +347,12 @@ private final case class SavedIslandData(islandGlobalOffset: Int,
|
|||
forwardWires = new java.util.ArrayList[ForwardWire](8)
|
||||
}
|
||||
|
||||
val forwardWire = ForwardWire(islandGlobalOffset = currentIslandGlobalOffset,
|
||||
from = out,
|
||||
toGlobalOffset = absoluteOffset,
|
||||
logic,
|
||||
currentPhase)
|
||||
val forwardWire = ForwardWire(
|
||||
islandGlobalOffset = currentIslandGlobalOffset,
|
||||
from = out,
|
||||
toGlobalOffset = absoluteOffset,
|
||||
logic,
|
||||
currentPhase)
|
||||
|
||||
if (Debug) println(s" wiring is forward, recording $forwardWire")
|
||||
forwardWires.add(forwardWire)
|
||||
|
|
@ -363,12 +375,13 @@ private final case class SavedIslandData(islandGlobalOffset: Int,
|
|||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi private[akka] case class PhasedFusingActorMaterializer(system: ActorSystem,
|
||||
override val settings: ActorMaterializerSettings,
|
||||
dispatchers: Dispatchers,
|
||||
supervisor: ActorRef,
|
||||
haveShutDown: AtomicBoolean,
|
||||
flowNames: SeqActorName)
|
||||
@InternalApi private[akka] case class PhasedFusingActorMaterializer(
|
||||
system: ActorSystem,
|
||||
override val settings: ActorMaterializerSettings,
|
||||
dispatchers: Dispatchers,
|
||||
supervisor: ActorRef,
|
||||
haveShutDown: AtomicBoolean,
|
||||
flowNames: SeqActorName)
|
||||
extends ExtendedActorMaterializer {
|
||||
import PhasedFusingActorMaterializer._
|
||||
|
||||
|
|
@ -417,9 +430,10 @@ private final case class SavedIslandData(islandGlobalOffset: Int,
|
|||
case other => other
|
||||
})
|
||||
|
||||
override def schedulePeriodically(initialDelay: FiniteDuration,
|
||||
interval: FiniteDuration,
|
||||
task: Runnable): Cancellable =
|
||||
override def schedulePeriodically(
|
||||
initialDelay: FiniteDuration,
|
||||
interval: FiniteDuration,
|
||||
task: Runnable): Cancellable =
|
||||
system.scheduler.schedule(initialDelay, interval, task)(executionContext)
|
||||
|
||||
override def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable =
|
||||
|
|
@ -429,22 +443,25 @@ private final case class SavedIslandData(islandGlobalOffset: Int,
|
|||
materialize(_runnableGraph, defaultAttributes)
|
||||
|
||||
override def materialize[Mat](_runnableGraph: Graph[ClosedShape, Mat], defaultAttributes: Attributes): Mat =
|
||||
materialize(_runnableGraph,
|
||||
defaultAttributes,
|
||||
PhasedFusingActorMaterializer.DefaultPhase,
|
||||
PhasedFusingActorMaterializer.DefaultPhases)
|
||||
materialize(
|
||||
_runnableGraph,
|
||||
defaultAttributes,
|
||||
PhasedFusingActorMaterializer.DefaultPhase,
|
||||
PhasedFusingActorMaterializer.DefaultPhases)
|
||||
|
||||
override def materialize[Mat](graph: Graph[ClosedShape, Mat],
|
||||
defaultAttributes: Attributes,
|
||||
defaultPhase: Phase[Any],
|
||||
phases: Map[IslandTag, Phase[Any]]): Mat = {
|
||||
override def materialize[Mat](
|
||||
graph: Graph[ClosedShape, Mat],
|
||||
defaultAttributes: Attributes,
|
||||
defaultPhase: Phase[Any],
|
||||
phases: Map[IslandTag, Phase[Any]]): Mat = {
|
||||
if (isShutdown) throw new IllegalStateException("Trying to materialize stream after materializer has been shutdown")
|
||||
val islandTracking = new IslandTracking(phases,
|
||||
settings,
|
||||
defaultAttributes,
|
||||
defaultPhase,
|
||||
this,
|
||||
islandNamePrefix = createFlowName() + "-")
|
||||
val islandTracking = new IslandTracking(
|
||||
phases,
|
||||
settings,
|
||||
defaultAttributes,
|
||||
defaultPhase,
|
||||
this,
|
||||
islandNamePrefix = createFlowName() + "-")
|
||||
|
||||
var current: Traversal = graph.traversalBuilder.traversal
|
||||
|
||||
|
|
@ -535,9 +552,10 @@ private final case class SavedIslandData(islandGlobalOffset: Int,
|
|||
|
||||
}
|
||||
|
||||
private def wireInlets(islandTracking: IslandTracking,
|
||||
mod: StreamLayout.AtomicModule[Shape, Any],
|
||||
logic: Any): Unit = {
|
||||
private def wireInlets(
|
||||
islandTracking: IslandTracking,
|
||||
mod: StreamLayout.AtomicModule[Shape, Any],
|
||||
logic: Any): Unit = {
|
||||
val inlets = mod.shape.inlets
|
||||
if (inlets.nonEmpty) {
|
||||
if (Shape.hasOnePort(inlets)) {
|
||||
|
|
@ -553,11 +571,12 @@ private final case class SavedIslandData(islandGlobalOffset: Int,
|
|||
}
|
||||
}
|
||||
|
||||
private def wireOutlets(islandTracking: IslandTracking,
|
||||
mod: StreamLayout.AtomicModule[Shape, Any],
|
||||
logic: Any,
|
||||
stageGlobalOffset: Int,
|
||||
outToSlot: Array[Int]): Unit = {
|
||||
private def wireOutlets(
|
||||
islandTracking: IslandTracking,
|
||||
mod: StreamLayout.AtomicModule[Shape, Any],
|
||||
logic: Any,
|
||||
stageGlobalOffset: Int,
|
||||
outToSlot: Array[Int]): Unit = {
|
||||
val outlets = mod.shape.outlets
|
||||
if (outlets.nonEmpty) {
|
||||
if (Shape.hasOnePort(outlets)) {
|
||||
|
|
@ -592,10 +611,11 @@ private final case class SavedIslandData(islandGlobalOffset: Int,
|
|||
* INTERNAL API
|
||||
*/
|
||||
@DoNotInherit private[akka] trait Phase[M] {
|
||||
def apply(settings: ActorMaterializerSettings,
|
||||
effectiveAttributes: Attributes,
|
||||
materializer: PhasedFusingActorMaterializer,
|
||||
islandName: String): PhaseIsland[M]
|
||||
def apply(
|
||||
settings: ActorMaterializerSettings,
|
||||
effectiveAttributes: Attributes,
|
||||
materializer: PhasedFusingActorMaterializer,
|
||||
islandName: String): PhaseIsland[M]
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -627,11 +647,12 @@ private final case class SavedIslandData(islandGlobalOffset: Int,
|
|||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi private[akka] final class GraphStageIsland(settings: ActorMaterializerSettings,
|
||||
effectiveAttributes: Attributes,
|
||||
materializer: PhasedFusingActorMaterializer,
|
||||
islandName: String,
|
||||
subflowFuser: OptionVal[GraphInterpreterShell => ActorRef])
|
||||
@InternalApi private[akka] final class GraphStageIsland(
|
||||
settings: ActorMaterializerSettings,
|
||||
effectiveAttributes: Attributes,
|
||||
materializer: PhasedFusingActorMaterializer,
|
||||
islandName: String,
|
||||
subflowFuser: OptionVal[GraphInterpreterShell => ActorRef])
|
||||
extends PhaseIsland[GraphStageLogic] {
|
||||
// TODO: remove these
|
||||
private val logicArrayType = Array.empty[GraphStageLogic]
|
||||
|
|
@ -802,8 +823,9 @@ private final case class SavedIslandData(islandGlobalOffset: Int,
|
|||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi private[akka] final class SourceModulePhase(materializer: PhasedFusingActorMaterializer,
|
||||
islandName: String)
|
||||
@InternalApi private[akka] final class SourceModulePhase(
|
||||
materializer: PhasedFusingActorMaterializer,
|
||||
islandName: String)
|
||||
extends PhaseIsland[Publisher[Any]] {
|
||||
override def name: String = s"SourceModule phase"
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue