paulwong

#

一个PIG脚本例子分析

执行脚本:
PIGGYBANK_PATH=$PIG_HOME/contrib/piggybank/java/piggybank.jar
INPUT=pig/input/test-pig-full.txt
OUTPUT=pig/output/test-pig-output-$(date  +%Y%m%d%H%M%S)
PIGSCRIPT=analyst_status_logs.pig

#analyst_500_404_month.pig
#
analyst_500_404_day.pig
#
analyst_404_percentage.pig
#
analyst_500_percentage.pig
#
analyst_unique_path.pig
#
analyst_user_logs.pig
#
analyst_status_logs.pig


pig -p PIGGYBANK_PATH=$PIGGYBANK_PATH -p INPUT=$INPUT -p OUTPUT=$OUTPUT $PIGSCRIPT


要分析的数据源,LOG 文件
46.20.45.18 - - [25/Dec/2012:23:00:25 +0100] "GET / HTTP/1.0" 302 - "-" "Pingdom.com_bot_version_1.4_(http://www.pingdom.com/)" "-" "-" 46.20.45.18 "" 11011AEC9542DB0983093A100E8733F8 0
46.20.45.18 - - [25/Dec/2012:23:00:25 +0100] "GET /sign-in.jspx HTTP/1.0" 200 3926 "-" "Pingdom.com_bot_version_1.4_(http://www.pingdom.com/)" "-" "-" 46.20.45.18 "" 11011AEC9542DB0983093A100E8733F8 0
69.59.28.19 - - [25/Dec/2012:23:01:25 +0100] "GET / HTTP/1.0" 302 - "-" "Pingdom.com_bot_version_1.4_(http://www.pingdom.com/)" "-" "-" 69.59.28.19 "" 36D80DE7FE52A2D89A8F53A012307B0A 15


PIG脚本:
--注册JAR包,因为要用到DateExtractor
register '$PIGGYBANK_PATH';

--声明一个短函数名
DEFINE DATE_EXTRACT_MM 
org.apache.pig.piggybank.evaluation.util.apachelogparser.DateExtractor('yyyy-MM');

DEFINE DATE_EXTRACT_DD 
org.apache.pig.piggybank.evaluation.util.apachelogparser.DateExtractor('yyyy-MM-dd');

-- pig/input/test-pig-full.txt
--把数据从变量所指的文件加载到PIG中,并定义数据列名,此时的数据集为数组(a,b,c)
raw_logs = load '$INPUT' USING org.apache.pig.piggybank.storage.MyRegExLoader('^(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] "(\\S+) (\\S+) (HTTP[^"]+)" (\\S+) (\\S+) "([^"]*)" "([^"]*)" "(\\S+)" "(\\S+)" (\\S+) "(.*)" (\\S+) (\\S+)')
as (remoteAddr: chararray, 
n2: chararray, 
n3: chararray, 
time: chararray, 
method: chararray,
path:chararray,
protocol:chararray,
status: int, 
bytes_string: chararray, 
referrer: chararray, 
browser: chararray, 
n10:chararray,
remoteLogname: chararray, 
remoteAddr12: chararray, 
path2: chararray, 
sessionid: chararray, 
n15: chararray
);

--过滤数据
filter_logs = FILTER raw_logs BY not (browser matches '.*pingdom.*');
--item_logs = FOREACH raw_logs GENERATE browser;

--percent 500 logs
--重定义数据项,数据集只取2项status,month
reitem_percent_500_logs = FOREACH filter_logs GENERATE status,DATE_EXTRACT_MM(time) as month;
--分组数据集,此时的数据结构为MAP(a{(aa,bb,cc),(dd,ee,ff)},b{(bb,cc,dd),(ff,gg,hh)})
group_month_percent_500_logs = GROUP reitem_percent_500_logs BY (month);
--重定义分组数据集数据项,进行分组统计,此时要联合分组数据集和原数据集统计
final_month_500_logs = FOREACH group_month_percent_500_logs 
{
    --对原数据集做count,因为是在foreachj里做count的,即使是对原数据集,也会自动会加month==group的条件
    --从这里可以看出对于group里的数据集,完全没用到
    --这时是以每一行为单位的,统计MAP中的KEY-a对应的数组在原数据集中的个数
    total = COUNT(reitem_percent_500_logs);
    --对原数据集做filter,因为是在foreachj里做count的,即使是对原数据集,也会自动会加month==group的条件
    --重新过滤一下原数据集,得到status==500,month==group的数据集
    t = filter reitem_percent_500_logs by status== 500; --create a bag which contains only T values
    --重定义数据项,取group,统计结果
    generate flatten(group) as col1, 100*(double)COUNT(t)/(double)total;
}
STORE final_month_500_logs into '$OUTPUT' using PigStorage(',');

