程序员的自我修养
Home » 分类目录 » Apache Spark

spark streaming一些坑爹事

4条评论1,591次浏览

kafka无限重连

现象

spark任务/kafka集群会隔一段时间出错,日志基本都会显示too many open files。查看socket连接会发现会有几千到几十万的连接。

仔细的查看spark的运行日志,会发现有如下错误:

我们的kafka中每个topic都是12个分区,对于连接1个topic的spark streaming任务,每次刷出上面的日志后,通过命令netstat -antup |grep 9092或者ss -atp |grep 9092会发现对应的spark任务的连接数又多12个。

排查

基本可以确定是连接没有正确关闭导致的,gitlab上看了下最近的改动:由于每个batch的处理时间接近batch的间隔时间,所以把处理逻辑全部放到了线程池中去处理。大概的代码如下:

回滚了这个改动后,日志不再刷出上面的错误日志,查看连接数也一直是固定的12,正好对应12个kafka partition。于是将代码改成:

然后一切都正常了。

从结果来看,真正去kafka读取数据是发生在读取partition变量内的数据时。若直接将partition变量放入线程池,则导致无法使用已有的连接,然后就会重新建立连接,而且在读取完数据后也没有正确的关闭连接,从而导致了上面的错误。

stopGracefullyOnShutdown

为了能优雅的关闭spark streaming,可以SparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")

然而坑爹的事情是,需要等到下一个batch才能优雅的关闭,不论当前的batch是否已经处理完。比如batch间隔是10分钟,每个batch处理时间是3分钟。若在10:00调用关闭任务的命令,spark不会在10:03执行完当前batch后关闭,而是直到下一个batch执行完才关闭,也就是10:13。对于batch间隔小于30s的倒是也能接受这样的设定,但对于10min这样的间隔,简直是无法忍受。

此外,还可以加上:

把stream当成集合

这里将永远也进入不了partition.foreach,因为partition.size会遍历一遍partition,而遍历一遍的stream就代表消费完毕了。╮(╯_╰)╭

分类:Apache Spark
标签:,

KafkaUtils.createDirectStream

9条评论16,490次浏览

官网上对这个新接口的介绍很多,大致就是不与zookeeper交互,直接去kafka中读取数据,自己维护offset,于是速度比KafkaUtils.createStream要快上很多。但有利就有弊:无法进行offset的监控。

项目中需要尝试使用这个接口,同时还要进行offset的监控,于是只能按照官网所说的,自己将offset写入zookeeper。

方法1

这个方法只有3个参数,使用起来最为方便,但是每次启动的时候默认从Latest offset开始读取,或者设置参数auto.offset.reset="smallest"后将会从Earliest offset开始读取。

显然这2种读取位置都不适合生产环境。
(更多…)

标签:, ,

Spark集群问题小结

1条评论2,626次浏览

JAVA版本问题

master进程以jdk7运行,worker以jdk6运行。

产生原因

集群中的机器曾由jdk6升级至jdk7,升级后未卸载jdk6。由未知原因(初步推测ssh配置的问题),导致ssh host "java -version"命令结果显示为jdk6,但ssh至host后再运行java -version命令则显示jdk7。而spark运行worker是通过ssh来启动的,所以导致worker全部以jdk6启动。

解决方法

在spark执行用户下的.bashrc中加入$JAVA_HOME, $PATH等环境变量,然后重启集群。

pid文件问题

运行$SPARK_HOME/sbin/下的各种stop开头的sh文件都会显示类似下方错误:

(更多…)

分类:Apache Spark
标签:

战5渣系列——Spark Streaming启动问题

0条评论4,816次浏览

测试集群上Flume监控本地文件夹+Spark Streaming跑的没问题,但放到生产环境上来测试却一直报错,启动命令如下:

报错信息如下:

(更多…)

分类:Apache Spark, 战5渣
标签:,

Flume+Spark Steaming初探

2条评论10,255次浏览

公司业务准备上流数据处理了。由于之前基础平台选用了CDH,而CDH自带Spark,且由于数据源是每隔几分钟发一组数据文件的形式来传送数据,所以最终选取用Spark Steaming来做流数据处理。

