网上关于介绍pig、安装Pig以及pig原理的文章够多了。以下记录一些入门时遇到的一些问题,算是一些实战经验吧。
关于脏数据过滤
Pig除了用FILTER进行按条件过滤数据以外,还有一种脏数据无法处理。如,我的数据共29个字段,以“,”分隔。现在有一部分的数据,缺失了部分字段,导致以“,”分隔后,数组长度小于29。因为不知道到底缺失的是哪个字段,这样的数据已经毫无意义。在Pig中如何过滤掉呢?翻遍了Pig的Document,去各种社区提问也没人能回答。最后尽然在FAQ中看到了答案,可以用ARTIY(*)来进行过滤。如:
1 2 |
A = LOAD ..; B = FILTER A BY ARTIY(*) != 29; --去除数据长度不足29的记录 |
关于PigStorage
PigStorage可以自定义分隔符,如PigStorage(','),PigStorage('|')。但上次遇到一个奇葩的csv文件,里面的所有数据全部加上的引号,导致数据大小直接变大了1倍不说,而且以','分隔后的数据全部是"1352288xxxx"这样带着引号无法转换为chararray以外类型的数据。数据样例:
"1","2","3","4"
后来想尝试使用'\",\"'来进行分隔,然后对第一个和最后一个字段进行SUBSTRING处理,结果却报错了。具体错误没保存下来,意思好像是PigStorage只能以char为分隔符。
无奈,只能用shell命令替换掉所有的引号再进行下一步处理。
1 |
sed "s/\"//g" data.csv |
遇到这样的文件,除了祈祷文件不要太大以外,最好的处理方式是给数据发送方上一课。you see see your po data!!!
关于DEFINE (macros)
官方文档上的restrictions:
- Macros are not allowed inside a FOREACH nested block.
- Macros cannot contain Grunt shell commands.
- Macros cannot include a user-defined schema that has a name collision with an alias in the macro.
后来遇到这样的一个场景,数据一共207个字段,每个字段都是double类型。对于每个字段,都要按照3个公式求出三个衍伸值,如平均值。
1 2 3 |
A = LOAD ...; B = GROUP A BY $0; R = FOREACH A GENERATE group, SUM($1.$1)/COUNT($1.$1), SUM($1.$2)/COUNT($1.$2) ...; |
运行正常,结果正确。貌似很easy。
但对于207个字段来说,全部粘贴复制下来也真够恶心的。这还只是一个平均值,还有2个衍伸值的算法跟复杂,公式更长,肯定不能这么干。
所以定义macros:
1 2 3 |
DEFINE m_avg(in) RETURNS out { $out = SUM($in)/COUNT($in); } |
程序:
1 2 3 |
A = LOAD ...; B = GROUP A BY $0; R = FOREACH A GENERATE group, m_avg($1.$1), m_avg($1.$2) ...; |
看上去很美,好像解决了问题,结果运行时候报错了,具体错误没保存下来。
为何会报错,仔细阅读官方关于macros给出的示例,发现macros中的in和out,都是以“行”的形式存在,没有以“列”或“组”形式存在的示例。结合上面遇到的错误,我想,pig的macros,只能处理行形式的数据,离真正的“宏”还有点距离。
最后解决方法:由于太懒,不想207×3次的粘贴复制+修改众多参数,于是编写了一个MapReduce搞定了。
关于UDF构造函数的调用次数
通常来说UDF构造函数只会调用一次,其exec方法会执行多次。可以在构造函数中处理比较耗时但只需处理一次的工作。
关于选出表中某字段重复数大于N的方法
用sql非常简单
1 |
select * from (table) where [conditions] group by [columns] having count(*) > N |
用pig如何做到?貌似可以这么写:
1 2 3 |
A = LOAD 'pig/data' USING PigStorage('|'); B = GROUP A BY $0; C = FILTER B BY COUNT(B)>N; |
理解上是没错,不过会报错。查了不少资料都没有说到这个问题的。
目前我只能这么做:
1 2 3 4 |
A = LOAD 'pig/data' USING PigStorage('|'); B = GROUP A BY $0; BB = FOREACH B GENERATE group, COUNT(B); C = FILTER BB BY $1 > N; |
若有更好的办法请告知,不甚感激。
关于in和not in
PigLatin的in和not in好像只能支持tuple?一般来说遇到需要in的地方可以用各种join搞定。但是not in则比较麻烦,具体可以参考subquery not in pig
简单的说就是利用left join,然后对结果进行一次filter,条件是join的key为null
示例:$ cat data
1|13522880000|11
2|13522880000|22
3|18201600000|33
4|15313080000|44
5|15313080000|55
$ cat fdata
1
2
5
1 2 3 4 5 |
A = LOAD 'pig/data' USING PigStorage('|') AS (a,b,c); B = LOAD 'pig/fdata' USING PigStorage('|') AS (d); C = JOIN A BY $0 LEFT OUTER, B BY $0; D = FILTER C BY $3 is null; STORE D INTO 'pig/result' USING PigStorage('|'); |
结果:
3|18201600000|33|
4|15313080000|44|
关于store与HBaseStorage
从HBase读取数据很简单,官网上有示例:
1 2 3 4 |
raw = LOAD 'hbase://SomeTableName' USING org.apache.pig.backend.hadoop.hbase.HBaseStorage( 'info:first_name info:last_name tags:work_* info:*', '-loadKey=true -limit=5') AS (id:bytearray, first_name:chararray, last_name:chararray, tags_map:map[], info_map:map[]); |
可选参数:
- -loadKey=(true|false) Load the row key as the first value in every tuple returned from HBase (default=false)
- -gt=minKeyVal Return rows with a rowKey greater than minKeyVal
- -lt=maxKeyVal Return rows with a rowKey less than maxKeyVal
- -gte=minKeyVal Return rows with a rowKey greater than or equal to minKeyVal
- -lte=maxKeyVal Return rows with a rowKey less than or equal to maxKeyVal
- -limit=numRowsPerRegion Max number of rows to retrieve per region
- -caching=numRows Number of rows to cache (faster scans, more memory)
- -delim=delimiter Column delimiter in columns list (default is whitespace)
- -ignoreWhitespace=(true|false) When delim is set to something other than whitespace, ignore spaces when parsing column list (default=true)
- -caster=(HBaseBinaryConverter|Utf8StorageConverter) Class name of Caster to use to convert values (default=Utf8StorageConverter). The default caster can be overridden with the pig.hbase.caster config param. Casters must implement LoadStoreCaster.
- -noWAL=(true|false) During storage, sets the write ahead to false for faster loading into HBase (default=false). To be used with extreme caution since this could result in data loss (see http://hbase.apache.org/book.html#perf.hbase.client.putwal).
- -minTimestamp=timestamp Return cell values that have a creation timestamp greater or equal to this value
- -maxTimestamp=timestamp Return cell values that have a creation timestamp less than this value
- -timestamp=timestamp Return cell values that have a creation timestamp equal to this value
那么如何向HBase存数据?虽然官网没有示例,但简单尝试后,发现也很简单:
$ cat pig/fdata
1|10086|qwe
2|10000|wer
3|10001|ert
4|10086|rty
5|10011|tyu
1 2 |
A = LOAD 'pig/fdata' USING PigStorage('|') AS (a,b); STORE A INTO 'hbase://pigstore' USING org.apache.pig.backend.hadoop.hbase.HBaseStorage('cf:one,cf:two'); |
scan一下hbase:scan 'pigstore'
ROW COLUMN+CELL
1 column=cf:one, timestamp=1402367076331, value=10086
1 column=cf:two, timestamp=1402367076331, value=qwe
2 column=cf:one, timestamp=1402367076332, value=10000
2 column=cf:two, timestamp=1402367076332, value=wer
3 column=cf:one, timestamp=1402367076332, value=10001
3 column=cf:two, timestamp=1402367076332, value=ert
4 column=cf:one, timestamp=1402367076332, value=10086
4 column=cf:two, timestamp=1402367076332, value=rty
5 column=cf:one, timestamp=1402367076332, value=10011
5 column=cf:two, timestamp=1402367076332, value=tyu
可以看到第一列是用来当做key的。
关于传递参数和使用参数
传递参数:-p,每个参数前都需要有-p
1 |
pig -p GTP=$STARTTIME -p START=$1 -p LTP=$ENDTIME -p END=$2 yk.pig |
使用参数文件:
cat parm.parms
STARTTIME=20140608
ENDTIME=20140708
START=1
END=1024
1 |
pig -m parm.parms yk.pig |
Pig脚本里面使用参数:
1 |
RES = FOREACH FIN GENERATE CONCAT($1,CONCAT('$START','$END')), $0, $2; |