程序员的自我修养

KafkaUtils.createDirectStream

9条评论17,664次浏览

官网上对这个新接口的介绍很多,大致就是不与zookeeper交互,直接去kafka中读取数据,自己维护offset,于是速度比KafkaUtils.createStream要快上很多。但有利就有弊:无法进行offset的监控。

项目中需要尝试使用这个接口,同时还要进行offset的监控,于是只能按照官网所说的,自己将offset写入zookeeper。

方法1

这个方法只有3个参数,使用起来最为方便,但是每次启动的时候默认从Latest offset开始读取,或者设置参数auto.offset.reset="smallest"后将会从Earliest offset开始读取。

显然这2种读取位置都不适合生产环境。
(更多…)

标签:, ,

Spark集群问题小结

1条评论2,834次浏览

JAVA版本问题

master进程以jdk7运行,worker以jdk6运行。

产生原因

集群中的机器曾由jdk6升级至jdk7,升级后未卸载jdk6。由未知原因(初步推测ssh配置的问题),导致ssh host "java -version"命令结果显示为jdk6,但ssh至host后再运行java -version命令则显示jdk7。而spark运行worker是通过ssh来启动的,所以导致worker全部以jdk6启动。

解决方法

在spark执行用户下的.bashrc中加入$JAVA_HOME, $PATH等环境变量,然后重启集群。

pid文件问题

运行$SPARK_HOME/sbin/下的各种stop开头的sh文件都会显示类似下方错误:

(更多…)

分类:Apache Spark
标签:

Hive关于Full Join的疑问

1条评论3,013次浏览

Join与filter的顺序

根据官方文档的说法,join发生在前,filter发生在后。所以如果这样写join sql则会先将整个表join然后在按照where条件过滤:

显然这样背离了初衷。正确的写法应该是:

两种写法虽然感觉是一样的意思,理论上计算出来的结果应该也是一样。但通过实际应用,发现结果居然是不一样的,两者的结果数量可能会相差一个数量级以上,而输出正确的是第二种写法。至于为何第一种写法会出现不一样的结果,等有时间了在研究下。感觉应该是数据的问题,而不是hive有bug。

从日志也可以看到两种语句的map数量不一样,第一种会扫描全表,而第二种只扫描where条件中的特定的partition。

然而以上都不是重点

(更多…)

分类:Apache Hive
标签:

CDH集群调优:内存、Vcores和DRF

1条评论8,206次浏览

吐槽

最近“闲”来无事,通过CM把vcores使用情况调出来看了一眼,发现不论集群中有多少个任务在跑,已分配的VCores始终不会超过120。而集群的可用Vcores是360(15台机器×24虚拟核)。这就相当于CPU资源只用到了1/3,作为一个半强迫症患者绝对不能容忍这样的事情发生。

分析的过程不表,其实很简单就是几个参数的问题。本以为CM能智能的将这些东西配好,现在看来好像不行。以下记录结论。

DRF和相关参数

DRF: Dominant Resource Fairness,根据CPU和内存公平调度资源。CDH动态资源池默认采用的DRF计划策略。简单的理解就是内存不够的时候,多余的CPU就不会分配任务了,就让他空着;CPU不够的时候,多出来的内存也不会再启动任务了。

理解这个计划策略后,再查看Yarn启动任务时资源相关的参数,发现有以下几个参数可能会产生影响:

  • mapreduce.map.memory.mb,map任务内存,cdh默认1G
  • mapreduce.map.cpu.vcores,map任务虚拟CPU核数,cdh默认1
  • mapreduce.reduce.memory.mb,reduce任务内存,cdh默认1G
  • mapreduce.reduce.cpu.vcores,reduce任务虚拟CPU核数,cdh默认1
  • yarn.nodemanager.resource.memory-mb,容器内存,cdh默认8G
  • yarn.nodemanager.resource.cpu-vcores,容器虚拟CPU核数,cdh默认8,但CM会自动检测内核数并修改,我这里被自动改成了24。

可以看到默认配置下,CPU核数和内存是1:1G的比例来启动任务的。
(更多…)

分类:Apache Hadoop, CDH
标签:,

战5渣系列——Spark Streaming启动问题

0条评论5,278次浏览

测试集群上Flume监控本地文件夹+Spark Streaming跑的没问题,但放到生产环境上来测试却一直报错,启动命令如下:

报错信息如下:

(更多…)

分类:Apache Spark, 战5渣
标签:,

Flume+Spark Steaming初探

2条评论10,641次浏览

公司业务准备上流数据处理了。由于之前基础平台选用了CDH,而CDH自带Spark,且由于数据源是每隔几分钟发一组数据文件的形式来传送数据,所以最终选取用Spark Steaming来做流数据处理。

下面记录初步使用Spark Steaming和Flume的一些过程。

第一个测试:Flume(spooldir to hdfs)

原始数据通过ftp每隔几分钟拉取一批数据到本地某文件夹。于是测试了下flume监控文件夹并将新加入的文件写入hdfs的功能。

配置文件如下:

根据官方文档hdfs.fileType默认是SequenceFile,这里选用DataStream将不压缩输出文件。

若不设置hdfs.useLocalTimeStamptrue则会报下面的错误,暂时不知为何。
(更多…)

标签:,

Hive窗口和分析函数(上)

0条评论4,494次浏览

之前记录row_number()的使用方法,最近终于有空将窗口函数这一块完整的看一遍,在此记录。

Analytics functions

RANK()、DENSE_RANK()、ROW_NUMBER()

使用示例:

其中PARTITION by calling_nbr可选,若加上则是窗口内统计,否则则是全局统计。

部分结果如下:

(更多…)

分类:Apache Hive
标签:,

Hive常用操作记录

1条评论3,523次浏览

记录日常工作中Hive相关的常用语句。之前总是东一个文件西一个文件的丢这些语句,导致需要用的时候总是找不到,要去网上重新查。

创建外部表

添加分区

设置NULL值的替代字符

OR
(更多…)

分类:Apache Hive
标签:

Hive Custom UDF

1条评论5,831次浏览

需求

最近有这么一个需求要实现,由于话单中的区号不准确,需要根据区号-号段对应表来刷新话单中的区号信息。业务逻辑类似如下:

就是将号码与区号-号段对应表中的记录来对比,返回该号码所属于的号码段的区号。在oracle中测试了下,不加limit 1是好使的。但是放到Hive中却一直报错,不认这样的语法。尝试其它方法无果,最后决定用UDF实现。

定制UDF

最大的问题

如何在UDF中读入区号-号段对应表的数据?

Hive的UDF类没有类似setup()这样的在开头调用一次的方法,只是单纯写evaluate()方法来实现逻辑,显然不能在evaluate()方法中读入区号-号段对应表的数据,否则每一次调用该方法都会重新读一次数据,开销太大,方法也太挫。GenericUDF当中有initialize()方法,但太过复杂,没法应付这次紧急需求。
(更多…)

分类:Apache Hive
标签:

CDH集群常用管理操作

3条评论5,171次浏览

调整replication

随着业务数据的不断增加,hdfs可用空间逐渐减少。经过再三考虑决定将集群的备份数量由2变为1,也就是不在备份,每个数据块只有唯一的一份存档。

当初备份数由3调成2时只是简单的将dfs.replication由3变成了2,然后分发配置重启集群。记得当时通过hdfs dfsadmin -report查看hdfs占用大小发现大小一直在减少。心想着hdfs还挺智能的,知道备份数减少了,自己去删除多余的副本了。

结果这次将dfs.replication由2变成了1后发现hdfs空间一直保持不变。难道上次的记忆出错了,hdfs其实并不会自己去调整备份数?

后来查了相关资料,证实hdfs确实不会根据dfs.replication参数来自动调整备份数。这个参数是给client端用的,当有新文件写入时,会根据该参数确定复制几份,但集群已有的文件备份数不受该参数的影响。

于是使用命令hadoop fs -setrep -R 1 /将集群全部文件的备份数变为1。再次查看集群空间大小,发现下降了一半。成功。
(更多…)

分类:CDH
标签:
profile
  • 文章总数:81篇
  • 评论总数:247条
  • 分类总数:32个
  • 标签总数:45个
  • 运行时间:1250天

大家好,欢迎来到selfup.cn。

这不是一个只谈技术的博客,这里记录我成长的点点滴滴,coding、riding and everthing!

最新评论
  • Anonymous: :?: :razz: :sad:
  • Anonymous: 牛
  • Anonymous: 楼主你好,我偶尔也会 遇到Reconnect due to socket error: java.nio.channels.ClosedCha...
  • Anonymous: sdfs
  • Anonymous: :arrow: :neutral: :cry:
  • Anonymous: java.io.NotSerializableExcepti on: DStream checkpointing has been enabled but the DStreams with their...
  • wick: HI,请问一下,U,S,V得到 ,怎么得到近似矩阵 (用spark java),谢谢。
  • Michael Whitaker: Thank you for this blog, it was very helpful in troubleshooting my own issues. It seems that no...
  • Anonymous: :mad:
  • Anonymous: :???:
  • Anonymous: :mad: :mad: :mad:
  • 洋流: 哥们,我问个问题,你 把testOnborrow去掉了。。 如果得到的jedis资源...
  • 洋流: 哥们,我问个问题,你 把testOnborrow去掉了。。 如果得到的jedis资源...
  • Anonymous: :razz: :evil: :grin:
  • 张瑞昌: 有很多,比较常见的是 Jacob迭代法,一次迭代O (n^3),迭代次数不清楚 ...
  • Anonymous: :mrgreen:
  • lc277: 你好 我想问下一般删除节点 要多久,要删除的datano de大概用了1t,解除...
  • Anonymous: 你好 我想问下一般删除节点 要多久,要删除的datano de大概用了1t,解除...
  • Anonymous: :smile: :grin: :eek:
  • 李雪璇: 想要完整代码,可以帮 忙发给我吗