下面记录初步使用Spark Steaming和Flume的一些过程。

第一个测试:Flume(spooldir to hdfs)

原始数据通过ftp每隔几分钟拉取一批数据到本地某文件夹。于是测试了下flume监控文件夹并将新加入的文件写入hdfs的功能。

配置文件如下:

根据官方文档hdfs.fileType默认是SequenceFile,这里选用DataStream将不压缩输出文件。

若不设置hdfs.useLocalTimeStamptrue则会报下面的错误,暂时不知为何。
(更多…)

标签:,

SVD与PCA

4条评论15,221次浏览

奇异值分解

奇异值分解,singular value decomposition(SVD)是线性代数中一种重要的矩阵分解。

记得大学时学习线性代数中的特征值特征向量时,我就一直思考这个玩意算出来到底有啥用,难不成就是一群热(xian)爱(de)专(dan)研(teng)的人弄出来的数学小把戏?然后随着时间的推移,这些纯理论的东西就基本忘光了。大学的知识往往都这样的,和实际不接轨,学的时候不知道有啥用,等用的时候就忘的差不多了。

现在,在我学习线性代数后的第8年,我终于知道特征值这个玩意有啥用了。首先,先回忆下什么是特征值和特征向量吧。

特征值

对于一个方阵,其特征值和特征向量满足:

\(A\nu=\lambda\nu\)

求出所有的特征值和特征向量后,就得出了方阵A的特征值分解:

\(A=Q\Sigma Q^{-1}\)

其中 \(Q\) 是特征向量按照与特征值的对应顺序组合而成 \((\nu_1,\nu_2,..)\)\(\Sigma\) 是由特征值组成的一个对角矩阵。那么对于方阵A的特征值分解的意义又何在呢?先看下面这个例子,对于矩阵A(为了简单起见,设为对角矩阵):

\(A=\left(\begin{array}{ccc}100 & 0 & 0 \\0 & 10 & 0 \\0 & 0 & 1 \\\end{array}\right)\)

阅读全文>>

Spark 1.1.0 Basic Statistics(下)

2条评论5,963次浏览

Hypothesis testing

Hypothesis testing,假设检验。Spark目前支持皮尔森卡方检测(Pearson’s chi-squared tests),包括适配度检定和独立性检定。

皮尔森卡方检测

皮尔森卡方检测是最著名的卡方检测方法之一,一般提到卡方检测时若无特殊说明则代表使用的是皮尔森卡方检测。皮尔森卡方检测可以用来进行适配度检测独立性检测

适配度检测

适配度检测,Goodness of Fit test,验证一组观察值的次数分配是否异于理论上的分配。\(H_0\) 假设(虚无假设,null hypothesis)为一个样本中已发生事件的次数分配会服从某个特定的理论分配。通常情况下这个特定的理论分配指的是均匀分配,目前Spark默认的是均匀分配。

独立性检测

独立性检测,independence test,验证从两个变量抽出的配对观察值组是否互相独立。其虚无假设是:两个变量呈统计独立性。

检测三个步骤

  1. 计算卡方检定的统计值“ \(\chi^2\) ”:把每一个观察值和理论值的差做平方后、除以理论值、再加总
  2. 计算 \(\chi^2\) 统计值的自由度“df”
  3. 依据研究者设定的置信水平,查出自由度为df的卡方分配临界值,比较它与第1步骤得出的 \(\chi^2\) 统计值,推论能否拒绝虚无假设

适配度检测示例

场景

将五角星的5个角分别标记为1,2,3,4,5。现在旋转若干次五角星,记录每个角指向自己的次数。

第一个的结果为(1,7,2,3,18),第二个五角星的结果为(7,8,6,7,9)。现做出虚无假设:五角星的每个角指向自己的概率是相同的。

阅读全文>>

Spark 1.1.0 Basic Statistics(上)

1条评论3,703次浏览

