程序员的自我修养
Home » Apache Pig » 关于Apache Pig

关于Apache Pig

0条评论1,979次浏览

网上关于介绍pig、安装Pig以及pig原理的文章够多了。以下记录一些入门时遇到的一些问题,算是一些实战经验吧。

关于脏数据过滤

Pig除了用FILTER进行按条件过滤数据以外,还有一种脏数据无法处理。如,我的数据共29个字段,以“,”分隔。现在有一部分的数据,缺失了部分字段,导致以“,”分隔后,数组长度小于29。因为不知道到底缺失的是哪个字段,这样的数据已经毫无意义。在Pig中如何过滤掉呢?翻遍了Pig的Document,去各种社区提问也没人能回答。最后尽然在FAQ中看到了答案,可以用ARTIY(*)来进行过滤。如:

关于PigStorage

PigStorage可以自定义分隔符,如PigStorage(','),PigStorage('|')。但上次遇到一个奇葩的csv文件,里面的所有数据全部加上的引号,导致数据大小直接变大了1倍不说,而且以','分隔后的数据全部是"1352288xxxx"这样带着引号无法转换为chararray以外类型的数据。数据样例:

"1","2","3","4"

后来想尝试使用'\",\"'来进行分隔,然后对第一个和最后一个字段进行SUBSTRING处理,结果却报错了。具体错误没保存下来,意思好像是PigStorage只能以char为分隔符。

无奈,只能用shell命令替换掉所有的引号再进行下一步处理。

遇到这样的文件,除了祈祷文件不要太大以外,最好的处理方式是给数据发送方上一课。you see see your po data!!!

关于DEFINE (macros)

官方文档上的restrictions:

  • Macros are not allowed inside a FOREACH nested block.
  • Macros cannot contain Grunt shell commands.
  • Macros cannot include a user-defined schema that has a name collision with an alias in the macro.

后来遇到这样的一个场景,数据一共207个字段,每个字段都是double类型。对于每个字段,都要按照3个公式求出三个衍伸值,如平均值。

运行正常,结果正确。貌似很easy。

但对于207个字段来说,全部粘贴复制下来也真够恶心的。这还只是一个平均值,还有2个衍伸值的算法跟复杂,公式更长,肯定不能这么干。

所以定义macros:

程序:

看上去很美,好像解决了问题,结果运行时候报错了,具体错误没保存下来。

为何会报错,仔细阅读官方关于macros给出的示例,发现macros中的in和out,都是以“行”的形式存在,没有以“列”或“组”形式存在的示例。结合上面遇到的错误,我想,pig的macros,只能处理行形式的数据,离真正的“宏”还有点距离。

最后解决方法:由于太懒,不想207×3次的粘贴复制+修改众多参数,于是编写了一个MapReduce搞定了。

关于UDF构造函数的调用次数

通常来说UDF构造函数只会调用一次,其exec方法会执行多次。可以在构造函数中处理比较耗时但只需处理一次的工作。

关于选出表中某字段重复数大于N的方法

用sql非常简单

用pig如何做到?貌似可以这么写:

理解上是没错,不过会报错。查了不少资料都没有说到这个问题的。

目前我只能这么做:

若有更好的办法请告知,不甚感激。

关于in和not in

PigLatin的in和not in好像只能支持tuple?一般来说遇到需要in的地方可以用各种join搞定。但是not in则比较麻烦,具体可以参考subquery not in pig

简单的说就是利用left join,然后对结果进行一次filter,条件是join的key为null

示例:$ cat data

1|13522880000|11
2|13522880000|22
3|18201600000|33
4|15313080000|44
5|15313080000|55

$ cat fdata

1
2
5

结果:

3|18201600000|33|
4|15313080000|44|

关于store与HBaseStorage

从HBase读取数据很简单,官网上有示例:

