Merge pull request #20070 from akka/wip-random-findings-RK

some random findings
This commit is contained in:
Konrad Malawski 2016-03-24 20:24:42 +01:00
commit aff158cb98
9 changed files with 26 additions and 21 deletions

View file

@ -186,12 +186,12 @@ trait SubchannelClassification { this: EventBus ⇒
private def removeFromCache(changes: immutable.Seq[(Classifier, Set[Subscriber])]): Unit = private def removeFromCache(changes: immutable.Seq[(Classifier, Set[Subscriber])]): Unit =
cache = (cache /: changes) { cache = (cache /: changes) {
case (m, (c, cs)) m.updated(c, m.getOrElse(c, Set.empty[Subscriber]) -- cs) case (m, (c, cs)) m.updated(c, m.getOrElse(c, Set.empty[Subscriber]) diff cs)
} }
private def addToCache(changes: immutable.Seq[(Classifier, Set[Subscriber])]): Unit = private def addToCache(changes: immutable.Seq[(Classifier, Set[Subscriber])]): Unit =
cache = (cache /: changes) { cache = (cache /: changes) {
case (m, (c, cs)) m.updated(c, m.getOrElse(c, Set.empty[Subscriber]) ++ cs) case (m, (c, cs)) m.updated(c, m.getOrElse(c, Set.empty[Subscriber]) union cs)
} }
} }

View file

@ -19,7 +19,7 @@ case class TwoPhaseSet(
def remove(element: String): TwoPhaseSet = def remove(element: String): TwoPhaseSet =
copy(removals = removals.add(element)) copy(removals = removals.add(element))
def elements: Set[String] = adds.elements -- removals.elements def elements: Set[String] = adds.elements diff removals.elements
override def merge(that: TwoPhaseSet): TwoPhaseSet = override def merge(that: TwoPhaseSet): TwoPhaseSet =
copy( copy(

View file

@ -32,8 +32,7 @@ object TwitterStreamQuickstartDocSpec {
//#model //#model
//#tweet-source //#tweet-source
val tweets: Source[Tweet, NotUsed] val tweets: Source[Tweet, NotUsed] //#tweet-source
//#tweet-source
= Source( = Source(
Tweet(Author("rolandkuhn"), System.currentTimeMillis, "#akka rocks!") :: Tweet(Author("rolandkuhn"), System.currentTimeMillis, "#akka rocks!") ::
Tweet(Author("patriknw"), System.currentTimeMillis, "#akka !") :: Tweet(Author("patriknw"), System.currentTimeMillis, "#akka !") ::

View file

@ -84,11 +84,10 @@ class BundleDelegatingClassLoader(bundle: Bundle, fallBackClassLoader: ClassLoad
wire Option(wire.getProviderWiring) map { _.getBundle } wire Option(wire.getProviderWiring) map { _.getBundle }
}.toSet }.toSet
} }
process(processed + b, rest ++ (direct -- processed)) process(processed + b, rest ++ (direct diff processed))
} }
} }
} }
process(Set.empty, Set(bundle)) process(Set.empty, Set(bundle))
} }
} }

View file