Spark 1.1.0于2014年9月11日发布,此次的版本将mllib完善了不少,如添加了Basic Statistics、添加了决策树的Java实现等等。现对1.1.0的新功能进行一次初步探索。

Summary statistics

Summary statistics主要提供基于列的统计信息,包括6个统计量:均值、方差、非零统计量个数、总数、最小值、最大值。

测试数据

测试代码

阅读全文>>

分类:Apache Spark
标签:,

Spark MLlib之协同过滤

29条评论24,609次浏览

什么是协同过滤

协同过滤(Collaborative Filtering, 简称CF),wiki上的定义是:简单来说是利用某兴趣相投、拥有共同经验之群体的喜好来推荐使用者感兴趣的资讯,个人透过合作的机制给予资讯相当程度的回应(如评分)并记录下来以达到过滤的目的进而帮助别人筛选资讯,回应不一定局限于特别感兴趣的,特别不感兴趣资讯的纪录也相当重要。

以上定义太拗口,举个简单的例子:我现在多年不看日本anime的新番了,最近突然又想看几部新番,但又不知道这么多新番中看哪些比较好,于是我就找几个同样喜欢日本动漫的朋友来咨询。我第一个想咨询的朋友是和我口味最像的,我们都特别喜欢看《虫师》、《黑之契约者》、《寒蝉》这样的小众动画;我问的第二个朋友和我口味差不多,他特别喜欢看《钢炼》《无头骑士异闻录》这样的动画,我虽然喜欢,但不像他那么喜欢;由于身边喜欢日本动画的朋友不多,剩下第三个可以咨询的是一个宅女,平常经常看腐、宅、基的动漫,完全跟我不是一路人,于是问了下她推荐的片子,并将这些片子打上的黑名单的标签。然后我就开始看第一个朋友推荐的片子了,要是时间特别多又很无聊我可能会看第二个朋友推荐的,但打死我也不会看第三个朋友推荐的。这就是协同过滤的一个简化、小众版。

如何进行相似度度量

接着上面的例子,我可以通过我和其它朋友共同喜欢某个或某类动漫来确定我们的口味是否一样,那么如何以数学或者机器的形式来表示这个“口味一样”呢?通常,是通过“距离”来表示,例如:欧几里得距离、皮尔逊相关度、曼哈顿距离、Jaccard系数等等。

欧几里得距离

欧几里德距离(Euclidean Distance),最初用于计算欧几里得空间中两个点的距离,在二维空间中,就是我们熟悉的两点间的距离,x、y表示两点,维度为n:

\(d(x,y)=\sqrt {(\sum_i^n (x_i-y_i)^2)}\)

相似度:

\(sim(x,y)={1\over {1+d(x,y)}}\)

阅读全文>>

Spark MLlib之决策树(上)

8条评论16,970次浏览

决策树

决策树是常用的分类算法之一,其对于探索式的知识发现往往有较好的表现。决策树原理十分简单,可处理大维度的数据,不用预先对模型的特征有所了解,这些特性使得决策树被广泛使用。决策树采用贪心算法,其建立过程同样需要训练数据。决策树算法有ID3、在ID3基础上发展起来的C4.5,以及C4.5的商业化版本C5.0,C5.0核心与C4.5相同,只是在执行效率和内存使用方面有所改进。

决策树的核心问题是决策树分支准则的确定,以及分裂点的确定。为了直观起见,推荐大家玩一个游戏:通过20个问题来猜出你心中所想的那个人

初次接触这个游戏的你是否觉得十分神奇,在20个不到的问题里真的就能猜出你心中所想的那个人,不论是你的女朋友、父母或者动漫人物、歌手、演员甚至是政界人物。其实仔细想想,一个20层的二叉树最后的叶子节点有多少个?1024*1024个,而我们能想到的人绝对是超不出这个数量的。这个网站的具体算法就是采用的类似决策树的算法,通过一个个问题来减少候选的数据,直至找出你所想的那个人。

多玩几次你就会发现,一般第一个或前几个问题就会问你:你描述的对象是男(女)性吗?这意味着什么,意味着第一个问题就能将候选数据减少一半左右。因为你想的那个人,除了男人就是女人了。这就是前面所说的决策树分支准则的确定。若将这个问题放在最后几个问题中,毫无疑问是个吃力不讨好的事情。那么如何才能将这些众多属性(如:性别、高矮、胖瘦、头发长短、是否是歌手、是否有money等)按照其重要程度来排个顺序,这就是ID3和C4.5算法所做的事情了。

预备知识

信息熵是信息论中的基本概念。信息论是C.E.Shannon于1948年提出并由此发展起来的,主要用于解决信息传递过程中的问题,也称为统计通信理论。信息论认为:信息是用来消除随机不确定性的,信息量的大小可由所消除的不确定大小来计量。详细了解

信息量的数学定义为:

\(I(u_i)=-log_2P(u_i)\)

其中 \(P(u_i)\) 为信息 \(u_i\) 发生的概率。信息熵是信息量的数学期望,是信源发出信息前的平均不确定性,也成为先验熵,信息熵的数学定义为:

\(Ent(U)=-\sum_iP(u_i)log_2P(u_i)\)

当已知信号 \(U\) 的概率分布 \(P(U)\) 且收到信号 \(V=v_i\) 后,发出信号的概率分布变为 \(P(U|v_j)\) ,于是信源的平均不确定性变为(也称为条件熵):

\(Ent(U|v_i)=-\sum_iP(u_i|v_i)log_2P(u_i|v_i)\)

一般来说, \(Ent(U|v_i) < Ent(U)\) ,于是定义信息增益为:

\(Gains(U,V)=Ent(U)-Ent(U|V)\)

阅读全文>>

212
profile
  • 文章总数:79篇
  • 评论总数:254条
  • 分类总数:31个
  • 标签总数:44个
  • 运行时间:1193天

大家好,欢迎来到selfup.cn。

这不是一个只谈技术的博客,这里记录我成长的点点滴滴,coding、riding and everthing!

最新评论
  • Anonymous: :arrow: :neutral: :cry:
  • Anonymous: java.io.NotSerializableExcepti on: DStream checkpointing has been enabled but the DStreams with their...
  • wick: HI,请问一下,U,S,V得到后,怎么得到近似矩阵呢(用sp ark java),谢谢。
  • Michael Whitaker: Thank you for this blog, it was very helpful in troubleshooting my own issues. It seems that no...
  • Anonymous: :mad:
  • Anonymous: :???:
  • Anonymous: :mad: :mad: :mad:
  • 洋流: 哥们,我问个问题,你把testOnborrow去掉了。。如果 得到的jedis资源是个不可用的,服务从来都不出问题么?
  • 洋流: 哥们,我问个问题,你把testOnborrow去掉了。。如果 得到的jedis资源是个不可用的,服务从来都不出问题么?
  • Anonymous: :razz: :evil: :grin:
  • 张瑞昌: 有很多,比较常见的是Jacob迭代法,一次迭代O(n^3), 迭代次数不清楚。 如果是手动算的话按照定义求就可以了
  • Anonymous: :mrgreen:
  • lc277: 你好 我想问下一般删除节点要多久,要删除的datanode大概用了 1t,解除授权已经30多小时还没完成,请问是出现什么问题了吗 麻烦告诉下谢谢 qq1844554123
  • Anonymous: 你好 我想问下一般删除节点要多久,要删除的datanode大概用了 1t,解除授权已经30多小时还没完成,请问是出现什么问题了吗
  • Anonymous: :smile: :grin: :eek:
  • 李雪璇: 想要完整代码,可以帮忙发给我吗
  • Anonymous: 请问一下,那个 user的推荐结果楼主查看了么? 为什么输入数据 最高是五分,输出结果都是7分8分啥的?怎么设置输出的分数的最 大值?
  • Anonymous: 那个 user的推荐结果楼主查看了么? 为什么输入数据 最高是五分,输出结果都是7分8分啥的?
  • Anonymous: stopGracefullyOnShutdown在yarn- client模式下我测试的无效,你的呢
  • Anonymous: 另外,import的lib包能否发个列表.