Akka(4): Routers – 智能任务分配。Akka(4): Routers – 智能任务分配。

   
Actor模式最可怜之助益就是是每个Actor都是一个独立的职责运算器。这种模式为咱们十分有利地拿同起大型的职责分割成多心细小任务然后分配给不同的Actor去完。优点是于设计时可以小心实现每个Actor的效果,在实质上运算时出于每个Actor都在单独的线程里运行,能充分利用多核CPU的优势实现相似并行运算的效率。我们同可将一个独的功力以不同的输入分配给多独Actor去完因达到相同的效率增长目的,这就是是Akka的Routing模式了。Routing模式的特色是有运算Actor的演算逻辑都是同等之,分别对两样之输入进行相同之演算。不过我们应当了解运算结果的相继是无能为力预测的,毕竟Actor模式是名列前茅的无序运算。Routing模式由于Router和Routee组成:Routee是负担具体运算的Actor(因为运算逻辑必须在Actor的receive里实现),Router的显要职能是把外场发来的运算指令以某种指定的点子分配给Routee去运算。可以说Router不是正式的Actor,因为它们不需要贯彻任何其他的效应,基本功能是预设嵌入的。Router的邮箱直接表示了任务分配逻辑,与标准Actor逐个运算信箱遭到信息相比,能大大提高任务分配效率。Akka自带森现的任务分配模式,以不同的算法来满足不同的任务分配要求。这些算法的布置好当布置文件要代码中定义。Router又可分Pool和Group两种模式:在Router-Pool模式中Router负责构建具有的Routee。如此有Routee都是Router的依附子级Actor,可以实现Router对Routees的一直监管。由于这种直白的监管关系,Router-Pool又足以遵循运算负载自动增减Routee,能还实惠地分配使用计算资源。Router-Group模式遭遇之Routees由外面别Actor产生,特点是会促成灵活的Routee构建和监理,可以为此不同之监管政策来保管一个Router下的Routees,比如可采用BackoffSupervisor。从另一方面来讲,Router-Group的通病是Routees的构建与治本复杂化了,而且一再用人工干预。

   
Actor模式极其酷的亮点就是是每个Actor都是一个独立的职责运算器。这种模式于咱死有益于地管同码大型的任务分割成几细小任务然后分配为不同之Actor去好。优点是在计划时方可小心实现每个Actor的意义,在实际上运算时由于每个Actor都以单身的线程里运行,能充分利用多核CPU的优势实现相似并行运算的频率。我们一样好拿一个独立的成效以不同的输入分配为多只Actor去好因高达平的频率增高目的,这就算是Akka的Routing模式了。Routing模式之风味是独具运算Actor的演算逻辑都是同的,分别针对两样的输入进行同样之演算。不过我们应有理解运算结果的次第是无能为力预测的,毕竟Actor模式是超人的无序运算。Routing模式由Router和Routee组成:Routee是负责具体运算的Actor(因为运算逻辑必须于Actor的receive里实现),Router的显要作用是管外场发来的演算指令以某种指定的不二法门分配受Routee去运算。可以说Router不是正经的Actor,因为她不需实现其他其它的法力,基本功能是预设嵌入的。Router的邮箱直接表示了任务分配逻辑,与标准Actor逐个运算信箱遭到信息相比,能大大提高任务分配效率。Akka自带森备的任务分配模式,以不同的算法来满足不同的任务分配要求。这些算法的布置好以安排文件或者代码中定义。Router又只是分Pool和Group两种模式:在Router-Pool模式中Router负责构建有的Routee。如此有Routee都是Router的附属子级Actor,可以兑现Router对Routees的直白监管。由于这种直白的监管关系,Router-Pool又有何不可随运算负载自动增减Routee,能还实用地分配使用计算资源。Router-Group模式受到的Routees由外面别Actor产生,特点是力所能及实现灵活的Routee构建和监理,可以就此不同的监管政策来管理一个Router下的Routees,比如可以应用BackoffSupervisor。从一头来讲,Router-Group的欠缺是Routees的构建与管制复杂化了,而且一再需要人工干预。