可选参数:

  • -loadKey=(true|false) Load the row key as the first value in every tuple returned from HBase (default=false)
  • -gt=minKeyVal Return rows with a rowKey greater than minKeyVal
  • -lt=maxKeyVal Return rows with a rowKey less than maxKeyVal
  • -gte=minKeyVal Return rows with a rowKey greater than or equal to minKeyVal
  • -lte=maxKeyVal Return rows with a rowKey less than or equal to maxKeyVal
  • -limit=numRowsPerRegion Max number of rows to retrieve per region
  • -caching=numRows Number of rows to cache (faster scans, more memory)
  • -delim=delimiter Column delimiter in columns list (default is whitespace)
  • -ignoreWhitespace=(true|false) When delim is set to something other than whitespace, ignore spaces when parsing column list (default=true)
  • -caster=(HBaseBinaryConverter|Utf8StorageConverter) Class name of Caster to use to convert values (default=Utf8StorageConverter). The default caster can be overridden with the pig.hbase.caster config param. Casters must implement LoadStoreCaster.
  • -noWAL=(true|false) During storage, sets the write ahead to false for faster loading into HBase (default=false). To be used with extreme caution since this could result in data loss (see http://hbase.apache.org/book.html#perf.hbase.client.putwal).
  • -minTimestamp=timestamp Return cell values that have a creation timestamp greater or equal to this value
  • -maxTimestamp=timestamp Return cell values that have a creation timestamp less than this value
  • -timestamp=timestamp Return cell values that have a creation timestamp equal to this value

那么如何向HBase存数据?虽然官网没有示例,但简单尝试后,发现也很简单:

$ cat pig/fdata

1|10086|qwe
2|10000|wer
3|10001|ert
4|10086|rty
5|10011|tyu

scan一下hbase:scan 'pigstore'

ROW COLUMN+CELL
1 column=cf:one, timestamp=1402367076331, value=10086
1 column=cf:two, timestamp=1402367076331, value=qwe
2 column=cf:one, timestamp=1402367076332, value=10000
2 column=cf:two, timestamp=1402367076332, value=wer
3 column=cf:one, timestamp=1402367076332, value=10001
3 column=cf:two, timestamp=1402367076332, value=ert
4 column=cf:one, timestamp=1402367076332, value=10086
4 column=cf:two, timestamp=1402367076332, value=rty
5 column=cf:one, timestamp=1402367076332, value=10011
5 column=cf:two, timestamp=1402367076332, value=tyu

可以看到第一列是用来当做key的。

关于传递参数和使用参数

传递参数:-p,每个参数前都需要有-p

使用参数文件:

cat parm.parms

STARTTIME=20140608
ENDTIME=20140708
START=1
END=1024

Pig脚本里面使用参数:

(转载本站文章请注明作者和出处 程序员的自我修养 – SelfUp.cn ,请勿用于任何商业用途)
分类:Apache Pig
标签:
发表评论


profile
  • 文章总数:78篇
  • 评论总数:252条
  • 分类总数:31个
  • 标签总数:43个
  • 运行时间:946天

大家好,欢迎来到selfup.cn。

这不是一个只谈技术的博客,这里记录我成长的点点滴滴,coding、riding and everthing!

最新评论
  • pacificLee: :twisted:
  • 小码: 为什么没有后面的呢,只有前10个
  • Anonymous: :lol:
  • Anonymous: :razz: 楼主是属于会聊天的。 我想问,sqoop发了几个版本了,应该没这些问题了吧。
  • Anonymous: Config.kafkaConfig.kafkaGroupI d 这个是指自己配置的group id 还是从 import org.apache.kafka.common.config .Config 这个类...
  • Anonymous: ZkUtils.getPartitionsForTopics (zkClient, Config.kafkaConfig.topic) 那个方法是在 spark-streaming_2.10 中 kafka...
  • Anonymous: ZkUtils.getPartitionsForTopics (zkClient, Config.kafkaConfig.topic) 你确定 kafka 里面有这个类 ? 个人在kafka 最新 稳定版...
  • Anonymous: :roll:
  • Anonymous: 很不错,试问有java版的吗?
  • Anonymous: 赞
  • Anonymous: 哈哈 看楼主的吐槽乐死了 where子句是可以写的 同样找不到资料 一点点试出来的 select id from xxxx where ${CONDITIONS} and 1=1 and 2=2 limit 4
  • EVIL: 我在运行完C4.5的代码后,显示 defined object DecisionTreeTest 是什么意思?这是有错误吗?运行结果在哪里看?
  • sf: 楼主的问题,我都遇到。。。没办法项目已经定型了,最后都硬着头 皮一个一个的改了源码
  • zz: 我去,楼主你真及时,我们今天上了新的HTTP2 push之后也发现速度曲线很奇怪,开始有200k/min,跟 另一台老的推送协议速度差不多,但是过了一会,立马降到只有几k /min,百思不得其解,我们还用了一个海外代理,在...
  • qi365: :mad: 很可恶,百度助纣为虐~
  • qi365: :? :shock: haha~ very good~
  • 张是大: 《深入浅出Spark机器学习实战(用户行为分析)》 课程网盘下载:http://pan.baidu.com/s/ 1mixvUli 密码:1pfn
  • Anonymous: :???:
  • Anonymous: 我用着sqoop感觉还可以,select 几十个字段也没事,估计是版本低。。
  • Anonymous: :grin: