PIG中的LOAD函数,可以在LOAD数据的同时,进行正则表达式的筛选。
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the
* NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*/
package org.apache.pig.piggybank.storage.apachelog;
import java.util.regex.Pattern;
import org.apache.pig.piggybank.storage.RegExLoader;
/**
* CombinedLogLoader is used to load logs based on Apache's combined log format, based on a format like
*
* LogFormat "%h %l %u %t \"%r\" %>s %b \"%{Referer}i\" \"%{User-Agent}i\"" combined
*
* The log filename ends up being access_log from a line like
*
* CustomLog logs/combined_log combined
*
* Example:
*
* raw = LOAD 'combined_log' USING org.apache.pig.piggybank.storage.apachelog.CombinedLogLoader AS
* (remoteAddr, remoteLogname, user, time, method, uri, proto, status, bytes, referer, userAgent);
*
*/
public class CombinedLogLoader extends RegExLoader {
// 1.2.3.4 - - [30/Sep/2008:15:07:53 -0400] "GET / HTTP/1.1" 200 3190 "-"
// "Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_5_4; en-us) AppleWebKit/525.18 (KHTML, like Gecko) Version/3.1.2 Safari/525.20.1"
private final static Pattern combinedLogPattern = Pattern
.compile("^(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+.(\\S+\\s+\\S+).\\s+\"(\\S+)\\s+(.+?)\\s+(HTTP[^\"]+)\"\\s+(\\S+)\\s+(\\S+)\\s+\"([^\"]*)\"\\s+\"(.*)\"$");
public Pattern getPattern() {
return combinedLogPattern;
}
}
Server socket编程的时候,一个SERVER服务一个连接的时候,是阻塞线程的,除非用多线程来处理。
NIO只使用一条线程即可以处理多个连接。是基于事件的模式,即产生事件的时候,通知客户端处理相应的事件。
1)server端代码
/**
*
* @author Jeff
*
*/
public class HelloWorldServer {
static int BLOCK = 1024;
static String name = "";
protected Selector selector;
protected ByteBuffer clientBuffer = ByteBuffer.allocate(BLOCK);
protected CharsetDecoder decoder;
static CharsetEncoder encoder = Charset.forName("GB2312").newEncoder();
public HelloWorldServer(int port) throws IOException {
selector = this.getSelector(port);
Charset charset = Charset.forName("GB2312");
decoder = charset.newDecoder();
}
// 获取Selector
protected Selector getSelector(int port) throws IOException {
ServerSocketChannel server = ServerSocketChannel.open();
Selector sel = Selector.open();
server.socket().bind(new InetSocketAddress(port));
server.configureBlocking(false);
server.register(sel, SelectionKey.OP_ACCEPT);
return sel;
}
// 监听端口
public void listen() {
try {
for (;;) {
selector.select();
Iterator iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = (SelectionKey) iter.next();
iter.remove();
process(key);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
// 处理事件
protected void process(SelectionKey key) throws IOException {
if (key.isAcceptable()) { // 接收请求
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel = server.accept();
//设置非阻塞模式
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) { // 读信息
SocketChannel channel = (SocketChannel) key.channel();
int count = channel.read(clientBuffer);
if (count > 0) {
clientBuffer.flip();
CharBuffer charBuffer = decoder.decode(clientBuffer);
name = charBuffer.toString();
// System.out.println(name);
SelectionKey sKey = channel.register(selector,
SelectionKey.OP_WRITE);
sKey.attach(name);
} else {
channel.close();
}
clientBuffer.clear();
} else if (key.isWritable()) { // 写事件
SocketChannel channel = (SocketChannel) key.channel();
String name = (String) key.attachment();
ByteBuffer block = encoder.encode(CharBuffer
.wrap("Hello !" + name));
channel.write(block);
//channel.close();
}
}
public static void main(String[] args) {
int port = 8888;
try {
HelloWorldServer server = new HelloWorldServer(port);
System.out.println("listening on " + port);
server.listen();
} catch (IOException e) {
e.printStackTrace();
}
}
}
server主要是读取client发过来的信息,并返回一条信息
2)client端代码
/**
*
* @author Jeff
*
*/
public class HelloWorldClient {
static int SIZE = 10;
static InetSocketAddress ip = new InetSocketAddress("localhost", 8888);
static CharsetEncoder encoder = Charset.forName("GB2312").newEncoder();
static class Message implements Runnable {
protected String name;
String msg = "";
public Message(String index) {
this.name = index;
}
public void run() {
try {
long start = System.currentTimeMillis();
//打开Socket通道
SocketChannel client = SocketChannel.open();
//设置为非阻塞模式
client.configureBlocking(false);
//打开选择器
Selector selector = Selector.open();
//注册连接服务端socket动作
client.register(selector, SelectionKey.OP_CONNECT);
//连接
client.connect(ip);
//分配内存
ByteBuffer buffer = ByteBuffer.allocate(8 * 1024);
int total = 0;
_FOR: for (;;) {
selector.select();
Iterator iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = (SelectionKey) iter.next();
iter.remove();
if (key.isConnectable()) {
SocketChannel channel = (SocketChannel) key
.channel();
if (channel.isConnectionPending())
channel.finishConnect();
channel
.write(encoder
.encode(CharBuffer.wrap(name)));
channel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
SocketChannel channel = (SocketChannel) key
.channel();
int count = channel.read(buffer);
if (count > 0) {
total += count;
buffer.flip();
while (buffer.remaining() > 0) {
byte b = buffer.get();
msg += (char) b;
}
buffer.clear();
} else {
client.close();
break _FOR;
}
}
}
}
double last = (System.currentTimeMillis() - start) * 1.0 / 1000;
System.out.println(msg + "used time :" + last + "s.");
msg = "";
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws IOException {
String names[] = new String[SIZE];
for (int index = 0; index < SIZE; index++) {
names[index] = "jeff[" + index + "]";
new Thread(new Message(names[index])).start();
}
}
}
Java 1.5版本最终提供了对编程中最基础数据结构之一-Queue的内在支持。本文章将探究新添加到java.util包中的Queue接口,演示如何去使用这个新特性去使你的数据处理流式化。
by Kulvir Singh Bhogal (Translated by Victor Jan 2004-09-26 )
(英文原文见http://www.devx.com/Java/Article/21983/1954?pf=true )
在 计算机学科中,基础数据结构之一 — Queue。你会想起Queue是一种数据结构,在它里边的元素可以按照添加它们的相同顺序被移除。在以前的Java版本中,这中FIFO(先进先出)数 据结构很不幸被忽略了。随着Java1.5(也叫Tiger)的出现,对Queue支持第一次成为固有特性。
过去在没有Queue的情况下如何管理?
在Java 1.5以前,通常的实现方式是使用java.util.List 集合来模仿Queue。Queue的概念通过把对象添加(称为enqueuing的操作)到List的尾部(即Queue的后部)并通过从List的头部 (即Queue的前部)提取对象而从List中移除(称为dequeuing的操作)来模拟。下面代码显示了你以前可能做法。
import java.util.Enumeration;
import java.util.LinkedList;
import java.util.Vector;
public class QueueEmulate
{
private LinkedList queueMembers;
public Object enqueue(Object element)
{
queueMembers.add (element);
return element;
}
public Object dequeue()
{
return queueMembers.removeFirst();
}
}
现在我们有了Queue
Java 1.5在java.util包中 新添加了Queue接口,下面代码中使用到它(从 sample code 的SimpleQueueUsageExample.java 截取)。注意,在Java 1.5中,java.util.LinkedList被用来实现java.util.Queue,如同java.util.List接口。也要注意我是如 何显式地声明我的只包含字符串的Queue---使用<String> 泛型(如果需要深入了解泛型,请参阅"J2SE 1.5: Java's Evolution Continues ")。这样使我省却了当从 Queue 中提取它们时不得不进行对象造型的痛苦。
Queue<String> myQueue = new LinkedList<String>();
myQueue.add("Add Me");
myQueue.add("Add Me Too");
String enqueued = myQueue.remove();
你可以看到LinkedList类的add和remove方法被分别用于执行enqueuing和dequeuing操作。实际上没有更好的可用方法;Queue接口提供了新的offer和poll方法,如下显示(截取自SimpleQueueUsageExamplePreferred ):
Queue<String> myQueue = new LinkedList<String>();
boolean offerSuccess;
// offer method tries to enqueue.
// A boolean is returned telling you if the
// attempt to add to the queue was successful or not
offerSuccess=myQueue.offer("Add Me");
offerSuccess=myQueue.offer("Add Me Too");
// peek at the head of the queue, but don't grab it
Object pull = myQueue.peek();
String enqueued = null;
// grab the head of the queue
if (pull!=null) enqueued = myQueue.poll();
System.out.println(enqueued);
如果你的Queue有固定长度限制(常常是这种情形),使用add方法向Queue中添加内容,将导致抛出一个非检查异常。当你编译SimpleQueueUsageExample的代码时,编译器将会就此问题发出警告。相反,当新的offer方法试图添加时,如果一切正常则会返回TRUE,否则,返回FALSE。 同样地, 如果你试图使用remove方法对一个空Queue操作时 ,也将导致一个非检查异常。
你 也可以使用poll方法去从Queue中提取元素。如果在Queue中没有元素存在,poll方法将会返回一个null(即不会抛出异常)。在某些情况 下,你可能不想提取头部的元素而只是想看一下。Queue接口提供了一个peek方法来做这样的事情。如果你正在处理一个空Queue,peek方法返回 null。象add-offer和remove-poll关系一样,还有peek-element关系。在Queue为空的情形下,element方法抛 出一个非检查异常。但如果在Queue中有元素,peek方法允许你查看一下第一个元素而无需真的把他从Queue中取出来。peek方法的用法在SimpleQueueUsageExamplePreferred类中有示例。
AbstractQueue类
如你所见,java.util.LinkedList类实现了java.util.Queue接口,同样,AbstractQueue也是这样。 AbstractQueue 类实现了java.util接口的一些方法(因此在它的名字中包含abstract)。而AbstractQueue将重点放在了实现offer,poll和peek方法上。另外使用一些已经提供的具体实现。
PriorityQueue类
在 PriorityQueue中,当你添加元素到Queue中时,实现了自动排序。根据你使用的PriorityQueue的不同构造器,Queue元素的 顺序要么基于他们的自然顺序要么通过PriorirtyQueue构造器传入的Comparator来确定。下面的代码示例了PirorityQueue 类的使用方法。在Queue的前边是字符串"Alabama"-由于元素在PriorityQueue中是按自然顺序排列的(此例中是按字母表顺序)。
PriorityQueue<String> priorityQueue = new PriorityQueue<String>();
priorityQueue.offer("Texas");
priorityQueue.offer("Alabama");
priorityQueue.offer("California");
priorityQueue.offer("Rhode Island");
int queueSize = priorityQueue.size();
for (int i =0; i< queueSize; i++)
{
System.out.println(priorityQueue.poll());
}
执行结果如下:
Alabama
California
Rhode Island
Texas
Queue各项按照自然顺序-字母顺序-来排列。
如上提到的,你可以创建你自己的Comparator类并提供给PirorityQueue。如此,你可以定义你自己的排序方式。在PriorityQueueComparatorUsageExample 类中可找到此方式,在其中使用了一个名为State的助手类。如你在下边看到的,在类定义中,State只简单地包含了一个名字和人口。
private String name;
private int population;
public State(String name, int population)
{
super();
this.name = name;
this.population = population;
}
public String getName()
{
return this.name;
}
public int getPopulation()
{
return this.population;
}
public String toString()
{
return getName() + " - " + getPopulation();
}
在PriorityQueueComparatorUsageExample中,Queue使用了java.util.Comparator的自定义实现来定义排列顺序(如下)。
PriorityQueue<State> priorityQueue =
new PriorityQueue(6, new Comparator<State>()
{
public int compare(State a, State b)
{
System.out.println("Comparing Populations");
int populationA = a.getPopulation();
int populationB = b.getPopulation();
if (populationB>populationA)
return 1;
else if (populationB<populationA)
return -1;
else
return 0;
}
}
);
执行PriorityQueueComparatorUsageExample 类后,添加到Queue中的State对象将按人口数量排放(从低到高)。
阻塞Queue
Queue通常限定于给定大小。迄今为止,通过Queue的实现你已经看到,使用offer或add方法enqueue Queue(并用remove或poll来dequeue Queue)都是假设如果Queue不能提供添加或移除操作,那么你不需要等待程序执行。java.util.concurrent.BlockingQueue接口实现阻塞。它添加了put和take方法。举一个例子可能更有用。
使 用原来的producer/consumer关系来假定你的producer写一个Queue(更特定是一个BlockingQueue)。你有一些 consumer正从Queue中读取,在一个有序的方式下,哪种方式是你希望看到的。基本上,每个consumer需要等待先于它并获准从Queue中 提取项目的前一个consumer。用程序构建此结构,先生成一个producer线程用于向一个Queue中写数据,然后生成一些consumer线程 从同一Queue中读取数据。注意,线程会阻塞另一线程直到当前线程做完从Queue中提取一项的操作。
下 面的代码展示了类Producer写BlockingQueue的过程。注意run方法中的对象(你有责任实现,因为你继承了Thread)在等待了随机 数量的时间(范围从100到500毫秒)后,被放进了BlockingQueue。放到Queue中的对象只是一些包含消息产生时的时间的字符串。
添加对象的实际工作是由如下语句实现的:
blockingQueue.put("Enqueued at: " + time)
put方法会抛出InterruptedException,因此,put操作需要被try...catch块包围,用来捕获被抛出的异常 (见Listing 1 )。
从producer中提取消息的是Consumer对象,它也继承自Thread对象并因此要实现run方法(见Listing 2 )。
Consumer 类在设计上是类似于Producer类的。Consumer类使用take方法去从Queue中取出(即dequeue)消息,而不是将消息放到 BlockingQueue中。如前所述,这需要等待到有什么内容确实存在于Queue中时才发生。如果producer线程停止放置(即 enqueue)对象到Queue中,那么consumer将等待到Queue的项目有效为止。下面所示的TestBlockingQueue类,产生四 个consumer线程,它们从BlockingQueue中尝试提取对象。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class TestBlockingQueue
{
public static void main(String args[])
{
BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>();
Producer producer = new Producer(blockingQueue, System.out);
Consumer consumerA = new Consumer("ConsumerA", blockingQueue, System.out);
Consumer consumerB = new Consumer("ConsumerB", blockingQueue, System.out);
Consumer consumerC = new Consumer("ConsumerC", blockingQueue, System.out);
Consumer consumerD = new Consumer("ConsumerD", blockingQueue, System.out);
producer.start();
consumerA.start();
consumerB.start();
consumerC.start();
consumerD.start();
}
}
Figure 1. Consumer Threads: These threads dequeue messages from the BlockingQueue in the order that you spawned them.
下面一行创建BlockingQueue:
BlockingQueue<String> blockingQueue
= new LinkedBlockingQueue<String>();
注意,它使用BlockingQueue的LinkedBlockingQueue实现。 这是因为BlockingQueue是一个抽象类,你不能直接实例化它。你也可以使用ArrayBlockingQueueQueue类型。 ArrayBlockingQueue使用一个数组作为它的存储设备,而LinkedBlockingQueue使用一个LinkedList。 ArrayBlockingQueue的容量是固定的。对于LinkedBlockingQueue,最大值可以指定;默认是无边界的。本示例代码采用无 边界方式。
在类的执行期间,从Queue中读取对象以顺序方式执行(见下面例子的执行)。实际上,一个consumer线程阻塞其他访问BlockingQueue的线程直到它可以从Queue中取出一个对象。
DelayQueue-我是/不是不完整的
在某些情况下,存放在Queue中的对象,在它们准备被取出之前,会需要被放在另一Queue中一段时间。这时你可使用java.util.concurrent.DelayQueue类,他实现类BlockingQueue接口。DelayQueue需要Queue对象被驻留在Queue上一段指定时间。
我想用来证实它的现实例子(这可能是你非常渴望的)是关于松饼(muffins)。噢,Muffin对象(象我们正在谈论的Java-没有coffee双关意图)。假定你有一个DelayQueue并在其中放了一些Muffin对象。Muffin对象(如下所示)必须实现java.util.concurrent.Delayed 接口,以便可被放在DelayQueue中。这个接口需要Muffin对象实现getDelay方法(如下所示)。getDelay方法,实际上声明给多 长时间让对象保存在DelayQueue中。当该方法返回的值变为0或小于0时,对象就准备完毕(或在本例子中,是烤制完毕)并允许被取出(见 Listing 3 )。
Muffin类也实现compareTo(java.util.concurrent.Delayed)方法。由于Delayed接口继承自 java.lang.Comparable 类,这通过约定限制你要实现Muffin对象的bakeCompletion时间。
由于你不是真想去吃没有完全烤熟的Muffin,因此,需要将Muffin放在DelayQueue中存放推荐的烤制时间。Listing 4 ,取自DelayQueueUsageExample类,展示了从DelayQueue中enqueue和dequeue Muffin对象。
如你所见,对Muffin对象的烤制时间是使用它的构造器设置的(构造器期望烤制时间是以秒计)。
如 前所讲,Muffin对象放到DelayQueue中是不允许被取出的,直到他的延时时间(又叫烤制时间)超期。元素被从Queue中取出基于最早的延时 时间。在本例中,如果你有一些已经烤过的Muffin对象,他们将按他们已经等待多久而被取出(换句话说,最早被烤制的Muffin会在新烤制的 Muffin之前被取出)。
SynchronousQueue
在Java 1.5中,另外一种阻塞Queue实现是SynchronousQueue。相当有趣的是,该Queue没有内在容量。这是故意的,因为Queue意在用 于传递目的。这意味着,在一个同步Queue结构中,put请求必须等待来自另一线程的给SynchronousQueue的take请求。同时,一个 take请求必须等待一个来自另一线程的给SynchronousQueue的put请求。用程序来示例此概念,可参见示例代码。类似于前边的 LinkedBlockingQueue例子,它包含一个consumer(SynchConsumer),见Listing 5 。
Listing 5 中的代码使用SynchronousQueue类的 poll(long timeout,TimeUnit unit)方法。此方法允许poll过程在厌倦等待另一消费线程写SynchronousQueue之前等待一个指定时间(本例中是20秒)。
在Listing 6 中的producer(SynchProducer )使用相似的offer(E o,long timeout, TimeUnit unit)方法去放置对象到SynchronousQueue中。使用此方法允许在 厌倦等待另一线程去读取SynchronousQueue之前等待一段时间(本例中为10秒) 。
TestSynchQueue 展示了producer和consumer的动作:
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class TestSynchQueue
{
public static void main(String args[])
{
SynchronousQueue<String> synchQueue = new SynchronousQueue<String>();
SynchProducer producer = new SynchProducer("ProducerA",synchQueue, System.out);
SynchConsumer consumerA = new SynchConsumer("ConsumerA", synchQueue, System.out);
consumerA.start();
producer.start();
}
}
当试图明白隐藏在SynchronousQueue后面的概念时,要牢记这些Queue通常被使用在什么地方。JavaDoc中关于同步Queue指出:
"它们[同步Queue]是适合于传递设计,在那里运行在一个线程中的对象必须与运行在另外一个线程中的对象同步以便于交给它一些信息,时间或任务。"