2018-10-29 17:19:37 +08:00
|
|
|
/*
|
2019-01-02 18:55:26 +08:00
|
|
|
* Copyright (C) 2014-2019 Lightbend Inc. <https://www.lightbend.com>
|
2014-12-18 10:34:59 +01:00
|
|
|
*/
|
2018-03-13 23:45:55 +09:00
|
|
|
|
2014-12-18 10:34:59 +01:00
|
|
|
package docs.stream
|
|
|
|
|
|
2016-01-20 10:00:37 +02:00
|
|
|
import akka.NotUsed
|
|
|
|
|
|
2014-12-18 10:34:59 +01:00
|
|
|
import scala.concurrent.duration._
|
2016-02-25 14:27:45 +01:00
|
|
|
import akka.testkit.AkkaSpec
|
2015-04-02 22:54:32 +02:00
|
|
|
import akka.stream.scaladsl._
|
2018-05-22 18:27:54 +09:00
|
|
|
import akka.stream._
|
2018-04-12 22:06:37 +09:00
|
|
|
|
2014-12-18 10:34:59 +01:00
|
|
|
import scala.concurrent.Future
|
|
|
|
|
import akka.testkit.TestProbe
|
2018-04-12 22:06:37 +09:00
|
|
|
import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Status }
|
2014-12-18 10:34:59 +01:00
|
|
|
import com.typesafe.config.ConfigFactory
|
|
|
|
|
import akka.util.Timeout
|
2018-04-12 22:06:37 +09:00
|
|
|
|
2014-12-18 10:34:59 +01:00
|
|
|
import scala.concurrent.ExecutionContext
|
|
|
|
|
import java.util.concurrent.atomic.AtomicInteger
|
2018-04-12 22:06:37 +09:00
|
|
|
|
2015-04-09 15:16:59 +02:00
|
|
|
import akka.stream.scaladsl.Flow
|
2016-10-26 10:24:51 +02:00
|
|
|
import akka.Done
|
2018-04-12 22:06:37 +09:00
|
|
|
import akka.actor.Status.Status
|
2018-05-22 18:27:54 +09:00
|
|
|
import akka.stream.QueueOfferResult.{ Dropped, Enqueued }
|
2014-12-18 10:34:59 +01:00
|
|
|
|
|
|
|
|
object IntegrationDocSpec {
|
|
|
|
|
import TwitterStreamQuickstartDocSpec._
|
|
|
|
|
|
|
|
|
|
val config = ConfigFactory.parseString("""
|
|
|
|
|
#//#blocking-dispatcher-config
|
|
|
|
|
blocking-dispatcher {
|
|
|
|
|
executor = "thread-pool-executor"
|
|
|
|
|
thread-pool-executor {
|
|
|
|
|
core-pool-size-min = 10
|
|
|
|
|
core-pool-size-max = 10
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
#//#blocking-dispatcher-config
|
|
|
|
|
|
|
|
|
|
akka.actor.default-mailbox.mailbox-type = akka.dispatch.UnboundedMailbox
|
|
|
|
|
""")
|
|
|
|
|
|
|
|
|
|
class AddressSystem {
|
|
|
|
|
//#email-address-lookup
|
|
|
|
|
def lookupEmail(handle: String): Future[Option[String]] =
|
|
|
|
|
//#email-address-lookup
|
|
|
|
|
Future.successful(Some(handle + "@somewhere.com"))
|
|
|
|
|
|
|
|
|
|
//#phone-lookup
|
|
|
|
|
def lookupPhoneNumber(handle: String): Future[Option[String]] =
|
|
|
|
|
//#phone-lookup
|
|
|
|
|
Future.successful(Some(handle.hashCode.toString))
|
|
|
|
|
}
|
|
|
|
|
|
2015-02-04 09:26:32 +01:00
|
|
|
class AddressSystem2 {
|
|
|
|
|
//#email-address-lookup2
|
|
|
|
|
def lookupEmail(handle: String): Future[String] =
|
|
|
|
|
//#email-address-lookup2
|
|
|
|
|
Future.successful(handle + "@somewhere.com")
|
|
|
|
|
}
|
|
|
|
|
|
2014-12-18 10:34:59 +01:00
|
|
|
final case class Email(to: String, title: String, body: String)
|
|
|
|
|
final case class TextMessage(to: String, body: String)
|
|
|
|
|
|
|
|
|
|
class EmailServer(probe: ActorRef) {
|
|
|
|
|
//#email-server-send
|
|
|
|
|
def send(email: Email): Future[Unit] = {
|
|
|
|
|
// ...
|
|
|
|
|
//#email-server-send
|
|
|
|
|
probe ! email.to
|
|
|
|
|
Future.successful(())
|
|
|
|
|
//#email-server-send
|
|
|
|
|
}
|
|
|
|
|
//#email-server-send
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
class SmsServer(probe: ActorRef) {
|
|
|
|
|
//#sms-server-send
|
|
|
|
|
def send(text: TextMessage): Unit = {
|
|
|
|
|
// ...
|
|
|
|
|
//#sms-server-send
|
|
|
|
|
probe ! text.to
|
|
|
|
|
//#sms-server-send
|
|
|
|
|
}
|
|
|
|
|
//#sms-server-send
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
final case class Save(tweet: Tweet)
|
|
|
|
|
final case object SaveDone
|
|
|
|
|
|
|
|
|
|
class DatabaseService(probe: ActorRef) extends Actor {
|
|
|
|
|
override def receive = {
|
2017-10-06 10:30:28 +02:00
|
|
|
case Save(tweet: Tweet) ⇒
|
2014-12-18 10:34:59 +01:00
|
|
|
probe ! tweet.author.handle
|
|
|
|
|
sender() ! SaveDone
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//#sometimes-slow-service
|
|
|
|
|
class SometimesSlowService(implicit ec: ExecutionContext) {
|
|
|
|
|
//#sometimes-slow-service
|
|
|
|
|
def println(s: String): Unit = ()
|
|
|
|
|
//#sometimes-slow-service
|
|
|
|
|
|
|
|
|
|
private val runningCount = new AtomicInteger
|
|
|
|
|
|
|
|
|
|
def convert(s: String): Future[String] = {
|
|
|
|
|
println(s"running: $s (${runningCount.incrementAndGet()})")
|
|
|
|
|
Future {
|
|
|
|
|
if (s.nonEmpty && s.head.isLower)
|
|
|
|
|
Thread.sleep(500)
|
|
|
|
|
else
|
|
|
|
|
Thread.sleep(20)
|
|
|
|
|
println(s"completed: $s (${runningCount.decrementAndGet()})")
|
|
|
|
|
s.toUpperCase
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
//#sometimes-slow-service
|
|
|
|
|
|
2016-10-26 10:24:51 +02:00
|
|
|
//#ask-actor
|
|
|
|
|
class Translator extends Actor {
|
|
|
|
|
def receive = {
|
2017-10-06 10:30:28 +02:00
|
|
|
case word: String ⇒
|
2016-10-26 10:24:51 +02:00
|
|
|
// ... process message
|
|
|
|
|
val reply = word.toUpperCase
|
|
|
|
|
sender() ! reply // reply to the ask
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
//#ask-actor
|
|
|
|
|
|
2014-12-18 10:34:59 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
|
|
|
|
|
import TwitterStreamQuickstartDocSpec._
|
|
|
|
|
import IntegrationDocSpec._
|
|
|
|
|
|
2015-12-11 14:45:24 +01:00
|
|
|
implicit val materializer = ActorMaterializer()
|
2016-10-26 10:24:51 +02:00
|
|
|
val ref: ActorRef = system.actorOf(Props[Translator])
|
|
|
|
|
|
2018-01-14 00:21:00 +09:00
|
|
|
"ask" in {
|
|
|
|
|
//#ask
|
2016-10-26 10:24:51 +02:00
|
|
|
implicit val askTimeout = Timeout(5.seconds)
|
|
|
|
|
val words: Source[String, NotUsed] =
|
|
|
|
|
Source(List("hello", "hi"))
|
|
|
|
|
|
|
|
|
|
words
|
2018-01-14 00:21:00 +09:00
|
|
|
.ask[String](parallelism = 5)(ref)
|
2016-10-26 10:24:51 +02:00
|
|
|
// continue processing of the replies from the actor
|
|
|
|
|
.map(_.toLowerCase)
|
|
|
|
|
.runWith(Sink.ignore)
|
2018-01-14 00:21:00 +09:00
|
|
|
//#ask
|
2016-10-26 10:24:51 +02:00
|
|
|
}
|
2014-12-18 10:34:59 +01:00
|
|
|
|
|
|
|
|
"calling external service with mapAsync" in {
|
|
|
|
|
val probe = TestProbe()
|
|
|
|
|
val addressSystem = new AddressSystem
|
|
|
|
|
val emailServer = new EmailServer(probe.ref)
|
|
|
|
|
|
|
|
|
|
//#tweet-authors
|
2016-01-20 10:00:37 +02:00
|
|
|
val authors: Source[Author, NotUsed] =
|
2014-12-18 10:34:59 +01:00
|
|
|
tweets
|
2016-10-26 10:24:51 +02:00
|
|
|
.filter(_.hashtags.contains(akkaTag))
|
2014-12-18 10:34:59 +01:00
|
|
|
.map(_.author)
|
|
|
|
|
//#tweet-authors
|
|
|
|
|
|
|
|
|
|
//#email-addresses-mapAsync
|
2016-01-20 10:00:37 +02:00
|
|
|
val emailAddresses: Source[String, NotUsed] =
|
2014-12-18 10:34:59 +01:00
|
|
|
authors
|
2017-10-06 10:30:28 +02:00
|
|
|
.mapAsync(4)(author ⇒ addressSystem.lookupEmail(author.handle))
|
|
|
|
|
.collect { case Some(emailAddress) ⇒ emailAddress }
|
2014-12-18 10:34:59 +01:00
|
|
|
//#email-addresses-mapAsync
|
|
|
|
|
|
|
|
|
|
//#send-emails
|
2016-01-20 10:00:37 +02:00
|
|
|
val sendEmails: RunnableGraph[NotUsed] =
|
2014-12-18 10:34:59 +01:00
|
|
|
emailAddresses
|
2017-10-06 10:30:28 +02:00
|
|
|
.mapAsync(4)(address ⇒ {
|
2014-12-18 10:34:59 +01:00
|
|
|
emailServer.send(
|
|
|
|
|
Email(to = address, title = "Akka", body = "I like your tweet"))
|
2015-04-09 22:28:16 +02:00
|
|
|
})
|
2014-12-18 10:34:59 +01:00
|
|
|
.to(Sink.ignore)
|
|
|
|
|
|
|
|
|
|
sendEmails.run()
|
|
|
|
|
//#send-emails
|
|
|
|
|
|
|
|
|
|
probe.expectMsg("rolandkuhn@somewhere.com")
|
|
|
|
|
probe.expectMsg("patriknw@somewhere.com")
|
|
|
|
|
probe.expectMsg("bantonsson@somewhere.com")
|
|
|
|
|
probe.expectMsg("drewhk@somewhere.com")
|
|
|
|
|
probe.expectMsg("ktosopl@somewhere.com")
|
|
|
|
|
probe.expectMsg("mmartynas@somewhere.com")
|
|
|
|
|
probe.expectMsg("akkateam@somewhere.com")
|
|
|
|
|
}
|
|
|
|
|
|
2018-04-12 22:06:37 +09:00
|
|
|
"actorRefWithAck" in {
|
|
|
|
|
//#actorRefWithAck
|
|
|
|
|
val words: Source[String, NotUsed] =
|
|
|
|
|
Source(List("hello", "hi"))
|
|
|
|
|
|
|
|
|
|
// sent from actor to stream to "ack" processing of given element
|
|
|
|
|
val AckMessage = AckingReceiver.Ack
|
|
|
|
|
|
|
|
|
|
// sent from stream to actor to indicate start, end or failure of stream:
|
|
|
|
|
val InitMessage = AckingReceiver.StreamInitialized
|
|
|
|
|
val OnCompleteMessage = AckingReceiver.StreamCompleted
|
|
|
|
|
val onErrorMessage = (ex: Throwable) ⇒ AckingReceiver.StreamFailure(ex)
|
|
|
|
|
|
2018-04-30 21:12:04 +09:00
|
|
|
val probe = TestProbe()
|
2018-04-12 22:06:37 +09:00
|
|
|
val receiver = system.actorOf(
|
2018-04-30 21:12:04 +09:00
|
|
|
Props(new AckingReceiver(probe.ref, ackWith = AckMessage)))
|
2018-04-12 22:06:37 +09:00
|
|
|
val sink = Sink.actorRefWithAck(
|
|
|
|
|
receiver,
|
|
|
|
|
onInitMessage = InitMessage,
|
|
|
|
|
ackMessage = AckMessage,
|
|
|
|
|
onCompleteMessage = OnCompleteMessage,
|
|
|
|
|
onFailureMessage = onErrorMessage
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
words
|
|
|
|
|
.map(_.toLowerCase)
|
|
|
|
|
.runWith(sink)
|
2018-05-15 11:52:07 +02:00
|
|
|
|
2018-04-30 21:12:04 +09:00
|
|
|
probe.expectMsg("Stream initialized!")
|
|
|
|
|
probe.expectMsg("hello")
|
|
|
|
|
probe.expectMsg("hi")
|
|
|
|
|
probe.expectMsg("Stream completed!")
|
2018-05-15 11:52:07 +02:00
|
|
|
//#actorRefWithAck
|
2018-04-12 22:06:37 +09:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//#actorRefWithAck-actor
|
|
|
|
|
object AckingReceiver {
|
|
|
|
|
case object Ack
|
|
|
|
|
|
|
|
|
|
case object StreamInitialized
|
|
|
|
|
case object StreamCompleted
|
|
|
|
|
final case class StreamFailure(ex: Throwable)
|
|
|
|
|
}
|
|
|
|
|
|
2018-04-30 21:12:04 +09:00
|
|
|
class AckingReceiver(probe: ActorRef, ackWith: Any) extends Actor with ActorLogging {
|
2018-04-12 22:06:37 +09:00
|
|
|
import AckingReceiver._
|
|
|
|
|
|
|
|
|
|
def receive: Receive = {
|
|
|
|
|
case StreamInitialized ⇒
|
|
|
|
|
log.info("Stream initialized!")
|
2018-04-30 21:12:04 +09:00
|
|
|
probe ! "Stream initialized!"
|
|
|
|
|
sender() ! Ack // ack to allow the stream to proceed sending more elements
|
2018-04-12 22:06:37 +09:00
|
|
|
|
|
|
|
|
case el: String ⇒
|
|
|
|
|
log.info("Received element: {}", el)
|
2018-04-30 21:12:04 +09:00
|
|
|
probe ! el
|
2018-04-12 22:06:37 +09:00
|
|
|
sender() ! Ack // ack to allow the stream to proceed sending more elements
|
|
|
|
|
|
|
|
|
|
case StreamCompleted ⇒
|
|
|
|
|
log.info("Stream completed!")
|
2018-04-30 21:12:04 +09:00
|
|
|
probe ! "Stream completed!"
|
2018-04-12 22:06:37 +09:00
|
|
|
case StreamFailure(ex) ⇒
|
|
|
|
|
log.error(ex, "Stream failed!")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
//#actorRefWithAck-actor
|
|
|
|
|
|
2015-02-04 09:26:32 +01:00
|
|
|
"lookup email with mapAsync and supervision" in {
|
|
|
|
|
val addressSystem = new AddressSystem2
|
2016-01-20 10:00:37 +02:00
|
|
|
val authors: Source[Author, NotUsed] =
|
2016-10-26 10:24:51 +02:00
|
|
|
tweets.filter(_.hashtags.contains(akkaTag)).map(_.author)
|
2015-02-04 09:26:32 +01:00
|
|
|
|
|
|
|
|
//#email-addresses-mapAsync-supervision
|
2015-06-23 17:32:55 +02:00
|
|
|
import ActorAttributes.supervisionStrategy
|
2015-02-04 09:26:32 +01:00
|
|
|
import Supervision.resumingDecider
|
|
|
|
|
|
2016-01-20 10:00:37 +02:00
|
|
|
val emailAddresses: Source[String, NotUsed] =
|
2015-04-09 15:16:59 +02:00
|
|
|
authors.via(
|
2017-10-06 10:30:28 +02:00
|
|
|
Flow[Author].mapAsync(4)(author ⇒ addressSystem.lookupEmail(author.handle))
|
2015-04-09 15:16:59 +02:00
|
|
|
.withAttributes(supervisionStrategy(resumingDecider)))
|
2015-02-04 09:26:32 +01:00
|
|
|
//#email-addresses-mapAsync-supervision
|
|
|
|
|
}
|
|
|
|
|
|
2014-12-18 10:34:59 +01:00
|
|
|
"calling external service with mapAsyncUnordered" in {
|
|
|
|
|
val probe = TestProbe()
|
|
|
|
|
val addressSystem = new AddressSystem
|
|
|
|
|
val emailServer = new EmailServer(probe.ref)
|
|
|
|
|
|
|
|
|
|
//#external-service-mapAsyncUnordered
|
2016-01-20 10:00:37 +02:00
|
|
|
val authors: Source[Author, NotUsed] =
|
2016-10-26 10:24:51 +02:00
|
|
|
tweets.filter(_.hashtags.contains(akkaTag)).map(_.author)
|
2014-12-18 10:34:59 +01:00
|
|
|
|
2016-01-20 10:00:37 +02:00
|
|
|
val emailAddresses: Source[String, NotUsed] =
|
2014-12-18 10:34:59 +01:00
|
|
|
authors
|
2017-10-06 10:30:28 +02:00
|
|
|
.mapAsyncUnordered(4)(author ⇒ addressSystem.lookupEmail(author.handle))
|
|
|
|
|
.collect { case Some(emailAddress) ⇒ emailAddress }
|
2014-12-18 10:34:59 +01:00
|
|
|
|
2016-01-20 10:00:37 +02:00
|
|
|
val sendEmails: RunnableGraph[NotUsed] =
|
2014-12-18 10:34:59 +01:00
|
|
|
emailAddresses
|
2017-10-06 10:30:28 +02:00
|
|
|
.mapAsyncUnordered(4)(address ⇒ {
|
2014-12-18 10:34:59 +01:00
|
|
|
emailServer.send(
|
|
|
|
|
Email(to = address, title = "Akka", body = "I like your tweet"))
|
2015-04-09 22:28:16 +02:00
|
|
|
})
|
2014-12-18 10:34:59 +01:00
|
|
|
.to(Sink.ignore)
|
|
|
|
|
|
|
|
|
|
sendEmails.run()
|
|
|
|
|
//#external-service-mapAsyncUnordered
|
|
|
|
|
|
|
|
|
|
probe.receiveN(7).toSet should be(Set(
|
|
|
|
|
"rolandkuhn@somewhere.com",
|
|
|
|
|
"patriknw@somewhere.com",
|
|
|
|
|
"bantonsson@somewhere.com",
|
|
|
|
|
"drewhk@somewhere.com",
|
|
|
|
|
"ktosopl@somewhere.com",
|
|
|
|
|
"mmartynas@somewhere.com",
|
|
|
|
|
"akkateam@somewhere.com"))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"careful managed blocking with mapAsync" in {
|
|
|
|
|
val probe = TestProbe()
|
|
|
|
|
val addressSystem = new AddressSystem
|
|
|
|
|
val smsServer = new SmsServer(probe.ref)
|
|
|
|
|
|
2016-10-26 10:24:51 +02:00
|
|
|
val authors = tweets.filter(_.hashtags.contains(akkaTag)).map(_.author)
|
2014-12-18 10:34:59 +01:00
|
|
|
|
|
|
|
|
val phoneNumbers =
|
2017-10-06 10:30:28 +02:00
|
|
|
authors.mapAsync(4)(author ⇒ addressSystem.lookupPhoneNumber(author.handle))
|
|
|
|
|
.collect { case Some(phoneNo) ⇒ phoneNo }
|
2014-12-18 10:34:59 +01:00
|
|
|
|
|
|
|
|
//#blocking-mapAsync
|
|
|
|
|
val blockingExecutionContext = system.dispatchers.lookup("blocking-dispatcher")
|
|
|
|
|
|
2016-01-20 10:00:37 +02:00
|
|
|
val sendTextMessages: RunnableGraph[NotUsed] =
|
2014-12-18 10:34:59 +01:00
|
|
|
phoneNumbers
|
2017-10-06 10:30:28 +02:00
|
|
|
.mapAsync(4)(phoneNo ⇒ {
|
2014-12-18 10:34:59 +01:00
|
|
|
Future {
|
|
|
|
|
smsServer.send(
|
|
|
|
|
TextMessage(to = phoneNo, body = "I like your tweet"))
|
|
|
|
|
}(blockingExecutionContext)
|
2015-04-09 22:28:16 +02:00
|
|
|
})
|
2014-12-18 10:34:59 +01:00
|
|
|
.to(Sink.ignore)
|
|
|
|
|
|
|
|
|
|
sendTextMessages.run()
|
|
|
|
|
//#blocking-mapAsync
|
|
|
|
|
|
|
|
|
|
probe.receiveN(7).toSet should be(Set(
|
|
|
|
|
"rolandkuhn".hashCode.toString,
|
|
|
|
|
"patriknw".hashCode.toString,
|
|
|
|
|
"bantonsson".hashCode.toString,
|
|
|
|
|
"drewhk".hashCode.toString,
|
|
|
|
|
"ktosopl".hashCode.toString,
|
|
|
|
|
"mmartynas".hashCode.toString,
|
|
|
|
|
"akkateam".hashCode.toString))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"careful managed blocking with map" in {
|
|
|
|
|
val probe = TestProbe()
|
|
|
|
|
val addressSystem = new AddressSystem
|
|
|
|
|
val smsServer = new SmsServer(probe.ref)
|
|
|
|
|
|
2016-10-26 10:24:51 +02:00
|
|
|
val authors = tweets.filter(_.hashtags.contains(akkaTag)).map(_.author)
|
2014-12-18 10:34:59 +01:00
|
|
|
|
|
|
|
|
val phoneNumbers =
|
2017-10-06 10:30:28 +02:00
|
|
|
authors.mapAsync(4)(author ⇒ addressSystem.lookupPhoneNumber(author.handle))
|
|
|
|
|
.collect { case Some(phoneNo) ⇒ phoneNo }
|
2014-12-18 10:34:59 +01:00
|
|
|
|
|
|
|
|
//#blocking-map
|
2015-04-09 15:16:59 +02:00
|
|
|
val send = Flow[String]
|
2017-10-06 10:30:28 +02:00
|
|
|
.map { phoneNo ⇒
|
2015-04-09 15:16:59 +02:00
|
|
|
smsServer.send(TextMessage(to = phoneNo, body = "I like your tweet"))
|
|
|
|
|
}
|
2015-06-23 17:32:55 +02:00
|
|
|
.withAttributes(ActorAttributes.dispatcher("blocking-dispatcher"))
|
2016-01-20 10:00:37 +02:00
|
|
|
val sendTextMessages: RunnableGraph[NotUsed] =
|
2015-04-09 15:16:59 +02:00
|
|
|
phoneNumbers.via(send).to(Sink.ignore)
|
2014-12-18 10:34:59 +01:00
|
|
|
|
|
|
|
|
sendTextMessages.run()
|
|
|
|
|
//#blocking-map
|
|
|
|
|
|
|
|
|
|
probe.expectMsg("rolandkuhn".hashCode.toString)
|
|
|
|
|
probe.expectMsg("patriknw".hashCode.toString)
|
|
|
|
|
probe.expectMsg("bantonsson".hashCode.toString)
|
|
|
|
|
probe.expectMsg("drewhk".hashCode.toString)
|
|
|
|
|
probe.expectMsg("ktosopl".hashCode.toString)
|
|
|
|
|
probe.expectMsg("mmartynas".hashCode.toString)
|
|
|
|
|
probe.expectMsg("akkateam".hashCode.toString)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"calling actor service with mapAsync" in {
|
|
|
|
|
val probe = TestProbe()
|
|
|
|
|
val database = system.actorOf(Props(classOf[DatabaseService], probe.ref), "db")
|
|
|
|
|
|
|
|
|
|
//#save-tweets
|
2016-10-26 10:24:51 +02:00
|
|
|
import akka.pattern.ask
|
|
|
|
|
|
|
|
|
|
val akkaTweets: Source[Tweet, NotUsed] = tweets.filter(_.hashtags.contains(akkaTag))
|
2014-12-18 10:34:59 +01:00
|
|
|
|
|
|
|
|
implicit val timeout = Timeout(3.seconds)
|
2016-01-20 10:00:37 +02:00
|
|
|
val saveTweets: RunnableGraph[NotUsed] =
|
2014-12-18 10:34:59 +01:00
|
|
|
akkaTweets
|
2017-10-06 10:30:28 +02:00
|
|
|
.mapAsync(4)(tweet ⇒ database ? Save(tweet))
|
2014-12-18 10:34:59 +01:00
|
|
|
.to(Sink.ignore)
|
|
|
|
|
//#save-tweets
|
|
|
|
|
|
|
|
|
|
saveTweets.run()
|
|
|
|
|
|
|
|
|
|
probe.expectMsg("rolandkuhn")
|
|
|
|
|
probe.expectMsg("patriknw")
|
|
|
|
|
probe.expectMsg("bantonsson")
|
|
|
|
|
probe.expectMsg("drewhk")
|
|
|
|
|
probe.expectMsg("ktosopl")
|
|
|
|
|
probe.expectMsg("mmartynas")
|
|
|
|
|
probe.expectMsg("akkateam")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"illustrate ordering and parallelism of mapAsync" in {
|
|
|
|
|
val probe = TestProbe()
|
|
|
|
|
def println(s: String): Unit = {
|
|
|
|
|
if (s.startsWith("after:"))
|
|
|
|
|
probe.ref ! s
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//#sometimes-slow-mapAsync
|
|
|
|
|
implicit val blockingExecutionContext = system.dispatchers.lookup("blocking-dispatcher")
|
|
|
|
|
val service = new SometimesSlowService
|
|
|
|
|
|
2015-12-11 14:45:24 +01:00
|
|
|
implicit val materializer = ActorMaterializer(
|
2015-06-23 18:28:53 +02:00
|
|
|
ActorMaterializerSettings(system).withInputBuffer(initialSize = 4, maxSize = 4))
|
2014-12-18 10:34:59 +01:00
|
|
|
|
|
|
|
|
Source(List("a", "B", "C", "D", "e", "F", "g", "H", "i", "J"))
|
2017-10-06 10:30:28 +02:00
|
|
|
.map(elem ⇒ { println(s"before: $elem"); elem })
|
2015-04-28 14:37:58 +02:00
|
|
|
.mapAsync(4)(service.convert)
|
2017-10-06 10:30:28 +02:00
|
|
|
.runForeach(elem ⇒ println(s"after: $elem"))
|
2014-12-18 10:34:59 +01:00
|
|
|
//#sometimes-slow-mapAsync
|
|
|
|
|
|
|
|
|
|
probe.expectMsg("after: A")
|
|
|
|
|
probe.expectMsg("after: B")
|
|
|
|
|
probe.expectMsg("after: C")
|
|
|
|
|
probe.expectMsg("after: D")
|
|
|
|
|
probe.expectMsg("after: E")
|
|
|
|
|
probe.expectMsg("after: F")
|
|
|
|
|
probe.expectMsg("after: G")
|
|
|
|
|
probe.expectMsg("after: H")
|
|
|
|
|
probe.expectMsg("after: I")
|
|
|
|
|
probe.expectMsg("after: J")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"illustrate ordering and parallelism of mapAsyncUnordered" in {
|
|
|
|
|
val probe = TestProbe()
|
|
|
|
|
def println(s: String): Unit = {
|
|
|
|
|
if (s.startsWith("after:"))
|
|
|
|
|
probe.ref ! s
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//#sometimes-slow-mapAsyncUnordered
|
|
|
|
|
implicit val blockingExecutionContext = system.dispatchers.lookup("blocking-dispatcher")
|
|
|
|
|
val service = new SometimesSlowService
|
|
|
|
|
|
2015-12-11 14:45:24 +01:00
|
|
|
implicit val materializer = ActorMaterializer(
|
2015-06-23 18:28:53 +02:00
|
|
|
ActorMaterializerSettings(system).withInputBuffer(initialSize = 4, maxSize = 4))
|
2014-12-18 10:34:59 +01:00
|
|
|
|
|
|
|
|
Source(List("a", "B", "C", "D", "e", "F", "g", "H", "i", "J"))
|
2017-10-06 10:30:28 +02:00
|
|
|
.map(elem ⇒ { println(s"before: $elem"); elem })
|
2015-04-28 14:37:58 +02:00
|
|
|
.mapAsyncUnordered(4)(service.convert)
|
2017-10-06 10:30:28 +02:00
|
|
|
.runForeach(elem ⇒ println(s"after: $elem"))
|
2014-12-18 10:34:59 +01:00
|
|
|
//#sometimes-slow-mapAsyncUnordered
|
|
|
|
|
|
|
|
|
|
probe.receiveN(10).toSet should be(Set(
|
|
|
|
|
"after: A",
|
|
|
|
|
"after: B",
|
|
|
|
|
"after: C",
|
|
|
|
|
"after: D",
|
|
|
|
|
"after: E",
|
|
|
|
|
"after: F",
|
|
|
|
|
"after: G",
|
|
|
|
|
"after: H",
|
|
|
|
|
"after: I",
|
|
|
|
|
"after: J"))
|
|
|
|
|
}
|
|
|
|
|
|
2018-05-22 18:27:54 +09:00
|
|
|
"illustrate use of source queue" in {
|
|
|
|
|
//#source-queue
|
2019-01-17 12:16:52 +01:00
|
|
|
val bufferSize = 10
|
|
|
|
|
val elementsToProcess = 5
|
2018-05-22 18:27:54 +09:00
|
|
|
|
|
|
|
|
val queue = Source
|
|
|
|
|
.queue[Int](bufferSize, OverflowStrategy.backpressure)
|
|
|
|
|
.throttle(elementsToProcess, 3.second)
|
|
|
|
|
.map(x ⇒ x * x)
|
|
|
|
|
.toMat(Sink.foreach(x ⇒ println(s"completed $x")))(Keep.left)
|
|
|
|
|
.run()
|
|
|
|
|
|
|
|
|
|
val source = Source(1 to 10)
|
|
|
|
|
|
|
|
|
|
implicit val ec = system.dispatcher
|
|
|
|
|
source.mapAsync(1)(x ⇒ {
|
|
|
|
|
queue.offer(x).map {
|
|
|
|
|
case QueueOfferResult.Enqueued ⇒ println(s"enqueued $x")
|
|
|
|
|
case QueueOfferResult.Dropped ⇒ println(s"dropped $x")
|
|
|
|
|
case QueueOfferResult.Failure(ex) ⇒ println(s"Offer failed ${ex.getMessage}")
|
|
|
|
|
case QueueOfferResult.QueueClosed ⇒ println("Source Queue closed")
|
|
|
|
|
}
|
|
|
|
|
}).runWith(Sink.ignore)
|
|
|
|
|
//#source-queue
|
|
|
|
|
}
|
|
|
|
|
|
2014-12-18 10:34:59 +01:00
|
|
|
}
|