From 777a400b122c06c107394625dd518af2a81bcd55 Mon Sep 17 00:00:00 2001 From: Roland Kuhn Date: Fri, 18 Mar 2016 12:28:07 +0100 Subject: [PATCH 1/2] use union/diff operator on Sets (optimization) --- .../src/main/scala/akka/event/EventBus.scala | 4 ++-- .../scala/code/docs/ddata/TwoPhaseSet.scala | 2 +- .../TwitterStreamQuickstartDocSpec.scala | 3 +-- .../osgi/BundleDelegatingClassLoader.scala | 3 +-- .../scala/akka/stream/impl/StreamLayout.scala | 20 +++++++++---------- .../scala/akka/stream/javadsl/Source.scala | 1 - 6 files changed, 15 insertions(+), 18 deletions(-) diff --git a/akka-actor/src/main/scala/akka/event/EventBus.scala b/akka-actor/src/main/scala/akka/event/EventBus.scala index fd3f97ed71..e94dd8ee38 100644 --- a/akka-actor/src/main/scala/akka/event/EventBus.scala +++ b/akka-actor/src/main/scala/akka/event/EventBus.scala @@ -186,12 +186,12 @@ trait SubchannelClassification { this: EventBus ⇒ private def removeFromCache(changes: immutable.Seq[(Classifier, Set[Subscriber])]): Unit = cache = (cache /: changes) { - case (m, (c, cs)) ⇒ m.updated(c, m.getOrElse(c, Set.empty[Subscriber]) -- cs) + case (m, (c, cs)) ⇒ m.updated(c, m.getOrElse(c, Set.empty[Subscriber]) diff cs) } private def addToCache(changes: immutable.Seq[(Classifier, Set[Subscriber])]): Unit = cache = (cache /: changes) { - case (m, (c, cs)) ⇒ m.updated(c, m.getOrElse(c, Set.empty[Subscriber]) ++ cs) + case (m, (c, cs)) ⇒ m.updated(c, m.getOrElse(c, Set.empty[Subscriber]) union cs) } } diff --git a/akka-docs/rst/scala/code/docs/ddata/TwoPhaseSet.scala b/akka-docs/rst/scala/code/docs/ddata/TwoPhaseSet.scala index 673b9a2efa..8f331485b1 100644 --- a/akka-docs/rst/scala/code/docs/ddata/TwoPhaseSet.scala +++ b/akka-docs/rst/scala/code/docs/ddata/TwoPhaseSet.scala @@ -19,7 +19,7 @@ case class TwoPhaseSet( def remove(element: String): TwoPhaseSet = copy(removals = removals.add(element)) - def elements: Set[String] = adds.elements -- removals.elements + def elements: Set[String] = adds.elements diff removals.elements override def merge(that: TwoPhaseSet): TwoPhaseSet = copy( diff --git a/akka-docs/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala b/akka-docs/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala index 818368a044..edf4828aa8 100644 --- a/akka-docs/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala @@ -32,8 +32,7 @@ object TwitterStreamQuickstartDocSpec { //#model //#tweet-source - val tweets: Source[Tweet, NotUsed] - //#tweet-source + val tweets: Source[Tweet, NotUsed] //#tweet-source = Source( Tweet(Author("rolandkuhn"), System.currentTimeMillis, "#akka rocks!") :: Tweet(Author("patriknw"), System.currentTimeMillis, "#akka !") :: diff --git a/akka-osgi/src/main/scala/akka/osgi/BundleDelegatingClassLoader.scala b/akka-osgi/src/main/scala/akka/osgi/BundleDelegatingClassLoader.scala index 1925396537..b2e7d1a8da 100644 --- a/akka-osgi/src/main/scala/akka/osgi/BundleDelegatingClassLoader.scala +++ b/akka-osgi/src/main/scala/akka/osgi/BundleDelegatingClassLoader.scala @@ -84,11 +84,10 @@ class BundleDelegatingClassLoader(bundle: Bundle, fallBackClassLoader: ClassLoad wire ⇒ Option(wire.getProviderWiring) map { _.getBundle } }.toSet } - process(processed + b, rest ++ (direct -- processed)) + process(processed + b, rest ++ (direct diff processed)) } } } process(Set.empty, Set(bundle)) } } - diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala index 8fb58a9b4e..b0f788b3b0 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala @@ -52,16 +52,16 @@ object StreamLayout { var problems: List[String] = Nil if (inset.size != shape.inlets.size) problems ::= "shape has duplicate inlets: " + ins(shape.inlets) - if (inset != inPorts) problems ::= s"shape has extra ${ins(inset -- inPorts)}, module has extra ${ins(inPorts -- inset)}" + if (inset != inPorts) problems ::= s"shape has extra ${ins(inset diff inPorts)}, module has extra ${ins(inPorts diff inset)}" if (inset.intersect(upstreams.keySet).nonEmpty) problems ::= s"found connected inlets ${inset.intersect(upstreams.keySet)}" if (outset.size != shape.outlets.size) problems ::= "shape has duplicate outlets: " + outs(shape.outlets) - if (outset != outPorts) problems ::= s"shape has extra ${outs(outset -- outPorts)}, module has extra ${outs(outPorts -- outset)}" + if (outset != outPorts) problems ::= s"shape has extra ${outs(outset diff outPorts)}, module has extra ${outs(outPorts diff outset)}" if (outset.intersect(downstreams.keySet).nonEmpty) problems ::= s"found connected outlets ${outset.intersect(downstreams.keySet)}" val ups = upstreams.toSet val ups2 = ups.map(_.swap) val downs = downstreams.toSet val inter = ups2.intersect(downs) - if (downs != ups2) problems ::= s"inconsistent maps: ups ${pairs(ups2 -- inter)} downs ${pairs(downs -- inter)}" + if (downs != ups2) problems ::= s"inconsistent maps: ups ${pairs(ups2 diff inter)} downs ${pairs(downs diff inter)}" val (allIn, dupIn, allOut, dupOut) = subModules.foldLeft((Set.empty[InPort], Set.empty[InPort], Set.empty[OutPort], Set.empty[OutPort])) { case ((ai, di, ao, doo), sm) ⇒ @@ -69,11 +69,11 @@ object StreamLayout { } if (dupIn.nonEmpty) problems ::= s"duplicate ports in submodules ${ins(dupIn)}" if (dupOut.nonEmpty) problems ::= s"duplicate ports in submodules ${outs(dupOut)}" - if (!isSealed && (inset -- allIn).nonEmpty) problems ::= s"foreign inlets ${ins(inset -- allIn)}" - if (!isSealed && (outset -- allOut).nonEmpty) problems ::= s"foreign outlets ${outs(outset -- allOut)}" - val unIn = allIn -- inset -- upstreams.keySet + if (!isSealed && (inset diff allIn).nonEmpty) problems ::= s"foreign inlets ${ins(inset diff allIn)}" + if (!isSealed && (outset diff allOut).nonEmpty) problems ::= s"foreign outlets ${outs(outset diff allOut)}" + val unIn = allIn diff inset diff upstreams.keySet if (unIn.nonEmpty && !isCopied) problems ::= s"unconnected inlets ${ins(unIn)}" - val unOut = allOut -- outset -- downstreams.keySet + val unOut = allOut diff outset diff downstreams.keySet if (unOut.nonEmpty && !isCopied) problems ::= s"unconnected outlets ${outs(unOut)}" def atomics(n: MaterializedValueNode): Set[Module] = @@ -88,8 +88,8 @@ object StreamLayout { case GraphModule(_, _, _, mvids) ⇒ mvids case _ ⇒ Nil } - if ((atomic -- subModules -- graphValues - m).nonEmpty) - problems ::= s"computation refers to non-existent modules [${atomic -- subModules -- graphValues - m mkString ","}]" + if (((atomic diff subModules diff graphValues) - m).nonEmpty) + problems ::= s"computation refers to non-existent modules [${(atomic diff subModules diff graphValues) - m mkString ","}]" val print = doPrint || problems.nonEmpty @@ -273,7 +273,7 @@ object StreamLayout { } CompositeModule( - modulesLeft ++ modulesRight, + modulesLeft union modulesRight, AmorphousShape(shape.inlets ++ that.shape.inlets, shape.outlets ++ that.shape.outlets), downstreams ++ that.downstreams, upstreams ++ that.upstreams, diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index c929f46095..9a3d4651df 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -24,7 +24,6 @@ import java.util.concurrent.CompletionStage import java.util.concurrent.CompletableFuture import scala.compat.java8.FutureConverters._ import akka.stream.impl.SourceQueueAdapter -import akka.stream.scaladsl.SourceQueueWithComplete /** Java API */ object Source { From 8a3cf8145b12adc5345ccbfbeca2f1b1d682209a Mon Sep 17 00:00:00 2001 From: Roland Kuhn Date: Fri, 18 Mar 2016 12:28:27 +0100 Subject: [PATCH 2/2] lift mapMaterializedValue to FlowOpsMat --- akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala | 7 ++++++- .../src/main/scala/akka/stream/scaladsl/Source.scala | 4 ++-- project/MiMa.scala | 3 +++ 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index e041eaa743..755da1ebe6 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -114,7 +114,7 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) /** * Transform the materialized value of this Flow, leaving all other properties as they were. */ - def mapMaterializedValue[Mat2](f: Mat ⇒ Mat2): ReprMat[Out, Mat2] = + override def mapMaterializedValue[Mat2](f: Mat ⇒ Mat2): ReprMat[Out, Mat2] = new Flow(module.transformMaterializedValue(f.asInstanceOf[Any ⇒ Any])) /** @@ -1983,6 +1983,11 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] { def watchTermination[Mat2]()(matF: (Mat, Future[Done]) ⇒ Mat2): ReprMat[Out, Mat2] = viaMat(GraphStages.terminationWatcher)(matF) + /** + * Transform the materialized value of this graph, leaving all other properties as they were. + */ + def mapMaterializedValue[Mat2](f: Mat ⇒ Mat2): ReprMat[Out, Mat2] + /** * INTERNAL API. */ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 7740e27fd6..0d6a7ec994 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -28,7 +28,7 @@ import scala.compat.java8.FutureConverters._ * a Reactive Streams `Publisher` (at least conceptually). */ final class Source[+Out, +Mat](private[stream] override val module: Module) - extends FlowOpsMat[Out, Mat] with Graph[SourceShape[Out], Mat] { + extends FlowOpsMat[Out, Mat] with Graph[SourceShape[Out], Mat] { override type Repr[+O] = Source[O, Mat @uncheckedVariance] override type ReprMat[+O, +M] = Source[O, M] @@ -71,7 +71,7 @@ final class Source[+Out, +Mat](private[stream] override val module: Module) /** * Transform only the materialized value of this Source, leaving all other properties as they were. */ - def mapMaterializedValue[Mat2](f: Mat ⇒ Mat2): ReprMat[Out, Mat2] = + override def mapMaterializedValue[Mat2](f: Mat ⇒ Mat2): ReprMat[Out, Mat2] = new Source[Out, Mat2](module.transformMaterializedValue(f.asInstanceOf[Any ⇒ Any])) /** INTERNAL API */ diff --git a/project/MiMa.scala b/project/MiMa.scala index 2af118b654..aececeaed8 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -659,6 +659,9 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.Drop.onPush"), ProblemFilters.exclude[FinalClassProblem]("akka.stream.stage.GraphStageLogic$Reading"), // this class is private + // lifting this method to the type where it belongs + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOpsMat.mapMaterializedValue"), + // #19908 Take is private ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.Stages$Take$"), ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.Stages$Take"),