posted @ 2013-04-13 15:21 paulwong 阅读(2310) | 评论 (0)编辑 收藏

大对象XML读写

I am using JAXB and I have a large set of data which i have to marshal into a xml.Since marshalling the whole thing into xml in a single step will be using most of the memory , i want to split it into parts and write to the xml file incremently

For example if my generated output xml should be like this:
<Employees>
<employee>......</employee>
<employee>.....</employee>
<employee>.....</employee>
<employee>.....</employee>
..
...
..
</Employees>

I would like to write the <employee> sections separately into a file instead of writing the whole thing together.I am retrieving the employee details from the database and converting to xml.There are almost 8 lakh records.So marshalling the whole thing in single step will use up my memory.How can i do it?????


Use Stax API (XMLStreamWriter) as the underlying XML processing thing;
write <Employees> tag using that, and then pass XMLStreamWriter to
JAXB Marshaller, marshall employee by employee.
This is the pattern I use; similarly works well with unmarshalling.
Not sure if this is in FAQ or not, but it probably should be. 

posted @ 2013-04-12 19:18 paulwong 阅读(275) | 评论 (0)编辑 收藏

把命令行中的值传进PIG中

http://wiki.apache.org/pig/ParameterSubstitution


%pig -param input=/user/paul/sample.txt -param output=/user/paul/output/


PIG中获取
records = LOAD $input;

posted @ 2013-04-10 15:32 paulwong 阅读(352) | 评论 (0)编辑 收藏

PIG中的分组统计百分比

http://stackoverflow.com/questions/15318785/pig-calculating-percentage-of-total-for-a-field

http://stackoverflow.com/questions/13476642/calculating-percentage-in-a-pig-query

posted @ 2013-04-10 14:13 paulwong 阅读(399) | 评论 (0)编辑 收藏

CombinedLogLoader

PIG中的LOAD函数,可以在LOAD数据的同时,进行正则表达式的筛选。

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the
 * NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF
 * licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file
 * except in compliance with the License. You may obtain a copy of the License at
 * 
 * 
http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software distributed under the License is
 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and limitations under the License.
 
*/

package org.apache.pig.piggybank.storage.apachelog;

import java.util.regex.Pattern;

import org.apache.pig.piggybank.storage.RegExLoader;

/**
 * CombinedLogLoader is used to load logs based on Apache's combined log format, based on a format like
 * 
 * LogFormat "%h %l %u %t \"%r\" %>s %b \"%{Referer}i\" \"%{User-Agent}i\"" combined
 * 
 * The log filename ends up being access_log from a line like
 * 
 * CustomLog logs/combined_log combined
 * 
 * Example:
 * 
 * raw = LOAD 'combined_log' USING org.apache.pig.piggybank.storage.apachelog.CombinedLogLoader AS
 * (remoteAddr, remoteLogname, user, time, method, uri, proto, status, bytes, referer, userAgent);
 * 
 
*/

public class CombinedLogLoader extends RegExLoader {
    // 1.2.3.4 - - [30/Sep/2008:15:07:53 -0400] "GET / HTTP/1.1" 200 3190 "-"
    
// "Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_5_4; en-us) AppleWebKit/525.18 (KHTML, like Gecko) Version/3.1.2 Safari/525.20.1"
    private final static Pattern combinedLogPattern = Pattern
        .compile("^(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+.(\\S+\\s+\\S+).\\s+\"(\\S+)\\s+(.+?)\\s+(HTTP[^\"]+)\"\\s+(\\S+)\\s+(\\S+)\\s+\"([^\"]*)\"\\s+\"(.*)\"$");

    public Pattern getPattern() {
        return combinedLogPattern;
    }
}

posted @ 2013-04-08 11:28 paulwong 阅读(288) | 评论 (0)编辑 收藏

Analyzing Apache logs with Pig



Analyzing log files, churning them and extracting meaningful information is a potential use case in Hadoop. We don’t have to go in for MapReduce programming for these analyses; instead we can go for tools like Pig and Hive for this log analysis. I’d just give you a start off on the analysis part. Let us consider Pig for apache log analysis. Pig has some built in libraries that would help us load the apache log files into pig and also some cleanup operation on string values from crude log files. All the functionalities are available in the piggybank.jar mostly available under pig/contrib/piggybank/java/ directory. As the first step we need to register this jar file with our pig session then only we can use the functionalities in our Pig Latin
1.       Register PiggyBank jar
REGISTER /usr/lib/pig/contrib/piggybank/java/piggybank.jar;
Once we have registered the jar file we need to define a few functionalities to be used in our Pig Latin. For any basic apache log analysis we need a loader to load the log files in a column oriented format in pig, we can create a apache log loader as
2.       Define a log loader
DEFINE ApacheCommonLogLoader org.apache.pig.piggybank.storage.apachelog.CommonLogLoader();
(Piggy Bank has other log loaders as well)
In apache log files the default format of date is ‘dd/MMM/yyyy:HH:mm:ss Z’ . But such a date won’t help us much in case of log analysis we may have to extract date without time stamp. For that we use DateExtractor()
3.       Define Date Extractor
DEFINE DayExtractor org.apache.pig.piggybank.evaluation.util.apachelogparser.DateExtractor('yyyy-MM-dd');
Once we have the required functionalities with us we need to first load the log file into pig
4.       Load apachelog file into pig
--load the log files from hdfs into pig using CommonLogLoader
logs = LOAD '/userdata/bejoys/pig/p01/access.log.2011-01-01' USING ApacheCommonLogLoader AS (ip_address, rfc, userId, dt, request, serverstatus, returnobject, referersite, clientbrowser);
Now we are ready to dive in for the actual log analysis. There would be multiple information you need to extract out of a log; we’d see a few of those common requirements out here
Note: you need to first register the jar, define the classes to be used and load the log files into pig before trying out any of the pig latin below
Requirement 1: Find unique hits per day
PIG Latin
--Extracting the day alone and grouping records based on days
grpd = GROUP logs BY DayExtractor(dt) as day;
--looping through each group to get the unique no of userIds
cntd = FOREACH grpd
{
                tempId =  logs.userId;
                uniqueUserId = DISTINCT tempId;
                GENERATE group AS day,COUNT(uniqueUserId) AS cnt;
}
--sorting the processed records based on no of unique user ids in descending order
srtd = ORDER cntd BY cnt desc;
--storing the final result into a hdfs directory
STORE srtd INTO '/userdata/bejoys/pig/ApacheLogResult1';
Requirement 1: Find unique hits to websites (IPs) per day
PIG Latin
--Extracting the day alone and grouping records based on days and ip address
grpd = GROUP logs BY (DayExtractor(dt) as day,ip_address);
--looping through each group to get the unique no of userIds
cntd = FOREACH grpd
{
                tempId =  logs.userId;
                uniqueUserId = DISTINCT tempId;
                GENERATE group AS day,COUNT(uniqueUserId) AS cnt;
}
--sorting the processed records based on no of unique user ids in descending order
srtd = ORDER cntd BY cnt desc;
--storing the final result into a hdfs directory
STORE srtd INTO '/userdata/bejoys/pig/ ApacheLogResult2 ';
Note: When you use pig latin in grunt shell we need to know a few factors
1.       When we issue a pig statement in grunt and press enter only the semantic check is being done, no execution is triggered.
2.       All the pig statements are executed only after the STORE command is submitted, ie map reduce programs would be triggered only after STORE is submitted
3.       Also in this case you don’t have to load the log files again and again to pig once it is loaded we can use the same for all related operations in that session. Once you are out of the grunt shell the loaded files are lost, you’d have to perform the register and log file loading steps all over again.

posted @ 2013-04-08 02:06 paulwong 阅读(361) | 评论 (0)编辑 收藏

PIG小议

什么是PIG
是一种设计语言,通过设计数据怎么流动,然后由相应的引擎将此变成MAPREDUCE JOB去HADOOP中运行。
PIG与SQL
两者有相同之处,执行一个或多个语句,然后出来一些结果。
但不同的是,SQL要先把数据导到表中才能执行,SQL不关心中间如何做,即发一个SQL语句过去,就有结果出来。
PIG,无须导数据到表中,但要设计直到出结果的中间过程,步骤如何等等。

posted @ 2013-04-05 21:33 paulwong 阅读(362) | 评论 (0)编辑 收藏

PIG资源

