ivaneeo's blog

自由的力量,自由的生活。

  BlogJava :: 首页 :: 联系 :: 聚合  :: 管理
  669 Posts :: 0 Stories :: 64 Comments :: 0 Trackbacks

#

DBUS_SESSION_BUS_ADDRESS="tcp:host=localhost,port=732"
posted @ 2011-02-18 15:32 ivaneeo 阅读(1619) | 评论 (2)编辑 收藏

这个话题来自: Nutz的issue 361

在考虑这个issue时, 我一直倾向于使用系统变量file.encoding来改变JVM的默认编码.

今天,我想到, 这个系统变量,对JVM的影响到底有多大呢?
我使用最简单的方法看看这个变量的影响–在JDK 1.6.0_20的src.zip文件中,查找包含file.encoding字眼的文件.
共找到4个, 分别是:
先上重头戏 java.nio.Charset类:

View Code JAVA
    public static Charset defaultCharset() {
            if (defaultCharset == null) {
            synchronized (Charset.class) {
            java.security.PrivilegedAction pa =
            new GetPropertyAction("file.encoding");
            String csn = (String)AccessController.doPrivileged(pa);
            Charset cs = lookup(csn);
            if (cs != null)
            defaultCharset = cs;
            else
            defaultCharset = forName("UTF-8");
            }
            }
            return defaultCharset;
            }

java.net.URLEncoder的静态构造方法,影响到的方法 java.net.URLEncoder.encode(String)

View Code JAVA

com.sun.org.apache.xml.internal.serializer.Encoding的getMimeEncoding方法(209行起)

View Code JAVA

最后一个javax.print.DocFlavor类的静态构造方法:

View Code JAVA

可以看到,系统变量file.encoding影响到
1. Charset.defaultCharset() Java环境中最关键的编码设置
2. URLEncoder.encode(String) Web环境中最常遇到的编码使用
3. com.sun.org.apache.xml.internal.serializer.Encoding 影响对无编码设置的xml文件的读取
4. javax.print.DocFlavor 影响打印的编码

故,影响还是很大的哦, 可以说是Java中编码的一个关键钥匙!

posted @ 2011-01-31 14:28 ivaneeo 阅读(4039) | 评论 (0)编辑 收藏

我们大家在做J2EE项目开发的时候,都会用到Application Server,然后配置Connection Pool,Data Source,但不知道大家有没有留意到,其实我们绝大部分的应用用的都是Apache的DBCP机制。
 
JES,Weblogic,JBoss等等的大型App Server,其中一个好处就是提供了Admin Console,让配置做起来就像傻瓜式的,Step By Step就可以了,下面举个用Tomcat的应用例子,深入一点探讨DBCP的配置都做了些什么。(当然得配置Server.xml了,但是其实JES和Weblogic等等的大型App Server,也是可以同样修改Server.xml或这Domain.xml来达到同一目的的,只不过有了Admin Console,大家容易避免犯错,但其实我觉得,要深入了解一个App Server,避免不了深入了解配置文件里面的内容)。
 
当使用DBCP(通常我们都是用Oracle的了)时候,不知道大家有没有遇到一个情况,当数据库连接因为某种原因断掉(有可能时网络问题,导致App Server跑了一天后,第二天再跑马上爆错误),再从Connection Pool中获取连接而又不做Validate,这时候取得的Connection实际上已经是无效的了,从而导致程序一跑,马上爆Connect Reset错误。
 
其实只要你了解一下DBCP的运作机制和相关属性的话,这个问题就很容易避免了。
 
DBCP使用Apache的ObjectPool作为Connection Pool的实现,在构造GenericObjectPool的时候,会生成一个Inner Class Evictor,实现Runnable的接口。如果属性_timeBetweenEvictionRunsMillis > 0,每过_timeBetweenEvictionRunsMillis毫秒后Evictor会调用evict method,检查Object的idle time是否大于属性_minEvictableIdleTimeMillis毫秒(如果_minEvictableIdleTimeMillis设置为<=0则忽略,使用default value 30分钟),如果是则销毁该Object,否则就激活并进行Validate,然后调用ensureMinIdle method检查确保Connection Pool中的Object个数不小于属性_minIdle。在调用returnObject method把Object放回ObjectPool时候,需要检查该Object是否有效,然后调用PoolableObjectFactory的passivateObject method使Object处于inactive状态,再检查ObjectPool中的对象个数是否小于属性_maxIdle,是则可以把该Object放回到ObjectPool,否则销毁此Object。
 
除此之外,还有几个比较重要的属性,_testOnBorrow,_testOnReturn,_testWhileIdle,这些属性的意思是取得,返回对象,空闲时候是否进行Valiadte,检查对象是否有效。默认都为False,只有把这些属性设为True,再提供_validationQuery语句就可以保证DBCP始终有效了,例如,Oracle中就完全可以使用select 1 from dual来进行验证,这里要注意的是,DBCP要求_validationQuery语句查询的Result Set必须为非空。
 
在Tomcat的Server.xml,我们可以看看下面的这个例子:
 
<Resource name="lda/raw"
              type="javax.sql.DataSource"
               password="lda_master"
               driverClassName="oracle.jdbc.driver.OracleDriver"
               maxIdle="30" minIdle="2" maxWait="60000" maxActive="1000" 
               testOnBorrow="true" testWhileIdle="true" validationQuery="select 1 from dual"
               username="lda_master" url="jdbc:oracle:thin:@192.160.100.107:15537:lcststd"/>
 
这样一来,就能够解决Connect Reset的问题了。刚才说了,其实很多App Server都会有相应的配置地方,只是大型的服务器正好提供了Admin Console,上面可以显式的配置Connection Pool,也有明显的属性选择,这里就不一一详述了,都是眼见的功夫。

本文出自 “jayenho” 博客,转载请与作者联系!

posted @ 2011-01-29 11:30 ivaneeo 阅读(412) | 评论 (0)编辑 收藏