下面我们事先举行只示范:

脚我们先举行只示范:

import akka.actor._
import akka.routing._
import scala.annotation.tailrec

object FibonacciRoutee {
  case class FibonacciNumber(nbr: Int)
  def props = Props[FibonacciRoutee]
}
class FibonacciRoutee extends Actor with ActorLogging {
 import FibonacciRoutee._

  override def receive: Receive = {
    case FibonacciNumber(nbr) =>
      val answer = fibonacci(nbr)
      log.info(s"${self.path.name}'s answer: Fibonacci($nbr)=$answer")
  }
  private def fibonacci(n: Int): Int = {
    @tailrec
    def fib(n: Int, b: Int, a: Int): Int = n match {
      case 0 => a
      case _ =>
        fib(n - 1, a + b, b)
    }
    fib(n, 1, 0)
  }
}
object RouterDemo extends App {
  import FibonacciRoutee._
  val routingSystem = ActorSystem("routingSystem")
  val router = routingSystem.actorOf(
    FromConfig.props(FibonacciRoutee.props)
    ,"balance-pool-router")

  router ! FibonacciNumber(10)
  router ! FibonacciNumber(13)
  router ! FibonacciNumber(15)
  router ! FibonacciNumber(17)

  scala.io.StdIn.readLine()

  routingSystem.terminate()

}
import akka.actor._
import akka.routing._
import scala.annotation.tailrec

object FibonacciRoutee {
  case class FibonacciNumber(nbr: Int)
  def props = Props[FibonacciRoutee]
}
class FibonacciRoutee extends Actor with ActorLogging {
 import FibonacciRoutee._

  override def receive: Receive = {
    case FibonacciNumber(nbr) =>
      val answer = fibonacci(nbr)
      log.info(s"${self.path.name}'s answer: Fibonacci($nbr)=$answer")
  }
  private def fibonacci(n: Int): Int = {
    @tailrec
    def fib(n: Int, b: Int, a: Int): Int = n match {
      case 0 => a
      case _ =>
        fib(n - 1, a + b, b)
    }
    fib(n, 1, 0)
  }
}
object RouterDemo extends App {
  import FibonacciRoutee._
  val routingSystem = ActorSystem("routingSystem")
  val router = routingSystem.actorOf(
    FromConfig.props(FibonacciRoutee.props)
    ,"balance-pool-router")

  router ! FibonacciNumber(10)
  router ! FibonacciNumber(13)
  router ! FibonacciNumber(15)
  router ! FibonacciNumber(17)

  scala.io.StdIn.readLine()

  routingSystem.terminate()

}

在这例子里我们因而3只Routees来因指令计算Fibonacci。FibonacciRoutee只生雷同项意义:就是依照输入计算Fibonacci数。我们看,Router构建过程充分略。在咱们的事例里独自待读来布局文件内容即好了。balance-pool-router是部署文件里之一个定义项:

以斯例子里我们因而3只Routees来因指令计算Fibonacci。FibonacciRoutee只发雷同宗功能:就是按照输入计算Fibonacci数。我们看出,Router构建过程异常简短。在咱们的例子里就待读来布局文件内容即足以了。balance-pool-router是布文件里之一个概念项:

akka {
  prio-dispatcher {
    mailbox-type = "PriorityMailbox"
  }
  actor {
    deployment {
      /balance-pool-router {
        router = balancing-pool
        nr-of-instances = 3
        pool-dispatcher {
          executor = "fork-join-executor"
          # Configuration for the fork join pool
          fork-join-executor {
            # Min number of threads to cap factor-based parallelism number to
            parallelism-min = 3
            # Parallelism (threads) ... ceil(available processors * factor)
            parallelism-factor = 2.0
            # Max number of threads to cap factor-based parallelism number to
            parallelism-max = 3
          }
          # Throughput defines the maximum number of messages to be
          # processed per actor before the thread jumps to the next actor.
          # Set to 1 for as fair as possible.
          throughput = 1
        }
      }
    }
  }

}
akka {
  prio-dispatcher {
    mailbox-type = "PriorityMailbox"
  }
  actor {
    deployment {
      /balance-pool-router {
        router = balancing-pool
        nr-of-instances = 3
        pool-dispatcher {
          executor = "fork-join-executor"
          # Configuration for the fork join pool
          fork-join-executor {
            # Min number of threads to cap factor-based parallelism number to
            parallelism-min = 3
            # Parallelism (threads) ... ceil(available processors * factor)
            parallelism-factor = 2.0
            # Max number of threads to cap factor-based parallelism number to
            parallelism-max = 3
          }
          # Throughput defines the maximum number of messages to be
          # processed per actor before the thread jumps to the next actor.
          # Set to 1 for as fair as possible.
          throughput = 1
        }
      }
    }
  }

}

Routing模式设置的整标识是akka.actor.deployment{/balance-pool-router}。完成构建router后我们直接通往router发送计算指令,运算结果如下:

Routing模式设置的总体标识是akka.actor.deployment{/balance-pool-router}。完成构建router后我们直接为router发送计算指令,运算结果如下:

[INFO] [05/30/2017 20:13:52.323] [routingSystem-BalancingPool-/balance-pool-router-5] [akka://routingSystem/user/balance-pool-router/$b] $b's answer: Fibonacci(13)=233
[INFO] [05/30/2017 20:13:52.323] [routingSystem-BalancingPool-/balance-pool-router-7] [akka://routingSystem/user/balance-pool-router/$a] $a's answer: Fibonacci(10)=55
[INFO] [05/30/2017 20:13:52.323] [routingSystem-BalancingPool-/balance-pool-router-6] [akka://routingSystem/user/balance-pool-router/$c] $c's answer: Fibonacci(15)=610
[INFO] [05/30/2017 20:13:52.323] [routingSystem-BalancingPool-/balance-pool-router-6] [akka://routingSystem/user/balance-pool-router/$c] $c's answer: Fibonacci(17)=1597
[INFO] [05/30/2017 20:13:52.323] [routingSystem-BalancingPool-/balance-pool-router-5] [akka://routingSystem/user/balance-pool-router/$b] $b's answer: Fibonacci(13)=233
[INFO] [05/30/2017 20:13:52.323] [routingSystem-BalancingPool-/balance-pool-router-7] [akka://routingSystem/user/balance-pool-router/$a] $a's answer: Fibonacci(10)=55
[INFO] [05/30/2017 20:13:52.323] [routingSystem-BalancingPool-/balance-pool-router-6] [akka://routingSystem/user/balance-pool-router/$c] $c's answer: Fibonacci(15)=610
[INFO] [05/30/2017 20:13:52.323] [routingSystem-BalancingPool-/balance-pool-router-6] [akka://routingSystem/user/balance-pool-router/$c] $c's answer: Fibonacci(17)=1597

咱们来看,router按部署活动构建了3独FibonacciRoutee。Routee的构建过程是无能为力人工干预的。向router发送的计量指令给分配给b,a,c,c去运算了。从出示顺序可以印证每个参与的Actor占用运算时不同,产生了无序的运算结果。

咱俩看来,router按部署活动构建了3只FibonacciRoutee。Routee的构建过程是无法人工干预的。向router发送的测算指令给分配给b,a,c,c去运算了。从显示顺序可以作证每个参与的Actor占用运算时不一,产生了无序的演算结果。

脚我们当Routee里加一个延迟效应。这样运算结果显示会再也自然把:

下我们于Routee里加一个推效应。这样运算结果显示会再度当把:

object FibonacciRoutee {
  case class FibonacciNumber(nbr: Int, msDelay: Int)  //增加延迟参数
  case class GetAnswer(nbr: Int)

  class RouteeException extends Exception

  def props = Props[FibonacciRoutee]
}
class FibonacciRoutee extends Actor with ActorLogging {
 import FibonacciRoutee._
 import context.dispatcher

  override def receive: Receive = {
    case FibonacciNumber(nbr,ms) =>
      context.system.scheduler.scheduleOnce(ms second,self,GetAnswer(nbr))
    case GetAnswer(nbr) =>
      if (Random.nextBoolean())
        throw new RouteeException
      else {
        val answer = fibonacci(nbr)
        log.info(s"${self.path.name}'s answer: Fibonacci($nbr)=$answer")
      }
  }
  private def fibonacci(n: Int): Int = {
    @tailrec
    def fib(n: Int, b: Int, a: Int): Int = n match {
      case 0 => a
      case _ =>
        fib(n - 1, a + b, b)
    }
    fib(n, 1, 0)
  }

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    log.info(s"Restarting ${self.path.name} on ${reason.getMessage}")
    message foreach {m => self ! m}
    super.preRestart(reason, message)
  }

  override def postRestart(reason: Throwable): Unit = {
    log.info(s"Restarted ${self.path.name} on ${reason.getMessage}")
    super.postRestart(reason)
  }

  override def postStop(): Unit = {
    log.info(s"Stopped ${self.path.name}!")
    super.postStop()
  }

}
object FibonacciRoutee {
  case class FibonacciNumber(nbr: Int, msDelay: Int)  //增加延迟参数
  case class GetAnswer(nbr: Int)

  class RouteeException extends Exception

  def props = Props[FibonacciRoutee]
}
class FibonacciRoutee extends Actor with ActorLogging {
 import FibonacciRoutee._
 import context.dispatcher

  override def receive: Receive = {
    case FibonacciNumber(nbr,ms) =>
      context.system.scheduler.scheduleOnce(ms second,self,GetAnswer(nbr))
    case GetAnswer(nbr) =>
      if (Random.nextBoolean())
        throw new RouteeException
      else {
        val answer = fibonacci(nbr)
        log.info(s"${self.path.name}'s answer: Fibonacci($nbr)=$answer")
      }
  }
  private def fibonacci(n: Int): Int = {
    @tailrec
    def fib(n: Int, b: Int, a: Int): Int = n match {
      case 0 => a
      case _ =>
        fib(n - 1, a + b, b)
    }
    fib(n, 1, 0)
  }

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    log.info(s"Restarting ${self.path.name} on ${reason.getMessage}")
    message foreach {m => self ! m}
    super.preRestart(reason, message)
  }

  override def postRestart(reason: Throwable): Unit = {
    log.info(s"Restarted ${self.path.name} on ${reason.getMessage}")
    super.postRestart(reason)
  }

  override def postStop(): Unit = {
    log.info(s"Stopped ${self.path.name}!")
    super.postStop()
  }

}

因以Actor内部不能够采取Thread.sleep,所以我们因此了单scheduleOnce在延迟时间后朝好发送一个提示消息。注意,scheduleOnce是不管阂non-blocking代码,调用后先后不见面待等待计划动作。在面修改后的代码里长了监管策略SupervisorStrategy的动测试。Router的默认监管政策是Esculate,即把有Routee发生的不行提交给Router的直属父级处理。如果Router直属父级对Routee异常的处理方式是还开的口舌,那么首先重开Router,然后是用作直属子级的有Routees都见面给再度开,结果连无是咱纪念使的。所以要人为的设定Router监管政策。由于Router的SupervisorStrategy无法在装置文件被定义,所以这次咱们只有用代码方式来安装routing模式了:

因当Actor内部未能够使用Thread.sleep,所以我们就此了只scheduleOnce在延迟时间后望和睦发送一个提拔消息。注意,scheduleOnce是凭阂non-blocking代码,调用后先后不会见滞留等待计划动作。在点修改后底代码里添了监管策略SupervisorStrategy的运测试。Router的默认监管政策是Esculate,即把某部Routee发生的挺提交给Router的附属父级处理。如果Router直属父级对Routee异常的处理方式是再开的言语,那么首先重开Router,然后是当做直属子级的有所Routees都见面被重复开,结果连无是咱想如果的。所以要人为的设定Router监管政策。由于Router的SupervisorStrategy无法在安文件被定义,所以这次我们惟有用代码方式来装routing模式了:

object RouterDemo extends App {
  import FibonacciRoutee._
  import scala.concurrent.ExecutionContext.Implicits.global
  val routingSystem = ActorSystem("routingSystem")
  /* cannot set SupervisorStrategy in config file
  val router = routingSystem.actorOf(
    FromConfig.props(FibonacciRoutee.props)
    ,"balance-pool-router")
    */
  val routingDecider: PartialFunction[Throwable,SupervisorStrategy.Directive] = {
    case _: RouteeException => SupervisorStrategy.Restart
  }
  val routerSupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds)(
    routingDecider.orElse(SupervisorStrategy.defaultDecider)
  )
  val router = routingSystem.actorOf(
    BalancingPool(nrOfInstances = 3
      ,supervisorStrategy=routerSupervisorStrategy    //set SupervisorStrategy here
      ).withDispatcher("akka.pool-dispatcher")
      .props(FibonacciRoutee.props)
    ,"balance-pool-router"
  )

  router ! FibonacciNumber(10,5)
  router ! FibonacciNumber(13,2)
  router ! FibonacciNumber(15,3)
  router ! FibonacciNumber(17,1)

  scala.io.StdIn.readLine()

  routingSystem.terminate()

}
object RouterDemo extends App {
  import FibonacciRoutee._
  import scala.concurrent.ExecutionContext.Implicits.global
  val routingSystem = ActorSystem("routingSystem")
  /* cannot set SupervisorStrategy in config file
  val router = routingSystem.actorOf(
    FromConfig.props(FibonacciRoutee.props)
    ,"balance-pool-router")
    */
  val routingDecider: PartialFunction[Throwable,SupervisorStrategy.Directive] = {
    case _: RouteeException => SupervisorStrategy.Restart
  }
  val routerSupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds)(
    routingDecider.orElse(SupervisorStrategy.defaultDecider)
  )
  val router = routingSystem.actorOf(
    BalancingPool(nrOfInstances = 3
      ,supervisorStrategy=routerSupervisorStrategy    //set SupervisorStrategy here
      ).withDispatcher("akka.pool-dispatcher")
      .props(FibonacciRoutee.props)
    ,"balance-pool-router"
  )

  router ! FibonacciNumber(10,5)
  router ! FibonacciNumber(13,2)
  router ! FibonacciNumber(15,3)
  router ! FibonacciNumber(17,1)

  scala.io.StdIn.readLine()

  routingSystem.terminate()

}

留神:我们当FibonacciRoutee的preRestart接口中增加了通向友好补发产生大信息之过程。运算结果显示:虽然出现了往往十分,router重启了f发生异常的Routee,所有消息都赢得了拍卖。

瞩目:我们于FibonacciRoutee的preRestart接口中加进了向和睦补发产生大信息的过程。运算结果显示:虽然出现了数雅,router重启了f发生大的Routee,所有信息还获得了处理。

Akka中多少routing模式支持Router-Pool
Routee的全自动增减。由于BalancingPool不支持是桩职能,下面我们尽管因此RoundRobinPool来举行个示范。由于需要定义监管政策,只有以代码中设置Resizer了:

Akka中稍微routing模式支持Router-Pool
Routee的机动增减。由于BalancingPool不支持此起意义,下面我们便因故RoundRobinPool来做只示范。由于要定义监管政策,只有在代码中装置Resizer了:

 val resizer = DefaultResizer(
    lowerBound = 2, upperBound = 5, pressureThreshold = 1
    ,rampupRate = 1, backoffRate = 0.25
    ,backoffThreshold = 0.25, messagesPerResize = 1
  )
  val router = routingSystem.actorOf(
    RoundRobinPool(nrOfInstances = 2
    ,resizer = Some(resizer)
    ,supervisorStrategy = routerSupervisorStrategy)
      .props(FibonacciRoutee.props)
    ,"roundrobin-pool-router"
  )
 val resizer = DefaultResizer(
    lowerBound = 2, upperBound = 5, pressureThreshold = 1
    ,rampupRate = 1, backoffRate = 0.25
    ,backoffThreshold = 0.25, messagesPerResize = 1
  )
  val router = routingSystem.actorOf(
    RoundRobinPool(nrOfInstances = 2
    ,resizer = Some(resizer)
    ,supervisorStrategy = routerSupervisorStrategy)
      .props(FibonacciRoutee.props)
    ,"roundrobin-pool-router"
  )

上述resizer设置为:Routee最少2单,可以自动增加至5独。运行后routingSystem自动增加了少只Routee: c,d。

如上resizer设置也:Routee最少2个,可以自行增加到5独。运行后routingSystem自动增加了少只Routee: c,d。

下是本次示范的总体源代码:

下面是本次示范的完全源代码:

 

 

import akka.actor._
import akka.routing._
import scala.annotation.tailrec
import scala.concurrent.duration._
import scala.util.Random

object FibonacciRoutee {
  case class FibonacciNumber(nbr: Int, msDelay: Int)  //增加延迟参数
  case class GetAnswer(nbr: Int)

  class RouteeException extends Exception

  def props = Props[FibonacciRoutee]
}
class FibonacciRoutee extends Actor with ActorLogging {
 import FibonacciRoutee._
 import context.dispatcher

  override def receive: Receive = {
    case FibonacciNumber(nbr,ms) =>
      context.system.scheduler.scheduleOnce(ms second,self,GetAnswer(nbr))
    case GetAnswer(nbr) =>
      if (Random.nextBoolean())
        throw new RouteeException
      else {
        val answer = fibonacci(nbr)
        log.info(s"${self.path.name}'s answer: Fibonacci($nbr)=$answer")
      }
  }
  private def fibonacci(n: Int): Int = {
    @tailrec
    def fib(n: Int, b: Int, a: Int): Int = n match {
      case 0 => a
      case _ =>
        fib(n - 1, a + b, b)
    }
    fib(n, 1, 0)
  }

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    log.info(s"Restarting ${self.path.name} on ${reason.getMessage}")
    message foreach {m => self ! m}
    super.preRestart(reason, message)
  }

  override def postRestart(reason: Throwable): Unit = {
    log.info(s"Restarted ${self.path.name} on ${reason.getMessage}")
    super.postRestart(reason)
  }

  override def postStop(): Unit = {
    log.info(s"Stopped ${self.path.name}!")
    super.postStop()
  }

}
object RouterDemo extends App {
  import FibonacciRoutee._
  import scala.concurrent.ExecutionContext.Implicits.global
  val routingSystem = ActorSystem("routingSystem")
  /* cannot set SupervisorStrategy in config file
  val router = routingSystem.actorOf(
    FromConfig.props(FibonacciRoutee.props)
    ,"balance-pool-router")
    */
  val routingDecider: PartialFunction[Throwable,SupervisorStrategy.Directive] = {
    case _: RouteeException => SupervisorStrategy.Restart
  }
  val routerSupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds)(
    routingDecider.orElse(SupervisorStrategy.defaultDecider)
  )
  /* does not support resizing routees
  val router = routingSystem.actorOf(
    BalancingPool(nrOfInstances = 3
      ,supervisorStrategy=routerSupervisorStrategy    //set SupervisorStrategy here
      ).withDispatcher("akka.pool-dispatcher")
      .props(FibonacciRoutee.props)
    ,"balance-pool-router"
  ) */

  val resizer = DefaultResizer(
    lowerBound = 2, upperBound = 5, pressureThreshold = 1
    ,rampupRate = 1, backoffRate = 0.25
    ,backoffThreshold = 0.25, messagesPerResize = 1
  )
  val router = routingSystem.actorOf(
    RoundRobinPool(nrOfInstances = 2
    ,resizer = Some(resizer)
    ,supervisorStrategy = routerSupervisorStrategy)
      .props(FibonacciRoutee.props)
    ,"roundrobin-pool-router"
  )

  router ! FibonacciNumber(10,5)
  router ! FibonacciNumber(13,2)
  router ! FibonacciNumber(15,3)
  router ! FibonacciNumber(17,1)
  router ! FibonacciNumber(27,1)
  router ! FibonacciNumber(37,1)
  router ! FibonacciNumber(47,1)

  scala.io.StdIn.readLine()

  routingSystem.terminate()

}
import akka.actor._
import akka.routing._
import scala.annotation.tailrec
import scala.concurrent.duration._
import scala.util.Random

object FibonacciRoutee {
  case class FibonacciNumber(nbr: Int, msDelay: Int)  //增加延迟参数
  case class GetAnswer(nbr: Int)

  class RouteeException extends Exception

  def props = Props[FibonacciRoutee]
}
class FibonacciRoutee extends Actor with ActorLogging {
 import FibonacciRoutee._
 import context.dispatcher

  override def receive: Receive = {
    case FibonacciNumber(nbr,ms) =>
      context.system.scheduler.scheduleOnce(ms second,self,GetAnswer(nbr))
    case GetAnswer(nbr) =>
      if (Random.nextBoolean())
        throw new RouteeException
      else {
        val answer = fibonacci(nbr)
        log.info(s"${self.path.name}'s answer: Fibonacci($nbr)=$answer")
      }
  }
  private def fibonacci(n: Int): Int = {
    @tailrec
    def fib(n: Int, b: Int, a: Int): Int = n match {
      case 0 => a
      case _ =>
        fib(n - 1, a + b, b)
    }
    fib(n, 1, 0)
  }

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    log.info(s"Restarting ${self.path.name} on ${reason.getMessage}")
    message foreach {m => self ! m}
    super.preRestart(reason, message)
  }

  override def postRestart(reason: Throwable): Unit = {
    log.info(s"Restarted ${self.path.name} on ${reason.getMessage}")
    super.postRestart(reason)
  }

  override def postStop(): Unit = {
    log.info(s"Stopped ${self.path.name}!")
    super.postStop()
  }

}
object RouterDemo extends App {
  import FibonacciRoutee._
  import scala.concurrent.ExecutionContext.Implicits.global
  val routingSystem = ActorSystem("routingSystem")
  /* cannot set SupervisorStrategy in config file
  val router = routingSystem.actorOf(
    FromConfig.props(FibonacciRoutee.props)
    ,"balance-pool-router")
    */
  val routingDecider: PartialFunction[Throwable,SupervisorStrategy.Directive] = {
    case _: RouteeException => SupervisorStrategy.Restart
  }
  val routerSupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds)(
    routingDecider.orElse(SupervisorStrategy.defaultDecider)
  )
  /* does not support resizing routees
  val router = routingSystem.actorOf(
    BalancingPool(nrOfInstances = 3
      ,supervisorStrategy=routerSupervisorStrategy    //set SupervisorStrategy here
      ).withDispatcher("akka.pool-dispatcher")
      .props(FibonacciRoutee.props)
    ,"balance-pool-router"
  ) */

  val resizer = DefaultResizer(
    lowerBound = 2, upperBound = 5, pressureThreshold = 1
    ,rampupRate = 1, backoffRate = 0.25
    ,backoffThreshold = 0.25, messagesPerResize = 1
  )
  val router = routingSystem.actorOf(
    RoundRobinPool(nrOfInstances = 2
    ,resizer = Some(resizer)
    ,supervisorStrategy = routerSupervisorStrategy)
      .props(FibonacciRoutee.props)
    ,"roundrobin-pool-router"
  )

  router ! FibonacciNumber(10,5)
  router ! FibonacciNumber(13,2)
  router ! FibonacciNumber(15,3)
  router ! FibonacciNumber(17,1)
  router ! FibonacciNumber(27,1)
  router ! FibonacciNumber(37,1)
  router ! FibonacciNumber(47,1)

  scala.io.StdIn.readLine()

  routingSystem.terminate()

}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

相关文章