From 0cc8e20a2ef43ed1a5efd2f4eb17390c830ed4db Mon Sep 17 00:00:00 2001 From: Arnout Engelen Date: Wed, 21 Nov 2018 15:00:29 +0100 Subject: [PATCH] Various scala-2.13.0-M5 fixes Clearer imports in TcpDnsClient Accept SVG's when checking if diagrams were generated Fix akka-actor MiMa issues akka-testkit scalafix changes Update scalatest to 2.13-released snapshot akka-stream scalafix changes Update ssl-config To version that's released with -M5 Take elements directly out of buffer Manually fix immutable.Seq / Doc link Make sure the right Concat, Sink is imported akka-stream binary compat akka-stream-testkit scalafix akka-actor-tests scalafix Eta-expensions --- .../scala/akka/actor/SupervisorHierarchySpec.scala | 4 ++-- .../scala/akka/io/TcpIntegrationSpecSupport.scala | 5 +++-- .../src/test/scala/akka/pattern/RetrySpec.scala | 4 ++-- .../src/test/scala/akka/util/ByteStringSpec.scala | 2 +- .../akka/util/ccompat/imm/package.scala | 11 +++++++++++ .../akka/util/ccompat/imm/package.scala | 11 +++++++++++ .../akka/actor/dungeon/ChildrenContainer.scala | 5 +++-- akka-actor/src/main/scala/akka/io/Tcp.scala | 14 +++++++------- .../src/main/scala/akka/io/TcpConnection.scala | 2 +- .../main/scala/akka/io/TcpIncomingConnection.scala | 2 +- akka-actor/src/main/scala/akka/io/Udp.scala | 4 ++-- .../src/main/scala/akka/io/UdpConnected.scala | 4 ++-- akka-actor/src/main/scala/akka/io/UdpSender.scala | 2 +- .../scala/akka/io/dns/internal/TcpDnsClient.scala | 13 ++++++------- .../protobuf/ClusterMessageSerializer.scala | 8 ++++---- .../scala/akka/stream/testkit/StreamTestKit.scala | 5 +++-- .../main/scala/akka/stream/impl/QueueSource.scala | 5 ++++- .../src/main/scala/akka/stream/impl/Sinks.scala | 14 +++++++++----- .../scala/akka/stream/impl/TraversalBuilder.scala | 2 +- .../main/scala/akka/stream/impl/io/TcpStages.scala | 1 + .../src/main/scala/akka/stream/javadsl/Graph.scala | 4 +++- .../src/main/scala/akka/stream/javadsl/Sink.scala | 4 ++-- .../src/main/scala/akka/stream/scaladsl/Flow.scala | 2 +- .../src/main/scala/akka/stream/scaladsl/Hub.scala | 5 +++-- .../src/main/scala/akka/stream/scaladsl/Sink.scala | 7 ++++--- .../src/main/scala/akka/stream/scaladsl/Tcp.scala | 1 + .../akka/testkit/CallingThreadDispatcher.scala | 2 +- .../scala/akka/testkit/TestEventListener.scala | 5 +++-- .../src/main/scala/akka/testkit/package.scala | 5 +++-- project/Dependencies.scala | 4 ++-- project/Doc.scala | 5 ++++- 31 files changed, 102 insertions(+), 60 deletions(-) create mode 100644 akka-actor/src/main/scala-2.13+/akka/util/ccompat/imm/package.scala create mode 100644 akka-actor/src/main/scala-2.13-/akka/util/ccompat/imm/package.scala diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala index 8526a58dc1..d68a21556f 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala @@ -160,11 +160,11 @@ object SupervisorHierarchySpec { val sizes = s / kids var rest = s % kids val propsTemplate = Props.empty.withDispatcher("hierarchy") - (1 to kids).map { (id) ⇒ + (1 to kids).iterator.map { (id) ⇒ val kidSize = if (rest > 0) { rest -= 1; sizes + 1 } else sizes val props = Props(new Hierarchy(kidSize, breadth, listener, myLevel + 1, random)).withDeploy(propsTemplate.deploy) (context.watch(context.actorOf(props, id.toString)).path, kidSize) - }(collection.breakOut) + }.toMap } else Map() stateCache.put(self.path, HierarchyState(log, kidInfo, null)) } diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpecSupport.scala b/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpecSupport.scala index b1fc3d4df2..98ed9d9df7 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpecSupport.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpecSupport.scala @@ -13,6 +13,7 @@ import akka.testkit.SocketUtil._ import Tcp._ import akka.actor.ActorSystem import akka.dispatch.ExecutionContexts +import scala.collection.immutable trait TcpIntegrationSpecSupport { _: AkkaSpec ⇒ @@ -56,10 +57,10 @@ trait TcpIntegrationSpecSupport { _: AkkaSpec ⇒ } /** allow overriding socket options for server side channel */ - def bindOptions: immutable.Traversable[SocketOption] = Nil + def bindOptions: immutable.Iterable[SocketOption] = Nil /** allow overriding socket options for client side channel */ - def connectOptions: immutable.Traversable[SocketOption] = Nil + def connectOptions: immutable.Iterable[SocketOption] = Nil } } diff --git a/akka-actor-tests/src/test/scala/akka/pattern/RetrySpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/RetrySpec.scala index 846a6df9a6..9e541f5427 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/RetrySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/RetrySpec.scala @@ -68,7 +68,7 @@ class RetrySpec extends AkkaSpec with RetrySupport { } val retried = retry( - attempt, + () ⇒ attempt, 10, 100 milliseconds ) @@ -89,7 +89,7 @@ class RetrySpec extends AkkaSpec with RetrySupport { } val retried = retry( - attempt, + () ⇒ attempt, 5, 100 milliseconds ) diff --git a/akka-actor-tests/src/test/scala/akka/util/ByteStringSpec.scala b/akka-actor-tests/src/test/scala/akka/util/ByteStringSpec.scala index b2973d78e4..ae22684176 100644 --- a/akka-actor-tests/src/test/scala/akka/util/ByteStringSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/util/ByteStringSpec.scala @@ -35,7 +35,7 @@ class ByteStringSpec extends WordSpec with Matchers with Checkers { for { chunks ← Gen.choose(0, s) bytes ← Gen.listOfN(chunks, genSimpleByteString(1, 1 max (s / (chunks max 1)))) - } yield (ByteString.empty /: bytes)(_ ++ _) + } yield bytes.foldLeft(ByteString.empty)(_ ++ _) } } diff --git a/akka-actor/src/main/scala-2.13+/akka/util/ccompat/imm/package.scala b/akka-actor/src/main/scala-2.13+/akka/util/ccompat/imm/package.scala new file mode 100644 index 0000000000..7750df8b46 --- /dev/null +++ b/akka-actor/src/main/scala-2.13+/akka/util/ccompat/imm/package.scala @@ -0,0 +1,11 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.util.ccompat + +import scala.collection.immutable + +package object imm { + type MapLike[K, +V, +This <: immutable.Map[K, V]] = immutable.Map[K, V] +} diff --git a/akka-actor/src/main/scala-2.13-/akka/util/ccompat/imm/package.scala b/akka-actor/src/main/scala-2.13-/akka/util/ccompat/imm/package.scala new file mode 100644 index 0000000000..730c8cf224 --- /dev/null +++ b/akka-actor/src/main/scala-2.13-/akka/util/ccompat/imm/package.scala @@ -0,0 +1,11 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.util.ccompat + +import scala.collection.immutable + +package object imm { + type MapLike[K, +V, +This <: MapLike[K, V, This] with immutable.Map[K, V]] = immutable.MapLike[K, V, This] +} diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/ChildrenContainer.scala b/akka-actor/src/main/scala/akka/actor/dungeon/ChildrenContainer.scala index a4b125e2ec..f8808fd116 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/ChildrenContainer.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/ChildrenContainer.scala @@ -8,6 +8,7 @@ import scala.collection.immutable import akka.actor.{ InvalidActorNameException, ChildStats, ChildRestartStats, ChildNameReserved, ActorRef } import akka.util.Collections.{ EmptyImmutableSeq, PartialImmutableValuesIterable } +import akka.util.ccompat /** * INTERNAL API @@ -49,13 +50,13 @@ private[akka] object ChildrenContainer { final case class Creation() extends SuspendReason with WaitingForChildren case object Termination extends SuspendReason - class ChildRestartsIterable(stats: immutable.Map[_, ChildStats]) extends PartialImmutableValuesIterable[ChildStats, ChildRestartStats] { + class ChildRestartsIterable(stats: ccompat.imm.MapLike[_, ChildStats, _]) extends PartialImmutableValuesIterable[ChildStats, ChildRestartStats] { override final def apply(c: ChildStats) = c.asInstanceOf[ChildRestartStats] override final def isDefinedAt(c: ChildStats) = c.isInstanceOf[ChildRestartStats] override final def valuesIterator = stats.valuesIterator } - class ChildrenIterable(stats: immutable.Map[_, ChildStats]) extends PartialImmutableValuesIterable[ChildStats, ActorRef] { + class ChildrenIterable(stats: ccompat.imm.MapLike[_, ChildStats, _]) extends PartialImmutableValuesIterable[ChildStats, ActorRef] { override final def apply(c: ChildStats) = c.asInstanceOf[ChildRestartStats].child override final def isDefinedAt(c: ChildStats) = c.isInstanceOf[ChildRestartStats] override final def valuesIterator = stats.valuesIterator diff --git a/akka-actor/src/main/scala/akka/io/Tcp.scala b/akka-actor/src/main/scala/akka/io/Tcp.scala index 996604d683..20ffda2d6a 100644 --- a/akka-actor/src/main/scala/akka/io/Tcp.scala +++ b/akka-actor/src/main/scala/akka/io/Tcp.scala @@ -119,10 +119,10 @@ object Tcp extends ExtensionId[TcpExt] with ExtensionIdProvider { */ final case class Connect( remoteAddress: InetSocketAddress, - localAddress: Option[InetSocketAddress] = None, - options: immutable.Iterable[SocketOption] = Nil, - timeout: Option[FiniteDuration] = None, - pullMode: Boolean = false) extends Command + localAddress: Option[InetSocketAddress] = None, + options: immutable.Traversable[SocketOption] = Nil, + timeout: Option[FiniteDuration] = None, + pullMode: Boolean = false) extends Command /** * The Bind message is send to the TCP manager actor, which is obtained via @@ -146,9 +146,9 @@ object Tcp extends ExtensionId[TcpExt] with ExtensionIdProvider { final case class Bind( handler: ActorRef, localAddress: InetSocketAddress, - backlog: Int = 100, - options: immutable.Iterable[SocketOption] = Nil, - pullMode: Boolean = false) extends Command + backlog: Int = 100, + options: immutable.Traversable[SocketOption] = Nil, + pullMode: Boolean = false) extends Command /** * This message must be sent to a TCP connection actor after receiving the diff --git a/akka-actor/src/main/scala/akka/io/TcpConnection.scala b/akka-actor/src/main/scala/akka/io/TcpConnection.scala index 0b2f1951a8..0c347821ca 100644 --- a/akka-actor/src/main/scala/akka/io/TcpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpConnection.scala @@ -201,7 +201,7 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha /** used in subclasses to start the common machinery above once a channel is connected */ def completeConnect(registration: ChannelRegistration, commander: ActorRef, - options: immutable.Iterable[SocketOption]): Unit = { + options: immutable.Traversable[SocketOption]): Unit = { this.registration = Some(registration) // Turn off Nagle's algorithm by default diff --git a/akka-actor/src/main/scala/akka/io/TcpIncomingConnection.scala b/akka-actor/src/main/scala/akka/io/TcpIncomingConnection.scala index 89348f8880..0140039d62 100644 --- a/akka-actor/src/main/scala/akka/io/TcpIncomingConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpIncomingConnection.scala @@ -21,7 +21,7 @@ private[io] class TcpIncomingConnection( _channel: SocketChannel, registry: ChannelRegistry, bindHandler: ActorRef, - options: immutable.Iterable[SocketOption], + options: immutable.Traversable[SocketOption], readThrottling: Boolean) extends TcpConnection(_tcp, _channel, readThrottling) { diff --git a/akka-actor/src/main/scala/akka/io/Udp.scala b/akka-actor/src/main/scala/akka/io/Udp.scala index d01823c91d..66f1b2e851 100644 --- a/akka-actor/src/main/scala/akka/io/Udp.scala +++ b/akka-actor/src/main/scala/akka/io/Udp.scala @@ -98,7 +98,7 @@ object Udp extends ExtensionId[UdpExt] with ExtensionIdProvider { final case class Bind( handler: ActorRef, localAddress: InetSocketAddress, - options: immutable.Iterable[SocketOption] = Nil) extends Command + options: immutable.Traversable[SocketOption] = Nil) extends Command /** * Send this message to the listener actor that previously sent a [[Bound]] @@ -117,7 +117,7 @@ object Udp extends ExtensionId[UdpExt] with ExtensionIdProvider { * The “simple sender” will not stop itself, you will have to send it a [[akka.actor.PoisonPill]] * when you want to close the socket. */ - case class SimpleSender(options: immutable.Iterable[SocketOption] = Nil) extends Command + case class SimpleSender(options: immutable.Traversable[SocketOption] = Nil) extends Command object SimpleSender extends SimpleSender(Nil) /** diff --git a/akka-actor/src/main/scala/akka/io/UdpConnected.scala b/akka-actor/src/main/scala/akka/io/UdpConnected.scala index 6d920e1b78..ae249e22f1 100644 --- a/akka-actor/src/main/scala/akka/io/UdpConnected.scala +++ b/akka-actor/src/main/scala/akka/io/UdpConnected.scala @@ -90,8 +90,8 @@ object UdpConnected extends ExtensionId[UdpConnectedExt] with ExtensionIdProvide final case class Connect( handler: ActorRef, remoteAddress: InetSocketAddress, - localAddress: Option[InetSocketAddress] = None, - options: immutable.Iterable[SocketOption] = Nil) extends Command + localAddress: Option[InetSocketAddress] = None, + options: immutable.Traversable[SocketOption] = Nil) extends Command /** * Send this message to a connection actor (which had previously sent the diff --git a/akka-actor/src/main/scala/akka/io/UdpSender.scala b/akka-actor/src/main/scala/akka/io/UdpSender.scala index 7a33ededa8..c43e894902 100644 --- a/akka-actor/src/main/scala/akka/io/UdpSender.scala +++ b/akka-actor/src/main/scala/akka/io/UdpSender.scala @@ -20,7 +20,7 @@ private[io] class UdpSender( val udp: UdpExt, channelRegistry: ChannelRegistry, commander: ActorRef, - options: immutable.Iterable[SocketOption]) + options: immutable.Traversable[SocketOption]) extends Actor with ActorLogging with WithUdpSend with RequiresMessageQueue[UnboundedMessageQueueSemantics] { val channel = { diff --git a/akka-actor/src/main/scala/akka/io/dns/internal/TcpDnsClient.scala b/akka-actor/src/main/scala/akka/io/dns/internal/TcpDnsClient.scala index 725c97240d..8bfed95cc4 100644 --- a/akka-actor/src/main/scala/akka/io/dns/internal/TcpDnsClient.scala +++ b/akka-actor/src/main/scala/akka/io/dns/internal/TcpDnsClient.scala @@ -9,7 +9,6 @@ import java.net.InetSocketAddress import akka.AkkaException import akka.actor.{ Actor, ActorLogging, ActorRef, Stash } import akka.annotation.InternalApi -import akka.io.Tcp._ import akka.io.dns.internal.DnsClient.Answer import akka.io.Tcp import akka.util.ByteString @@ -31,13 +30,13 @@ import akka.util.ByteString } val connecting: Receive = { - case failure @ CommandFailed(_: Connect) ⇒ + case failure @ Tcp.CommandFailed(_: Tcp.Connect) ⇒ throwFailure(s"Failed to connect to TCP DNS server at [$ns]", failure.cause) case _: Tcp.Connected ⇒ log.debug("Connected to TCP address [{}]", ns) val connection = sender() context.become(ready(connection)) - connection ! Register(self) + connection ! Tcp.Register(self) unstashAll() case _: Message ⇒ stash() @@ -47,9 +46,9 @@ import akka.util.ByteString case msg: Message ⇒ val bytes = msg.write() connection ! Tcp.Write(encodeLength(bytes.length) ++ bytes) - case failure @ CommandFailed(_: Write) ⇒ + case failure @ Tcp.CommandFailed(_: Tcp.Write) ⇒ throwFailure("Write failed", failure.cause) - case Received(newData) ⇒ + case Tcp.Received(newData) ⇒ val data = buffer ++ newData // TCP DNS responses are prefixed by 2 bytes encoding the length of the response val prefixSize = 2 @@ -63,11 +62,11 @@ import akka.util.ByteString answerRecipient ! parseResponse(data.drop(prefixSize)) context.become(ready(connection)) if (data.length > prefixSize + expectedPayloadLength) { - self ! Received(data.drop(prefixSize + expectedPayloadLength)) + self ! Tcp.Received(data.drop(prefixSize + expectedPayloadLength)) } } } - case PeerClosed ⇒ + case Tcp.PeerClosed ⇒ context.become(idle) } diff --git a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala index 32808ab7cb..7c86bb3e99 100644 --- a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala +++ b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala @@ -403,7 +403,7 @@ final class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Se .build() val reachability = reachabilityToProto(gossip.overview.reachability) - val members = gossip.members.map(memberToProto) + val members = (gossip.members: Set[Member] /* 2.13.0-M5 change cast to .unsorted */ ).map(memberToProto _) val seen = gossip.overview.seen.map(mapUniqueAddress) val overview = cm.GossipOverview.newBuilder.addAllSeen(seen.asJava). @@ -415,7 +415,7 @@ final class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Se .addAllMembers(members.map(_.build).asJava) .setOverview(overview) .setVersion(vectorClockToProto(gossip.version, hashMapping)) - .addAllTombstones(gossip.tombstones.map(tombstoneToProto).asJava) + .addAllTombstones(gossip.tombstones.map(tombstoneToProto _).asJava) } private def vectorClockToProto(version: VectorClock, hashMapping: Map[String, Int]): cm.VectorClock.Builder = { @@ -455,7 +455,7 @@ final class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Se def reachabilityFromProto(observerReachability: Iterable[cm.ObserverReachability]): Reachability = { val recordBuilder = new immutable.VectorBuilder[Reachability.Record] - val versionsBuilder = new scala.collection.mutable.MapBuilder[UniqueAddress, Long, Map[UniqueAddress, Long]](Map.empty) + val versionsBuilder = Map.newBuilder[UniqueAddress, Long] for (o ← observerReachability) { val observer = addressMapping(o.getAddressIndex) versionsBuilder += ((observer, o.getVersion)) @@ -471,7 +471,7 @@ final class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Se def memberFromProto(member: cm.Member) = new Member(addressMapping(member.getAddressIndex), member.getUpNumber, memberStatusFromInt(member.getStatus.getNumber), - rolesFromProto(member.getRolesIndexesList.asScala)) + rolesFromProto(member.getRolesIndexesList.asScala.toSeq)) def rolesFromProto(roleIndexes: Seq[Integer]): Set[String] = { var containsDc = false diff --git a/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala b/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala index 1a8c7c3c18..58c36b125a 100644 --- a/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala +++ b/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala @@ -19,6 +19,7 @@ import java.util.concurrent.CountDownLatch import akka.testkit.TestActor.AutoPilot import akka.util.JavaDurationConverters +import scala.collection.compat._ /** * Provides factory methods for various Publishers. @@ -363,7 +364,7 @@ object TestSubscriber { * Expect multiple stream elements. */ @annotation.varargs def expectNext(e1: I, e2: I, es: I*): Self = - expectNextN((e1 +: e2 +: es).map(identity)(collection.breakOut)) + expectNextN((e1 +: e2 +: es).iterator.map(identity).to(scala.collection.immutable.IndexedSeq)) /** * Fluent DSL @@ -371,7 +372,7 @@ object TestSubscriber { * Expect multiple stream elements in arbitrary order. */ @annotation.varargs def expectNextUnordered(e1: I, e2: I, es: I*): Self = - expectNextUnorderedN((e1 +: e2 +: es).map(identity)(collection.breakOut)) + expectNextUnorderedN((e1 +: e2 +: es).iterator.map(identity).to(scala.collection.immutable.IndexedSeq)) /** * Expect and return the next `n` stream elements. diff --git a/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala b/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala index c7840b1db0..c051002fbe 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala @@ -186,7 +186,10 @@ import scala.util.control.NonFatal override def offer(element: T): Future[QueueOfferResult] = { val p = Promise[QueueOfferResult] callback.invokeWithFeedback(Offer(element, p)) - .onFailure { case NonFatal(e) ⇒ p.tryFailure(e) }(akka.dispatch.ExecutionContexts.sameThreadExecutionContext) + .onComplete { + case scala.util.Failure(NonFatal(e)) ⇒ p.tryFailure(e) + case _ ⇒ () + }(akka.dispatch.ExecutionContexts.sameThreadExecutionContext) p.future } override def complete(): Unit = callback.invoke(Completion) diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index b2a9b70669..ea91b14a01 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -22,13 +22,14 @@ import akka.stream.stage._ import org.reactivestreams.{ Publisher, Subscriber } import scala.annotation.unchecked.uncheckedVariance -import scala.collection.generic.CanBuildFrom import scala.collection.{ immutable, mutable } import scala.compat.java8.FutureConverters._ import scala.compat.java8.OptionConverters._ import scala.concurrent.{ Future, Promise } import scala.util.control.NonFatal import scala.util.{ Failure, Success, Try } +import scala.collection.immutable +import scala.collection.compat._ /** * INTERNAL API @@ -204,7 +205,7 @@ import scala.util.{ Failure, Success, Try } } override def onUpstreamFinish(): Unit = { - val elements = buffer.result().toList + val elements = buffer.toList buffer.clear() p.trySuccess(elements) completeStage() @@ -265,7 +266,7 @@ import scala.util.{ Failure, Success, Try } /** * INTERNAL API */ -@InternalApi private[akka] final class SeqStage[T, That](implicit cbf: CanBuildFrom[Nothing, T, That with immutable.Traversable[_]]) extends GraphStageWithMaterializedValue[SinkShape[T], Future[That]] { +@InternalApi private[akka] final class SeqStage[T, That](implicit cbf: Factory[T, That with immutable.Iterable[_]]) extends GraphStageWithMaterializedValue[SinkShape[T], Future[That]] { val in = Inlet[T]("seq.in") override def toString: String = "SeqStage" @@ -277,7 +278,7 @@ import scala.util.{ Failure, Success, Try } override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { val p: Promise[That] = Promise() val logic = new GraphStageLogic(shape) with InHandler { - val buf = cbf() + val buf = cbf.newBuilder override def preStart(): Unit = pull(in) @@ -395,7 +396,10 @@ import scala.util.{ Failure, Success, Try } override def pull(): Future[Option[T]] = { val p = Promise[Option[T]] callback.invokeWithFeedback(Pull(p)) - .onFailure { case NonFatal(e) ⇒ p.tryFailure(e) }(akka.dispatch.ExecutionContexts.sameThreadExecutionContext) + .onComplete { + case scala.util.Failure(NonFatal(e)) ⇒ p.tryFailure(e) + case _ ⇒ () + }(akka.dispatch.ExecutionContexts.sameThreadExecutionContext) p.future } override def cancel(): Unit = { diff --git a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala index 59a5c50d0a..0ee0109fc4 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala @@ -589,7 +589,7 @@ import akka.stream.impl.fusing.GraphStages.SingleSource val inlets = module.shape.inlets if (inlets.isEmpty) Map.empty else if (Shape.hasOnePort(inlets)) new Map1(inlets.head, inlets.head.id) - else inlets.map(in ⇒ in.asInstanceOf[InPort] → in.id)(collection.breakOut) + else inlets.iterator.map(in ⇒ in.asInstanceOf[InPort] → in.id).toMap } CompletedTraversalBuilder( traversalSoFar = MaterializeAtomic(module, newOutToSlot), diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala index 157562ccb3..a2a9b64eab 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala @@ -26,6 +26,7 @@ import akka.util.ByteString import scala.collection.immutable import scala.concurrent.duration.{ Duration, FiniteDuration } import scala.concurrent.{ Future, Promise } +import scala.collection.immutable /** * INTERNAL API diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala index 1173b8f3e8..c8a05c42a3 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala @@ -13,7 +13,9 @@ import akka.util.ConstantFun import scala.annotation.unchecked.uncheckedVariance import scala.collection.JavaConverters._ -import akka.stream.scaladsl.GenericGraph +import akka.stream.scaladsl.{ GenericGraph, GenericGraphWithChangedAttributes } +import akka.stream.Attributes +import akka.stream.impl.TraversalBuilder /** * Merge several streams, taking elements as they arrive from input streams diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index 42316cfe7e..2b300f9506 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -18,7 +18,7 @@ import scala.compat.java8.OptionConverters._ import scala.concurrent.ExecutionContext import scala.util.Try import java.util.concurrent.CompletionStage - +import scala.collection.immutable import scala.annotation.unchecked.uncheckedVariance import scala.compat.java8.FutureConverters._ @@ -262,7 +262,7 @@ object Sink { */ def combine[T, U](output1: Sink[U, _], output2: Sink[U, _], rest: java.util.List[Sink[U, _]], strategy: function.Function[java.lang.Integer, Graph[UniformFanOutShape[T, U], NotUsed]]): Sink[T, NotUsed] = { import scala.collection.JavaConverters._ - val seq = if (rest != null) rest.asScala.map(_.asScala) else Seq() + val seq = if (rest != null) rest.asScala.map(_.asScala).toSeq else immutable.Seq() new Sink(scaladsl.Sink.combine(output1.asScala, output2.asScala, seq: _*)(num ⇒ strategy.apply(num))) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 8ceb70835c..2c5a13088e 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -7,7 +7,7 @@ package akka.stream.scaladsl import akka.event.LoggingAdapter import akka.stream._ import akka.Done -import akka.stream.impl._ +import akka.stream.impl.{ LinearTraversalBuilder, ProcessorModule, Timers, SubFlowImpl, TraversalBuilder, Throttle, fusing } import akka.stream.impl.fusing._ import akka.stream.stage._ import akka.util.{ ConstantFun, Timeout } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Hub.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Hub.scala index da46765b7a..098ebc6133 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Hub.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Hub.scala @@ -431,11 +431,12 @@ private[akka] class BroadcastHub[T](bufferSize: Int) extends GraphStageWithMater addConsumer(consumer, startFrom) // in case the consumer is already stopped we need to undo registration implicit val ec = materializer.executionContext - consumer.callback.invokeWithFeedback(Initialize(startFrom)).onFailure { - case _: StreamDetachedException ⇒ + consumer.callback.invokeWithFeedback(Initialize(startFrom)).onComplete { + case scala.util.Failure(_: StreamDetachedException) ⇒ callbackPromise.future.foreach(callback ⇒ callback.invoke(UnRegister(consumer.id, startFrom, startFrom)) ) + case _ ⇒ () } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 518852cad1..cca88d8878 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -17,10 +17,11 @@ import akka.stream.{ javadsl, _ } import org.reactivestreams.{ Publisher, Subscriber } import scala.annotation.tailrec -import scala.collection.generic.CanBuildFrom import scala.collection.immutable import scala.concurrent.{ ExecutionContext, Future } import scala.util.{ Failure, Success, Try } +import scala.collection.immutable +import scala.collection.compat._ /** * A `Sink` is a set of stream processing steps that has one open input. @@ -239,12 +240,12 @@ object Sink { * may be used to ensure boundedness. * Materializes into a `Future` of `That[T]` containing all the collected elements. * `That[T]` is limited to the limitations of the CanBuildFrom associated with it. For example, `Seq` is limited to - * `Int.MaxValue` elements. See [The Architecture of Scala Collections](https://docs.scala-lang.org/overviews/core/architecture-of-scala-collections.html) for more info. + * `Int.MaxValue` elements. See [The Architecture of Scala 2.13's Collections](https://docs.scala-lang.org/overviews/core/architecture-of-scala-213-collections.html) for more info. * This Sink will cancel the stream after having received that many elements. * * See also [[Flow.limit]], [[Flow.limitWeighted]], [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]] */ - def collection[T, That](implicit cbf: CanBuildFrom[Nothing, T, That with immutable.Traversable[_]]): Sink[T, Future[That]] = + def collection[T, That](implicit cbf: Factory[T, That with immutable.Iterable[_]]): Sink[T, Future[That]] = Sink.fromGraph(new SeqStage[T, That]) /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala index 30a6624d4d..9ed696f3e0 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala @@ -25,6 +25,7 @@ import scala.concurrent.Future import scala.concurrent.duration.{ Duration, FiniteDuration } import scala.util.Try import scala.util.control.NoStackTrace +import scala.collection.immutable object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider { diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index ac8c12e62c..660bd9e308 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -50,7 +50,7 @@ private[testkit] class CallingThreadDispatcherQueues extends Extension { // we have to forget about long-gone threads sometime private def gc(): Unit = { - queues = (Map.newBuilder[CallingThreadMailbox, Set[WeakReference[MessageQueue]]] /: queues) { + queues = queues.foldLeft(Map.newBuilder[CallingThreadMailbox, Set[WeakReference[MessageQueue]]]) { case (m, (k, v)) ⇒ val nv = v filter (_.get ne null) if (nv.isEmpty) m else m += (k → nv) diff --git a/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala b/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala index 33db28b44a..1579140b72 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala @@ -16,6 +16,7 @@ import akka.actor.NoSerializationVerificationNeeded import akka.japi.Util.immutableSeq import java.lang.{ Iterable ⇒ JIterable } import akka.util.BoxedType +import scala.collection.compat._ /** * Implementation helpers of the EventFilter facilities: send `Mute` @@ -39,7 +40,7 @@ sealed trait TestEvent */ object TestEvent { object Mute { - def apply(filter: EventFilter, filters: EventFilter*): Mute = new Mute(filter +: filters.to[immutable.Seq]) + def apply(filter: EventFilter, filters: EventFilter*): Mute = new Mute(filter +: filters.to(immutable.Seq)) } final case class Mute(filters: immutable.Seq[EventFilter]) extends TestEvent with NoSerializationVerificationNeeded { /** @@ -48,7 +49,7 @@ object TestEvent { def this(filters: JIterable[EventFilter]) = this(immutableSeq(filters)) } object UnMute { - def apply(filter: EventFilter, filters: EventFilter*): UnMute = new UnMute(filter +: filters.to[immutable.Seq]) + def apply(filter: EventFilter, filters: EventFilter*): UnMute = new UnMute(filter +: filters.to(immutable.Seq)) } final case class UnMute(filters: immutable.Seq[EventFilter]) extends TestEvent with NoSerializationVerificationNeeded { /** diff --git a/akka-testkit/src/main/scala/akka/testkit/package.scala b/akka-testkit/src/main/scala/akka/testkit/package.scala index 6ed264328c..684392cb77 100644 --- a/akka-testkit/src/main/scala/akka/testkit/package.scala +++ b/akka-testkit/src/main/scala/akka/testkit/package.scala @@ -9,12 +9,13 @@ import scala.concurrent.duration.{ Duration, FiniteDuration } import scala.reflect.ClassTag import scala.collection.immutable import java.util.concurrent.TimeUnit.MILLISECONDS +import scala.collection.compat._ package object testkit { def filterEvents[T](eventFilters: Iterable[EventFilter])(block: ⇒ T)(implicit system: ActorSystem): T = { def now = System.currentTimeMillis - system.eventStream.publish(TestEvent.Mute(eventFilters.to[immutable.Seq])) + system.eventStream.publish(TestEvent.Mute(eventFilters.to(immutable.Seq))) try { val result = block @@ -27,7 +28,7 @@ package object testkit { result } finally { - system.eventStream.publish(TestEvent.UnMute(eventFilters.to[immutable.Seq])) + system.eventStream.publish(TestEvent.UnMute(eventFilters.to(immutable.Seq))) } } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index cbcdbef282..6928228dee 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -15,7 +15,7 @@ object Dependencies { lazy val scalaCheckVersion = settingKey[String]("The version of ScalaCheck to use.") lazy val java8CompatVersion = settingKey[String]("The version of scala-java8-compat to use.") val junitVersion = "4.12" - val sslConfigVersion = "0.3.6" + val sslConfigVersion = "0.3.7" val slf4jVersion = "1.7.25" val scalaXmlVersion = "1.0.6" val aeronVersion = "1.12.0" @@ -29,7 +29,7 @@ object Dependencies { case Some((2, n)) if n >= 12 ⇒ "1.14.0" // does not work for 2.11 case _ ⇒ "1.13.2" }), - scalaTestVersion := "3.0.5", + scalaTestVersion := "3.0.6-SNAP5", java8CompatVersion := { CrossVersion.partialVersion(scalaVersion.value) match { case Some((2, n)) if n >= 13 ⇒ "0.9.0" diff --git a/project/Doc.scala b/project/Doc.scala index 00e19dc3c2..003940ce09 100644 --- a/project/Doc.scala +++ b/project/Doc.scala @@ -61,7 +61,10 @@ object Scaladoc extends AutoPlugin { if (name.endsWith(".html") && !name.startsWith("index-") && !name.equals("index.html") && !name.equals("package.html")) { val source = scala.io.Source.fromFile(f)(scala.io.Codec.UTF8) - val hd = try source.getLines().exists(_.contains("
")) + val hd = try source.getLines().exists(lines => + lines.contains("
") || + lines.contains("