摘要: 公共安全管理的一般过程分为监测、预警、决策和处置,前两者属于安全事故发生前的防范,后两者属于事故发生后的紧急处理.公共安全事件发生的隐患越早被识别,处理就可以越及时,损失就越小. 
关键词: 物联网

  公共安全是指多数人的生命、健康和公私财产安全,其涵盖范围包括自然灾害,如地震、洪涝等;技术灾害,如交通事故、火灾、爆炸等;社会灾害,如骚乱、恐怖主义袭击等;公共卫生事件,如食品、药品安全和突发疫情等。我国的公共安全形势严峻,每年死亡人数超过20万,伤残人数超过200万;每年经济损失近9000亿元,相当于GDP的3.5%,远高于中等发达国家1%??2%左右的水平。

  公共安全重在预先感知

  公共安全管理的一般过程分为监测、预警、决策和处置,前两者属于安全事故发生前的防范,后两者属于事故发生后的紧急处理。公共安全事件发生的隐患越早被识别,处理就可以越及时,损失就越小。管理公共安全事故的重点应该在发生前,而不只是在发生之后。但目前的情况是,安全的防范技术难度大,同时也往往容易被人们所忽视。

  物联网是安防的重要技术手段,目前公众所说的物联网就是带有传感/标识器的智能感知信息网络系统,涵盖了当初的物联网、传感网等概念,是在传感、识别、接入网、无线通信网、互联网、计算技术、信息处理和应用软件、智能控制等信息集成基础上的新发展。

  公共安全管理的关键是预先感知。感知的对象很多,例如地表、堤坝、道路交通、公共区域、危化品、周界、水资源、食品药品生产环节以及疫情等容易引起公共安全事故发生的源头、场所和环节;感知的内容包括震动、压力、流量、图像、声音、光线、气体、温湿度、浓度、化学成分、标签信息、生物信息等;感知的目的就是要准确获取管理对象的异常变化。

  公共安全中需要感知的对象、内容和数量非常巨大,感知之间的关联关系也错综复杂,要做到准确、及时和无遗漏,光靠人工识别基本无法做到、也不现实。物联网的智能化应用将转变传统管理模式,大幅度提高公共管理水平。

  求解技术局限

  物联网基本层次结构按照普遍的理解划分为感知层、网络层和应用层。感知层由传感器、RFID和传感网络组成,负责数据采集;网络层一般指三大电信公司的宽带网、Wi-Fi、GPRS/CDMA、3G/4G等,负责数据传输;应用层是基于信息数据汇集之上的各类应用。

  感知层上的局限是:目前的传感器在较为复杂的环境下,难以做到准确、快速感知;高性能传感器的成本过高,对使用环境要求苛刻,限制了推广;传感器标准不统一,大家各自为政。

  网络层比传感层和应用层要成熟,几大电信公司不遗余力地扩展网络能力;三网合一的推进将进一步扩大覆盖面、提高传输能力。

  应用层上,专业系统条块分割,形成“信息孤岛”,限制了应用进一步提升和发展;各专业系统之间技术体系标准不统一,存在互联互通的技术障碍。

  另外,在管理层面,也存在条块分割、难以形成统一指挥的局面;而且创新和产业体系不成熟,创新能力不够,存在较多简单模仿和贴牌,不利于形成产业持续、健康发展的局面。

  不过,目前安防行业也出现了一些新的趋势,将会影响到整个公共安全领域。

  IP化

  安防系统的数字化及网络的普及为IP化提供了条件;安防系统从分散的单点系统朝分布式监控、集中管理方向发展,联网监控报警是大势所趋,“平安城市”等项目的大规模建设也加快了联网监控的步伐。

  IT化

  传统IT厂商及电信运营商全面进入安防行业,安防行业面临重大洗牌;IT行业成熟的结构化、标准化体系架构设计和理念将应用于安防领域,推动安防行业升级。

  网管化

  安防系统规模越来越大、监控范围越来越广、监控对象越来越多,海量存储对系统的管理和维护提出了更高要求,需要做到电信级可管可控。

  面对这些新的趋势,公共安全的信息系统建设需要迈上一个智能化综合管理的阶梯。

  系统的融合性

  物联网将横向整合各个孤立的安防系统,实现信息互通和联动,形成立体化决策支撑体系,使决策更智能、更科学。

  平台的开放性

  物联网采用标准化的开放平台,带动整个安防产业链发展,形成安防行业规模化的态势。

  应用的灵活性

  采用安防通用平台+应用子集的顶层设计架构,在保证系统标准化、可重用的同时,满足应用的多样性和不同应用的个性化要求。

  系统的智能性

  智能感知通过多感知协同,在感知阶段就完成信息的智能处理,滤除无效、冗余信息,保证信息获取的真实、全面和有效;智能分析对感知信息进行分析,对数据特征进行准确地识别和判断;智能决策通过专家系统等形成最优的决策建议;智能处置对异常情况和突发事件进行有效的处置,如结合特定业务和应用的报警联动;智能管理系统具有智能的自动配置管理、自动诊断和告警、故障自动愈合和多种接入方式的远程维护功能,尽量减轻人工负担。

  政策科研双管齐下

  正如物联网是信息化发展的更高阶段,基于物联网的智能安防、安监系统也是公共安全行业发展的大势所趋。

  在政策与管理层面,需要将公共安全信息科技列入国家战略性发展规划范围;整合现有相关资源,引导资源投入和技术、产品创新;推进产学研结合,在有条件的地方开展示范项目建设。

  在技术层面,公共安全中的物联网关键技术研究包括5大方面:

  1. 物联网智能安防、安监通用平台的建设,设备的接入和管理,中间件、体系结构和标准的确立。

  2. 高性能传感器和传感设备的研发,例如新型材料、纳米材料、生物技术、仿生技术、极低功耗、MEMS。

  3. 云计算的利用,包括异构数据海量存储和管理、智能信息处理、主动决策等。

  4. 基础资源管理,包括万亿量级节点的标识、异构网络融合、自治认知。

  5. 物联网的安全,涉及物的真实性、联的可靠性、网的健壮性。

  物联网的发展将极大地拓宽安防、安监的范围和内涵,未来的安防、安监将会渗透进人们生活的方方面面,成为物联网的一个基本功能;物联网对安防、安监行业的推动不仅体现在提升传统安防、安监的技术上,还将对整个行业的产业格局、业务模式产生重大影响,今后新的安防、安监应用、运营模式也将会层出不穷。

