Java 代码:
package com.xunjie.dmsp.olduser;
import java.util.Properties;
import cascading.flow.Flow;
import cascading.flow.FlowConnector;
import cascading.operation.regex.RegexSplitter;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.scheme.TextLine;
import cascading.tap.Hfs;
import cascading.tap.Tap;
import cascading.tuple.Fields;
/**
* test.txt:
* 1 a
* 2 b
* 3 c
*
* /data/hadoop/hadoop/bin/hadoop jar
* dmsp_test_jar-1.0-SNAPSHOT-dependencies.jar
* hdfs:/user/hadoop/test/lky/test.txt
* file:///data/hadoop/test/lky/output
*/
public class Test2 {
public static void main(String[] args) {
//设定输入文件
String sourcePath= args[0];
//设置输出文件夹
String sinkPath = args[1];
//定义读取列
Fields inputfields = new Fields("num", "value");
//定义分解正则,默认 \t
RegexSplitter spliter = new RegexSplitter(inputfields);
//管道定义
Pipe p1 = new Pipe( "test" );
//管道嵌套:
//分解日志源文件,输出给定字段
p1 = new Each(p1,new Fields("line") ,spliter);
//设定输入和输出 ,使用 泛型Hfs
Tap source = new Hfs( new TextLine(), sourcePath );
Tap sink = new Hfs( new TextLine() , sinkPath );
//配置job
Properties properties = new Properties();
properties.setProperty("hadoop.job.ugi", "hadoop,hadoop");
FlowConnector.setApplicationJarClass( properties, Main.class );
FlowConnector flowConnector = new FlowConnector(properties);
Flow importFlow = flowConnector.connect( "import flow", source,sink,p1);
importFlow.start();
importFlow.complete();
}
}
整理 www.blogjava.net/Good-Game