diff --git a/akka-http/src/main/scala/akka/http/scaladsl/server/Directive.scala b/akka-http/src/main/scala/akka/http/scaladsl/server/Directive.scala index 87e86c1e04..1acd651801 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/server/Directive.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/server/Directive.scala @@ -23,7 +23,7 @@ abstract class Directive[L](implicit val ev: Tuple[L]) { * which is added by an implicit conversion (see `Directive.addDirectiveApply`). */ def tapply(f: L ⇒ Route): Route -//# + //# /** * Joins two directives into one which runs the second directive if the first one rejects. */ @@ -90,7 +90,7 @@ abstract class Directive[L](implicit val ev: Tuple[L]) { def recoverPF[R >: L: Tuple](recovery: PartialFunction[immutable.Seq[Rejection], Directive[R]]): Directive[R] = recover { rejections ⇒ recovery.applyOrElse(rejections, (rejs: Seq[Rejection]) ⇒ RouteDirectives.reject(rejs: _*)) } -//#basic + //#basic } //# diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala index e87e093818..c3a420bdb9 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala @@ -87,7 +87,7 @@ private[stream] object GraphInterpreter { * corresponding segments of these arrays matches the exact same order of the ports in the [[Shape]]. * */ - final class GraphAssembly(val stages: Array[GraphStageWithMaterializedValue[_, _]], + final class GraphAssembly(val stages: Array[GraphStageWithMaterializedValue[Shape, Any]], val ins: Array[Inlet[_]], val inOwners: Array[Int], val outs: Array[Outlet[_]], @@ -113,7 +113,7 @@ private[stream] object GraphInterpreter { var i = 0 while (i < stages.length) { // Port initialization loops, these must come first - val shape = stages(i).asInstanceOf[GraphStageWithMaterializedValue[Shape, _]].shape + val shape = stages(i).shape var idx = 0 val inletItr = shape.inlets.iterator @@ -185,7 +185,7 @@ private[stream] object GraphInterpreter { */ final def apply(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]], - stages: GraphStageWithMaterializedValue[_, _]*): GraphAssembly = { + stages: GraphStageWithMaterializedValue[Shape, _]*): GraphAssembly = { // add the contents of an iterator to an array starting at idx @tailrec def add[T](i: Iterator[T], a: Array[T], idx: Int): Array[T] = if (i.hasNext) { @@ -317,7 +317,7 @@ private[stream] final class GraphInterpreter( // Counts how many active connections a stage has. Once it reaches zero, the stage is automatically stopped. private[this] val shutdownCounter = Array.tabulate(assembly.stages.length) { i ⇒ - val shape = assembly.stages(i).shape.asInstanceOf[Shape] + val shape = assembly.stages(i).shape shape.inlets.size + shape.outlets.size } diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index fdc3bc5e7c..0da78ee2bf 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -12,13 +12,12 @@ import scala.collection.{ immutable, mutable } import scala.concurrent.duration.FiniteDuration import scala.collection.mutable.ArrayBuffer -abstract class GraphStageWithMaterializedValue[S <: Shape, M] extends Graph[S, M] { - def shape: S +abstract class GraphStageWithMaterializedValue[+S <: Shape, +M] extends Graph[S, M] { def createLogicAndMaterializedValue: (GraphStageLogic, M) final override private[stream] lazy val module: Module = GraphModule( - GraphAssembly(shape.inlets, shape.outlets, Array(this): _*), + GraphAssembly(shape.inlets, shape.outlets, this), shape, Attributes.none)