Hadoop Pig学习笔记(一) 各种SQL在PIG中实现
http://guoyunsky.iteye.com/blog/1317084

http://guoyunsky.iteye.com/category/196632

Hadoop学习笔记(9) Pig简介
http://www.distream.org/?p=385


[hadoop系列]Pig的安装和简单示例
http://blog.csdn.net/inkfish/article/details/5205999


Hadoop and Pig for Large-Scale Web Log Analysis
http://www.devx.com/Java/Article/48063


Pig实战
http://www.cnblogs.com/xuqiang/archive/2011/06/06/2073601.html


[原创]Apache Pig中文教程(进阶)
http://www.codelast.com/?p=4249


基于hadoop平台的pig语言对apache日志系统的分析
http://goodluck-wgw.iteye.com/blog/1107503


!!Pig语言
http://hi.baidu.com/cpuramdisk/item/a2980b78caacfa3d71442318


Embedding Pig In Java Programs
http://wiki.apache.org/pig/EmbeddedPig


一个pig事例(REGEX_EXTRACT_ALL, DBStorage,结果存进数据库)
http://www.myexception.cn/database/1256233.html


Programming Pig
http://ofps.oreilly.com/titles/9781449302641/index.html


[原创]Apache Pig的一些基础概念及用法总结(1)
http://www.codelast.com/?p=3621


!PIG手册
http://pig.apache.org/docs/r0.11.1/func.html#built-in-functions

posted @ 2013-04-05 18:19 paulwong 阅读(381) | 评论 (0)编辑 收藏

NIO Socket非阻塞模式

Server socket编程的时候,一个SERVER服务一个连接的时候,是阻塞线程的,除非用多线程来处理。

NIO只使用一条线程即可以处理多个连接。是基于事件的模式,即产生事件的时候,通知客户端处理相应的事件。

1)server端代码
    /** 
     *  
     * 
@author Jeff 
     * 
     
*/  
    
public class HelloWorldServer {  
      
        
static int BLOCK = 1024;  
        
static String name = "";  
        
protected Selector selector;  
        
protected ByteBuffer clientBuffer = ByteBuffer.allocate(BLOCK);  
        
protected CharsetDecoder decoder;  
        
static CharsetEncoder encoder = Charset.forName("GB2312").newEncoder();  
      
        
public HelloWorldServer(int port) throws IOException {  
            selector 
= this.getSelector(port);  
            Charset charset 
= Charset.forName("GB2312");  
            decoder 
= charset.newDecoder();  
        }  
      
        
// 获取Selector  
        protected Selector getSelector(int port) throws IOException {  
            ServerSocketChannel server 
= ServerSocketChannel.open();  
            Selector sel 
= Selector.open();  
            server.socket().bind(
new InetSocketAddress(port));  
            server.configureBlocking(
false);  
            server.register(sel, SelectionKey.OP_ACCEPT);  
            
return sel;  
        }  
      
        
// 监听端口  
        public void listen() {  
            
try {  
                
for (;;) {  
                    selector.select();  
                    Iterator iter 
= selector.selectedKeys().iterator();  
                    
while (iter.hasNext()) {  
                        SelectionKey key 
= (SelectionKey) iter.next();  
                        iter.remove();  
                        process(key);  
                    }  
                }  
            } 
catch (IOException e) {  
                e.printStackTrace();  
            }  
        }  
      
        
// 处理事件  
        protected void process(SelectionKey key) throws IOException {  
            
if (key.isAcceptable()) { // 接收请求  
                ServerSocketChannel server = (ServerSocketChannel) key.channel();  
                SocketChannel channel 
= server.accept();  
                
//设置非阻塞模式  
                channel.configureBlocking(false);  
                channel.register(selector, SelectionKey.OP_READ);  
            } 
else if (key.isReadable()) { // 读信息  
                SocketChannel channel = (SocketChannel) key.channel();  
                
int count = channel.read(clientBuffer);  
                
if (count > 0) {  
                    clientBuffer.flip();  
                    CharBuffer charBuffer 
= decoder.decode(clientBuffer);  
                    name 
= charBuffer.toString();  
                    
// System.out.println(name);  
                    SelectionKey sKey = channel.register(selector,  
                            SelectionKey.OP_WRITE);  
                    sKey.attach(name);  
                } 
else {  
                    channel.close();  
                }  
      
                clientBuffer.clear();  
            } 
else if (key.isWritable()) { // 写事件  
                SocketChannel channel = (SocketChannel) key.channel();  
                String name 
= (String) key.attachment();  
                  
                ByteBuffer block 
= encoder.encode(CharBuffer  
                        .wrap(
"Hello !" + name));  
                  
      
                channel.write(block);  
      
                
//channel.close();  
      
            }  
        }  
      
        
public static void main(String[] args) {  
            
int port = 8888;  
            
try {  
                HelloWorldServer server 
= new HelloWorldServer(port);  
                System.out.println(
"listening on " + port);  
                  
                server.listen();  
                  
            } 
catch (IOException e) {  
                e.printStackTrace();  
            }  
        }  
    }