@ -52,16 +52,16 @@ object StreamLayout {
var problems: List[String] = Nil var problems: List[String] = Nil
if (inset.size != shape.inlets.size) problems ::= "shape has duplicate inlets: " + ins(shape.inlets) if (inset.size != shape.inlets.size) problems ::= "shape has duplicate inlets: " + ins(shape.inlets)
if (inset != inPorts) problems ::= s"shape has extra ${ins(inset -- inPorts)}, module has extra ${ins(inPorts -- inset)}" if (inset != inPorts) problems ::= s"shape has extra ${ins(inset diff inPorts)}, module has extra ${ins(inPorts diff inset)}"
if (inset.intersect(upstreams.keySet).nonEmpty) problems ::= s"found connected inlets ${inset.intersect(upstreams.keySet)}" if (inset.intersect(upstreams.keySet).nonEmpty) problems ::= s"found connected inlets ${inset.intersect(upstreams.keySet)}"
if (outset.size != shape.outlets.size) problems ::= "shape has duplicate outlets: " + outs(shape.outlets) if (outset.size != shape.outlets.size) problems ::= "shape has duplicate outlets: " + outs(shape.outlets)
if (outset != outPorts) problems ::= s"shape has extra ${outs(outset -- outPorts)}, module has extra ${outs(outPorts -- outset)}" if (outset != outPorts) problems ::= s"shape has extra ${outs(outset diff outPorts)}, module has extra ${outs(outPorts diff outset)}"
if (outset.intersect(downstreams.keySet).nonEmpty) problems ::= s"found connected outlets ${outset.intersect(downstreams.keySet)}" if (outset.intersect(downstreams.keySet).nonEmpty) problems ::= s"found connected outlets ${outset.intersect(downstreams.keySet)}"
val ups = upstreams.toSet val ups = upstreams.toSet
val ups2 = ups.map(_.swap) val ups2 = ups.map(_.swap)
val downs = downstreams.toSet val downs = downstreams.toSet
val inter = ups2.intersect(downs) val inter = ups2.intersect(downs)
if (downs != ups2) problems ::= s"inconsistent maps: ups ${pairs(ups2 -- inter)} downs ${pairs(downs -- inter)}" if (downs != ups2) problems ::= s"inconsistent maps: ups ${pairs(ups2 diff inter)} downs ${pairs(downs diff inter)}"
val (allIn, dupIn, allOut, dupOut) = val (allIn, dupIn, allOut, dupOut) =
subModules.foldLeft((Set.empty[InPort], Set.empty[InPort], Set.empty[OutPort], Set.empty[OutPort])) { subModules.foldLeft((Set.empty[InPort], Set.empty[InPort], Set.empty[OutPort], Set.empty[OutPort])) {
case ((ai, di, ao, doo), sm) case ((ai, di, ao, doo), sm)
@ -69,11 +69,11 @@ object StreamLayout {
} }
if (dupIn.nonEmpty) problems ::= s"duplicate ports in submodules ${ins(dupIn)}" if (dupIn.nonEmpty) problems ::= s"duplicate ports in submodules ${ins(dupIn)}"
if (dupOut.nonEmpty) problems ::= s"duplicate ports in submodules ${outs(dupOut)}" if (dupOut.nonEmpty) problems ::= s"duplicate ports in submodules ${outs(dupOut)}"
if (!isSealed && (inset -- allIn).nonEmpty) problems ::= s"foreign inlets ${ins(inset -- allIn)}" if (!isSealed && (inset diff allIn).nonEmpty) problems ::= s"foreign inlets ${ins(inset diff allIn)}"
if (!isSealed && (outset -- allOut).nonEmpty) problems ::= s"foreign outlets ${outs(outset -- allOut)}" if (!isSealed && (outset diff allOut).nonEmpty) problems ::= s"foreign outlets ${outs(outset diff allOut)}"
val unIn = allIn -- inset -- upstreams.keySet val unIn = allIn diff inset diff upstreams.keySet
if (unIn.nonEmpty && !isCopied) problems ::= s"unconnected inlets ${ins(unIn)}" if (unIn.nonEmpty && !isCopied) problems ::= s"unconnected inlets ${ins(unIn)}"
val unOut = allOut -- outset -- downstreams.keySet val unOut = allOut diff outset diff downstreams.keySet
if (unOut.nonEmpty && !isCopied) problems ::= s"unconnected outlets ${outs(unOut)}" if (unOut.nonEmpty && !isCopied) problems ::= s"unconnected outlets ${outs(unOut)}"
def atomics(n: MaterializedValueNode): Set[Module] = def atomics(n: MaterializedValueNode): Set[Module] =
@ -88,8 +88,8 @@ object StreamLayout {
case GraphModule(_, _, _, mvids) mvids case GraphModule(_, _, _, mvids) mvids
case _ Nil case _ Nil
} }
if ((atomic -- subModules -- graphValues - m).nonEmpty) if (((atomic diff subModules diff graphValues) - m).nonEmpty)
problems ::= s"computation refers to non-existent modules [${atomic -- subModules -- graphValues - m mkString ","}]" problems ::= s"computation refers to non-existent modules [${(atomic diff subModules diff graphValues) - m mkString ","}]"
val print = doPrint || problems.nonEmpty val print = doPrint || problems.nonEmpty
@ -273,7 +273,7 @@ object StreamLayout {
} }
CompositeModule( CompositeModule(
modulesLeft ++ modulesRight, modulesLeft union modulesRight,
AmorphousShape(shape.inlets ++ that.shape.inlets, shape.outlets ++ that.shape.outlets), AmorphousShape(shape.inlets ++ that.shape.inlets, shape.outlets ++ that.shape.outlets),
downstreams ++ that.downstreams, downstreams ++ that.downstreams,
upstreams ++ that.upstreams, upstreams ++ that.upstreams,

View file

@ -24,7 +24,6 @@ import java.util.concurrent.CompletionStage
import java.util.concurrent.CompletableFuture import java.util.concurrent.CompletableFuture
import scala.compat.java8.FutureConverters._ import scala.compat.java8.FutureConverters._
import akka.stream.impl.SourceQueueAdapter import akka.stream.impl.SourceQueueAdapter
import akka.stream.scaladsl.SourceQueueWithComplete
/** Java API */ /** Java API */
object Source { object Source {

View file

@ -114,7 +114,7 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
/** /**
* Transform the materialized value of this Flow, leaving all other properties as they were. * Transform the materialized value of this Flow, leaving all other properties as they were.
*/ */
def mapMaterializedValue[Mat2](f: Mat Mat2): ReprMat[Out, Mat2] = override def mapMaterializedValue[Mat2](f: Mat Mat2): ReprMat[Out, Mat2] =
new Flow(module.transformMaterializedValue(f.asInstanceOf[Any Any])) new Flow(module.transformMaterializedValue(f.asInstanceOf[Any Any]))
/** /**
@ -1983,6 +1983,11 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
def watchTermination[Mat2]()(matF: (Mat, Future[Done]) Mat2): ReprMat[Out, Mat2] = def watchTermination[Mat2]()(matF: (Mat, Future[Done]) Mat2): ReprMat[Out, Mat2] =
viaMat(GraphStages.terminationWatcher)(matF) viaMat(GraphStages.terminationWatcher)(matF)
/**
* Transform the materialized value of this graph, leaving all other properties as they were.
*/
def mapMaterializedValue[Mat2](f: Mat Mat2): ReprMat[Out, Mat2]
/** /**
* INTERNAL API. * INTERNAL API.
*/ */

View file

@ -28,7 +28,7 @@ import scala.compat.java8.FutureConverters._
* a Reactive Streams `Publisher` (at least conceptually). * a Reactive Streams `Publisher` (at least conceptually).
*/ */
final class Source[+Out, +Mat](private[stream] override val module: Module) final class Source[+Out, +Mat](private[stream] override val module: Module)
extends FlowOpsMat[Out, Mat] with Graph[SourceShape[Out], Mat] { extends FlowOpsMat[Out, Mat] with Graph[SourceShape[Out], Mat] {
override type Repr[+O] = Source[O, Mat @uncheckedVariance] override type Repr[+O] = Source[O, Mat @uncheckedVariance]
override type ReprMat[+O, +M] = Source[O, M] override type ReprMat[+O, +M] = Source[O, M]
@ -71,7 +71,7 @@ final class Source[+Out, +Mat](private[stream] override val module: Module)
/** /**
* Transform only the materialized value of this Source, leaving all other properties as they were. * Transform only the materialized value of this Source, leaving all other properties as they were.
*/ */
def mapMaterializedValue[Mat2](f: Mat Mat2): ReprMat[Out, Mat2] = override def mapMaterializedValue[Mat2](f: Mat Mat2): ReprMat[Out, Mat2] =
new Source[Out, Mat2](module.transformMaterializedValue(f.asInstanceOf[Any Any])) new Source[Out, Mat2](module.transformMaterializedValue(f.asInstanceOf[Any Any]))
/** INTERNAL API */ /** INTERNAL API */

View file

@ -659,6 +659,9 @@ object MiMa extends AutoPlugin {
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.Drop.onPush"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.Drop.onPush"),
ProblemFilters.exclude[FinalClassProblem]("akka.stream.stage.GraphStageLogic$Reading"), // this class is private ProblemFilters.exclude[FinalClassProblem]("akka.stream.stage.GraphStageLogic$Reading"), // this class is private
// lifting this method to the type where it belongs
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOpsMat.mapMaterializedValue"),
// #19908 Take is private // #19908 Take is private
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.Stages$Take$"), ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.Stages$Take$"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.Stages$Take"), ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.Stages$Take"),