Various scala-2.13.0-M5 fixes
Clearer imports in TcpDnsClient Accept SVG's when checking if diagrams were generated Fix akka-actor MiMa issues akka-testkit scalafix changes Update scalatest to 2.13-released snapshot akka-stream scalafix changes Update ssl-config To version that's released with -M5 Take elements directly out of buffer Manually fix immutable.Seq / Doc link Make sure the right Concat, Sink is imported akka-stream binary compat akka-stream-testkit scalafix akka-actor-tests scalafix Eta-expensions
This commit is contained in:
parent
10575ce8c5
commit
0cc8e20a2e
31 changed files with 102 additions and 60 deletions
|
|
@ -160,11 +160,11 @@ object SupervisorHierarchySpec {
|
|||
val sizes = s / kids
|
||||
var rest = s % kids
|
||||
val propsTemplate = Props.empty.withDispatcher("hierarchy")
|
||||
(1 to kids).map { (id) ⇒
|
||||
(1 to kids).iterator.map { (id) ⇒
|
||||
val kidSize = if (rest > 0) { rest -= 1; sizes + 1 } else sizes
|
||||
val props = Props(new Hierarchy(kidSize, breadth, listener, myLevel + 1, random)).withDeploy(propsTemplate.deploy)
|
||||
(context.watch(context.actorOf(props, id.toString)).path, kidSize)
|
||||
}(collection.breakOut)
|
||||
}.toMap
|
||||
} else Map()
|
||||
stateCache.put(self.path, HierarchyState(log, kidInfo, null))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ import akka.testkit.SocketUtil._
|
|||
import Tcp._
|
||||
import akka.actor.ActorSystem
|
||||
import akka.dispatch.ExecutionContexts
|
||||
import scala.collection.immutable
|
||||
|
||||
trait TcpIntegrationSpecSupport { _: AkkaSpec ⇒
|
||||
|
||||
|
|
@ -56,10 +57,10 @@ trait TcpIntegrationSpecSupport { _: AkkaSpec ⇒
|
|||
}
|
||||
|
||||
/** allow overriding socket options for server side channel */
|
||||
def bindOptions: immutable.Traversable[SocketOption] = Nil
|
||||
def bindOptions: immutable.Iterable[SocketOption] = Nil
|
||||
|
||||
/** allow overriding socket options for client side channel */
|
||||
def connectOptions: immutable.Traversable[SocketOption] = Nil
|
||||
def connectOptions: immutable.Iterable[SocketOption] = Nil
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -68,7 +68,7 @@ class RetrySpec extends AkkaSpec with RetrySupport {
|
|||
}
|
||||
|
||||
val retried = retry(
|
||||
attempt,
|
||||
() ⇒ attempt,
|
||||
10,
|
||||
100 milliseconds
|
||||
)
|
||||
|
|
@ -89,7 +89,7 @@ class RetrySpec extends AkkaSpec with RetrySupport {
|
|||
}
|
||||
|
||||
val retried = retry(
|
||||
attempt,
|
||||
() ⇒ attempt,
|
||||
5,
|
||||
100 milliseconds
|
||||
)
|
||||
|
|
|
|||
|
|
@ -35,7 +35,7 @@ class ByteStringSpec extends WordSpec with Matchers with Checkers {
|
|||
for {
|
||||
chunks ← Gen.choose(0, s)
|
||||
bytes ← Gen.listOfN(chunks, genSimpleByteString(1, 1 max (s / (chunks max 1))))
|
||||
} yield (ByteString.empty /: bytes)(_ ++ _)
|
||||
} yield bytes.foldLeft(ByteString.empty)(_ ++ _)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,11 @@
|
|||
/*
|
||||
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.util.ccompat
|
||||
|
||||
import scala.collection.immutable
|
||||
|
||||
package object imm {
|
||||
type MapLike[K, +V, +This <: immutable.Map[K, V]] = immutable.Map[K, V]
|
||||
}
|
||||
|
|
@ -0,0 +1,11 @@
|
|||
/*
|
||||
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.util.ccompat
|
||||
|
||||
import scala.collection.immutable
|
||||
|
||||
package object imm {
|
||||
type MapLike[K, +V, +This <: MapLike[K, V, This] with immutable.Map[K, V]] = immutable.MapLike[K, V, This]
|
||||
}
|
||||
|
|
@ -8,6 +8,7 @@ import scala.collection.immutable
|
|||
|
||||
import akka.actor.{ InvalidActorNameException, ChildStats, ChildRestartStats, ChildNameReserved, ActorRef }
|
||||
import akka.util.Collections.{ EmptyImmutableSeq, PartialImmutableValuesIterable }
|
||||
import akka.util.ccompat
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -49,13 +50,13 @@ private[akka] object ChildrenContainer {
|
|||
final case class Creation() extends SuspendReason with WaitingForChildren
|
||||
case object Termination extends SuspendReason
|
||||
|
||||
class ChildRestartsIterable(stats: immutable.Map[_, ChildStats]) extends PartialImmutableValuesIterable[ChildStats, ChildRestartStats] {
|
||||
class ChildRestartsIterable(stats: ccompat.imm.MapLike[_, ChildStats, _]) extends PartialImmutableValuesIterable[ChildStats, ChildRestartStats] {
|
||||
override final def apply(c: ChildStats) = c.asInstanceOf[ChildRestartStats]
|
||||
override final def isDefinedAt(c: ChildStats) = c.isInstanceOf[ChildRestartStats]
|
||||
override final def valuesIterator = stats.valuesIterator
|
||||
}
|
||||
|
||||
class ChildrenIterable(stats: immutable.Map[_, ChildStats]) extends PartialImmutableValuesIterable[ChildStats, ActorRef] {
|
||||
class ChildrenIterable(stats: ccompat.imm.MapLike[_, ChildStats, _]) extends PartialImmutableValuesIterable[ChildStats, ActorRef] {
|
||||
override final def apply(c: ChildStats) = c.asInstanceOf[ChildRestartStats].child
|
||||
override final def isDefinedAt(c: ChildStats) = c.isInstanceOf[ChildRestartStats]
|
||||
override final def valuesIterator = stats.valuesIterator
|
||||
|
|
|
|||
|
|
@ -120,7 +120,7 @@ object Tcp extends ExtensionId[TcpExt] with ExtensionIdProvider {
|
|||
final case class Connect(
|
||||
remoteAddress: InetSocketAddress,
|
||||
localAddress: Option[InetSocketAddress] = None,
|
||||
options: immutable.Iterable[SocketOption] = Nil,
|
||||
options: immutable.Traversable[SocketOption] = Nil,
|
||||
timeout: Option[FiniteDuration] = None,
|
||||
pullMode: Boolean = false) extends Command
|
||||
|
||||
|
|
@ -147,7 +147,7 @@ object Tcp extends ExtensionId[TcpExt] with ExtensionIdProvider {
|
|||
handler: ActorRef,
|
||||
localAddress: InetSocketAddress,
|
||||
backlog: Int = 100,
|
||||
options: immutable.Iterable[SocketOption] = Nil,
|
||||
options: immutable.Traversable[SocketOption] = Nil,
|
||||
pullMode: Boolean = false) extends Command
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -201,7 +201,7 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
|
|||
|
||||
/** used in subclasses to start the common machinery above once a channel is connected */
|
||||
def completeConnect(registration: ChannelRegistration, commander: ActorRef,
|
||||
options: immutable.Iterable[SocketOption]): Unit = {
|
||||
options: immutable.Traversable[SocketOption]): Unit = {
|
||||
this.registration = Some(registration)
|
||||
|
||||
// Turn off Nagle's algorithm by default
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@ private[io] class TcpIncomingConnection(
|
|||
_channel: SocketChannel,
|
||||
registry: ChannelRegistry,
|
||||
bindHandler: ActorRef,
|
||||
options: immutable.Iterable[SocketOption],
|
||||
options: immutable.Traversable[SocketOption],
|
||||
readThrottling: Boolean)
|
||||
extends TcpConnection(_tcp, _channel, readThrottling) {
|
||||
|
||||
|
|
|
|||
|
|
@ -98,7 +98,7 @@ object Udp extends ExtensionId[UdpExt] with ExtensionIdProvider {
|
|||
final case class Bind(
|
||||
handler: ActorRef,
|
||||
localAddress: InetSocketAddress,
|
||||
options: immutable.Iterable[SocketOption] = Nil) extends Command
|
||||
options: immutable.Traversable[SocketOption] = Nil) extends Command
|
||||
|
||||
/**
|
||||
* Send this message to the listener actor that previously sent a [[Bound]]
|
||||
|
|
@ -117,7 +117,7 @@ object Udp extends ExtensionId[UdpExt] with ExtensionIdProvider {
|
|||
* The “simple sender” will not stop itself, you will have to send it a [[akka.actor.PoisonPill]]
|
||||
* when you want to close the socket.
|
||||
*/
|
||||
case class SimpleSender(options: immutable.Iterable[SocketOption] = Nil) extends Command
|
||||
case class SimpleSender(options: immutable.Traversable[SocketOption] = Nil) extends Command
|
||||
object SimpleSender extends SimpleSender(Nil)
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -91,7 +91,7 @@ object UdpConnected extends ExtensionId[UdpConnectedExt] with ExtensionIdProvide
|
|||
handler: ActorRef,
|
||||
remoteAddress: InetSocketAddress,
|
||||
localAddress: Option[InetSocketAddress] = None,
|
||||
options: immutable.Iterable[SocketOption] = Nil) extends Command
|
||||
options: immutable.Traversable[SocketOption] = Nil) extends Command
|
||||
|
||||
/**
|
||||
* Send this message to a connection actor (which had previously sent the
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ private[io] class UdpSender(
|
|||
val udp: UdpExt,
|
||||
channelRegistry: ChannelRegistry,
|
||||
commander: ActorRef,
|
||||
options: immutable.Iterable[SocketOption])
|
||||
options: immutable.Traversable[SocketOption])
|
||||
extends Actor with ActorLogging with WithUdpSend with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
|
||||
|
||||
val channel = {
|
||||
|
|
|
|||
|
|
@ -9,7 +9,6 @@ import java.net.InetSocketAddress
|
|||
import akka.AkkaException
|
||||
import akka.actor.{ Actor, ActorLogging, ActorRef, Stash }
|
||||
import akka.annotation.InternalApi
|
||||
import akka.io.Tcp._
|
||||
import akka.io.dns.internal.DnsClient.Answer
|
||||
import akka.io.Tcp
|
||||
import akka.util.ByteString
|
||||
|
|
@ -31,13 +30,13 @@ import akka.util.ByteString
|
|||
}
|
||||
|
||||
val connecting: Receive = {
|
||||
case failure @ CommandFailed(_: Connect) ⇒
|
||||
case failure @ Tcp.CommandFailed(_: Tcp.Connect) ⇒
|
||||
throwFailure(s"Failed to connect to TCP DNS server at [$ns]", failure.cause)
|
||||
case _: Tcp.Connected ⇒
|
||||
log.debug("Connected to TCP address [{}]", ns)
|
||||
val connection = sender()
|
||||
context.become(ready(connection))
|
||||
connection ! Register(self)
|
||||
connection ! Tcp.Register(self)
|
||||
unstashAll()
|
||||
case _: Message ⇒
|
||||
stash()
|
||||
|
|
@ -47,9 +46,9 @@ import akka.util.ByteString
|
|||
case msg: Message ⇒
|
||||
val bytes = msg.write()
|
||||
connection ! Tcp.Write(encodeLength(bytes.length) ++ bytes)
|
||||
case failure @ CommandFailed(_: Write) ⇒
|
||||
case failure @ Tcp.CommandFailed(_: Tcp.Write) ⇒
|
||||
throwFailure("Write failed", failure.cause)
|
||||
case Received(newData) ⇒
|
||||
case Tcp.Received(newData) ⇒
|
||||
val data = buffer ++ newData
|
||||
// TCP DNS responses are prefixed by 2 bytes encoding the length of the response
|
||||
val prefixSize = 2
|
||||
|
|
@ -63,11 +62,11 @@ import akka.util.ByteString
|
|||
answerRecipient ! parseResponse(data.drop(prefixSize))
|
||||
context.become(ready(connection))
|
||||
if (data.length > prefixSize + expectedPayloadLength) {
|
||||
self ! Received(data.drop(prefixSize + expectedPayloadLength))
|
||||
self ! Tcp.Received(data.drop(prefixSize + expectedPayloadLength))
|
||||
}
|
||||
}
|
||||
}
|
||||
case PeerClosed ⇒
|
||||
case Tcp.PeerClosed ⇒
|
||||
context.become(idle)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -403,7 +403,7 @@ final class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Se
|
|||
.build()
|
||||
|
||||
val reachability = reachabilityToProto(gossip.overview.reachability)
|
||||
val members = gossip.members.map(memberToProto)
|
||||
val members = (gossip.members: Set[Member] /* 2.13.0-M5 change cast to .unsorted */ ).map(memberToProto _)
|
||||
val seen = gossip.overview.seen.map(mapUniqueAddress)
|
||||
|
||||
val overview = cm.GossipOverview.newBuilder.addAllSeen(seen.asJava).
|
||||
|
|
@ -415,7 +415,7 @@ final class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Se
|
|||
.addAllMembers(members.map(_.build).asJava)
|
||||
.setOverview(overview)
|
||||
.setVersion(vectorClockToProto(gossip.version, hashMapping))
|
||||
.addAllTombstones(gossip.tombstones.map(tombstoneToProto).asJava)
|
||||
.addAllTombstones(gossip.tombstones.map(tombstoneToProto _).asJava)
|
||||
}
|
||||
|
||||
private def vectorClockToProto(version: VectorClock, hashMapping: Map[String, Int]): cm.VectorClock.Builder = {
|
||||
|
|
@ -455,7 +455,7 @@ final class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Se
|
|||
|
||||
def reachabilityFromProto(observerReachability: Iterable[cm.ObserverReachability]): Reachability = {
|
||||
val recordBuilder = new immutable.VectorBuilder[Reachability.Record]
|
||||
val versionsBuilder = new scala.collection.mutable.MapBuilder[UniqueAddress, Long, Map[UniqueAddress, Long]](Map.empty)
|
||||
val versionsBuilder = Map.newBuilder[UniqueAddress, Long]
|
||||
for (o ← observerReachability) {
|
||||
val observer = addressMapping(o.getAddressIndex)
|
||||
versionsBuilder += ((observer, o.getVersion))
|
||||
|
|
@ -471,7 +471,7 @@ final class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Se
|
|||
|
||||
def memberFromProto(member: cm.Member) =
|
||||
new Member(addressMapping(member.getAddressIndex), member.getUpNumber, memberStatusFromInt(member.getStatus.getNumber),
|
||||
rolesFromProto(member.getRolesIndexesList.asScala))
|
||||
rolesFromProto(member.getRolesIndexesList.asScala.toSeq))
|
||||
|
||||
def rolesFromProto(roleIndexes: Seq[Integer]): Set[String] = {
|
||||
var containsDc = false
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ import java.util.concurrent.CountDownLatch
|
|||
|
||||
import akka.testkit.TestActor.AutoPilot
|
||||
import akka.util.JavaDurationConverters
|
||||
import scala.collection.compat._
|
||||
|
||||
/**
|
||||
* Provides factory methods for various Publishers.
|
||||
|
|
@ -363,7 +364,7 @@ object TestSubscriber {
|
|||
* Expect multiple stream elements.
|
||||
*/
|
||||
@annotation.varargs def expectNext(e1: I, e2: I, es: I*): Self =
|
||||
expectNextN((e1 +: e2 +: es).map(identity)(collection.breakOut))
|
||||
expectNextN((e1 +: e2 +: es).iterator.map(identity).to(scala.collection.immutable.IndexedSeq))
|
||||
|
||||
/**
|
||||
* Fluent DSL
|
||||
|
|
@ -371,7 +372,7 @@ object TestSubscriber {
|
|||
* Expect multiple stream elements in arbitrary order.
|
||||
*/
|
||||
@annotation.varargs def expectNextUnordered(e1: I, e2: I, es: I*): Self =
|
||||
expectNextUnorderedN((e1 +: e2 +: es).map(identity)(collection.breakOut))
|
||||
expectNextUnorderedN((e1 +: e2 +: es).iterator.map(identity).to(scala.collection.immutable.IndexedSeq))
|
||||
|
||||
/**
|
||||
* Expect and return the next `n` stream elements.
|
||||
|
|
|
|||
|
|
@ -186,7 +186,10 @@ import scala.util.control.NonFatal
|
|||
override def offer(element: T): Future[QueueOfferResult] = {
|
||||
val p = Promise[QueueOfferResult]
|
||||
callback.invokeWithFeedback(Offer(element, p))
|
||||
.onFailure { case NonFatal(e) ⇒ p.tryFailure(e) }(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
|
||||
.onComplete {
|
||||
case scala.util.Failure(NonFatal(e)) ⇒ p.tryFailure(e)
|
||||
case _ ⇒ ()
|
||||
}(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
|
||||
p.future
|
||||
}
|
||||
override def complete(): Unit = callback.invoke(Completion)
|
||||
|
|
|
|||
|
|
@ -22,13 +22,14 @@ import akka.stream.stage._
|
|||
import org.reactivestreams.{ Publisher, Subscriber }
|
||||
|
||||
import scala.annotation.unchecked.uncheckedVariance
|
||||
import scala.collection.generic.CanBuildFrom
|
||||
import scala.collection.{ immutable, mutable }
|
||||
import scala.compat.java8.FutureConverters._
|
||||
import scala.compat.java8.OptionConverters._
|
||||
import scala.concurrent.{ Future, Promise }
|
||||
import scala.util.control.NonFatal
|
||||
import scala.util.{ Failure, Success, Try }
|
||||
import scala.collection.immutable
|
||||
import scala.collection.compat._
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -204,7 +205,7 @@ import scala.util.{ Failure, Success, Try }
|
|||
}
|
||||
|
||||
override def onUpstreamFinish(): Unit = {
|
||||
val elements = buffer.result().toList
|
||||
val elements = buffer.toList
|
||||
buffer.clear()
|
||||
p.trySuccess(elements)
|
||||
completeStage()
|
||||
|
|
@ -265,7 +266,7 @@ import scala.util.{ Failure, Success, Try }
|
|||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi private[akka] final class SeqStage[T, That](implicit cbf: CanBuildFrom[Nothing, T, That with immutable.Traversable[_]]) extends GraphStageWithMaterializedValue[SinkShape[T], Future[That]] {
|
||||
@InternalApi private[akka] final class SeqStage[T, That](implicit cbf: Factory[T, That with immutable.Iterable[_]]) extends GraphStageWithMaterializedValue[SinkShape[T], Future[That]] {
|
||||
val in = Inlet[T]("seq.in")
|
||||
|
||||
override def toString: String = "SeqStage"
|
||||
|
|
@ -277,7 +278,7 @@ import scala.util.{ Failure, Success, Try }
|
|||
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
|
||||
val p: Promise[That] = Promise()
|
||||
val logic = new GraphStageLogic(shape) with InHandler {
|
||||
val buf = cbf()
|
||||
val buf = cbf.newBuilder
|
||||
|
||||
override def preStart(): Unit = pull(in)
|
||||
|
||||
|
|
@ -395,7 +396,10 @@ import scala.util.{ Failure, Success, Try }
|
|||
override def pull(): Future[Option[T]] = {
|
||||
val p = Promise[Option[T]]
|
||||
callback.invokeWithFeedback(Pull(p))
|
||||
.onFailure { case NonFatal(e) ⇒ p.tryFailure(e) }(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
|
||||
.onComplete {
|
||||
case scala.util.Failure(NonFatal(e)) ⇒ p.tryFailure(e)
|
||||
case _ ⇒ ()
|
||||
}(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
|
||||
p.future
|
||||
}
|
||||
override def cancel(): Unit = {
|
||||
|
|
|
|||
|
|
@ -589,7 +589,7 @@ import akka.stream.impl.fusing.GraphStages.SingleSource
|
|||
val inlets = module.shape.inlets
|
||||
if (inlets.isEmpty) Map.empty
|
||||
else if (Shape.hasOnePort(inlets)) new Map1(inlets.head, inlets.head.id)
|
||||
else inlets.map(in ⇒ in.asInstanceOf[InPort] → in.id)(collection.breakOut)
|
||||
else inlets.iterator.map(in ⇒ in.asInstanceOf[InPort] → in.id).toMap
|
||||
}
|
||||
CompletedTraversalBuilder(
|
||||
traversalSoFar = MaterializeAtomic(module, newOutToSlot),
|
||||
|
|
|
|||
|
|
@ -26,6 +26,7 @@ import akka.util.ByteString
|
|||
import scala.collection.immutable
|
||||
import scala.concurrent.duration.{ Duration, FiniteDuration }
|
||||
import scala.concurrent.{ Future, Promise }
|
||||
import scala.collection.immutable
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
|
|||
|
|
@ -13,7 +13,9 @@ import akka.util.ConstantFun
|
|||
|
||||
import scala.annotation.unchecked.uncheckedVariance
|
||||
import scala.collection.JavaConverters._
|
||||
import akka.stream.scaladsl.GenericGraph
|
||||
import akka.stream.scaladsl.{ GenericGraph, GenericGraphWithChangedAttributes }
|
||||
import akka.stream.Attributes
|
||||
import akka.stream.impl.TraversalBuilder
|
||||
|
||||
/**
|
||||
* Merge several streams, taking elements as they arrive from input streams
|
||||
|
|
|
|||
|
|
@ -18,7 +18,7 @@ import scala.compat.java8.OptionConverters._
|
|||
import scala.concurrent.ExecutionContext
|
||||
import scala.util.Try
|
||||
import java.util.concurrent.CompletionStage
|
||||
|
||||
import scala.collection.immutable
|
||||
import scala.annotation.unchecked.uncheckedVariance
|
||||
import scala.compat.java8.FutureConverters._
|
||||
|
||||
|
|
@ -262,7 +262,7 @@ object Sink {
|
|||
*/
|
||||
def combine[T, U](output1: Sink[U, _], output2: Sink[U, _], rest: java.util.List[Sink[U, _]], strategy: function.Function[java.lang.Integer, Graph[UniformFanOutShape[T, U], NotUsed]]): Sink[T, NotUsed] = {
|
||||
import scala.collection.JavaConverters._
|
||||
val seq = if (rest != null) rest.asScala.map(_.asScala) else Seq()
|
||||
val seq = if (rest != null) rest.asScala.map(_.asScala).toSeq else immutable.Seq()
|
||||
new Sink(scaladsl.Sink.combine(output1.asScala, output2.asScala, seq: _*)(num ⇒ strategy.apply(num)))
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ package akka.stream.scaladsl
|
|||
import akka.event.LoggingAdapter
|
||||
import akka.stream._
|
||||
import akka.Done
|
||||
import akka.stream.impl._
|
||||
import akka.stream.impl.{ LinearTraversalBuilder, ProcessorModule, Timers, SubFlowImpl, TraversalBuilder, Throttle, fusing }
|
||||
import akka.stream.impl.fusing._
|
||||
import akka.stream.stage._
|
||||
import akka.util.{ ConstantFun, Timeout }
|
||||
|
|
|
|||
|
|
@ -431,11 +431,12 @@ private[akka] class BroadcastHub[T](bufferSize: Int) extends GraphStageWithMater
|
|||
addConsumer(consumer, startFrom)
|
||||
// in case the consumer is already stopped we need to undo registration
|
||||
implicit val ec = materializer.executionContext
|
||||
consumer.callback.invokeWithFeedback(Initialize(startFrom)).onFailure {
|
||||
case _: StreamDetachedException ⇒
|
||||
consumer.callback.invokeWithFeedback(Initialize(startFrom)).onComplete {
|
||||
case scala.util.Failure(_: StreamDetachedException) ⇒
|
||||
callbackPromise.future.foreach(callback ⇒
|
||||
callback.invoke(UnRegister(consumer.id, startFrom, startFrom))
|
||||
)
|
||||
case _ ⇒ ()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -17,10 +17,11 @@ import akka.stream.{ javadsl, _ }
|
|||
import org.reactivestreams.{ Publisher, Subscriber }
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import scala.collection.generic.CanBuildFrom
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.{ ExecutionContext, Future }
|
||||
import scala.util.{ Failure, Success, Try }
|
||||
import scala.collection.immutable
|
||||
import scala.collection.compat._
|
||||
|
||||
/**
|
||||
* A `Sink` is a set of stream processing steps that has one open input.
|
||||
|
|
@ -239,12 +240,12 @@ object Sink {
|
|||
* may be used to ensure boundedness.
|
||||
* Materializes into a `Future` of `That[T]` containing all the collected elements.
|
||||
* `That[T]` is limited to the limitations of the CanBuildFrom associated with it. For example, `Seq` is limited to
|
||||
* `Int.MaxValue` elements. See [The Architecture of Scala Collections](https://docs.scala-lang.org/overviews/core/architecture-of-scala-collections.html) for more info.
|
||||
* `Int.MaxValue` elements. See [The Architecture of Scala 2.13's Collections](https://docs.scala-lang.org/overviews/core/architecture-of-scala-213-collections.html) for more info.
|
||||
* This Sink will cancel the stream after having received that many elements.
|
||||
*
|
||||
* See also [[Flow.limit]], [[Flow.limitWeighted]], [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]]
|
||||
*/
|
||||
def collection[T, That](implicit cbf: CanBuildFrom[Nothing, T, That with immutable.Traversable[_]]): Sink[T, Future[That]] =
|
||||
def collection[T, That](implicit cbf: Factory[T, That with immutable.Iterable[_]]): Sink[T, Future[That]] =
|
||||
Sink.fromGraph(new SeqStage[T, That])
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ import scala.concurrent.Future
|
|||
import scala.concurrent.duration.{ Duration, FiniteDuration }
|
||||
import scala.util.Try
|
||||
import scala.util.control.NoStackTrace
|
||||
import scala.collection.immutable
|
||||
|
||||
object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider {
|
||||
|
||||
|
|
|
|||
|
|
@ -50,7 +50,7 @@ private[testkit] class CallingThreadDispatcherQueues extends Extension {
|
|||
|
||||
// we have to forget about long-gone threads sometime
|
||||
private def gc(): Unit = {
|
||||
queues = (Map.newBuilder[CallingThreadMailbox, Set[WeakReference[MessageQueue]]] /: queues) {
|
||||
queues = queues.foldLeft(Map.newBuilder[CallingThreadMailbox, Set[WeakReference[MessageQueue]]]) {
|
||||
case (m, (k, v)) ⇒
|
||||
val nv = v filter (_.get ne null)
|
||||
if (nv.isEmpty) m else m += (k → nv)
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ import akka.actor.NoSerializationVerificationNeeded
|
|||
import akka.japi.Util.immutableSeq
|
||||
import java.lang.{ Iterable ⇒ JIterable }
|
||||
import akka.util.BoxedType
|
||||
import scala.collection.compat._
|
||||
|
||||
/**
|
||||
* Implementation helpers of the EventFilter facilities: send `Mute`
|
||||
|
|
@ -39,7 +40,7 @@ sealed trait TestEvent
|
|||
*/
|
||||
object TestEvent {
|
||||
object Mute {
|
||||
def apply(filter: EventFilter, filters: EventFilter*): Mute = new Mute(filter +: filters.to[immutable.Seq])
|
||||
def apply(filter: EventFilter, filters: EventFilter*): Mute = new Mute(filter +: filters.to(immutable.Seq))
|
||||
}
|
||||
final case class Mute(filters: immutable.Seq[EventFilter]) extends TestEvent with NoSerializationVerificationNeeded {
|
||||
/**
|
||||
|
|
@ -48,7 +49,7 @@ object TestEvent {
|
|||
def this(filters: JIterable[EventFilter]) = this(immutableSeq(filters))
|
||||
}
|
||||
object UnMute {
|
||||
def apply(filter: EventFilter, filters: EventFilter*): UnMute = new UnMute(filter +: filters.to[immutable.Seq])
|
||||
def apply(filter: EventFilter, filters: EventFilter*): UnMute = new UnMute(filter +: filters.to(immutable.Seq))
|
||||
}
|
||||
final case class UnMute(filters: immutable.Seq[EventFilter]) extends TestEvent with NoSerializationVerificationNeeded {
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -9,12 +9,13 @@ import scala.concurrent.duration.{ Duration, FiniteDuration }
|
|||
import scala.reflect.ClassTag
|
||||
import scala.collection.immutable
|
||||
import java.util.concurrent.TimeUnit.MILLISECONDS
|
||||
import scala.collection.compat._
|
||||
|
||||
package object testkit {
|
||||
def filterEvents[T](eventFilters: Iterable[EventFilter])(block: ⇒ T)(implicit system: ActorSystem): T = {
|
||||
def now = System.currentTimeMillis
|
||||
|
||||
system.eventStream.publish(TestEvent.Mute(eventFilters.to[immutable.Seq]))
|
||||
system.eventStream.publish(TestEvent.Mute(eventFilters.to(immutable.Seq)))
|
||||
|
||||
try {
|
||||
val result = block
|
||||
|
|
@ -27,7 +28,7 @@ package object testkit {
|
|||
|
||||
result
|
||||
} finally {
|
||||
system.eventStream.publish(TestEvent.UnMute(eventFilters.to[immutable.Seq]))
|
||||
system.eventStream.publish(TestEvent.UnMute(eventFilters.to(immutable.Seq)))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@ object Dependencies {
|
|||
lazy val scalaCheckVersion = settingKey[String]("The version of ScalaCheck to use.")
|
||||
lazy val java8CompatVersion = settingKey[String]("The version of scala-java8-compat to use.")
|
||||
val junitVersion = "4.12"
|
||||
val sslConfigVersion = "0.3.6"
|
||||
val sslConfigVersion = "0.3.7"
|
||||
val slf4jVersion = "1.7.25"
|
||||
val scalaXmlVersion = "1.0.6"
|
||||
val aeronVersion = "1.12.0"
|
||||
|
|
@ -29,7 +29,7 @@ object Dependencies {
|
|||
case Some((2, n)) if n >= 12 ⇒ "1.14.0" // does not work for 2.11
|
||||
case _ ⇒ "1.13.2"
|
||||
}),
|
||||
scalaTestVersion := "3.0.5",
|
||||
scalaTestVersion := "3.0.6-SNAP5",
|
||||
java8CompatVersion := {
|
||||
CrossVersion.partialVersion(scalaVersion.value) match {
|
||||
case Some((2, n)) if n >= 13 ⇒ "0.9.0"
|
||||
|
|
|
|||
|
|
@ -61,7 +61,10 @@ object Scaladoc extends AutoPlugin {
|
|||
if (name.endsWith(".html") && !name.startsWith("index-") &&
|
||||
!name.equals("index.html") && !name.equals("package.html")) {
|
||||
val source = scala.io.Source.fromFile(f)(scala.io.Codec.UTF8)
|
||||
val hd = try source.getLines().exists(_.contains("<div class=\"toggleContainer block diagram-container\" id=\"inheritance-diagram-container\">"))
|
||||
val hd = try source.getLines().exists(lines =>
|
||||
lines.contains("<div class=\"toggleContainer block diagram-container\" id=\"inheritance-diagram-container\">") ||
|
||||
lines.contains("<svg id=\"graph")
|
||||
)
|
||||
catch {
|
||||
case e: Exception ⇒ throw new IllegalStateException("Scaladoc verification failed for file '" + f + "'", e)
|
||||
} finally source.close()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue