Akka Actor模拟实现YARN
代码实现:
pom.xml
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.7</maven.compiler.source> <maven.compiler.target>1.7</maven.compiler.target> <scala.version>2.11.8</scala.version> <scala.actors.version>2.11.8</scala.actors.version> <akka.version>2.4.17</akka.version> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-actor_2.11</artifactId> <version>${akka.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-actors --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-actors</artifactId> <version>${scala.actors.version}</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-remote_2.11</artifactId> <version>${akka.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.5</version> </dependency> </dependencies>
用于存放,相应的ActorSystem,Actor的name
//用于存放,相应的ActorSystem,Actor的name
object Constant { val RMRS="MyResourceManagerActorSystem" val RMA="MyResourceManagerActor" val NMAS="MyNodeManagerActorSystem" val NMA="MyNodeManagerActor"}
用于存放相应的receiver中的模式匹配
//注册消息 nodemanager -> resourcemanagercase class RegisterNodeManager(val nodemanagerid:String,val memory:Int,val cpu:Int)//注册完成消息 resourcemanager -》 nodemanagercase class RegisteredNodeManager(val resourcemanagerhostname:String)//用于匹配心跳case class Heartbeat(val nodemanagerid:String)//用于存放NodeManagerInfo的相关信息class NodeManagerInfo(val nodemanagerid:String,val memory:Int,val cpu:Int){ var lastHeartBeatTime:Long=_}//用于匹配超时case object CheckTimeOut//用于匹配心跳case object SendMessage
MyResourceManager
class MyResourceManager(var hostname:String,var port:Int) extends Actor{ // 用来存储每个注册的NodeManager节点的信息 private var id2nodemanagerinfo=new mutable.HashMap[String,NodeManagerInfo]() // 对所有注册的NodeManager进行去重,其实就是一个HashSet private var nodemanagerInfoes = new mutable.HashSet[NodeManagerInfo]() // 调度一个任务, 每隔五秒钟执行一次,用于检查是否超时 override def preStart(): Unit = { import scala.concurrent.duration._ import context.dispatcher //也就是发送给receive的相关匹配 context.system.scheduler.schedule(0 millis,5000 millis,self,CheckTimeOut) } override def receive: Receive = { //用于处理从NodeManager中传来的注册信息 case RegisterNodeManager(nodemanagerid, memory, cpu)=>{ val nodeManagerInfo = new NodeManagerInfo(nodemanagerid, memory, cpu) // 对注册的NodeManager节点进行存储管理 id2nodemanagerinfo.put(nodemanagerid,nodeManagerInfo) nodemanagerInfoes+=nodeManagerInfo //注册完成,将信息,返回到NodeManager sender() ! RegisteredNodeManager(hostname+":"+port) } //注册心跳 case Heartbeat(nodemanagerid) =>{ val currentTime = System.currentTimeMillis() //获取旧的nodeManagerInfo val nodeManagerInfo = id2nodemanagerinfo(nodemanagerid) nodeManagerInfo.lastHeartBeatTime=currentTime //完成心跳,更新时间 id2nodemanagerinfo(nodemanagerid)=nodeManagerInfo nodemanagerInfoes += nodeManagerInfo } // 检查过期失效的 NodeManager case CheckTimeOut=>{ val currentTime = System.currentTimeMillis() // 15 秒钟失效 并过滤 nodemanagerInfoes.filter(nm=>currentTime-nm.lastHeartBeatTime>15000) .foreach(deadnm=>{ nodemanagerInfoes -= deadnm id2nodemanagerinfo.remove(deadnm.nodemanagerid) }) println("当前注册成功的节点数"+nodemanagerInfoes.size); } }}//注册,创建相应Actorobject MyResourceManager{ def main(args: Array[String]): Unit = { //localhost 9999 val RESOURCEMANAGER_HOSTNAME=args(0) //主机 val RESOURCEMANAGER_PORT=args(1).toInt //端口 val strConfig= s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname =${RESOURCEMANAGER_HOSTNAME} |akka.remote.netty.tcp.port=${RESOURCEMANAGER_PORT} """.stripMargin val conf=ConfigFactory.parseString(strConfig) val rmrs: ActorSystem = ActorSystem(Constant.RMRS,conf) rmrs.actorOf( Props(new MyResourceManager(RESOURCEMANAGER_HOSTNAME,RESOURCEMANAGER_PORT)),Constant.RMA) }}
MyNodeManager
class MyNodeManager(val resourcemanagerhostname:String, val resourcemanagerport:Int, val memory:Int, val cpu:Int) extends Actor{ var nodemanagerid:String=_ var rmRef:ActorSelection=_ //在启动NodeManager时,向ResourceManager发送注册信息 override def preStart(): Unit = { // 远程path akka.tcp://(ActorSystem的名称)@(远程地址的IP):(远程地址的端口)/user/(Actor的名称) rmRef=context.actorSelection(s"akka.tcp://${Constant.RMRS}@${resourcemanagerhostname}:${resourcemanagerport}/user/${Constant.RMA}") //生产 唯一的nodemanagerid nodemanagerid = UUID.randomUUID().toString rmRef ! RegisterNodeManager(nodemanagerid,memory,cpu) } override def receive: Receive = { //resourceManager,接受到NodeManager的注册信息后,成功返回给NodeManager信息 case RegisteredNodeManager(masterURL)=>{ println(masterURL); //发送心跳 /** * initialDelay: FiniteDuration, 多久以后开始执行 * interval: FiniteDuration, 每隔多长时间执行一次 * receiver: ActorRef, 给谁发送这个消息 * message: Any 发送的消息是啥 */ import scala.concurrent.duration._ import context.dispatcher context.system.scheduler.schedule(0 millis,4000 millis,self,SendMessage) } case SendMessage=>{ //向主节点发送心跳信息 rmRef ! Heartbeat(nodemanagerid) println(Thread.currentThread().getId) } }}object MyNodeManager{ def main(args: Array[String]): Unit = { //localhost localhost 9999 1000 5 8888 val HOSTNAME=args(0) val RM_HOSTNAME =args(1) val RM_PORT=args(2).toInt val NODEMANAGER_MEMORY = args(3).toInt val NODEMANAGER_CORE=args(4).toInt var NODEMANAGER_PORT=args(5).toInt val strConfig= s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname =${HOSTNAME} |akka.remote.netty.tcp.port=${NODEMANAGER_PORT} """.stripMargin val conf=ConfigFactory.parseString(strConfig) val nmas: ActorSystem = ActorSystem(Constant.NMAS,conf) nmas.actorOf(Props(new MyNodeManager(RM_HOSTNAME,RM_PORT,NODEMANAGER_MEMORY,NODEMANAGER_CORE)),Constant.NMA) }}
最后启动运行
声明:本站所有文章资源内容,如无特殊说明或标注,均为采集网络资源。如若本站内容侵犯了原著者的合法权益,可联系本站删除。