server主要是读取client发过来的信息,并返回一条信息

2)client端代码
    /** 
     *  
     * 
@author Jeff 
     * 
     
*/  
    
public class HelloWorldClient {  
      
        
static int SIZE = 10;  
        
static InetSocketAddress ip = new InetSocketAddress("localhost"8888);  
        
static CharsetEncoder encoder = Charset.forName("GB2312").newEncoder();  
      
        
static class Message implements Runnable {  
            
protected String name;  
            String msg 
= "";  
      
            
public Message(String index) {  
                
this.name = index;  
            }  
      
            
public void run() {  
                
try {  
                    
long start = System.currentTimeMillis();  
                    
//打开Socket通道  
                    SocketChannel client = SocketChannel.open();  
                    
//设置为非阻塞模式  
                    client.configureBlocking(false);  
                    
//打开选择器  
                    Selector selector = Selector.open();  
                    
//注册连接服务端socket动作  
                    client.register(selector, SelectionKey.OP_CONNECT);  
                    
//连接  
                    client.connect(ip);  
                    
//分配内存  
                    ByteBuffer buffer = ByteBuffer.allocate(8 * 1024);  
                    
int total = 0;  
      
                    _FOR: 
for (;;) {  
                        selector.select();  
                        Iterator iter 
= selector.selectedKeys().iterator();  
      
                        
while (iter.hasNext()) {  
                            SelectionKey key 
= (SelectionKey) iter.next();  
                            iter.remove();  
                            
if (key.isConnectable()) {  
                                SocketChannel channel 
= (SocketChannel) key  
                                        .channel();  
                                
if (channel.isConnectionPending())  
                                    channel.finishConnect();  
                                channel  
                                        .write(encoder  
                                                .encode(CharBuffer.wrap(name)));  
      
                                channel.register(selector, SelectionKey.OP_READ);  
                            } 
else if (key.isReadable()) {  
                                SocketChannel channel 
= (SocketChannel) key  
                                        .channel();  
                                
int count = channel.read(buffer);  
                                
if (count > 0) {  
                                    total 
+= count;  
                                    buffer.flip();  
      
                                    
while (buffer.remaining() > 0) {  
                                        
byte b = buffer.get();  
                                        msg 
+= (char) b;  
                                          
                                    }  
      
                                    buffer.clear();  
                                } 
else {  
                                    client.close();  
                                    
break _FOR;  
                                }  
                            }  
                        }  
                    }  
                    
double last = (System.currentTimeMillis() - start) * 1.0 / 1000;  
                    System.out.println(msg 
+ "used time :" + last + "s.");  
                    msg 
= "";  
                } 
catch (IOException e) {  
                    e.printStackTrace();  
                }  
            }  
        }  
      
        
public static void main(String[] args) throws IOException {  
          
            String names[] 
= new String[SIZE];  
      
            
for (int index = 0; index < SIZE; index++) {  
                names[index] 
= "jeff[" + index + "]";  
                
new Thread(new Message(names[index])).start();  
            }  
          
        }  
    }




posted @ 2013-03-31 13:38 paulwong 阅读(360) | 评论 (0)编辑 收藏

CSS选择器

一个完整的标签称为元素,元素里面有属性名,属性值。

选择器相当于WHERE子句,结果就是返回符合WHERE子句的元素,可能是多个。

.class
class值=class,含有class属性,且值为class的元素。

a
标签名=a,含有标签名为a

#id
id值=id,含有属性名为id,且值为id的元素。

el.class
标签名=el and class值=class,含有标签名为el,含有class属性,且值为class的元素。

posted @ 2013-03-31 10:26 paulwong 阅读(235) | 评论 (0)编辑 收藏

仅列出标题
共116页: First 上一页 68 69 70 71 72 73 74 75 76 下一页 Last