关注akka有一段时间了,一直想在项目中用上,可惜没有合适的使用场景。
最近在做hbase的数据迁移,背景就不说了,反正很操蛋。最后的方案是dump全部的数据到磁盘上然后读取后写到新集群。
为了能尽量将rowkey分散开来,除了将region(startkey,endkey)的list打乱顺序外,还需要同时打开较多文件,每条记录随机写到某个文件中。这里终于用上了akka,避免了写的使用synchronized这样的方式。
Main函数,创建500个LogActor,每个LogActor写一个文件。采用RoundRobin的方式轮询写记录。Worker是具体读hbase的类,这里不贴代码。numOfReader为24,因为服务器的cpu core是24,设置大于24的值也最多24个并行。当然可以设置默认的最大值,但既然推荐最大值是cpu核心数那还是就按照推荐的来吧。
总结来说就是24个worker读取hbase的数据然后发送到某个LogActor写入到文件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
object Reader extends App { val system = ActorSystem("test") def apply(msg: String) = router.route(msg, null) val numOfLogs = 500 var router = Router( RoundRobinRoutingLogic(), (1 to numOfLogs).map { i => ActorRefRoutee(system.actorOf(Props(new LogActor(i)))) } ) val numOfReader = 24 (1 to numOfReader).par.foreach { id => val worker = new Worker(id) var workOpt = Works.get while(workOpt.isDefined) { worker.doWork(workOpt.get) workOpt = Works.get } } system.terminate() } |
写记录只需调用
LogActor代码:
1 2 3 4 5 6 7 8 9 10 11 12 |
class LogActor(id: Int) extends Actor { val stream = new FileOutputStream(s"/home/***/data/w$id", true) override def receive: Receive = { case msg: String => stream.write((msg + "\n").getBytes) stream.flush() } @throws[Exception](classOf[Exception]) override def postStop(): Unit = stream.close() } |
第一个akka的实际应用,在此记录。
(转载本站文章请注明作者和出处 程序员的自我修养 – SelfUp.cn ,请勿用于任何商业用途)