\
图注:物联网智能安防共性平台总体架构

(责编:邱文峰)

posted @ 2011-01-28 15:49 ivaneeo 阅读(453) | 评论 (0)编辑 收藏

chmod 700 ~/.ssh
chmod 600 ~/.ssh/authorized_keys
posted @ 2011-01-26 11:25 ivaneeo 阅读(246) | 评论 (0)编辑 收藏

一个图片太大了,只好分割成为两部分。根据流程图来说一下具体一个任务执行的情况。

  1. 在分布式环境中客户端创建任务并提交。
  2. InputFormat做Map前的预处理,主要负责以下工作:
    1. 验证输入的格式是否符合JobConfig的输入定义,这个在实现Map和构建Conf的时候就会知道,不定义可以是Writable的任意子类。
    2. 将input的文件切分为逻辑上的输入InputSplit,其实这就是在上面提到的在分布式文件系统中blocksize是有大小限制的,因此大文件会被划分为多个block。
    3. 通过RecordReader来再次处理inputsplit为一组records,输出给Map。(inputsplit只是逻辑切分的第一步,但是如何根据文件中的信息来切分还需要RecordReader来实现,例如最简单的默认方式就是回车换行的切分)
  3. RecordReader处理后的结果作为Map的输入,Map执行定义的Map逻辑,输出处理后的key和value对应到临时中间文件。
  4. Combiner可选择配置,主要作用是在每一个Map执行完分析以后,在本地优先作Reduce的工作,减少在Reduce过程中的数据传输量。
  5. Partitioner可选择配置,主要作用是在多个Reduce的情况下,指定Map的结果由某一个Reduce处理,每一个Reduce都会有单独的输出文件。(后面的代码实例中有介绍使用场景)
  6. Reduce执行具体的业务逻辑,并且将处理结果输出给OutputFormat。
  7. OutputFormat的职责是,验证输出目录是否已经存在,同时验证输出结果类型是否如Config中配置,最后输出Reduce汇总后的结果。

业务场景和代码范例

业务场景描述:可设定输入和输出路径(操作系统的路径非HDFS路径),根据访问日志分析某一个应用访问某一个API的总次数和总流量,统计后分别输出到两个文件中。这里仅仅为了测试,没有去细分很多类,将所有的类都归并于一个类便于说明问题。


测试代码类图

LogAnalysiser就是主类,主要负责创建、提交任务,并且输出部分信息。内部的几个子类用途可以参看流程中提到的角色职责。具体地看看几个类和方法的代码片断:

LogAnalysiser::MapClass

    public static class MapClass extends MapReduceBase
        implements Mapper<LongWritable, Text, Text, LongWritable>
    {
        public void map(LongWritable key, Text value, OutputCollector<Text, LongWritable> output, Reporter reporter)
                throws IOException
        {   
            String line = value.toString();//没有配置RecordReader,所以默认采用line的实现,key就是行号,value就是行内容
            if (line == null || line.equals(""))
                return;
            String[] words = line.split(",");
            if (words == null || words.length < 8)
                return;
            String appid = words[1];
            String apiName = words[2];
            LongWritable recbytes = new LongWritable(Long.parseLong(words[7]));
            Text record = new Text();
            record.set(new StringBuffer("flow::").append(appid)
                            .append("::").append(apiName).toString());
            reporter.progress();
            output.collect(record, recbytes);//输出流量的统计结果,通过flow::作为前缀来标示。
            record.clear();
            record.set(new StringBuffer("count::").append(appid).append("::").append(apiName).toString());
            output.collect(record, new LongWritable(1));//输出次数的统计结果,通过count::作为前缀来标示
        }   
    }

LogAnalysiser:: PartitionerClass

    public static class PartitionerClass implements Partitioner<Text, LongWritable>
    {
        public int getPartition(Text key, LongWritable value, int numPartitions)
        {
            if (numPartitions >= 2)//Reduce 个数,判断流量还是次数的统计分配到不同的Reduce
                if (key.toString().startsWith("flow::"))
                    return 0;
                else
                    return 1;
            else
                return 0;
        }
        public void configure(JobConf job){}   
}

LogAnalysiser:: CombinerClass

参看ReduceClass,通常两者可以使用一个,不过这里有些不同的处理就分成了两个。在ReduceClass中蓝色的行表示在CombinerClass中不存在。

LogAnalysiser:: ReduceClass

    public static class ReduceClass extends MapReduceBase
        implements Reducer<Text, LongWritable,Text, LongWritable>
    {
        public void reduce(Text key, Iterator<LongWritable> values,
                OutputCollector<Text, LongWritable> output, Reporter reporter)throws IOException
        {
            Text newkey = new Text();
            newkey.set(key.toString().substring(key.toString().indexOf("::")+2));
            LongWritable result = new LongWritable();
            long tmp = 0;
            int counter = 0;
            while(values.hasNext())//累加同一个key的统计结果
            {
                tmp = tmp + values.next().get();
               
                counter = counter +1;//担心处理太久,JobTracker长时间没有收到报告会认为TaskTracker已经失效,因此定时报告一下
                if (counter == 1000)
                {
                    counter = 0;
                    reporter.progress();
                }
            }
            result.set(tmp);
            output.collect(newkey, result);//输出最后的汇总结果
        }   
    }

LogAnalysiser

	public static void main(String[] args)
{
try
{
run(args);
} catch (Exception e)
{
e.printStackTrace();
}
}
public static void run(String[] args) throws Exception
{
if (args == null || args.length <2)
{
System.out.println("need inputpath and outputpath");
return;
}
String inputpath = args[0];
String outputpath = args[1];
String shortin = args[0];
String shortout = args[1];
if (shortin.indexOf(File.separator) >= 0)
shortin = shortin.substring(shortin.lastIndexOf(File.separator));
if (shortout.indexOf(File.separator) >= 0)
shortout = shortout.substring(shortout.lastIndexOf(File.separator));
SimpleDateFormat formater = new SimpleDateFormat("yyyy.MM.dd");
shortout = new StringBuffer(shortout).append("-")
.append(formater.format(new Date())).toString();


if (!shortin.startsWith("/"))
shortin = "/" + shortin;
if (!shortout.startsWith("/"))
shortout = "/" + shortout;
shortin = "/user/root" + shortin;
shortout = "/user/root" + shortout;
File inputdir = new File(inputpath);
File outputdir = new File(outputpath);
if (!inputdir.exists() || !inputdir.isDirectory())
{
System.out.println("inputpath not exist or isn't dir!");
return;
}
if (!outputdir.exists())
{
new File(outputpath).mkdirs();
}

JobConf conf = new JobConf(new Configuration(),LogAnalysiser.class);//构建Config
FileSystem fileSys = FileSystem.get(conf);
fileSys.copyFromLocalFile(new Path(inputpath), new Path(shortin));//将本地文件系统的文件拷贝到HDFS中

conf.setJobName("analysisjob");
conf.setOutputKeyClass(Text.class);//输出的key类型,在OutputFormat会检查
conf.setOutputValueClass(LongWritable.class); //输出的value类型,在OutputFormat会检查
conf.setMapperClass(MapClass.class);
conf.setCombinerClass(CombinerClass.class);
conf.setReducerClass(ReduceClass.class);
conf.setPartitionerClass(PartitionerClass.class);
conf.set("mapred.reduce.tasks", "2");//强制需要有两个Reduce来分别处理流量和次数的统计
FileInputFormat.setInputPaths(conf, shortin);//hdfs中的输入路径
FileOutputFormat.setOutputPath(conf, new Path(shortout));//hdfs中输出路径

Date startTime = new Date();
System.out.println("Job started: " + startTime);
JobClient.runJob(conf);
Date end_time = new Date();
System.out.println("Job ended: " + end_time);
System.out.println("The job took " + (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
//删除输入和输出的临时文件
fileSys.copyToLocalFile(new Path(shortout),new Path(outputpath));
fileSys.delete(new Path(shortin),true);
fileSys.delete(new Path(shortout),true);
}

以上的代码就完成了所有的逻辑性代码,然后还需要一个注册驱动类来注册业务Class为一个可标示的命令,让hadoop jar可以执行。

public class ExampleDriver {
  public static void main(String argv[]){
    ProgramDriver pgd = new ProgramDriver();
    try {
      pgd.addClass("analysislog", LogAnalysiser.class, "A map/reduce program that analysis log .");
      pgd.driver(argv);
    }
    catch(Throwable e){
      e.printStackTrace();
    }
  }
}

将代码打成jar,并且设置jar的mainClass为ExampleDriver这个类。在分布式环境启动以后执行如下语句:

hadoop jar analysiser.jar analysislog /home/wenchu/test-in /home/wenchu/test-out

在/home/wenchu/test-in中是需要分析的日志文件,执行后就会看见整个执行过程,包括了Map和Reduce的进度。执行完毕会在/home/wenchu/test-out下看到输出的内容。有两个文件:part-00000和part-00001分别记录了统计后的结果。 如果需要看执行的具体情况,可以看在输出目录下的_logs/history/xxxx_analysisjob,里面罗列了所有的Map,Reduce的创建情况以及执行情况。在运行期也可以通过浏览器来查看Map,Reduce的情况:http://MasterIP:50030/jobtracker.jsp

Hadoop集群测试

首先这里使用上面的范例作为测试,也没有做太多的优化配置,这个测试结果只是为了看看集群的效果,以及一些参数配置的影响。

文件复制数为1,blocksize 5M

Slave数 处理记录数(万条) 执行时间(秒)
2 95 38
2 950 337
4 95 24
4 950 178
6 95 21
6 950 114

Blocksize 5M

Slave数 处理记录数(万条) 执行时间(秒)
2(文件复制数为1) 950 337
2(文件复制数为3) 950 339
6(文件复制数为1) 950 114
6(文件复制数为3) 950 117

文件复制数为1

Slave数 处理记录数(万条) 执行时间(秒)
6(blocksize 5M) 95 21
6(blocksize 77M) 95 26
4(blocksize 5M) 950 178
4(blocksize 50M) 950 54
6(blocksize 5M) 950 114
6(blocksize 50M) 950 44
6(blocksize 77M) 950 74

测试的数据结果很稳定,基本测几次同样条件下都是一样。通过测试结果可以看出以下几点:

  1. 机器数对于性能还是有帮助的(等于没说^_^)。
  2. 文件复制数的增加只对安全性有帮助,但是对于性能没有太多帮助。而且现在采取的是将操作系统文件拷贝到HDFS中,所以备份多了,准备的时间很长。
  3. blocksize对于性能影响很大,首先如果将block划分的太小,那么将会增加job的数量,同时也增加了协作的代价,降低了性能,但是配置的太大也会让job不能最大化并行处理。所以这个值的配置需要根据数据处理的量来考虑。
  4. 最后就是除了这个表里面列出来的结果,应该去仔细看输出目录中的_logs/history中的xxx_analysisjob这个文件,里面记录了全部的执行过程以及读写情况。这个可以更加清楚地了解哪里可能会更加耗时。

随想

“云计算”热的烫手,就和SAAS、Web2及SNS等一样,往往都是在搞概念,只有真正踏踏实实的大型互联网公司,才会投入人力物力去研究符合自己的分布式计算。其实当你的数据量没有那么大的时候,这种分布式计算也就仅仅只是一个玩具而已,只有在真正解决问题的过程中,它深层次的问题才会被挖掘出来。

这三篇文章(分布式计算开源框架Hadoop介绍,Hadoop中的集群配置和使用技巧)仅仅是为了给对分布式计算有兴趣的朋友抛个砖,要想真的掘到金子,那么就踏踏实实的去用、去想、去分析。或者自己也会更进一步地去研究框架中的实现机制,在解决自己问题的同时,也能够贡献一些什么。

前几日看到有人跪求成为架构师的方式,看了有些可悲,有些可笑,其实有多少架构师知道什么叫做架构?架构师的职责是什么?与其追求这么一个名号,还不如踏踏实实地做块石头沉到水底。要知道,积累和沉淀的过程就是一种成长。

相关阅读:

  1. 分布式计算开源框架Hadoop介绍――分布式计算开源框架Hadoop入门实践(一)
  2. Hadoop中的集群配置和使用技巧――分布式计算开源框架Hadoop入门实践(二)

作者介绍:岑文初,就职于阿里软件公司研发中心平台一部,任架构师。当前主要工作涉及阿里软件开发平台服务框架(ASF)设计与实现,服务集成平台(SIP)设计与实现。没有什么擅长或者精通,工作到现在唯一提升的就是学习能力和速度。个人Blog为:http://blog.csdn.net/cenwenchu79

志愿参与InfoQ中文站内容建设,请邮件至editors@cn.infoq.com。也欢迎大家到InfoQ中文站用户讨论组参与我们的线上讨论。

posted @ 2011-01-25 16:14 ivaneeo 阅读(414) | 评论 (0)编辑 收藏

     摘要: HBASE松散数据存储设计初识 - 西湖边的穷秀才-文初 - BlogJava window.onerror = ignoreError; function ignoreError() { return true; } Date.prototype.Format = function(f...  阅读全文
posted @ 2011-01-21 19:31 ivaneeo 阅读(405) | 评论 (0)编辑 收藏

一、介绍
Google的工程师为了方便自己对MapReduce的实现搞了一个叫做Sawzall的工具,Google就放了几篇论文放在网上,但这玩意在代码上不开源在设计思想是开源的,在前面一篇文章中我也提到过Hadoop也推出了类似Sawzall的Pig语言,就是根据Google放出来的论文山寨的。

Pig是对处理超大型数据集的抽象层,在MapReduce中的框架中有map和reduce两个函数,如果你亲手弄一个MapReduce实现从编写代码,编译,部署,放在Hadoop上执行这个MapReduce程序还是耗费你一定的时间的,有了Pig这个东东以后不仅仅可以简化你对MapReduce的开发,而且还可以对不同的数据之间进行转换,例如:包含在连接内的一些转化在MapReduce中不太容易去实现。

Apache Pig的运行可以纯本地的,解压,敲个“bin/pig -x local”命令直接运行,非常简单,这就是传说中的local模式,但是人们往往不是这样使用,都是将Pig与hdfs/hadoop集群环境进行对接,我看说白了Apache的Pig最大的作用就是对mapreduce算法(框架)实现了一套shell脚本 ,类似我们通常熟悉的SQL语句,在Pig中称之为Pig Latin,在这套脚本中我们可以对加载出来的数据进行排序、过滤、求和、分组(group by)、关联(Joining),Pig也可以由用户自定义一些函数对数据集进行操作,也就是传说中的UDF(user-defined functions)。

经过Pig Latin的转换后变成了一道MapReduce的作业,通过MapReduce多个线程,进程或者独立系统并行执行处理的结果集进行分类和归纳。Map() 和 Reduce() 两个函数会并行运行,即使不是在同一的系统的同一时刻也在同时运行一套任务,当所有的处理都完成之后,结果将被排序,格式化,并且保存到一个文件。Pig利用MapReduce将计算分成两个阶段,第一个阶段分解成为小块并且分布到每一个存储数据的节点上进行执行,对计算的压力进行分散,第二个阶段聚合第一个阶段执行的这些结果,这样可以达到非常高的吞吐量,通过不多的代码和工作量就能够驱动上千台机器并行计算,充分的利用计算机的资源,打消运行中的瓶颈。

所以用Pig可以对TB级别海量的数据进行查询非常轻松,并且这些海量的数据都是非结构化的数据,例如:一堆文件可能是log4j输出日志存又放于跨越多个计算机的多个磁盘上,用来记录上千台在线服务器的健康状态日志,交易日至,IP访问记录,应用服务日志等等。我们通常需要统计或者抽取这些记录,或者查询异常记录,对这些记录形成一些报表,将数据转化为有价值的信息,这样的话查询会较为复杂,此时类似MySQL这样的产品就并非能满足我们的对速度、执行效率上的需求,而用Apache的Pig就可以帮助我们去实现这样的目标。

反之,你如果在做实验的时候,把MySQL中的100行数据转换成文本文件放在在pig中进行查询,会让你非常失望,为何这短短的100行数据查询的效率极低,呵呵,因为中间有一个生成MapReduce作业的过程,这是无法避免的开销,所以小量的数据查询是不适合pig做的,就好比用关二哥的大刀切青菜一样。另外,还可以利用Pig的API在Java环境中调用,对Apache的Pig以上内容请允许我这样片面的理解,谢谢。

 
二、基本架构
 
从整体上来看大量的数据聚集在HDFS系统上,通过输入类似SQL的脚本简化对MapReduce的操作,让这几行代码/脚本去驱动上千台机器进行并行计算。
如图所示:
 Apache-Pig-Architect.jpg

Pig的实现有5个主要的部分构成:
如图所示:
apache zebra

1.Pig自己实现的一套框架对输入、输出的人机交互部分的实现,就是Pig Latin 。
2.Zebra是Pig与HDFS/Hadoop的中间层、Zebra是MapReduce作业编写的客户端,Zerbra用结构化的语言实现了对hadoop物理存储元数据的管理也是对Hadoop的数据抽象层,在Zebra中有2个核心的类 TableStore(写)/TableLoad(读)对Hadoop上的数据进行操作。
3.Pig中的Streaming主要分为4个组件: 1. Pig Latin 2. 逻辑层(Logical Layer) 3. 物理层(Physical Layer) 4. Streaming具体实现(Implementation),Streaming会创建一个Map/Reduce作业,并把它发送给合适的集群,同时监视这个作业的在集群环境中的整个执行过程。 
4.MapReduce在每台机器上进行分布式计算的框架(算法)。
5.HDFS最终存储数据的部分。

三、与Hive对比
请允许我很无聊的把飞机和火车拿来做比较,因为2者根本没有深入的可比性,虽然两者都是一种高速的交通工具,但是具体的作用范围是截然不同的,就像Hive和Pig都是Hadoop中的项目,并且Hive和pig有很多共同点,但Hive还似乎有点数据库的影子,而Pig基本就是一个对MapReduce实现的工具(脚本)。两者都拥有自己的表达语言,其目的是将MapReduce的实现进行简化,并且读写操作数据最终都是存储在HDFS分布式文件系统上。看起来Pig和Hive有些类似的地方,但也有些不同,来做一个简单的比较,先来看一张图:
hive and pig
查看大图请点击这里

再让我说几句废话:
Language
在Hive中可以执行  插入/删除 等操作,但是Pig中我没有发现有可以 插入 数据的方法,请允许我暂且认为这是最大的不同点吧。

Schemas
Hive中至少还有一个“表”的概念,但是Pig中我认为是基本没有表的概念,所谓的表建立在Pig Latin脚本中,对与Pig更不要提metadata了。

Partitions
Pig中没有表的概念,所以说到分区对于Pig来说基本免谈,如果跟Hive说“分区”(Partition)他还是能明白的。

Server
Hive可以依托于Thrift启动一个服务器,提供远程调用。 找了半天压根没有发现Pig有这样的功能,如果你有新发现可以告诉我,就好像有人开发了一个Hive的REST

Shell
在Pig 你可以执行一些个 ls 、cat 这样很经典、很cool的命令,但是在使用Hive的时候我压根就没有想过有这样的需求。

Web Interface
Hive有,Pig无

JDBC/ODBC
Pig无,Hive有


四、使用

1启动/运行  
分为2台服务器,一台作为pig的服务器,一台作为hdfs的服务器。
首先需要在pig的服务器上进行配置,将pig的配置文件指向hdfs服务器,修改pig/conf目录下的
 vim /work/pig/conf/pig.properties
 添加以下内容:
fs.default.name=hdfs://192.168.1.201:9000/    #指向HDFS服务器
mapred.job.tracker=192.168.1.201:9001          #指向MR job服务器地址

如果是第一次运行请在Hadoop的HDFS的服务器上创建root目录,并且将etc目录下的passwd文件放在HDFS的root目录下,请执行以下两条命令。
hadoop fs -mkdir /user/root
hadoop fs -put /etc/passwd /user/root/passwd

创建运行脚本,用vim命令在pig的服务器上创建javabloger_testscript.pig 文件,内容如下:
LoadFile = load 'passwd' using PigStorage(':');
Result = foreach LoadFile  generate $0 as id;
dump Result;

运行pig脚本,例如:pig javabloger_testscript.pig,执行状态如图所示:
pig

执行结果:

2.java 代码  运行并且打印运行结果
import java.io.IOException;
import java.util.Iterator;

import org.apache.pig.PigServer;
import org.apache.pig.data.Tuple;

public class  LocalPig {
    public static void main(String[] args) {
        try {
            PigServer pigServer = new PigServer("local");
            runIdQuery(pigServer, "passwd");
        } catch (Exception e) {
        }
    }

    public static void runIdQuery(PigServer pigServer, String inputFile)  throws IOException {
        pigServer.registerQuery("LoadFile = load '" + inputFile+ "' using PigStorage(':');");
        pigServer.registerQuery("Result = foreach A generate $0 as id;");
        Iterator<Tuple> result = pigServer.openIterator("Result "); 
        while (result.hasNext()) { 
               Tuple t = result.next(); 
               System.out.println(t); 
            } 
//        pigServer.store("B", "output");
        
    }
}

–end–

本文已经同步到新浪微博,点击这里访问“J2EE企业应用 顾问/咨询- H.E.'s Blog”的官方微博。
豆瓣读书  向你推荐有关 Hadoop、 Hive、 MapReduce、 NoSQL、 Pig、 云计算、 架构设计、 类别的图书。
posted @ 2011-01-21 19:28 ivaneeo 阅读(2079) | 评论 (0)编辑 收藏

http://code.google.com/p/nutla/ 

1、概述
 不管程序性能有多高,机器处理能力有多强,都会有其极限。能够快速方便的横向与纵向扩展是Nut设计最重要的原则。
 Nut是一个Lucene+Hadoop分布式搜索框架,能对千G以上索引提供7*24小时搜索服务。在服务器资源足够的情况下能达到每秒处理100万次的搜索请求。
 Nut开发环境:jdk1.6.0.21+lucene3.0.2+eclipse3.6.1+hadoop0.20.2+zookeeper3.3.1+hbase0.20.6+memcached+linux

2、特新
 a、热插拔
 b、可扩展
 c、高负载
 d、易使用,与现有项目无缝集成
e、支持排序
f、7*24服务
g、失败转移

3、搜索流程
Nut由Index、Search、Client、Cache和DB五部分构成。(Cache默认使用memcached,DB默认使用hbase)
Client处理用户请求和对搜索结果排序。Search对请求进行搜索,Search上只放索引,数据存储在DB中,Nut将索引和存储分离。Cache缓存的是搜索条件和结果文档id。DB存储着数据,Client根据搜索排序结果,取出当前页中的文档id从DB上读取数据。

用户发起搜索请求给由Nut Client构成的集群,由某个Nut Client根据搜索条件查询Cache服务器是否有该缓存,如果有缓存根据缓存的文档id直接从DB读取数据,如果没有缓存将随机选择一组搜索服务器组(Search Group i),将查询条件同时发给该组搜索服务器组里的n台搜索服务器,搜索服务器将搜索结果返回给Nut Client由其排序,取出当前页文档id,将搜索条件和当前文档id缓存,同时从DB读取数据。


4、索引流程
Hadoop Mapper/Reducer 建立索引。再将索引从HDFS分发到各个索引服务器。
对索引的更新分为两种:删除和添加(更新分解为删除和添加)。
a、删除
在HDFS上删除索引,将生成的*.del文件分发到所有的索引服务器上去或者对HDFS索引目录删除索引再分发到对应的索引服务器上去。
b、添加
新添加的数据用另一台服务器来生成。
删除和添加步骤可按不同定时策略来实现。

5、Zookeeper服务器状态管理策略

在架构设计上通过使用多组搜索服务器可以支持每秒处理100万个搜索请求。
每组搜索服务器能处理的搜索请求数在1万—1万5千之间。如果使用100组搜索服务器,理论上每秒可处理100万个搜索请求。


假如每组搜索服务器有100份索引放在100台正在运行中搜索服务器(run)上,那么将索引按照如下的方式放在备用中搜索服务器(bak)上:index 1,index 2,index 3,index 4,index 5,index 6,index 7,index 8,index 9,index 10放在B 1 上,index 6,index 7,index 8,index 9,index 10,index 11,index 12,index 13,index 14,index 15放在B 2上。。。。。。index 96,index 97,index 98,index 99,index 100,index 5,index 4,index 3,index 2,index 1放在最后一台备用搜索服务器上。那么每份索引会存在3台机器中(1份正在运行中,2份备份中)。
尽管这样设计每份索引会存在3台机器中,仍然不是绝对安全的。假如运行中的index 1,index 2,index 3同时宕机的话,那么就会有一份索引搜索服务无法正确启用。这样设计,作者认为是在安全性和机器资源两者之间一个比较适合的方案。

备用中的搜索服务器会定时检查运行中搜索服务器的状态。一旦发现与自己索引对应的服务器宕机就会向lock申请分布式锁,得到分布式锁的服务器就将自己加入到运行中搜索服务器组,同时从备用搜索服务器组中删除自己,并停止运行中搜索服务器检查服务。

为能够更快速的得到搜索结果,设计上将搜索服务器分优先等级。通常是将最新的数据放在一台或几台内存搜索服务器上。通常情况下前几页数据能在这几台搜索服务器里搜索到。如果在这几台搜索服务器上没有数据时再向其他旧数据搜索服务器上搜索。
优先搜索等级的逻辑是这样的:9最大为搜索全部服务器并且9不能作为level标识。当搜索等级level为1,搜索优先级为1的服务器,当level为2时搜索优先级为1和2的服务器,依此类推。

posted @ 2011-01-21 19:06 ivaneeo 阅读(252) | 评论 (0)编辑 收藏

HBase – Hadoop Database,是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统,利用HBase技术可在廉价PC Server上搭建起大规模结构化存储集群。

HBase是Google Bigtable的开源实现,类似Google Bigtable利用GFS作为其文件存储系统,HBase利用Hadoop HDFS作为其文件存储系统;Google运行MapReduce来处理Bigtable中的海量数据,HBase同样利用Hadoop MapReduce来处理HBase中的海量数据;Google Bigtable利用 Chubby作为协同服务,HBase利用Zookeeper作为对应。

上图描述了Hadoop EcoSystem中的各层系统,其中HBase位于结构化存储层,Hadoop HDFS为HBase提供了高可靠性的底层存储支持,Hadoop MapReduce为HBase提供了高性能的计算能力,Zookeeper为HBase提供了稳定服务和failover机制。

此外,Pig和Hive还为HBase提供了高层语言支持,使得在HBase上进行数据统计处理变的非常简单。 Sqoop则为HBase提供了方便的RDBMS数据导入功能,使得传统数据库数据向HBase中迁移变的非常方便。

HBase访问接口

1.       Native Java API,最常规和高效的访问方式,适合Hadoop MapReduce Job并行批处理HBase表数据

2.       HBase Shell,HBase的命令行工具,最简单的接口,适合HBase管理使用

3.       Thrift Gateway,利用Thrift序列化技术,支持C++,PHP,Python等多种语言,适合其他异构系统在线访问HBase表数据

4.       REST Gateway,支持REST 风格的Http API访问HBase, 解除了语言限制

5.       Pig,可以使用Pig Latin流式编程语言来操作HBase中的数据,和Hive类似,本质最终也是编译成MapReduce Job来处理HBase表数据,适合做数据统计

6.       Hive,当前Hive的Release版本尚没有加入对HBase的支持,但在下一个版本Hive 0.7.0中将会支持HBase,可以使用类似SQL语言来访问HBase

HBase数据模型

Table & Column Family

Row Key Timestamp Column Family
URI Parser
r1 t3 url=http://www.taobao.com title=天天特价
t2 host=taobao.com
t1
r2 t5 url=http://www.alibaba.com content=每天…
t4 host=alibaba.com

Ø  Row Key: 行键,Table的主键,Table中的记录按照Row Key排序

Ø  Timestamp: 时间戳,每次数据操作对应的时间戳,可以看作是数据的version number

Ø  Column Family:列簇,Table在水平方向有一个或者多个Column Family组成,一个Column Family中可以由任意多个Column组成,即Column Family支持动态扩展,无需预先定义Column的数量以及类型,所有Column均以二进制格式存储,用户需要自行进行类型转换。

Table & Region

当Table随着记录数不断增加而变大后,会逐渐分裂成多份splits,成为regions,一个region由[startkey,endkey)表示,不同的region会被Master分配给相应的RegionServer进行管理:

-ROOT- && .META. Table

HBase中有两张特殊的Table,-ROOT-和.META.

Ø  .META.:记录了用户表的Region信息,.META.可以有多个regoin

Ø  -ROOT-:记录了.META.表的Region信息,-ROOT-只有一个region

Ø  Zookeeper中记录了-ROOT-表的location

Client访问用户数据之前需要首先访问zookeeper,然后访问-ROOT-表,接着访问.META.表,最后才能找到用户数据的位置去访问,中间需要多次网络操作,不过client端会做cache缓存。

MapReduce on HBase

在HBase系统上运行批处理运算,最方便和实用的模型依然是MapReduce,如下图:

HBase Table和Region的关系,比较类似HDFS File和Block的关系,HBase提供了配套的TableInputFormat和TableOutputFormat API,可以方便的将HBase Table作为Hadoop MapReduce的Source和Sink,对于MapReduce Job应用开发人员来说,基本不需要关注HBase系统自身的细节。

HBase系统架构

Client

HBase Client使用HBase的RPC机制与HMaster和HRegionServer进行通信,对于管理类操作,Client与HMaster进行RPC;对于数据读写类操作,Client与HRegionServer进行RPC

Zookeeper

Zookeeper Quorum中除了存储了-ROOT-表的地址和HMaster的地址,HRegionServer也会把自己以Ephemeral方式注册到Zookeeper中,使得HMaster可以随时感知到各个HRegionServer的健康状态。此外,Zookeeper也避免了HMaster的单点问题,见下文描述

HMaster

HMaster没有单点问题,HBase中可以启动多个HMaster,通过Zookeeper的Master Election机制保证总有一个Master运行,HMaster在功能上主要负责Table和Region的管理工作:

1.       管理用户对Table的增、删、改、查操作

2.       管理HRegionServer的负载均衡,调整Region分布

3.       在Region Split后,负责新Region的分配

4.       在HRegionServer停机后,负责失效HRegionServer 上的Regions迁移

HRegionServer

HRegionServer主要负责响应用户I/O请求,向HDFS文件系统中读写数据,是HBase中最核心的模块。

HRegionServer内部管理了一系列HRegion对象,每个HRegion对应了Table中的一个Region,HRegion中由多个HStore组成。每个HStore对应了Table中的一个Column Family的存储,可以看出每个Column Family其实就是一个集中的存储单元,因此最好将具备共同IO特性的column放在一个Column Family中,这样最高效。

HStore存储是HBase存储的核心了,其中由两部分组成,一部分是MemStore,一部分是StoreFiles。MemStore是Sorted Memory Buffer,用户写入的数据首先会放入MemStore,当MemStore满了以后会Flush成一个StoreFile(底层实现是HFile),当StoreFile文件数量增长到一定阈值,会触发Compact合并操作,将多个StoreFiles合并成一个StoreFile,合并过程中会进行版本合并和数据删除,因此可以看出HBase其实只有增加数据,所有的更新和删除操作都是在后续的compact过程中进行的,这使得用户的写操作只要进入内存中就可以立即返回,保证了HBase I/O的高性能。当StoreFiles Compact后,会逐步形成越来越大的StoreFile,当单个StoreFile大小超过一定阈值后,会触发Split操作,同时把当前Region Split成2个Region,父Region会下线,新Split出的2个孩子Region会被HMaster分配到相应的HRegionServer上,使得原先1个Region的压力得以分流到2个Region上。下图描述了Compaction和Split的过程:

在理解了上述HStore的基本原理后,还必须了解一下HLog的功能,因为上述的HStore在系统正常工作的前提下是没有问题的,但是在分布式系统环境中,无法避免系统出错或者宕机,因此一旦HRegionServer意外退出,MemStore中的内存数据将会丢失,这就需要引入HLog了。每个HRegionServer中都有一个HLog对象,HLog是一个实现Write Ahead Log的类,在每次用户操作写入MemStore的同时,也会写一份数据到HLog文件中(HLog文件格式见后续),HLog文件定期会滚动出新的,并删除旧的文件(已持久化到StoreFile中的数据)。当HRegionServer意外终止后,HMaster会通过Zookeeper感知到,HMaster首先会处理遗留的 HLog文件,将其中不同Region的Log数据进行拆分,分别放到相应region的目录下,然后再将失效的region重新分配,领取 到这些region的HRegionServer在Load Region的过程中,会发现有历史HLog需要处理,因此会Replay HLog中的数据到MemStore中,然后flush到StoreFiles,完成数据恢复。

HBase存储格式

HBase中的所有数据文件都存储在Hadoop HDFS文件系统上,主要包括上述提出的两种文件类型:

1.       HFile, HBase中KeyValue数据的存储格式,HFile是Hadoop的二进制格式文件,实际上StoreFile就是对HFile做了轻量级包装,即StoreFile底层就是HFile

2.       HLog File,HBase中WAL(Write Ahead Log) 的存储格式,物理上是Hadoop的Sequence File

HFile

下图是HFile的存储格式:

首先HFile文件是不定长的,长度固定的只有其中的两块:Trailer和FileInfo。正如图中所示的,Trailer中有指针指向其他数据块的起始点。File Info中记录了文件的一些Meta信息,例如:AVG_KEY_LEN, AVG_VALUE_LEN, LAST_KEY, COMPARATOR, MAX_SEQ_ID_KEY等。Data Index和Meta Index块记录了每个Data块和Meta块的起始点。

Data Block是HBase I/O的基本单元,为了提高效率,HRegionServer中有基于LRU的Block Cache机制。每个Data块的大小可以在创建一个Table的时候通过参数指定,大号的Block有利于顺序Scan,小号Block利于随机查询。每个Data块除了开头的Magic以外就是一个个KeyValue对拼接而成, Magic内容就是一些随机数字,目的是防止数据损坏。后面会详细介绍每个KeyValue对的内部构造。

HFile里面的每个KeyValue对就是一个简单的byte数组。但是这个byte数组里面包含了很多项,并且有固定的结构。我们来看看里面的具体结构:

开始是两个固定长度的数值,分别表示Key的长度和Value的长度。紧接着是Key,开始是固定长度的数值,表示RowKey的长度,紧接着是RowKey,然后是固定长度的数值,表示Family的长度,然后是Family,接着是Qualifier,然后是两个固定长度的数值,表示Time Stamp和Key Type(Put/Delete)。Value部分没有这么复杂的结构,就是纯粹的二进制数据了。

HLogFile

上图中示意了HLog文件的结构,其实HLog文件就是一个普通的Hadoop Sequence File,Sequence File 的Key是HLogKey对象,HLogKey中记录了写入数据的归属信息,除了table和region名字外,同时还包括 sequence number和timestamp,timestamp是“写入时间”,sequence number的起始值为0,或者是最近一次存入文件系统中sequence number。

HLog Sequece File的Value是HBase的KeyValue对象,即对应HFile中的KeyValue,可参见上文描述。

结束

本文对HBase技术在功能和设计上进行了大致的介绍,由于篇幅有限,本文没有过多深入地描述HBase的一些细节技术。目前一淘的存储系统就是基于HBase技术搭建的,后续将介绍“一淘分布式存储系统”,通过实际案例来更多的介绍HBase应用。

posted @ 2011-01-21 19:04 ivaneeo 阅读(926) | 评论 (0)编辑 收藏

仅列出标题
共67页: First 上一页 15 16 17 18 19 20 21 22 23 下一页 Last