use union/diff operator on Sets (optimization)
This commit is contained in:
parent
b7fecaff37
commit
777a400b12
6 changed files with 15 additions and 18 deletions
|
|
@ -186,12 +186,12 @@ trait SubchannelClassification { this: EventBus ⇒
|
||||||
|
|
||||||
private def removeFromCache(changes: immutable.Seq[(Classifier, Set[Subscriber])]): Unit =
|
private def removeFromCache(changes: immutable.Seq[(Classifier, Set[Subscriber])]): Unit =
|
||||||
cache = (cache /: changes) {
|
cache = (cache /: changes) {
|
||||||
case (m, (c, cs)) ⇒ m.updated(c, m.getOrElse(c, Set.empty[Subscriber]) -- cs)
|
case (m, (c, cs)) ⇒ m.updated(c, m.getOrElse(c, Set.empty[Subscriber]) diff cs)
|
||||||
}
|
}
|
||||||
|
|
||||||
private def addToCache(changes: immutable.Seq[(Classifier, Set[Subscriber])]): Unit =
|
private def addToCache(changes: immutable.Seq[(Classifier, Set[Subscriber])]): Unit =
|
||||||
cache = (cache /: changes) {
|
cache = (cache /: changes) {
|
||||||
case (m, (c, cs)) ⇒ m.updated(c, m.getOrElse(c, Set.empty[Subscriber]) ++ cs)
|
case (m, (c, cs)) ⇒ m.updated(c, m.getOrElse(c, Set.empty[Subscriber]) union cs)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -19,7 +19,7 @@ case class TwoPhaseSet(
|
||||||
def remove(element: String): TwoPhaseSet =
|
def remove(element: String): TwoPhaseSet =
|
||||||
copy(removals = removals.add(element))
|
copy(removals = removals.add(element))
|
||||||
|
|
||||||
def elements: Set[String] = adds.elements -- removals.elements
|
def elements: Set[String] = adds.elements diff removals.elements
|
||||||
|
|
||||||
override def merge(that: TwoPhaseSet): TwoPhaseSet =
|
override def merge(that: TwoPhaseSet): TwoPhaseSet =
|
||||||
copy(
|
copy(
|
||||||
|
|
|
||||||
|
|
@ -32,8 +32,7 @@ object TwitterStreamQuickstartDocSpec {
|
||||||
//#model
|
//#model
|
||||||
|
|
||||||
//#tweet-source
|
//#tweet-source
|
||||||
val tweets: Source[Tweet, NotUsed]
|
val tweets: Source[Tweet, NotUsed] //#tweet-source
|
||||||
//#tweet-source
|
|
||||||
= Source(
|
= Source(
|
||||||
Tweet(Author("rolandkuhn"), System.currentTimeMillis, "#akka rocks!") ::
|
Tweet(Author("rolandkuhn"), System.currentTimeMillis, "#akka rocks!") ::
|
||||||
Tweet(Author("patriknw"), System.currentTimeMillis, "#akka !") ::
|
Tweet(Author("patriknw"), System.currentTimeMillis, "#akka !") ::
|
||||||
|
|
|
||||||
|
|
@ -84,11 +84,10 @@ class BundleDelegatingClassLoader(bundle: Bundle, fallBackClassLoader: ClassLoad
|
||||||
wire ⇒ Option(wire.getProviderWiring) map { _.getBundle }
|
wire ⇒ Option(wire.getProviderWiring) map { _.getBundle }
|
||||||
}.toSet
|
}.toSet
|
||||||
}
|
}
|
||||||
process(processed + b, rest ++ (direct -- processed))
|
process(processed + b, rest ++ (direct diff processed))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
process(Set.empty, Set(bundle))
|
process(Set.empty, Set(bundle))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -52,16 +52,16 @@ object StreamLayout {
|
||||||
var problems: List[String] = Nil
|
var problems: List[String] = Nil
|
||||||
|
|
||||||
if (inset.size != shape.inlets.size) problems ::= "shape has duplicate inlets: " + ins(shape.inlets)
|
if (inset.size != shape.inlets.size) problems ::= "shape has duplicate inlets: " + ins(shape.inlets)
|
||||||
if (inset != inPorts) problems ::= s"shape has extra ${ins(inset -- inPorts)}, module has extra ${ins(inPorts -- inset)}"
|
if (inset != inPorts) problems ::= s"shape has extra ${ins(inset diff inPorts)}, module has extra ${ins(inPorts diff inset)}"
|
||||||
if (inset.intersect(upstreams.keySet).nonEmpty) problems ::= s"found connected inlets ${inset.intersect(upstreams.keySet)}"
|
if (inset.intersect(upstreams.keySet).nonEmpty) problems ::= s"found connected inlets ${inset.intersect(upstreams.keySet)}"
|
||||||
if (outset.size != shape.outlets.size) problems ::= "shape has duplicate outlets: " + outs(shape.outlets)
|
if (outset.size != shape.outlets.size) problems ::= "shape has duplicate outlets: " + outs(shape.outlets)
|
||||||
if (outset != outPorts) problems ::= s"shape has extra ${outs(outset -- outPorts)}, module has extra ${outs(outPorts -- outset)}"
|
if (outset != outPorts) problems ::= s"shape has extra ${outs(outset diff outPorts)}, module has extra ${outs(outPorts diff outset)}"
|
||||||
if (outset.intersect(downstreams.keySet).nonEmpty) problems ::= s"found connected outlets ${outset.intersect(downstreams.keySet)}"
|
if (outset.intersect(downstreams.keySet).nonEmpty) problems ::= s"found connected outlets ${outset.intersect(downstreams.keySet)}"
|
||||||
val ups = upstreams.toSet
|
val ups = upstreams.toSet
|
||||||
val ups2 = ups.map(_.swap)
|
val ups2 = ups.map(_.swap)
|
||||||
val downs = downstreams.toSet
|
val downs = downstreams.toSet
|
||||||
val inter = ups2.intersect(downs)
|
val inter = ups2.intersect(downs)
|
||||||
if (downs != ups2) problems ::= s"inconsistent maps: ups ${pairs(ups2 -- inter)} downs ${pairs(downs -- inter)}"
|
if (downs != ups2) problems ::= s"inconsistent maps: ups ${pairs(ups2 diff inter)} downs ${pairs(downs diff inter)}"
|
||||||
val (allIn, dupIn, allOut, dupOut) =
|
val (allIn, dupIn, allOut, dupOut) =
|
||||||
subModules.foldLeft((Set.empty[InPort], Set.empty[InPort], Set.empty[OutPort], Set.empty[OutPort])) {
|
subModules.foldLeft((Set.empty[InPort], Set.empty[InPort], Set.empty[OutPort], Set.empty[OutPort])) {
|
||||||
case ((ai, di, ao, doo), sm) ⇒
|
case ((ai, di, ao, doo), sm) ⇒
|
||||||
|
|
@ -69,11 +69,11 @@ object StreamLayout {
|
||||||
}
|
}
|
||||||
if (dupIn.nonEmpty) problems ::= s"duplicate ports in submodules ${ins(dupIn)}"
|
if (dupIn.nonEmpty) problems ::= s"duplicate ports in submodules ${ins(dupIn)}"
|
||||||
if (dupOut.nonEmpty) problems ::= s"duplicate ports in submodules ${outs(dupOut)}"
|
if (dupOut.nonEmpty) problems ::= s"duplicate ports in submodules ${outs(dupOut)}"
|
||||||
if (!isSealed && (inset -- allIn).nonEmpty) problems ::= s"foreign inlets ${ins(inset -- allIn)}"
|
if (!isSealed && (inset diff allIn).nonEmpty) problems ::= s"foreign inlets ${ins(inset diff allIn)}"
|
||||||
if (!isSealed && (outset -- allOut).nonEmpty) problems ::= s"foreign outlets ${outs(outset -- allOut)}"
|
if (!isSealed && (outset diff allOut).nonEmpty) problems ::= s"foreign outlets ${outs(outset diff allOut)}"
|
||||||
val unIn = allIn -- inset -- upstreams.keySet
|
val unIn = allIn diff inset diff upstreams.keySet
|
||||||
if (unIn.nonEmpty && !isCopied) problems ::= s"unconnected inlets ${ins(unIn)}"
|
if (unIn.nonEmpty && !isCopied) problems ::= s"unconnected inlets ${ins(unIn)}"
|
||||||
val unOut = allOut -- outset -- downstreams.keySet
|
val unOut = allOut diff outset diff downstreams.keySet
|
||||||
if (unOut.nonEmpty && !isCopied) problems ::= s"unconnected outlets ${outs(unOut)}"
|
if (unOut.nonEmpty && !isCopied) problems ::= s"unconnected outlets ${outs(unOut)}"
|
||||||
|
|
||||||
def atomics(n: MaterializedValueNode): Set[Module] =
|
def atomics(n: MaterializedValueNode): Set[Module] =
|
||||||
|
|
@ -88,8 +88,8 @@ object StreamLayout {
|
||||||
case GraphModule(_, _, _, mvids) ⇒ mvids
|
case GraphModule(_, _, _, mvids) ⇒ mvids
|
||||||
case _ ⇒ Nil
|
case _ ⇒ Nil
|
||||||
}
|
}
|
||||||
if ((atomic -- subModules -- graphValues - m).nonEmpty)
|
if (((atomic diff subModules diff graphValues) - m).nonEmpty)
|
||||||
problems ::= s"computation refers to non-existent modules [${atomic -- subModules -- graphValues - m mkString ","}]"
|
problems ::= s"computation refers to non-existent modules [${(atomic diff subModules diff graphValues) - m mkString ","}]"
|
||||||
|
|
||||||
val print = doPrint || problems.nonEmpty
|
val print = doPrint || problems.nonEmpty
|
||||||
|
|
||||||
|
|
@ -273,7 +273,7 @@ object StreamLayout {
|
||||||
}
|
}
|
||||||
|
|
||||||
CompositeModule(
|
CompositeModule(
|
||||||
modulesLeft ++ modulesRight,
|
modulesLeft union modulesRight,
|
||||||
AmorphousShape(shape.inlets ++ that.shape.inlets, shape.outlets ++ that.shape.outlets),
|
AmorphousShape(shape.inlets ++ that.shape.inlets, shape.outlets ++ that.shape.outlets),
|
||||||
downstreams ++ that.downstreams,
|
downstreams ++ that.downstreams,
|
||||||
upstreams ++ that.upstreams,
|
upstreams ++ that.upstreams,
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,6 @@ import java.util.concurrent.CompletionStage
|
||||||
import java.util.concurrent.CompletableFuture
|
import java.util.concurrent.CompletableFuture
|
||||||
import scala.compat.java8.FutureConverters._
|
import scala.compat.java8.FutureConverters._
|
||||||
import akka.stream.impl.SourceQueueAdapter
|
import akka.stream.impl.SourceQueueAdapter
|
||||||
import akka.stream.scaladsl.SourceQueueWithComplete
|
|
||||||
|
|
||||||
/** Java API */
|
/** Java API */
|
||||||
object Source {
|
object Source {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue