=str - Makes GraphStageWithMaterializedValue covariant its type parameters
This commit is contained in:
parent
fc0ecfebef
commit
f330cfb394
3 changed files with 8 additions and 9 deletions
|
|
@ -23,7 +23,7 @@ abstract class Directive[L](implicit val ev: Tuple[L]) {
|
||||||
* which is added by an implicit conversion (see `Directive.addDirectiveApply`).
|
* which is added by an implicit conversion (see `Directive.addDirectiveApply`).
|
||||||
*/
|
*/
|
||||||
def tapply(f: L ⇒ Route): Route
|
def tapply(f: L ⇒ Route): Route
|
||||||
//#
|
//#
|
||||||
/**
|
/**
|
||||||
* Joins two directives into one which runs the second directive if the first one rejects.
|
* Joins two directives into one which runs the second directive if the first one rejects.
|
||||||
*/
|
*/
|
||||||
|
|
@ -90,7 +90,7 @@ abstract class Directive[L](implicit val ev: Tuple[L]) {
|
||||||
def recoverPF[R >: L: Tuple](recovery: PartialFunction[immutable.Seq[Rejection], Directive[R]]): Directive[R] =
|
def recoverPF[R >: L: Tuple](recovery: PartialFunction[immutable.Seq[Rejection], Directive[R]]): Directive[R] =
|
||||||
recover { rejections ⇒ recovery.applyOrElse(rejections, (rejs: Seq[Rejection]) ⇒ RouteDirectives.reject(rejs: _*)) }
|
recover { rejections ⇒ recovery.applyOrElse(rejections, (rejs: Seq[Rejection]) ⇒ RouteDirectives.reject(rejs: _*)) }
|
||||||
|
|
||||||
//#basic
|
//#basic
|
||||||
}
|
}
|
||||||
//#
|
//#
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -87,7 +87,7 @@ private[stream] object GraphInterpreter {
|
||||||
* corresponding segments of these arrays matches the exact same order of the ports in the [[Shape]].
|
* corresponding segments of these arrays matches the exact same order of the ports in the [[Shape]].
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
final class GraphAssembly(val stages: Array[GraphStageWithMaterializedValue[_, _]],
|
final class GraphAssembly(val stages: Array[GraphStageWithMaterializedValue[Shape, Any]],
|
||||||
val ins: Array[Inlet[_]],
|
val ins: Array[Inlet[_]],
|
||||||
val inOwners: Array[Int],
|
val inOwners: Array[Int],
|
||||||
val outs: Array[Outlet[_]],
|
val outs: Array[Outlet[_]],
|
||||||
|
|
@ -113,7 +113,7 @@ private[stream] object GraphInterpreter {
|
||||||
var i = 0
|
var i = 0
|
||||||
while (i < stages.length) {
|
while (i < stages.length) {
|
||||||
// Port initialization loops, these must come first
|
// Port initialization loops, these must come first
|
||||||
val shape = stages(i).asInstanceOf[GraphStageWithMaterializedValue[Shape, _]].shape
|
val shape = stages(i).shape
|
||||||
|
|
||||||
var idx = 0
|
var idx = 0
|
||||||
val inletItr = shape.inlets.iterator
|
val inletItr = shape.inlets.iterator
|
||||||
|
|
@ -185,7 +185,7 @@ private[stream] object GraphInterpreter {
|
||||||
*/
|
*/
|
||||||
final def apply(inlets: immutable.Seq[Inlet[_]],
|
final def apply(inlets: immutable.Seq[Inlet[_]],
|
||||||
outlets: immutable.Seq[Outlet[_]],
|
outlets: immutable.Seq[Outlet[_]],
|
||||||
stages: GraphStageWithMaterializedValue[_, _]*): GraphAssembly = {
|
stages: GraphStageWithMaterializedValue[Shape, _]*): GraphAssembly = {
|
||||||
// add the contents of an iterator to an array starting at idx
|
// add the contents of an iterator to an array starting at idx
|
||||||
@tailrec def add[T](i: Iterator[T], a: Array[T], idx: Int): Array[T] =
|
@tailrec def add[T](i: Iterator[T], a: Array[T], idx: Int): Array[T] =
|
||||||
if (i.hasNext) {
|
if (i.hasNext) {
|
||||||
|
|
@ -317,7 +317,7 @@ private[stream] final class GraphInterpreter(
|
||||||
|
|
||||||
// Counts how many active connections a stage has. Once it reaches zero, the stage is automatically stopped.
|
// Counts how many active connections a stage has. Once it reaches zero, the stage is automatically stopped.
|
||||||
private[this] val shutdownCounter = Array.tabulate(assembly.stages.length) { i ⇒
|
private[this] val shutdownCounter = Array.tabulate(assembly.stages.length) { i ⇒
|
||||||
val shape = assembly.stages(i).shape.asInstanceOf[Shape]
|
val shape = assembly.stages(i).shape
|
||||||
shape.inlets.size + shape.outlets.size
|
shape.inlets.size + shape.outlets.size
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -12,13 +12,12 @@ import scala.collection.{ immutable, mutable }
|
||||||
import scala.concurrent.duration.FiniteDuration
|
import scala.concurrent.duration.FiniteDuration
|
||||||
import scala.collection.mutable.ArrayBuffer
|
import scala.collection.mutable.ArrayBuffer
|
||||||
|
|
||||||
abstract class GraphStageWithMaterializedValue[S <: Shape, M] extends Graph[S, M] {
|
abstract class GraphStageWithMaterializedValue[+S <: Shape, +M] extends Graph[S, M] {
|
||||||
def shape: S
|
|
||||||
def createLogicAndMaterializedValue: (GraphStageLogic, M)
|
def createLogicAndMaterializedValue: (GraphStageLogic, M)
|
||||||
|
|
||||||
final override private[stream] lazy val module: Module =
|
final override private[stream] lazy val module: Module =
|
||||||
GraphModule(
|
GraphModule(
|
||||||
GraphAssembly(shape.inlets, shape.outlets, Array(this): _*),
|
GraphAssembly(shape.inlets, shape.outlets, this),
|
||||||
shape,
|
shape,
|
||||||
Attributes.none)
|
Attributes.none)
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue