no need to rename to operators in internal API
This commit is contained in:
parent
60eee84345
commit
bf8f60f1ff
2 changed files with 12 additions and 12 deletions
|
|
@ -49,7 +49,7 @@ private[remote] trait InboundCompressions {
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*
|
*
|
||||||
* One per incoming Aeron stream, actual compression tables are kept per-originUid and created on demand.
|
* One per incoming Aeron stream, actual compression tables are kept per-originUid and created on demand.
|
||||||
* All access is via the Decoder operator.
|
* All access is via the Decoder stage.
|
||||||
*/
|
*/
|
||||||
private[remote] final class InboundCompressionsImpl(
|
private[remote] final class InboundCompressionsImpl(
|
||||||
system: ActorSystem,
|
system: ActorSystem,
|
||||||
|
|
|
||||||
|
|
@ -17,27 +17,27 @@ import scala.collection.{ Map ⇒ SMap }
|
||||||
object GraphInterpreterSpecKit {
|
object GraphInterpreterSpecKit {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create logics and enumerate operators and ports
|
* Create logics and enumerate stages and ports
|
||||||
*
|
*
|
||||||
* @param operators Operators to "materialize" into operator logic instances
|
* @param stages Stages to "materialize" into graph stage logic instances
|
||||||
* @param upstreams Upstream boundary logics that are already instances of operator logic and should be
|
* @param upstreams Upstream boundary logics that are already instances of graph stage logic and should be
|
||||||
* part of the graph, is placed before the rest of the operators
|
* part of the graph, is placed before the rest of the stages
|
||||||
* @param downstreams Downstream boundary logics, is placed after the other operators
|
* @param downstreams Downstream boundary logics, is placed after the other stages
|
||||||
* @param attributes Optional set of attributes to pass to the operators when creating the logics
|
* @param attributes Optional set of attributes to pass to the stages when creating the logics
|
||||||
* @return Created logics and the maps of all inlets respective outlets to those logics
|
* @return Created logics and the maps of all inlets respective outlets to those logics
|
||||||
*/
|
*/
|
||||||
private[stream] def createLogics(
|
private[stream] def createLogics(
|
||||||
operators: Array[GraphStageWithMaterializedValue[_ <: Shape, _]],
|
stages: Array[GraphStageWithMaterializedValue[_ <: Shape, _]],
|
||||||
upstreams: Array[UpstreamBoundaryStageLogic[_]],
|
upstreams: Array[UpstreamBoundaryStageLogic[_]],
|
||||||
downstreams: Array[DownstreamBoundaryStageLogic[_]],
|
downstreams: Array[DownstreamBoundaryStageLogic[_]],
|
||||||
attributes: Array[Attributes] = Array.empty): (Array[GraphStageLogic], SMap[Inlet[_], GraphStageLogic], SMap[Outlet[_], GraphStageLogic]) = {
|
attributes: Array[Attributes] = Array.empty): (Array[GraphStageLogic], SMap[Inlet[_], GraphStageLogic], SMap[Outlet[_], GraphStageLogic]) = {
|
||||||
if (attributes.nonEmpty && attributes.length != operators.length)
|
if (attributes.nonEmpty && attributes.length != stages.length)
|
||||||
throw new IllegalArgumentException("Attributes must be either empty or one per stage")
|
throw new IllegalArgumentException("Attributes must be either empty or one per stage")
|
||||||
|
|
||||||
var inOwners = SMap.empty[Inlet[_], GraphStageLogic]
|
var inOwners = SMap.empty[Inlet[_], GraphStageLogic]
|
||||||
var outOwners = SMap.empty[Outlet[_], GraphStageLogic]
|
var outOwners = SMap.empty[Outlet[_], GraphStageLogic]
|
||||||
|
|
||||||
val logics = new Array[GraphStageLogic](upstreams.length + operators.length + downstreams.length)
|
val logics = new Array[GraphStageLogic](upstreams.length + stages.length + downstreams.length)
|
||||||
var idx = 0
|
var idx = 0
|
||||||
|
|
||||||
while (idx < upstreams.length) {
|
while (idx < upstreams.length) {
|
||||||
|
|
@ -50,8 +50,8 @@ object GraphInterpreterSpecKit {
|
||||||
}
|
}
|
||||||
|
|
||||||
var stageIdx = 0
|
var stageIdx = 0
|
||||||
while (stageIdx < operators.length) {
|
while (stageIdx < stages.length) {
|
||||||
val stage = operators(stageIdx)
|
val stage = stages(stageIdx)
|
||||||
setPortIds(stage.shape)
|
setPortIds(stage.shape)
|
||||||
|
|
||||||
val stageAttributes =
|
val stageAttributes =
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue