scala3: show scala3-compatible code in most of akka-docs (#30974)
* scala3: show scala3-compatible code in most of akka-docs There are some Streams samples that don't compile with Scala 3 yet. * Fix warnings/errors * scalafmtSbt
This commit is contained in:
parent
8015a0c0c8
commit
602eb45dcc
25 changed files with 81 additions and 70 deletions
|
|
@ -485,7 +485,7 @@ class ActorDocSpec extends AkkaSpec("""
|
|||
import scala.concurrent.duration._
|
||||
import akka.util.Timeout
|
||||
import akka.pattern.ask
|
||||
implicit val timeout = Timeout(5 seconds)
|
||||
implicit val timeout: Timeout = 5.seconds
|
||||
val future = myActor ? "hello"
|
||||
//#using-implicit-timeout
|
||||
Await.result(future, timeout.duration) should be("hello")
|
||||
|
|
@ -683,7 +683,7 @@ class ActorDocSpec extends AkkaSpec("""
|
|||
final case class Result(x: Int, s: String, d: Double)
|
||||
case object Request
|
||||
|
||||
implicit val timeout = Timeout(5 seconds) // needed for `?` below
|
||||
implicit val timeout: Timeout = 5.seconds // needed for `?` below
|
||||
|
||||
val f: Future[Result] =
|
||||
for {
|
||||
|
|
|
|||
|
|
@ -46,7 +46,7 @@ class SharedMutableStateDocSpec {
|
|||
def receive = {
|
||||
case _ =>
|
||||
implicit val ec = context.dispatcher
|
||||
implicit val timeout = Timeout(5 seconds) // needed for `?` below
|
||||
implicit val timeout: Timeout = 5.seconds // needed for `?` below
|
||||
|
||||
// Example of incorrect approach
|
||||
// Very bad: shared mutable state will cause your
|
||||
|
|
|
|||
|
|
@ -42,8 +42,8 @@ class CoordinatedActorShutdownSpec {
|
|||
val myActor = context.spawn(MyActor.behavior, "my-actor")
|
||||
//#coordinated-shutdown-addTask
|
||||
CoordinatedShutdown(context.system).addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "someTaskName") { () =>
|
||||
implicit val timeout = Timeout(5.seconds)
|
||||
myActor.ask(MyActor.Stop)
|
||||
implicit val timeout: Timeout = 5.seconds
|
||||
myActor.ask(MyActor.Stop(_))
|
||||
}
|
||||
//#coordinated-shutdown-addTask
|
||||
|
||||
|
|
|
|||
|
|
@ -68,18 +68,18 @@ class SharedMutableStateDocSpec {
|
|||
// Another example of incorrect approach
|
||||
// mutating actor state from ask future callback
|
||||
import akka.actor.typed.scaladsl.AskPattern._
|
||||
implicit val timeout = Timeout(5.seconds) // needed for `ask` below
|
||||
implicit val timeout: Timeout = 5.seconds // needed for `ask` below
|
||||
implicit val scheduler = context.system.scheduler
|
||||
val future: Future[String] = otherActor.ask(Query)
|
||||
val future: Future[String] = otherActor.ask(Query(_))
|
||||
future.foreach { result =>
|
||||
state = result
|
||||
}
|
||||
|
||||
// use context.ask instead, turns the completion
|
||||
// into a message sent to self
|
||||
context.ask(otherActor, Query) {
|
||||
case Success(result) => UpdateState(result)
|
||||
case Failure(ex) => throw ex
|
||||
context.ask(otherActor, Query(_)) {
|
||||
case Success(result: String) => UpdateState(result)
|
||||
case Failure(ex) => throw ex
|
||||
}
|
||||
this
|
||||
|
||||
|
|
|
|||
|
|
@ -56,7 +56,7 @@ object TransformationFrontend {
|
|||
val counter = new AtomicInteger
|
||||
import system.dispatcher
|
||||
system.scheduler.scheduleWithFixedDelay(2.seconds, 2.seconds) { () =>
|
||||
implicit val timeout = Timeout(5 seconds)
|
||||
implicit val timeout: Timeout = 5.seconds
|
||||
(frontend ? TransformationJob("hello-" + counter.incrementAndGet())).foreach { result =>
|
||||
println(result)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -104,10 +104,10 @@ class DistributedDataDocSpec extends AkkaSpec(DistributedDataDocSpec.config) {
|
|||
|
||||
"demonstrate update" in {
|
||||
val probe = TestProbe()
|
||||
implicit val self = probe.ref
|
||||
implicit val self: ActorRef = probe.ref
|
||||
|
||||
//#update
|
||||
implicit val node = DistributedData(system).selfUniqueAddress
|
||||
implicit val node: SelfUniqueAddress = DistributedData(system).selfUniqueAddress
|
||||
val replicator = DistributedData(system).replicator
|
||||
|
||||
val Counter1Key = PNCounterKey("counter1")
|
||||
|
|
@ -148,7 +148,7 @@ class DistributedDataDocSpec extends AkkaSpec(DistributedDataDocSpec.config) {
|
|||
"demonstrate update with request context" in {
|
||||
import Actor.Receive
|
||||
val probe = TestProbe()
|
||||
implicit val self = probe.ref
|
||||
implicit val self: ActorRef = probe.ref
|
||||
def sender() = self
|
||||
|
||||
//#update-request-context
|
||||
|
|
@ -173,7 +173,7 @@ class DistributedDataDocSpec extends AkkaSpec(DistributedDataDocSpec.config) {
|
|||
|
||||
"demonstrate get" in {
|
||||
val probe = TestProbe()
|
||||
implicit val self = probe.ref
|
||||
implicit val self: ActorRef = probe.ref
|
||||
|
||||
//#get
|
||||
val replicator = DistributedData(system).replicator
|
||||
|
|
@ -220,7 +220,7 @@ class DistributedDataDocSpec extends AkkaSpec(DistributedDataDocSpec.config) {
|
|||
"demonstrate get with request context" in {
|
||||
import Actor.Receive
|
||||
val probe = TestProbe()
|
||||
implicit val self = probe.ref
|
||||
implicit val self: ActorRef = probe.ref
|
||||
def sender() = self
|
||||
|
||||
//#get-request-context
|
||||
|
|
@ -248,7 +248,7 @@ class DistributedDataDocSpec extends AkkaSpec(DistributedDataDocSpec.config) {
|
|||
"demonstrate subscribe" in {
|
||||
import Actor.Receive
|
||||
val probe = TestProbe()
|
||||
implicit val self = probe.ref
|
||||
implicit val self: ActorRef = probe.ref
|
||||
def sender() = self
|
||||
|
||||
//#subscribe
|
||||
|
|
@ -270,7 +270,7 @@ class DistributedDataDocSpec extends AkkaSpec(DistributedDataDocSpec.config) {
|
|||
|
||||
"demonstrate delete" in {
|
||||
val probe = TestProbe()
|
||||
implicit val self = probe.ref
|
||||
implicit val self: ActorRef = probe.ref
|
||||
|
||||
//#delete
|
||||
val replicator = DistributedData(system).replicator
|
||||
|
|
@ -372,7 +372,7 @@ class DistributedDataDocSpec extends AkkaSpec(DistributedDataDocSpec.config) {
|
|||
case class Record(version: Int, name: String, address: String)
|
||||
|
||||
implicit val node = DistributedData(system).selfUniqueAddress
|
||||
implicit val recordClock = new LWWRegister.Clock[Record] {
|
||||
implicit val recordClock: LWWRegister.Clock[Record] = new LWWRegister.Clock[Record] {
|
||||
override def apply(currentTimestamp: Long, value: Record): Long =
|
||||
value.version
|
||||
}
|
||||
|
|
|
|||
|
|
@ -158,7 +158,7 @@ class FutureDocSpec extends AkkaSpec {
|
|||
import akka.util.Timeout
|
||||
import scala.concurrent.duration._
|
||||
|
||||
implicit val timeout = Timeout(5 seconds)
|
||||
implicit val timeout: Timeout = 5.seconds
|
||||
val future = actor ? msg // enabled by the “ask” import
|
||||
val result = Await.result(future, timeout.duration).asInstanceOf[String]
|
||||
//#ask-blocking
|
||||
|
|
@ -169,7 +169,7 @@ class FutureDocSpec extends AkkaSpec {
|
|||
"demonstrate usage of mapTo" in {
|
||||
val actor = system.actorOf(Props[MyActor]())
|
||||
val msg = "hello"
|
||||
implicit val timeout = Timeout(5 seconds)
|
||||
implicit val timeout: Timeout = 5.seconds
|
||||
//#map-to
|
||||
import scala.concurrent.Future
|
||||
import akka.pattern.ask
|
||||
|
|
@ -285,7 +285,7 @@ class FutureDocSpec extends AkkaSpec {
|
|||
val actor3 = system.actorOf(Props[MyActor]())
|
||||
val msg1 = 1
|
||||
val msg2 = 2
|
||||
implicit val timeout = Timeout(5 seconds)
|
||||
implicit val timeout: Timeout = 5.seconds
|
||||
import scala.concurrent.Await
|
||||
import akka.pattern.ask
|
||||
//#composing-wrong
|
||||
|
|
@ -309,7 +309,7 @@ class FutureDocSpec extends AkkaSpec {
|
|||
val actor3 = system.actorOf(Props[MyActor]())
|
||||
val msg1 = 1
|
||||
val msg2 = 2
|
||||
implicit val timeout = Timeout(5 seconds)
|
||||
implicit val timeout: Timeout = 5.seconds
|
||||
import scala.concurrent.Await
|
||||
import akka.pattern.ask
|
||||
//#composing
|
||||
|
|
@ -330,7 +330,7 @@ class FutureDocSpec extends AkkaSpec {
|
|||
}
|
||||
|
||||
"demonstrate usage of sequence with actors" in {
|
||||
implicit val timeout = Timeout(5 seconds)
|
||||
implicit val timeout: Timeout = 5.seconds
|
||||
val oddActor = system.actorOf(Props[OddActor]())
|
||||
//#sequence-ask
|
||||
// oddActor returns odd numbers sequentially from 1 as a List[Future[Int]]
|
||||
|
|
@ -385,7 +385,7 @@ class FutureDocSpec extends AkkaSpec {
|
|||
}
|
||||
|
||||
"demonstrate usage of recover" in {
|
||||
implicit val timeout = Timeout(5 seconds)
|
||||
implicit val timeout: Timeout = 5.seconds
|
||||
val actor = system.actorOf(Props[MyActor]())
|
||||
val msg1 = -1
|
||||
//#recover
|
||||
|
|
@ -394,11 +394,11 @@ class FutureDocSpec extends AkkaSpec {
|
|||
}
|
||||
future.foreach(println)
|
||||
//#recover
|
||||
Await.result(future, 3 seconds) should be(0)
|
||||
Await.result(future, 3.seconds) should be(0)
|
||||
}
|
||||
|
||||
"demonstrate usage of recoverWith" in {
|
||||
implicit val timeout = Timeout(5 seconds)
|
||||
implicit val timeout: Timeout = 5.seconds
|
||||
val actor = system.actorOf(Props[MyActor]())
|
||||
val msg1 = -1
|
||||
//#try-recover
|
||||
|
|
|
|||
|
|
@ -62,7 +62,7 @@ object PersistencePluginDocSpec {
|
|||
}
|
||||
|
||||
class PersistencePluginDocSpec extends AnyWordSpec {
|
||||
new AnyRef {
|
||||
{
|
||||
val providerConfig =
|
||||
"""
|
||||
//#journal-plugin-config
|
||||
|
|
@ -142,7 +142,7 @@ object SharedLeveldbPluginDocSpec {
|
|||
trait SharedLeveldbPluginDocSpec {
|
||||
val system: ActorSystem
|
||||
|
||||
new AnyRef {
|
||||
{
|
||||
import akka.actor._
|
||||
//#shared-store-creation
|
||||
import akka.persistence.journal.leveldb.SharedLeveldbStore
|
||||
|
|
@ -181,7 +181,7 @@ class MySnapshotStore extends SnapshotStore {
|
|||
}
|
||||
|
||||
object PersistenceTCKDoc {
|
||||
new AnyRef {
|
||||
object example1 {
|
||||
import akka.persistence.journal.JournalSpec
|
||||
|
||||
//#journal-tck-scala
|
||||
|
|
@ -197,7 +197,7 @@ object PersistenceTCKDoc {
|
|||
}
|
||||
//#journal-tck-scala
|
||||
}
|
||||
new AnyRef {
|
||||
object example2 {
|
||||
import akka.persistence.snapshot.SnapshotStoreSpec
|
||||
|
||||
//#snapshot-store-tck-scala
|
||||
|
|
@ -212,7 +212,7 @@ object PersistenceTCKDoc {
|
|||
}
|
||||
//#snapshot-store-tck-scala
|
||||
}
|
||||
new AnyRef {
|
||||
object example3 {
|
||||
import java.io.File
|
||||
|
||||
import akka.persistence.journal.JournalSpec
|
||||
|
|
|
|||
|
|
@ -168,23 +168,23 @@ class BidiFlowDocSpec extends AkkaSpec {
|
|||
// test it by plugging it into its own inverse and closing the right end
|
||||
val pingpong = Flow[Message].collect { case Ping(id) => Pong(id) }
|
||||
val flow = stack.atop(stack.reversed).join(pingpong)
|
||||
val result = Source((0 to 9).map(Ping)).via(flow).limit(20).runWith(Sink.seq)
|
||||
Await.result(result, 1.second) should ===((0 to 9).map(Pong))
|
||||
val result = Source((0 to 9).map(Ping(_))).via(flow).limit(20).runWith(Sink.seq)
|
||||
Await.result(result, 1.second) should ===((0 to 9).map(Pong(_)))
|
||||
//#compose
|
||||
}
|
||||
|
||||
"work when chopped up" in {
|
||||
val stack = codec.atop(framing)
|
||||
val flow = stack.atop(chopUp).atop(stack.reversed).join(Flow[Message].map { case Ping(id) => Pong(id) })
|
||||
val f = Source((0 to 9).map(Ping)).via(flow).limit(20).runWith(Sink.seq)
|
||||
Await.result(f, 1.second) should ===((0 to 9).map(Pong))
|
||||
val f = Source((0 to 9).map(Ping(_))).via(flow).limit(20).runWith(Sink.seq)
|
||||
Await.result(f, 1.second) should ===((0 to 9).map(Pong(_)))
|
||||
}
|
||||
|
||||
"work when accumulated" in {
|
||||
val stack = codec.atop(framing)
|
||||
val flow = stack.atop(accumulate).atop(stack.reversed).join(Flow[Message].map { case Ping(id) => Pong(id) })
|
||||
val f = Source((0 to 9).map(Ping)).via(flow).limit(20).runWith(Sink.seq)
|
||||
Await.result(f, 1.second) should ===((0 to 9).map(Pong))
|
||||
val f = Source((0 to 9).map(Ping(_))).via(flow).limit(20).runWith(Sink.seq)
|
||||
Await.result(f, 1.second) should ===((0 to 9).map(Pong(_)))
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -200,7 +200,7 @@ class CompositionDocSpec extends AkkaSpec {
|
|||
|
||||
// Materializes to Future[OutgoingConnection] (yellow)
|
||||
val flow3: Flow[ByteString, ByteString, Future[OutgoingConnection]] =
|
||||
Tcp().outgoingConnection("localhost", 8080)
|
||||
Tcp(system).outgoingConnection("localhost", 8080)
|
||||
|
||||
// Materializes to Future[OutgoingConnection] (yellow)
|
||||
val nestedFlow: Flow[Int, ByteString, Future[OutgoingConnection]] =
|
||||
|
|
|
|||
|
|
@ -144,7 +144,7 @@ class FlowDocSpec extends AkkaSpec with CompileOnlySpec {
|
|||
"various ways of transforming materialized values" in {
|
||||
import scala.concurrent.duration._
|
||||
|
||||
val throttler = Flow.fromGraph(GraphDSL.create(Source.tick(1.second, 1.second, "test")) {
|
||||
val throttler = Flow.fromGraph(GraphDSL.createGraph(Source.tick(1.second, 1.second, "test")) {
|
||||
implicit builder => tickSource =>
|
||||
import GraphDSL.Implicits._
|
||||
val zip = builder.add(ZipWith[String, Int, Int](Keep.right))
|
||||
|
|
@ -207,7 +207,7 @@ class FlowDocSpec extends AkkaSpec with CompileOnlySpec {
|
|||
|
||||
// The result of r11 can be also achieved by using the Graph API
|
||||
val r12: RunnableGraph[(Promise[Option[Int]], Cancellable, Future[Int])] =
|
||||
RunnableGraph.fromGraph(GraphDSL.create(source, flow, sink)((_, _, _)) { implicit builder => (src, f, dst) =>
|
||||
RunnableGraph.fromGraph(GraphDSL.createGraph(source, flow, sink)((_, _, _)) { implicit builder => (src, f, dst) =>
|
||||
import GraphDSL.Implicits._
|
||||
src ~> f ~> dst
|
||||
ClosedShape
|
||||
|
|
|
|||
|
|
@ -74,7 +74,7 @@ class GraphDSLDocSpec extends AkkaSpec {
|
|||
// format: OFF
|
||||
val g =
|
||||
//#graph-dsl-reusing-a-flow
|
||||
RunnableGraph.fromGraph(GraphDSL.create(topHeadSink, bottomHeadSink)((_, _)) { implicit builder =>
|
||||
RunnableGraph.fromGraph(GraphDSL.createGraph(topHeadSink, bottomHeadSink)((_, _)) { implicit builder =>
|
||||
(topHS, bottomHS) =>
|
||||
import GraphDSL.Implicits._
|
||||
val broadcast = builder.add(Broadcast[Int](2))
|
||||
|
|
@ -192,7 +192,7 @@ class GraphDSLDocSpec extends AkkaSpec {
|
|||
"access to materialized value" in {
|
||||
//#graph-dsl-matvalue
|
||||
import GraphDSL.Implicits._
|
||||
val foldFlow: Flow[Int, Int, Future[Int]] = Flow.fromGraph(GraphDSL.create(Sink.fold[Int, Int](0)(_ + _)) {
|
||||
val foldFlow: Flow[Int, Int, Future[Int]] = Flow.fromGraph(GraphDSL.createGraph(Sink.fold[Int, Int](0)(_ + _)) {
|
||||
implicit builder => fold =>
|
||||
FlowShape(fold.in, builder.materializedValue.mapAsync(4)(identity).outlet)
|
||||
})
|
||||
|
|
@ -204,7 +204,7 @@ class GraphDSLDocSpec extends AkkaSpec {
|
|||
import GraphDSL.Implicits._
|
||||
// This cannot produce any value:
|
||||
val cyclicFold: Source[Int, Future[Int]] =
|
||||
Source.fromGraph(GraphDSL.create(Sink.fold[Int, Int](0)(_ + _)) { implicit builder => fold =>
|
||||
Source.fromGraph(GraphDSL.createGraph(Sink.fold[Int, Int](0)(_ + _)) { implicit builder => fold =>
|
||||
// - Fold cannot complete until its upstream mapAsync completes
|
||||
// - mapAsync cannot complete until the materialized Future produced by
|
||||
// fold completes
|
||||
|
|
|
|||
|
|
@ -140,7 +140,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
|
|||
|
||||
"ask" in {
|
||||
//#ask
|
||||
implicit val askTimeout = Timeout(5.seconds)
|
||||
implicit val askTimeout: Timeout = 5.seconds
|
||||
val words: Source[String, NotUsed] =
|
||||
Source(List("hello", "hi"))
|
||||
|
||||
|
|
@ -384,7 +384,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
|
|||
|
||||
val akkaTweets: Source[Tweet, NotUsed] = tweets.filter(_.hashtags.contains(akkaTag))
|
||||
|
||||
implicit val timeout = Timeout(3.seconds)
|
||||
implicit val timeout: Timeout = 3.seconds
|
||||
val saveTweets: RunnableGraph[NotUsed] =
|
||||
akkaTweets.mapAsync(4)(tweet => database ? Save(tweet)).to(Sink.ignore)
|
||||
//#save-tweets
|
||||
|
|
|
|||
|
|
@ -41,9 +41,9 @@ class ReactiveStreamsDocSpec extends AkkaSpec {
|
|||
override def tweets: Publisher[Tweet] =
|
||||
TwitterStreamQuickstartDocSpec.tweets.runWith(Sink.asPublisher(fanout = false))
|
||||
|
||||
override def storage = TestSubscriber.manualProbe[Author]()
|
||||
override def storage: TestSubscriber.ManualProbe[Author] = TestSubscriber.manualProbe[Author]()
|
||||
|
||||
override def alert = TestSubscriber.manualProbe[Author]()
|
||||
override def alert: TestSubscriber.ManualProbe[Author] = TestSubscriber.manualProbe[Author]()
|
||||
}
|
||||
|
||||
def assertResult(storage: TestSubscriber.ManualProbe[Author]): Unit = {
|
||||
|
|
|
|||
|
|
@ -31,7 +31,7 @@ class StreamPartialGraphDSLDocSpec extends AkkaSpec {
|
|||
|
||||
val resultSink = Sink.head[Int]
|
||||
|
||||
val g = RunnableGraph.fromGraph(GraphDSL.create(resultSink) { implicit b => sink =>
|
||||
val g = RunnableGraph.fromGraph(GraphDSL.createGraph(resultSink) { implicit b => sink =>
|
||||
import GraphDSL.Implicits._
|
||||
|
||||
// importing the partial graph will return its shape (inlets & outlets)
|
||||
|
|
|
|||
|
|
@ -24,7 +24,7 @@ class RecipeDroppyBroadcast extends RecipeSpec {
|
|||
val mySink3 = Sink.fromSubscriber(sub3)
|
||||
|
||||
//#droppy-bcast
|
||||
val graph = RunnableGraph.fromGraph(GraphDSL.create(mySink1, mySink2, mySink3)((_, _, _)) {
|
||||
val graph = RunnableGraph.fromGraph(GraphDSL.createGraph(mySink1, mySink2, mySink3)((_, _, _)) {
|
||||
implicit b => (sink1, sink2, sink3) =>
|
||||
import GraphDSL.Implicits._
|
||||
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@
|
|||
package docs.stream.cookbook
|
||||
|
||||
import akka.event.Logging
|
||||
import akka.event.LoggingAdapter
|
||||
import akka.stream.Attributes
|
||||
import akka.stream.scaladsl.{ Sink, Source }
|
||||
import akka.testkit.{ EventFilter, TestProbe }
|
||||
|
|
@ -29,10 +30,9 @@ class RecipeLoggingElements extends RecipeSpec {
|
|||
printProbe.expectMsgAllOf("1", "2", "3")
|
||||
}
|
||||
|
||||
val mySource = Source(List("1", "2", "3"))
|
||||
def analyse(s: String) = s
|
||||
"use log()" in {
|
||||
val mySource = Source(List("1", "2", "3"))
|
||||
def analyse(s: String) = s
|
||||
|
||||
//#log-custom
|
||||
// customise log levels
|
||||
mySource
|
||||
|
|
@ -40,9 +40,13 @@ class RecipeLoggingElements extends RecipeSpec {
|
|||
.withAttributes(Attributes
|
||||
.logLevels(onElement = Logging.WarningLevel, onFinish = Logging.InfoLevel, onFailure = Logging.DebugLevel))
|
||||
.map(analyse)
|
||||
//#log-custom
|
||||
}
|
||||
|
||||
"use log() with custom adapter" in {
|
||||
//#log-custom
|
||||
// or provide custom logging adapter
|
||||
implicit val adapter = Logging(system, "customLogger")
|
||||
implicit val adapter: LoggingAdapter = Logging(system, "customLogger")
|
||||
mySource.log("custom")
|
||||
//#log-custom
|
||||
|
||||
|
|
|
|||
|
|
@ -27,7 +27,7 @@ class StreamTcpDocSpec extends AkkaSpec {
|
|||
{
|
||||
//#echo-server-simple-bind
|
||||
val binding: Future[ServerBinding] =
|
||||
Tcp().bind("127.0.0.1", 8888).to(Sink.ignore).run()
|
||||
Tcp(system).bind("127.0.0.1", 8888).to(Sink.ignore).run()
|
||||
|
||||
binding.map { b =>
|
||||
b.unbind().onComplete {
|
||||
|
|
@ -42,7 +42,7 @@ class StreamTcpDocSpec extends AkkaSpec {
|
|||
import akka.stream.scaladsl.Framing
|
||||
|
||||
val connections: Source[IncomingConnection, Future[ServerBinding]] =
|
||||
Tcp().bind(host, port)
|
||||
Tcp(system).bind(host, port)
|
||||
connections.runForeach { connection =>
|
||||
println(s"New connection from: ${connection.remoteAddress}")
|
||||
|
||||
|
|
@ -61,7 +61,7 @@ class StreamTcpDocSpec extends AkkaSpec {
|
|||
"initial server banner echo server" in {
|
||||
val localhost = SocketUtil.temporaryServerAddress()
|
||||
|
||||
val connections = Tcp().bind(localhost.getHostString, localhost.getPort)
|
||||
val connections = Tcp(system).bind(localhost.getHostString, localhost.getPort)
|
||||
val serverProbe = TestProbe()
|
||||
|
||||
import akka.stream.scaladsl.Framing
|
||||
|
|
@ -111,12 +111,12 @@ class StreamTcpDocSpec extends AkkaSpec {
|
|||
{
|
||||
// just for docs, never actually used
|
||||
//#repl-client
|
||||
val connection = Tcp().outgoingConnection("127.0.0.1", 8888)
|
||||
val connection = Tcp(system).outgoingConnection("127.0.0.1", 8888)
|
||||
//#repl-client
|
||||
}
|
||||
|
||||
{
|
||||
val connection = Tcp().outgoingConnection(localhost)
|
||||
val connection = Tcp(system).outgoingConnection(localhost)
|
||||
//#repl-client
|
||||
|
||||
val replParser =
|
||||
|
|
|
|||
|
|
@ -34,7 +34,7 @@ object BroadcastDocExample {
|
|||
|
||||
val (count: Future[Int], min: Future[Int], max: Future[Int]) =
|
||||
RunnableGraph
|
||||
.fromGraph(GraphDSL.create(countSink, minSink, maxSink)(Tuple3.apply) {
|
||||
.fromGraph(GraphDSL.createGraph(countSink, minSink, maxSink)(Tuple3.apply) {
|
||||
implicit builder => (countS, minS, maxS) =>
|
||||
import GraphDSL.Implicits._
|
||||
val broadcast = builder.add(Broadcast[Int](3))
|
||||
|
|
@ -48,7 +48,7 @@ object BroadcastDocExample {
|
|||
//#broadcast
|
||||
|
||||
//#broadcast-async
|
||||
RunnableGraph.fromGraph(GraphDSL.create(countSink, minSink, maxSink)(Tuple3.apply) {
|
||||
RunnableGraph.fromGraph(GraphDSL.createGraph(countSink, minSink, maxSink)(Tuple3.apply) {
|
||||
implicit builder => (countS, minS, maxS) =>
|
||||
import GraphDSL.Implicits._
|
||||
val broadcast = builder.add(Broadcast[Int](3))
|
||||
|
|
|
|||
|
|
@ -34,7 +34,7 @@ object FromSinkAndSource {
|
|||
|
||||
val serverFlow = Flow.fromSinkAndSource(sink, source)
|
||||
|
||||
Tcp().bind("127.0.0.1", 9999, halfClose = true).runForeach { incomingConnection =>
|
||||
Tcp(system).bind("127.0.0.1", 9999, halfClose = true).runForeach { incomingConnection =>
|
||||
incomingConnection.handleWith(serverFlow)
|
||||
}
|
||||
// #halfClosedTcpServer
|
||||
|
|
@ -51,7 +51,7 @@ object FromSinkAndSource {
|
|||
|
||||
val serverFlow = Flow.fromSinkAndSource(sinkWithFraming, sourceWithFraming)
|
||||
|
||||
Tcp().bind("127.0.0.1", 9999).runForeach { incomingConnection =>
|
||||
Tcp(system).bind("127.0.0.1", 9999).runForeach { incomingConnection =>
|
||||
incomingConnection.handleWith(serverFlow)
|
||||
}
|
||||
// #chat
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ class FutureFlow {
|
|||
|
||||
val source: Source[String, NotUsed] =
|
||||
Source(1 to 10).prefixAndTail(1).flatMapConcat {
|
||||
case (List(id), tail) =>
|
||||
case (List(id: Int), tail) =>
|
||||
// base the Future flow creation on the first element
|
||||
tail.via(Flow.futureFlow(processingFlow(id)))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -45,7 +45,7 @@ object Tick {
|
|||
.tick(1.second, 1.second, "tick")
|
||||
.mapAsync(1) { _ =>
|
||||
implicit val timeout: Timeout = 3.seconds
|
||||
val response: Future[MyActor.Response] = myActor.ask(MyActor.Query)
|
||||
val response: Future[MyActor.Response] = myActor.ask(MyActor.Query(_))
|
||||
response
|
||||
}
|
||||
.map(_.text);
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ import scala.concurrent.duration._
|
|||
|
||||
class Monitor {
|
||||
|
||||
implicit val system = ActorSystem("monitor-sample-sys2")
|
||||
implicit val system: ActorSystem = ActorSystem("monitor-sample-sys2")
|
||||
implicit val executionContext: ExecutionContextExecutor = system.dispatcher
|
||||
|
||||
// #monitor
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ import akka.testkit.TestProbe
|
|||
//#imports-test-probe
|
||||
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.Future
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
object TestKitDocSpec {
|
||||
|
|
@ -169,7 +170,7 @@ class TestKitDocSpec extends AkkaSpec with DefaultTimeout with ImplicitSender {
|
|||
|
||||
val actorRef = TestActorRef(new MyActor)
|
||||
// hypothetical message stimulating a '42' answer
|
||||
val future = actorRef ? Say42
|
||||
val future: Future[Any] = actorRef ? Say42
|
||||
future.futureValue should be(42)
|
||||
//#test-behavior
|
||||
}
|
||||
|
|
@ -321,7 +322,7 @@ class TestKitDocSpec extends AkkaSpec with DefaultTimeout with ImplicitSender {
|
|||
import akka.testkit.EventFilter
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
||||
implicit val system = ActorSystem(
|
||||
implicit val system: ActorSystem = ActorSystem(
|
||||
"testsystem",
|
||||
ConfigFactory.parseString("""
|
||||
akka.loggers = ["akka.testkit.TestEventListener"]
|
||||
|
|
|
|||
|
|
@ -84,10 +84,16 @@ object AkkaDisciplinePlugin extends AutoPlugin {
|
|||
val docs =
|
||||
Seq(
|
||||
Compile / scalacOptions -= defaultScalaOptions,
|
||||
Compile / scalacOptions += "-Wconf:cat=unused:s,cat=deprecation:s,cat=unchecked:s,any:e",
|
||||
Compile / scalacOptions ++= (
|
||||
if (scalaVersion.value.startsWith("3.")) Nil
|
||||
else Seq("-Wconf:cat=unused:s,cat=deprecation:s,cat=unchecked:s,any:e")
|
||||
),
|
||||
Test / scalacOptions --= Seq("-Xlint", "-unchecked", "-deprecation"),
|
||||
Test / scalacOptions -= defaultScalaOptions,
|
||||
Test / scalacOptions += "-Wconf:cat=unused:s,cat=deprecation:s,cat=unchecked:s,any:e",
|
||||
Test / scalacOptions ++= (
|
||||
if (scalaVersion.value.startsWith("3.")) Nil
|
||||
else Seq("-Wconf:cat=unused:s,cat=deprecation:s,cat=unchecked:s,any:e")
|
||||
),
|
||||
Compile / doc / scalacOptions := Seq())
|
||||
|
||||
lazy val disciplineSettings =
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue