随笔-314  评论-209  文章-0  trackbacks-0
  2017年7月28日
原文链接:http://www.cnblogs.com/juandx/p/4962089.html python中对文件、文件夹(文件操作函数)的操作需要涉及到os模块和shutil模块。 得到当前工作目录,即当前Python脚本工作的目录路径: os.getcwd() 返回指定目录下的所有文件和目录名:os.listdir() 函数用来删除一个文件:os.remove() 删除多个目录:os.removedirs(r“c:\python”) 检验给出的路径是否是一个文件:os.path.isfile() 检验给出的路径是否是一个目录:os.path.isdir() 判断是否是绝对路径:os.path.isabs() 检验给出的路径是否真地存:os.path.exists() 返回一个路径的目录名和文件名:os.path.split() eg os.path.split(‘/home/swaroop/byte/code/poem.txt’) 结果:(‘/home/swaroop/byte/code’, ‘poem.txt’) 分离扩展名:os.path.splitext() 获取路径名:os.path.dirname() 获取文件名:os.path.basename() 运行shell命令: os.system() 读取和设置环境变量:os.getenv() 与os.putenv() 给出当前平台使用的行终止符:os.linesep Windows使用’\r\n’,Linux使用’\n’而Mac使用’\r’ 指示你正在使用的平台:os.name 对于Windows,它是’nt’,而对于Linux/Unix用户,它是’posix’ 重命名:os.rename(old, new) 创建多级目录:os.makedirs(r“c:\python\test”) 创建单个目录:os.mkdir(“test”) 获取文件属性:os.stat(file) 修改文件权限与时间戳:os.chmod(file) 终止当前进程:os.exit() 获取文件大小:os.path.getsize(filename) 文件操作: os.mknod(“test.txt”) 创建空文件 fp = open(“test.txt”,w) 直接打开一个文件,如果文件不存在则创建文件 关于open 模式: w 以写方式打开, a 以追加模式打开 (从 EOF 开始, 必要时创建新文件) r+ 以读写模式打开 w+ 以读写模式打开 (参见 w ) a+ 以读写模式打开 (参见 a ) rb 以二进制读模式打开 wb 以二进制写模式打开 (参见 w ) ab 以二进制追加模式打开 (参见 a ) rb+ 以二进制读写模式打开 (参见 r+ ) wb+ 以二进制读写模式打开 (参见 w+ ) ab+ 以二进制读写模式打开 (参见 a+ ) fp.read([size]) #size为读取的长度,以byte为单位 fp.readline([size]) #读一行,如果定义了size,有可能返回的只是一行的一部分 fp.readlines([size]) #把文件每一行作为一个list的一个成员,并返回这个list。其实它的内部是通过循环调用readline()来实现的。如果提供size参数,size是表示读取内容的总长,也就是说可能只读到文件的一部分。 fp.write(str) #把str写到文件中,write()并不会在str后加上一个换行符 fp.writelines(seq) #把seq的内容全部写到文件中(多行一次性写入)。这个函数也只是忠实地写入,不会在每行后面加上任何东西。 fp.close() #关闭文件。python会在一个文件不用后自动关闭文件,不过这一功能没有保证,最好还是养成自己关闭的习惯。 如果一个文件在关闭后还对其进行操作会产生ValueError fp.flush() #把缓冲区的内容写入硬盘 fp.fileno() #返回一个长整型的”文件标签“ fp.isatty() #文件是否是一个终端设备文件(unix系统中的) fp.tell() #返回文件操作标记的当前位置,以文件的开头为原点 fp.next() #返回下一行,并将文件操作标记位移到下一行。把一个file用于for … in file这样的语句时,就是调用next()函数来实现遍历的。 fp.seek(offset[,whence]) #将文件打操作标记移到offset的位置。这个offset一般是相对于文件的开头来计算的,一般为正数。但如果提供了whence参数就不一定了,whence可以为0表示从头开始计算,1表示以当前位置为原点计算。2表示以文件末尾为原点进行计算。需要注意,如果文件以a或a+的模式打开,每次进行写操作时,文件操作标记会自动返回到文件末尾。 fp.truncate([size]) #把文件裁成规定的大小,默认的是裁到当前文件操作标记的位置。如果size比文件的大小还要大,依据系统的不同可能是不改变文件,也可能是用0把文件补到相应的大小,也可能是以一些随机的内容加上去。 目录操作: os.mkdir(“file”) 创建目录 复制文件: shutil.copyfile(“oldfile”,”newfile”) oldfile和newfile都只能是文件 shutil.copy(“oldfile”,”newfile”) oldfile只能是文件夹,newfile可以是文件,也可以是目标目录 复制文件夹: shutil.copytree(“olddir”,”newdir”) olddir和newdir都只能是目录,且newdir必须不存在 重命名文件(目录) os.rename(“oldname”,”newname”) 文件或目录都是使用这条命令 移动文件(目录) shutil.move(“oldpos”,”newpos”) 删除文件 os.remove(“file”) 删除目录 os.rmdir(“dir”)只能删除空目录 shutil.rmtree(“dir”) 空目录、有内容的目录都可以删 转换目录 os.chdir(“path”) 换路径 Python读写文件 1.open 使用open打开文件后一定要记得调用文件对象的close()方法。比如可以用try/finally语句来确保最后能关闭文件。 file_object = open(‘thefile.txt’) try: all_the_text = file_object.read( ) finally: file_object.close( ) 注:不能把open语句放在try块里,因为当打开文件出现异常时,文件对象file_object无法执行close()方法。 2.读文件 读文本文件 input = open('data', 'r') #第二个参数默认为r input = open('data') 1 2 3 读二进制文件 input = open('data', 'rb') 1 读取所有内容 file_object = open('thefile.txt') try: all_the_text = file_object.read( ) finally: file_object.close( ) 1 2 3 4 5 读固定字节 file_object = open('abinfile', 'rb') try: while True: chunk = file_object.read(100) if not chunk: break do_something_with(chunk) finally: file_object.close( ) 1 2 3 4 5 6 7 8 9 读每行 list_of_all_the_lines = file_object.readlines( ) 1 如果文件是文本文件,还可以直接遍历文件对象获取每行: for line in file_object: process line 1 2 3.写文件 写文本文件 output = open('data', 'w') 1 写二进制文件 output = open('data', 'wb') 1 追加写文件 output = open('data', 'w+') 1 写数据 file_object = open('thefile.txt', 'w') file_object.write(all_the_text) file_object.close( ) 1 2 3 写入多行 file_object.writelines(list_of_text_strings) 1 注意,调用writelines写入多行在性能上会比使用write一次性写入要高。 在处理日志文件的时候,常常会遇到这样的情况:日志文件巨大,不可能一次性把整个文件读入到内存中进行处理,例如需要在一台物理内存为 2GB 的机器上处理一个 2GB 的日志文件,我们可能希望每次只处理其中 200MB 的内容。 在 Python 中,内置的 File 对象直接提供了一个 readlines(sizehint) 函数来完成这样的事情。以下面的代码为例: file = open('test.log', 'r')sizehint = 209715200 # 200Mposition = 0lines = file.readlines(sizehint)while not file.tell() - position < 0: position = file.tell() lines = file.readlines(sizehint) 1 每次调用 readlines(sizehint) 函数,会返回大约 200MB 的数据,而且所返回的必然都是完整的行数据,大多数情况下,返回的数据的字节数会稍微比 sizehint 指定的值大一点(除最后一次调用 readlines(sizehint) 函数的时候)。通常情况下,Python 会自动将用户指定的 sizehint 的值调整成内部缓存大小的整数倍。 file在python是一个特殊的类型,它用于在python程序中对外部的文件进行操作。在python中一切都是对象,file也不例外,file有file的方法和属性。下面先来看如何创建一个file对象: file(name[, mode[, buffering]]) 1 file()函数用于创建一个file对象,它有一个别名叫open(),可能更形象一些,它们是内置函数。来看看它的参数。它参数都是以字符串的形式传递的。name是文件的名字。 mode是打开的模式,可选的值为r w a U,分别代表读(默认) 写 添加支持各种换行符的模式。用w或a模式打开文件的话,如果文件不存在,那么就自动创建。此外,用w模式打开一个已经存在的文件时,原有文件的内容会被清空,因为一开始文件的操作的标记是在文件的开头的,这时候进行写操作,无疑会把原有的内容给抹掉。由于历史的原因,换行符在不同的系统中有不同模式,比如在 unix中是一个\n,而在windows中是‘\r\n’,用U模式打开文件,就是支持所有的换行模式,也就说‘\r’ ‘\n’ ‘\r\n’都可表示换行,会有一个tuple用来存贮这个文件中用到过的换行符。不过,虽说换行有多种模式,读到python中统一用\n代替。在模式字符的后面,还可以加上+ b t这两种标识,分别表示可以对文件同时进行读写操作和用二进制模式、文本模式(默认)打开文件。 buffering如果为0表示不进行缓冲;如果为1表示进行“行缓冲“;如果是一个大于1的数表示缓冲区的大小,应该是以字节为单位的。 file对象有自己的属性和方法。先来看看file的属性。 closed #标记文件是否已经关闭,由close()改写 encoding #文件编码 mode #打开模式 name #文件名 newlines #文件中用到的换行模式,是一个tuple softspace #boolean型,一般为0,据说用于print 1 2 3 4 5 6 file的读写方法: F.read([size]) #size为读取的长度,以byte为单位 F.readline([size]) #读一行,如果定义了size,有可能返回的只是一行的一部分 F.readlines([size]) #把文件每一行作为一个list的一个成员,并返回这个list。其实它的内部是通过循环调用readline()来实现的。如果提供size参数,size是表示读取内容的总长,也就是说可能只读到文件的一部分。 F.write(str) #把str写到文件中,write()并不会在str后加上一个换行符 F.writelines(seq) #把seq的内容全部写到文件中。这个函数也只是忠实地写入,不会在每行后面加上任何东西。 file的其他方法: F.close() #关闭文件。python会在一个文件不用后自动关闭文件,不过这一功能没有保证,最好还是养成自己关闭的习惯。如果一个文件在关闭后还对其进行操作会产生ValueError F.flush() #把缓冲区的内容写入硬盘 F.fileno() #返回一个长整型的”文件标签“ F.isatty() #文件是否是一个终端设备文件(unix系统中的) F.tell() #返回文件操作标记的当前位置,以文件的开头为原点 F.next() #返回下一行,并将文件操作标记位移到下一行。把一个file用于for ... in file这样的语句时,就是调用next()函数来实现遍历的。 F.seek(offset[,whence]) #将文件打操作标记移到offset的位置。这个offset一般是相对于文件的开头来计算的,一般为正数。但如果提供了whence参数就不一定了,whence可以为0表示从头开始计算,1表示以当前位置为原点计算。2表示以文件末尾为原点进行计算。需要注意,如果文件以a或a+的模式打开,每次进行写操作时,文件操作标记会自动返回到文件末尾。 F.truncate([size]) #把文件裁成规定的大小,默认的是裁到当前文件操作标记的位置。如果size比文件的大小还要大,依据系统的不同可能是不改变文件,也可能是用0把文件补到相应的大小,也可能是以一些随机的内容加上去。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 http://www.cnblogs.com/allenblogs/archive/2010/09/13/1824842.html http://www.cnblogs.com/rollenholt/archive/2012/04/23/2466179.html
posted @ 2018-11-28 11:52 xzc 阅读(538) | 评论 (0)编辑 收藏
首先 dfs.replication这个参数是个client参数,即node level参数。需要在每台datanode上设置。 其实默认为3个副本已经够用了,设置太多也没什么用。 一个文件,上传到hdfs上时指定的是几个副本就是几个。以后你修改了副本数,对已经上传了的文件也不会起作用。可以再上传文件的同时指定创建的副本数 Hadoop dfs -D dfs.replication=1 -put 70M logs/2 可以通过命令来更改已经上传的文件的副本数: hadoop fs -setrep -R 3 / 查看当前hdfs的副本数 hadoop fsck -locations FSCK started by hadoop from /172.18.6.112 for path / at Thu Oct 27 13:24:25 CST 2011 ....................Status: HEALTHY Total size: 4834251860 B Total dirs: 21 Total files: 20 Total blocks (validated): 82 (avg. block size 58954290 B) Minimally replicated blocks: 82 (100.0 %) Over-replicated blocks: 0 (0.0 %) Under-replicated blocks: 0 (0.0 %) Mis-replicated blocks: 0 (0.0 %) Default replication factor: 3 Average block replication: 3.0 Corrupt blocks: 0 Missing replicas: 0 (0.0 %) Number of data-nodes: 3 Number of racks: 1 FSCK ended at Thu Oct 27 13:24:25 CST 2011 in 10 milliseconds The filesystem under path '/' is HEALTHY 某个文件的副本数,可以通过ls中的文件描述符看到 hadoop dfs -ls -rw-r--r-- 3 hadoop supergroup 153748148 2011-10-27 16:11 /user/hadoop/logs/201108/impression_witspixel2011080100.thin.log.gz 如果你只有3个datanode,但是你却指定副本数为4,是不会生效的,因为每个datanode上只能存放一个副本。 参考:http://blog.csdn.net/lskyne/article/details/8898666
posted @ 2018-11-26 11:52 xzc 阅读(849) | 评论 (0)编辑 收藏
转自:https://www.cnblogs.com/shabbylee/p/6792555.html 由于历史原因,Python有两个大的版本分支,Python2和Python3,又由于一些库只支持某个版本分支,所以需要在电脑上同时安装Python2和Python3,因此如何让两个版本的Python兼容,如何让脚本在对应的Python版本上运行,这个是值得总结的。 对于Ubuntu 16.04 LTS版本来说,Python2(2.7.12)和Python3(3.5.2)默认同时安装,默认的python版本是2.7.12。 当然你也可以用python2来调用。 如果想调用python3,就用python3. 对于Windows,就有点复杂了。因为不论python2还是python3,python可执行文件都叫python.exe,在cmd下输入python得到的版本号取决于环境变量里哪个版本的python路径更靠前,毕竟windows是按照顺序查找的。比如环境变量里的顺序是这样的: 那么cmd下的python版本就是2.7.12。 反之,则是python3的版本号。 这就带来一个问题了,如果你想用python2运行一个脚本,一会你又想用python3运行另一个脚本,你怎么做?来回改环境变量显然很麻烦。 网上很多办法比较简单粗暴,把两个python.exe改名啊,一个改成python2.exe,一个改成python3.exe。这样做固然可以,但修改可执行文件的方式,毕竟不是很好的方法。 我仔细查找了一些python技术文档,发现另外一个我觉得比较好的解决办法。 借用py的一个参数来调用不同版本的Python。py -2调用python2,py -3调用的是python3. 当python脚本需要python2运行时,只需在脚本前加上,然后运行py xxx.py即可。 #! python2 当python脚本需要python3运行时,只需在脚本前加上,,然后运行py xxx.py即可。 #! python3 就这么简单。 同时,这也完美解决了在pip在python2和python3共存的环境下报错,提示Fatal error in launcher: Unable to create process using '"'的问题。 当需要python2的pip时,只需 py -2 -m pip install xxx 当需要python3的pip时,只需 py -3 -m pip install xxx python2和python3的pip package就这样可以完美分开了。
posted @ 2018-11-16 09:38 xzc 阅读(619) | 评论 (0)编辑 收藏
Sentry权限控制通过Beeline(Hiveserver2 SQL 命令行接口)输入Grant 和 Revoke语句来配置。语法跟现在的一些主流的关系数据库很相似。需要注意的是:当sentry服务启用后,我们必须使用beeline接口来执行hive查询,Hive Cli并不支持sentry。 CREATE ROLE Statement CREATE ROLE语句创建一个可以被赋权的角色。权限可以赋给角色,然后再分配给各个用户。一个用户被分配到角色后可以执行该角色的权限。 只有拥有管理员的角色可以create/drop角色。默认情况下,hive、impala和hue用户拥有管理员角色。 CREATE ROLE [role_name]; DROP ROLE Statement DROP ROLE语句可以用来从数据库中移除一个角色。一旦移除,之前分配给所有用户的该角色将会取消。之前已经执行的语句不会受到影响。但是,因为hive在执行每条查询语句之前会检查用户的权限,处于登录活跃状态的用户会话会受到影响。 DROP ROLE [role_name]; GRANT ROLE Statement GRANT ROLE语句可以用来给组授予角色。只有sentry的管理员用户才能执行该操作。 GRANT ROLE role_name [, role_name] TO GROUP (groupName) [,GROUP (groupName)] REVOKE ROLE Statement REVOKE ROLE语句可以用来从组移除角色。只有sentry的管理员用户才能执行该操作。 REVOKE ROLE role_name [, role_name] FROM GROUP (groupName) [,GROUP (groupName)] GRANT (PRIVILEGE) Statement 授予一个对象的权限给一个角色,该用户必须为sentry的管理员用户。 GRANT (PRIVILEGE) [, (PRIVILEGE) ] ON (OBJECT) (object_name) TO ROLE (roleName) [,ROLE (roleName)] REVOKE (PRIVILEGE) Statement 因为只有认证的管理员用户可以创建角色,从而只有管理员用户可以取消一个组的权限。 REVOKE (PRIVILEGE) [, (PRIVILEGE) ] ON (OBJECT) (object_name) FROM ROLE (roleName) [,ROLE (roleName)] GRANT (PRIVILEGE) ... WITH GRANT OPTION 在cdh5.2中,你可以委托给其他角色来授予和解除权限。比如,一个角色被授予了WITH GRANT OPTION的权限可以GRANT/REVOKE同样的权限给其他角色。因此,如果一个角色有一个库的所有权限并且设置了 WITH GRANT OPTION,该角色分配的用户可以对该数据库和其中的表执行GRANT/REVOKE语句。 GRANT (PRIVILEGE) ON (OBJECT) (object_name) TO ROLE (roleName) WITH GRANT OPTION 只有一个带GRANT选项的特殊权限的角色或者它的父级权限可以从其他角色解除这种权限。一旦下面的语句执行,所有跟其相关的grant权限将会被解除。 REVOKE (RIVILEGE) ON (BJECT) (bject_name) FROM ROLE (roleName) Hive目前不支持解除之前赋予一个角色 WITH GRANT OPTION 的权限。要想移除WITH GRANT OPTION、解除权限,可以重新去除 WITH GRANT OPTION这个标记来再次附权。 SET ROLE Statement SET ROLE语句可以给当前会话选择一个角色使之生效。一个用户只能启用分配给他的角色。任何不存在的角色和当前用户不能使用的角色是不能生效的。如果没有使用任何角色,用户将会使用任何一个属于他的角色的权限。 选择一个角色使用: To enable a specific role: 使用所有的角色: To enable a specific role: 关闭所有角色 SET ROLE NONE; SHOW Statement 显示当前用户拥有库、表、列相关权限的数据库: SHOW DATABASES; 显示当前用户拥有表、列相关权限的表; SHOW TABLES; 显示当前用户拥有SELECT权限的列: SHOW COLUMNS (FROM|IN) table_name [(FROM|IN) db_name]; 显示当前系统中所有的角色(只有管理员用户可以执行): SHOW ROLES; 显示当前影响当前会话的角色: SHOW CURRENT ROLES; 显示指定组的被分配到的所有角色(只有管理员用户和指定组内的用户可以执行) SHOW ROLE GRANT GROUP (groupName); SHOW语句可以用来显示一个角色被授予的权限或者显示角色的一个特定对象的所有权限。 显示指定角色的所有被赋予的权限。(只有管理员用户和指定角色分配到的用户可以执行)。下面的语句也会显示任何列级的权限。 SHOW GRANT ROLE (roleName); 显示指定对象的一个角色的所有被赋予的权限(只有管理员用户和指定角色分配到的用户可以执行)。下面的语句也会显示任何列级的权限。 SHOW GRANT ROLE (roleName) on (OBJECT) (objectName); ----------------------------我也是有底线的-----------------------------
posted @ 2018-09-03 18:19 xzc 阅读(485) | 评论 (0)编辑 收藏
     摘要: Python 里面的编码和解码也就是 unicode 和 str 这两种形式的相互转化。编码是 unicode -> str,相反的,解码就是 str -> unicode。剩下的问题就是确定何时需要进行编码或者解码了.关于文件开头的"编码指示",也就是 # -*- codin...  阅读全文
posted @ 2018-05-18 09:52 xzc 阅读(391) | 评论 (0)编辑 收藏
一、前言
    早上醒来打开微信,同事反馈kafka集群从昨天凌晨开始写入频繁失败,赶紧打开电脑查看了kafka集群的机器监控,日志信息,发现其中一个节点的集群负载从昨天凌晨突然掉下来了,和同事反馈的时间点大概一致,于是乎就登录服务器开始干活。
二、排错
1、查看机器监控,看是否能大概定位是哪个节点有异常
技术分享
2、根据机器监控大概定位到其中一个异常节点,登录服务器查看kafka日志,发现有报错日志,并且日志就停留在这个这个时间点:
[2017-06-01 16:59:59,851] ERROR Processor got uncaught exception. (kafka.network.Processor)
java.lang.OutOfMemoryError: Direct buffer memory
        at java.nio.Bits.reserveMemory(Bits.java:658)
        at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
        at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
        at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
        at sun.nio.ch.IOUtil.read(IOUtil.java:195)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
        at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:108)
        at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
        at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
        at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:160)
        at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:141)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
        at kafka.network.Processor.run(SocketServer.scala:413)3、查看kafka进程和监听端口情况,发现都正常,尼玛假死了
ps -ef |grep kafka        ## 查看kafka的进程
netstat -ntlp |grep 9092  ##9092kafka的监听端口4、既然已经假死了,只能重启了
ps -ef |grep kafka |grep -v grep |awk ‘{print $2}‘  | xargs kill -9  
/usr/local/kafka/bin;nohup ./kafka-server-start.sh ../config/server.properties &5、重启后在观察该节点的kafka日志,在一顿index重建之后,上面的报错信息在疯狂的刷,最后谷歌一番,解决了该问题
三、解决方案:
/usr/local/kafka/binkafka-run-class.sh去掉
-XX:+DisableExplicitGC添加
-XX:MaxDirectMemorySize=512m在一次重启kafka,问题解决。
posted @ 2018-03-08 16:35 xzc 阅读(2158) | 评论 (0)编辑 收藏
     摘要: 我们每次执行hive的hql时,shell里都会提示一段话:[python] view plaincopy...  Number of reduce tasks not specified. Estimated from input data size: 50...  阅读全文
posted @ 2018-03-07 11:21 xzc 阅读(1494) | 评论 (1)编辑 收藏
     摘要: spark 累加历史主要用到了窗口函数,而进行全部统计,则需要用到rollup函数 1  应用场景:   1、我们需要统计用户的总使用时长(累加历史)   2、前台展现页面需要对多个维度进行查询,如:产品、地区等等   3、需要展现的表格头如: 产品、2015-04、2015-05、2015-06 2 原始数据: product_code |event_date |dur...  阅读全文
posted @ 2017-10-23 22:05 xzc 阅读(864) | 评论 (0)编辑 收藏
     摘要: Spark1.4发布,支持了窗口分析函数(window functions)。在离线平台中,90%以上的离线分析任务都是使用Hive实现,其中必然会使用很多窗口分析函数,如果SparkSQL支持窗口分析函数, 那么对于后面Hive向SparkSQL中的迁移的工作量会大大降低,使用方式如下: 1、初始化数据 创建表 [sql] view plain cop...  阅读全文
posted @ 2017-10-23 22:04 xzc 阅读(680) | 评论 (0)编辑 收藏

SparkSQL相关语句总结

1.in 不支持子查询 eg. select * from src where key in(select key from test);
支持查询个数 eg. select * from src where key in(1,2,3,4,5);
in 40000个 耗时25.766秒
in 80000个 耗时78.827秒

2.union all/union
不支持顶层的union all eg. select key from src UNION ALL select key from test;
支持select * from (select key from src union all select key from test)aa;
不支持 union
支持select distinct key from (select key from src union all select key from test)aa;

3.intersect 不支持

4.minus 不支持

5.except 不支持

6.inner join/join/left outer join/right outer join/full outer join/left semi join 都支持
left outer join/right outer join/full outer join 中间必须有outer
join是最简单的关联操作,两边关联只取交集;
left outer join是以左表驱动,右表不存在的key均赋值为null;
right outer join是以右表驱动,左表不存在的key均赋值为null;
full outer join全表关联,将两表完整的进行笛卡尔积操作,左右表均可赋值为null;
left semi join最主要的使用场景就是解决exist in;
Hive不支持where子句中的子查询,SQL常用的exist in子句在Hive中是不支持的
不支持子查询 eg. select * from src aa where aa.key in(select bb.key from test bb);
可用以下两种方式替换:
select * from src aa left outer join test bb on aa.key=bb.key where bb.key <> null;
select * from src aa left semi join test bb on aa.key=bb.key;
大多数情况下 JOIN ON 和 left semi on 是对等的
A,B两表连接,如果B表存在重复数据
当使用JOIN ON的时候,A,B表会关联出两条记录,应为ON上的条件符合; 
而是用LEFT SEMI JOIN 当A表中的记录,在B表上产生符合条件之后就返回,不会再继续查找B表记录了,
所以如果B表有重复,也不会产生重复的多条记录。 
left outer join 支持子查询 eg. select aa.* from src aa left outer join (select * from test111)bb on aa.key=bb.a;

7. hive四中数据导入方式
1)从本地文件系统中导入数据到Hive表
create table wyp(id int,name string) ROW FORMAT delimited fields terminated by '\t' STORED AS TEXTFILE;
load data local inpath 'wyp.txt' into table wyp;
2)从HDFS上导入数据到Hive表
[wyp@master /home/q/hadoop-2.2.0]$ bin/hadoop fs -cat /home/wyp/add.txt
hive> load data inpath '/home/wyp/add.txt' into table wyp;
3)从别的表中查询出相应的数据并导入到Hive表中
hive> create table test(
> id int, name string
> ,tel string)
> partitioned by
> (age int)
> ROW FORMAT DELIMITED
> FIELDS TERMINATED BY '\t'
> STORED AS TEXTFILE;

注:test表里面用age作为了分区字段,分区:在Hive中,表的每一个分区对应表下的相应目录,所有分区的数据都是存储在对应的目录中。
比如wyp表有dt和city两个分区,则对应dt=20131218city=BJ对应表的目录为/user/hive/warehouse/dt=20131218/city=BJ,
所有属于这个分区的数据都存放在这个目录中。

hive> insert into table test
> partition (age='25')
> select id, name, tel
> from wyp;

也可以在select语句里面通过使用分区值来动态指明分区:
hive> set hive.exec.dynamic.partition.mode=nonstrict;
hive> insert into table test
> partition (age)
> select id, name,
> tel, age
> from wyp;

Hive也支持insert overwrite方式来插入数据
hive> insert overwrite table test
> PARTITION (age)
> select id, name, tel, age
> from wyp;

Hive还支持多表插入
hive> from wyp
> insert into table test
> partition(age)
> select id, name, tel, age
> insert into table test3
> select id, name
> where age>25;
4)在创建表的时候通过从别的表中查询出相应的记录并插入到所创建的表中
hive> create table test4
> as
> select id, name, tel
> from wyp;

8.查看建表语句
hive> show create table test3;

9.表重命名
hive> ALTER TABLE events RENAME TO 3koobecaf; 

10.表增加列
hive> ALTER TABLE pokes ADD COLUMNS (new_col INT); 

11.添加一列并增加列字段注释 
hive> ALTER TABLE invites ADD COLUMNS (new_col2 INT COMMENT 'a comment'); 

12.删除表
hive> DROP TABLE pokes; 

13.top n
hive> select * from test order by key limit 10;
14.创建数据库
Create Database baseball;

14.alter table tablename  change oldColumn newColumn column_type 修改列的名称和类型

alter table yangsy CHANGE product_no phone_no string

 

15.导入.sql文件中的sql

 spark-sql --driver-class-path /home/hadoop/hive/lib/mysql-connector-java-5.1.30-bin.jar -f testsql.sql 


insert into table CI_CUSER_20141117154351522 select mainResult.PRODUCT_NO,dw_coclbl_m02_3848.L1_01_02_01,dw_coclbl_d01_3845.L2_01_01_04 from (select PRODUCT_NO from CI_CUSER_20141114203632267) mainResult left join DW_COCLBL_M02_201407 dw_coclbl_m02_3848 on mainResult.PRODUCT_NO = dw_coclbl_m02_3848.PRODUCT_NO left join DW_COCLBL_D01_20140515 dw_coclbl_d01_3845 on dw_coclbl_m02_3848.PRODUCT_NO = dw_coclbl_d01_3845.PRODUCT_NO

insert into CI_CUSER_20141117142123638 ( PRODUCT_NO,ATTR_COL_0000,ATTR_COL_0001) select mainResult.PRODUCT_NO,dw_coclbl_m02_3848.L1_01_02_01,dw_coclbl_m02_3848.L1_01_03_01 from (select PRODUCT_NO from CI_CUSER_20141114203632267) mainResult left join DW_COCLBL_M02_201407 dw_coclbl_m02_3848 on mainResult.PRODUCT_NO = dw_coclbl_m02_3848.PRODUCT_NO 


CREATE TABLE ci_cuser_yymmddhhmisstttttt_tmp(product_no string) row format serde 'com.bizo.hive.serde.csv.CSVSerde' ; 
LOAD DATA LOCAL INPATH '/home/ocdc/coc/yuli/test123.csv' OVERWRITE INTO TABLE test_yuli2;

创建支持CSV格式的testfile文件
CREATE TABLE test_yuli7 row format serde 'com.bizo.hive.serde.csv.CSVSerde' as select * from CI_CUSER_20150310162729786;

不依赖CSVSerde的jar包创建逗号分隔的表
"create table " +listName+ " ROW FORMAT DELIMITED FIELDS TERMINATED BY ','" +
" as select * from " + listName1;

create table aaaa ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE as select * from

ThriftServer 开启FAIR模式
SparkSQL Thrift Server 开启FAIR调度方式:
1. 修改$SPARK_HOME/conf/spark-defaults.conf,新增
2. spark.scheduler.mode FAIR
3. spark.scheduler.allocation.file /Users/tianyi/github/community/apache-spark/conf/fair-scheduler.xml
4. 修改$SPARK_HOME/conf/fair-scheduler.xml(或新增该文件), 编辑如下格式内容
5. <?xml version="1.0"?>
6. <allocations>
7. <pool name="production">
8. <schedulingMode>FAIR</schedulingMode>
9. <!-- weight表示两个队列在minShare相同的情况下,可以使用资源的比例 -->
10. <weight>1</weight>
11. <!-- minShare表示优先保证的资源数 -->
12. <minShare>2</minShare>
13. </pool>
14. <pool name="test">
15. <schedulingMode>FIFO</schedulingMode>
16. <weight>2</weight>
17. <minShare>3</minShare>
18. </pool>
19. </allocations>
20. 重启Thrift Server
21. 执行SQL前,执行 
22. set spark.sql.thriftserver.scheduler.pool=指定的队列名

等操作完了 create table yangsy555 like CI_CUSER_YYMMDDHHMISSTTTTTT 然后insert into yangsy555 select * from yangsy555

 

创建一个自增序列表,使用row_number() over()为表增加序列号 以供分页查询

create table yagnsytest2 as SELECT ROW_NUMBER() OVER() as id,* from yangsytest;

 

 

Sparksql的解析与Hiveql的解析的执行流程:

posted @ 2017-10-23 21:03 xzc 阅读(724) | 评论 (0)编辑 收藏
如果用传统SCP远程拷贝,速度是比较慢的。现在采用lz4压缩传输。LZ4是一个非常快的无损压缩算法,压缩速度在单核300MB/S,可扩展支持多核CPU。它还具有一个非常快速的解码器,速度单核可达到和超越1GB/S。通常能够达到多核系统上的RAM速度限制。 你PV 全命为Pipe Viewer,利用它我们可以查看到命令执行的进度。 下面介绍下lz4和pv的安装,下载软件: 下载pv-1.1.4.tar.gz wget http://sourceforge.jp/projects/sfnet_pipeviewer/downloads/pipeviewer/1.1.4/pv-1.1.4.tar.bz2/ 下lz4的包难一些,可能要FQ:https://dl.dropboxusercontent.com/u/59565338/LZ4/lz4-r108.tar.gz 安装灰常简单: pv安装: [root ~]$ tar jxvf pv-1.1.4.tar.bz2 [root ~]$ cd pv-1.1.4 [root pv-1.1.4]$ ./configure && make && make install lz4安装: [root ~]$ tar zxvf lz4-r108.tar.gz [root ~]$ cd lz4-r108 [root lz4-r108]$ make && make install 用法:(-c 后指定要传输的文件,ssh -p 是指定端口,后面的ip是目标主机的ip, -xC指定传到目标主机下的那个目录下,别的不用修改): tar -c mysql-slave-3307 |pv|lz4 -B4|ssh -p10022 -c arcfour128 -o"MACs umac-64@openssh.com" 192.168.100.234 "lz4 -d |tar -xC /data" 下面是我线上传一个从库的效果: 看到了吧,25.7G 只需要接近3分钟,这样远比scp速度快上了好几倍,直接scp拷贝离散文件,很消耗IO,而使用LZ4快速压缩,对性能影响不大,传输速度快 PS:下次补充同机房不同网段的传输效果及跨机房的传输效果^0^ 作者:陆炫志 出处:xuanzhi的博客 http://www.cnblogs.com/xuanzhi201111 您的支持是对博主最大的鼓励,感谢您的认真阅读。本文版权归作者所有,欢迎转载,但请保留该声明。
posted @ 2017-09-14 18:24 xzc 阅读(430) | 评论 (0)编辑 收藏
王 腾腾 和 邵 兵 2015 年 11 月 26 日发布 WeiboGoogle+用电子邮件发送本页面 Comments 1 引子 随着云时代的来临,大数据(Big data)也获得了越来越多的关注。著云台的分析师团队认为,大数据(Big data)通常用来形容一个公司创造的大量非结构化和半结构化数据,这些数据在下载到关系型数据库用于分析时会花费过多时间和金钱。大数据分析常和云计算联系到一起,因为实时的大型数据集分析需要像 MapReduce 一样的框架来向数十、数百或甚至数千的电脑分配工作。 “大数据”在互联网行业指的是这样一种现象:互联网公司在日常运营中生成、累积的用户网络行为数据。这些数据的规模是如此庞大,以至于不能用 G 或 T 来衡量。所以如何高效的处理分析大数据的问题摆在了面前。对于大数据的处理优化方式有很多种,本文中主要介绍在使用 Hadoop 平台中对数据进行压缩处理来提高数据处理效率。 压缩简介 Hadoop 作为一个较通用的海量数据处理平台,每次运算都会需要处理大量数据,我们会在 Hadoop 系统中对数据进行压缩处理来优化磁盘使用率,提高数据在磁盘和网络中的传输速度,从而提高系统处理数据的效率。在使用压缩方式方面,主要考虑压缩速度和压缩文件的可分割性。综合所述,使用压缩的优点如下: 1. 节省数据占用的磁盘空间; 2. 加快数据在磁盘和网络中的传输速度,从而提高系统的处理速度。 压缩格式 Hadoop 对于压缩格式的是自动识别。如果我们压缩的文件有相应压缩格式的扩展名(比如 lzo,gz,bzip2 等)。Hadoop 会根据压缩格式的扩展名自动选择相对应的解码器来解压数据,此过程完全是 Hadoop 自动处理,我们只需要确保输入的压缩文件有扩展名。 Hadoop 对每个压缩格式的支持, 详细见下表: 表 1. 压缩格式 压缩格式 工具 算法 扩展名 多文件 可分割性 DEFLATE 无 DEFLATE .deflate 不 不 GZIP gzip DEFLATE .gzp 不 不 ZIP zip DEFLATE .zip 是 是,在文件范围内 BZIP2 bzip2 BZIP2 .bz2 不 是 LZO lzop LZO .lzo 不 是 如果压缩的文件没有扩展名,则需要在执行 MapReduce 任务的时候指定输入格式。 1 2 3 4 5 hadoop jar /usr/home/hadoop/hadoop-0.20.2/contrib/streaming/ hadoop-streaming-0.20.2-CD H3B4.jar -file /usr/home/hadoop/hello/mapper.py -mapper / usr/home/hadoop/hello/mapper.py -file /usr/home/hadoop/hello/ reducer.py -reducer /usr/home/hadoop/hello/reducer.py -input lzotest -output result4 - jobconf mapred.reduce.tasks=1*-inputformatorg.apache.hadoop.mapred.LzoTextInputFormat* 性能对比 Hadoop 下各种压缩算法的压缩比,压缩时间,解压时间见下表: 表 2. 性能对比 压缩算法 原始文件大小 压缩文件大小 压缩速度 解压速度 gzip 8.3GB 1.8GB 17.5MB/s 58MB/s bzip2 8.3GB 1.1GB 2.4MB/s 9.5MB/s LZO-bset 8.3GB 2GB 4MB/s 60.6MB/s LZO 8.3GB 2.9GB 49.3MB/s 74.6MB/s 因此我们可以得出: 1) Bzip2 压缩效果明显是最好的,但是 bzip2 压缩速度慢,可分割。 2) Gzip 压缩效果不如 Bzip2,但是压缩解压速度快,不支持分割。 3) LZO 压缩效果不如 Bzip2 和 Gzip,但是压缩解压速度最快!并且支持分割! 这里提一下,文件的可分割性在 Hadoop 中是很非常重要的,它会影响到在执行作业时 Map 启动的个数,从而会影响到作业的执行效率! 所有的压缩算法都显示出一种时间空间的权衡,更快的压缩和解压速度通常会耗费更多的空间。在选择使用哪种压缩格式时,我们应该根据自身的业务需求来选择。 下图是在本地压缩与通过流将压缩结果上传到 BI 的时间对比。 图 1. 时间对比 图 1. 时间对比 使用方式 MapReduce 可以在三个阶段中使用压缩。 1. 输入压缩文件。如果输入的文件是压缩过的,那么在被 MapReduce 读取时,它们会被自动解压。 2.MapReduce 作业中,对 Map 输出的中间结果集压缩。实现方式如下: 1)可以在 core-site.xml 文件中配置,代码如下 图 2. core-site.xml 代码示例 图 2. core-site.xml 代码示例 2)使用 Java 代码指定 1 2 conf.setCompressMapOut(true); conf.setMapOutputCompressorClass(GzipCode.class); 最后一行代码指定 Map 输出结果的编码器。 3.MapReduce 作业中,对 Reduce 输出的最终结果集压。实现方式如下: 1)可以在 core-site.xml 文件中配置,代码如下 图 3. core-site.xml 代码示例 图 3. core-site.xml 代码示例 2)使用 Java 代码指定 1 2 conf.setBoolean(“mapred.output.compress”,true); conf.setClass(“mapred.output.compression.codec”,GzipCode.class,CompressionCodec.class); 最后一行同样指定 Reduce 输出结果的编码器。 压缩框架 我们前面已经提到过关于压缩的使用方式,其中第一种就是将压缩文件直接作为入口参数交给 MapReduce 处理,MapReduce 会自动根据压缩文件的扩展名来自动选择合适解压器处理数据。那么到底是怎么实现的呢?如下图所示: 图 4. 压缩实现情形 图 4. 压缩实现情形 我们在配置 Job 作业的时候,会设置数据输入的格式化方式,使用 conf.setInputFormat() 方法,这里的入口参数是 TextInputFormat.class。 TextInputFormat.class 继承于 InputFormat.class,主要用于对数据进行两方面的预处理。一是对输入数据进行切分,生成一组 split,一个 split 会分发给一个 mapper 进行处理;二是针对每个 split,再创建一个 RecordReader 读取 split 内的数据,并按照的形式组织成一条 record 传给 map 函数进行处理。此类在对数据进行切分之前,会首先初始化压缩解压工程类 CompressionCodeFactory.class,通过工厂获取实例化的编码解码器 CompressionCodec 后对数据处理操作。 下面我们来详细的看一下从压缩工厂获取编码解码器的过程。 压缩解压工厂类 CompressionCodecFactory 压缩解压工厂类 CompressionCodeFactory.class 主要功能就是负责根据不同的文件扩展名来自动获取相对应的压缩解压器 CompressionCodec.class,是整个压缩框架的核心控制器。我们来看下 CompressionCodeFactory.class 中的几个重要方法: 1. 初始化方法 图 5. 代码示例 图 5. 代码示例 ① getCodeClasses(conf) 负责获取关于编码解码器 CompressionCodec.class 的配置信息。下面将会详细讲解。 ② 默认添加两种编码解码器。当 getCodeClass(conf) 方法没有读取到相关的编码解码器 CompressionCodec.class 的配置信息时,系统会默认添加两种编码解码器 CompressionCodec.class,分别是 GzipCode.class 和 DefaultCode.class。 ③ addCode(code) 此方法用于将编码解码器 CompressionCodec.class 添加到系统缓存中。下面将会详细讲解。 2. getCodeClasses(conf) 图 6. 代码示例 图 6. 代码示例 ① 这里我们可以看,系统读取关于编码解码器 CompressionCodec.class 的配置信息在 core-site.xml 中 io.compression.codes 下。我们看下这段配置文件,如下图所示: 图 7. 代码示例 图 7. 代码示例 Value 标签中是每个编码解码 CompressionCodec.class 的完整路径,中间用逗号分隔。我们只需要将自己需要使用到的编码解码配置到此属性中,系统就会自动加载到缓存中。 除了上述的这种方式以外,Hadoop 为我们提供了另一种加载方式:代码加载。同样最终将信息配置在 io.compression.codes 属性中,代码如下: 1 2 conf.set("io.compression.codecs","org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.GzipCodec,com.hadoop.compression.lzo.LzopCodec");) 3. addCode(code) 方法添加编码解码器 图 8. 代码示例 图 8. 代码示例 addCodec(codec) 方法入口参数是个编码解码器 CompressionCodec.class,这里我们会首先接触到它的一个方法。 ① codec.getDefaultExtension() 方法看方法名的字面意思我们就可以知道,此方法用于获取此编码解码所对应文件的扩展名,比如,文件名是 xxxx.gz2,那么这个方法的返回值就是“.bz2”,我们来看下 org.apache.hadoop.io.compress.BZip2Codec 此方法的实现代码: 图 9. 代码示例 图 9. 代码示例 ② Codecs 是一个 SortedMap 的示例。这里有个很有意思的地方,它将 Key 值,也就是通过 codec.getDefaultExtension() 方法获取到的文件扩展名进行了翻转,举个例子,比如文件名扩展名“.bz2”,将文件名翻转之后就变成了“2zb.”。 系统加载完所有的编码解码器后,我们可以得到这样一个有序映射表,如下: 图 10. 代码示例 图 10. 代码示例 现在编码解码器都有了,我们怎么得到对应的编码解码器呢?看下面这个方法。 4. getCodec() 方法 此方法用于获取文件所对应的的编码解码器 CompressionCodec.class。 图 11. 代码示例 图 11. 代码示例 getCodec(Path) 方法的输入参数是 Path 对象,保存着文件路径。 ① 将文件名翻转。如 xxxx.bz2 翻转成 2zb.xxxx。 ② 获取 codecs 集合中最接近 2zb.xxxx 的值。此方法有返回值同样是个 SortMap 对象。 在这里对返回的 SortMap 对象进行第二次筛选。 编码解码器 CompressionCodec 刚刚在介绍压缩解压工程类 CompressionCodeFactory.class 的时候,我们多次提到了压缩解压器 CompressionCodecclass,并且我们在上文中还提到了它其中的一个用于获取文件扩展名的方法 getDefaultExtension()。 压缩解压工程类 CompressionCodeFactory.class 使用的是抽象工厂的设计模式。它是一个接口,制定了一系列方法,用于创建特定压缩解压算法。下面我们来看下比较重要的几个方法: 1. createOutputStream() 方法对数据流进行压缩。 图 12. 代码示例 图 12. 代码示例 此方法提供了方法重载。 ① 基于流的压缩处理; ② 基于压缩机 Compress.class 的压缩处理 2. createInputStream() 方法对数据流进行解压。 图 13. 代码示例 图 13. 代码示例 这里的解压方法同样提供了方法重载。 ① 基于流的解压处理; ② 基于解压机 Decompressor.class 的解压处理; 关于压缩/解压流与压缩/解压机会在下面的文章中我们会详细讲解。此处暂作了解。 3. getCompressorType() 返回需要的编码器的类型。 getDefaultExtension() 获取对应文件扩展名的方法。前文已提到过,不再敖述。 压缩机 Compressor 和解压机 Decompressor 前面在编码解码器部分的 createInputStream() 和 createInputStream() 方法中我们提到过 Compressor.class 和 Decompressor.class 对象。在 Hadoop 的实现中,数据编码器和解码器被抽象成了两个接口: 1. org.apache.hadoop.io.compress.Compressor; 2. org.apache.hadoop.io.compress.Decompressor; 它们规定了一系列的方法,所以在 Hadoop 内部的编码/解码算法实现都需要实现对应的接口。在实际的数据压缩与解压缩过程,Hadoop 为用户提供了统一的 I/O 流处理模式。 我们看一下压缩机 Compressor.class,代码如下: 图 14. 代码示例 图 14. 代码示例 ① setInput() 方法接收数据到内部缓冲区,可以多次调用; ② needsInput() 方法用于检查缓冲区是否已满。如果是 false 则说明当前的缓冲区已满; ③ getBytesRead() 输入未压缩字节的总数; ④ getBytesWritten() 输出压缩字节的总数; ⑤ finish() 方法结束数据输入的过程; ⑥ finished() 方法用于检查是否已经读取完所有的等待压缩的数据。如果返回 false,表明压缩器中还有未读取的压缩数据,可以继续通过 compress() 方法读取; ⑦ compress() 方法获取压缩后的数据,释放缓冲区空间; ⑧ reset() 方法用于重置压缩器,以处理新的输入数据集合; ⑨ end() 方法用于关闭解压缩器并放弃所有未处理的输入; ⑩ reinit() 方法更进一步允许使用 Hadoop 的配置系统,重置并重新配置压缩器; 为了提高压缩效率,并不是每次用户调用 setInput() 方法,压缩机就会立即工作,所以,为了通知压缩机所有数据已经写入,必须使用 finish() 方法。finish() 调用结束后,压缩机缓冲区中保持的已经压缩的数据,可以继续通过 compress() 方法获得。至于要判断压缩机中是否还有未读取的压缩数据,则需要利用 finished() 方法来判断。 压缩流 CompressionOutputStream 和解压缩流 CompressionInputStream 前文编码解码器部分提到过 createInputStream() 方法返回 CompressionOutputStream 对象,createInputStream() 方法返回 CompressionInputStream 对象。这两个类分别继承自 java.io.OutputStream 和 java.io.InputStream。从而我们不难理解,这两个对象的作用了吧。 我们来看下 CompressionInputStream.class 的代码: 图 15. 代码示例 图 15. 代码示例 可以看到 CompressionOutputStream 实现了 OutputStream 的 close() 方法和 flush() 方法,但用于输出数据的 write() 方法以及用于结束压缩过程并将输入写到底层流的 finish() 方法和重置压缩状态的 resetState() 方法还是抽象方法,需要 CompressionOutputStream 的子类实现。 Hadoop 压缩框架中为我们提供了一个实现了 CompressionOutputStream 类通用的子类 CompressorStream.class。 图 16. 代码示例 图 16. 代码示例 CompressorStream.class 提供了三个不同的构造函数,CompressorStream 需要的底层输出流 out 和压缩时使用的压缩器,都作为参数传入构造函数。另一个参数是 CompressorStream 工作时使用的缓冲区 buffer 的大小,构造时会利用这个参数分配该缓冲区。第一个可以手动设置缓冲区大小,第二个默认 512,第三个没有缓冲区且不可使用压缩器。 图 17. 代码示例 图 17. 代码示例 在 write()、compress()、finish() 以及 resetState() 方法中,我们发现了压缩机 Compressor 的身影,前面文章我们已经介绍过压缩机的的实现过程,通过调用 setInput() 方法将待压缩数据填充到内部缓冲区,然后调用 needsInput() 方法检查缓冲区是否已满,如果缓冲区已满,将调用 compress() 方法对数据进行压缩。流程如下图所示: 图 18. 调用流程图 图 18. 调用流程图 结束语 本文深入到 Hadoop 平台压缩框架内部,对其核心代码以及各压缩格式的效率进行对比分析,以帮助读者在使用 Hadoop 平台时,可以通过对数据进行压缩处理来提高数据处理效率。当再次面临海量数据处理时, Hadoop 平台的压缩机制可以让我们事半功倍。 相关主题 Hadoop 在线 API 《Hadoop 技术内幕深入解析 HADOOP COMMON 和 HDFS 架构设计与实现原理》 developerWorks 开源技术主题:查找丰富的操作信息、工具和项目更新,帮助您掌握开源技术并将其用于 IBM 产品。
posted @ 2017-09-14 17:35 xzc 阅读(553) | 评论 (0)编辑 收藏

Linux系统查看当前主机CPU、内存、机器型号及主板信息:


查看CPU信息(型号)
# cat /proc/cpuinfo | grep name | cut -f2 -d: | uniq -c

 

查看内存信息
# cat /proc/meminfo

 

查看主板型号:
# dmidecode |grep -A16 "System Information$"

 

查看机器型号
# dmidecode | grep "Product Name"

 

查看当前操作系统内核信息
# uname -a

 

查看当前操作系统发行版信息
# cat /etc/issue | grep Linux

posted @ 2017-09-10 16:37 xzc 阅读(236) | 评论 (0)编辑 收藏
本文介绍Hadoop YARN最近版本中增加的几个非常有用的特性,包括: (1)ResourceManager HA 在apache hadoop 2.4或者CDH5.0.0版本之后,增加了ResourceManger HA特性,支持基于Zookeeper的热主备切换,具体配置参数可以参考Cloudera的文档:ResourceManager HA配置。 需要注意的是,ResourceManager HA只完成了第一个阶段的设计,即备ResourceManager启动后,会杀死之前正在运行的Application,然后从共享存储系统中读取这些Application的元数据信息,并重新提交这些Application。启动ApplicationMaster后,剩下的容错功能就交给ApplicationMaster实现了,比如MapReduce的ApplicationMaster会不断地将完成的任务信息写到HDFS上,这样,当它重启时,可以重新读取这些日志,进而只需重新运行那些未完成的任务。ResourceManager HA第二个阶段的任务是,备ResourceManager接管主ResourceManager后,无需杀死那些正在运行的Application,让他们像任何事情没有发生一样运行下去。 (2) 磁盘容错 在apache hadoop 2.4或者CDH5.0.0版本之后,增加了几个对多磁盘非常友好地参数,这些参数允许YARN更好地使用NodeManager上的多块磁盘,相关jira为:YARN-1781,主要新增了三个参数: yarn.nodemanager.disk-health-checker.min-healthy-disks:NodeManager上最少保证健康磁盘比例,当健康磁盘比例低于该值时,NodeManager不会再接收和启动新的Container,默认值是0.25,表示25%; yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage:一块磁盘的最高使用率,当一块磁盘的使用率超过该值时,则认为该盘为坏盘,不再使用该盘,默认是100,表示100%,可以适当调低; yarn.nodemanager.disk-health-checker.min-free-space-per-disk-mb:一块磁盘最少保证剩余空间大小,当某块磁盘剩余空间低于该值时,将不再使用该盘,默认是0,表示0MB。 (3)资源调度器 Fair Scheduler:Fair Scheduler增加了一个非常有用的新特性,允许用户在线将一个应用程序从一个队列转移到另外一个队列,比如将一个重要作业从一个低优先级队列转移到高优先级队列,操作命令是:bin/yarn application -movetoqueue appID -queue targetQueueName,相关jira为:YARN-1721。 Capacity Scheduler:Capacity Scheduler中资源抢占功能经过了充分的测试,可以使用了。 原创文章,转载请注明: 转载自董的博客 本文链接地址: http://dongxicheng.org/mapreduce-nextgen/hadoop-yarn-recently-new-features/
posted @ 2017-09-07 11:37 xzc 阅读(270) | 评论 (0)编辑 收藏
关于mapreduce程序运行在yarn上时内存的分配一直是一个让我蒙圈的事情,单独查任何一个资料都不能很好的理解透彻。于是,最近查了大量的资料,综合各种解释,终于理解到了一个比较清晰的程度,在这里将理解的东西做一个简单的记录,以备忘却。 首先,先将关于mapreduce和yarn关于内存分配的参数粘贴上: yarn.scheduler.minimum-allocation-mb yarn.scheduler.maximum-allocation-mb yarn.nodemanager.resource.memory-mb yarn.nodemanager.vmem-pmem-ratio yarn.scheduler.increment-allocation-mb mapreduce.map.memory.mb mapreduce.reduce.memory.mb mapreduce.map.java.opts mapreduce.reduce.java.opts 个人认为,针对mapreduce任务,这些参数只有放在一起学习才能真正理解,如果单独考虑,理解不清晰。下面开始详细讲解。 一、理解参数yarn.nodemanager.resource.memory-mb,yarn.nodemanager.vmem-pmem-ratio yarn.nodemanager.resource.memory-mb很简单,就是你的这台服务器节点上准备分给yarn的内存; yarn.nodemanager.vmem-pmem-ratio网上解释都是"每使用1MB物理内存,最多可用的虚拟内存数,默认2.1",但是目前我还是不太理解其作用是什么,有知道的朋友希望能详细解释下。 二、理解参数yarn.scheduler.minimum-allocation-mb和yarn.scheduler.maximum-allocation-mb 都知道,在yarn上运行程序时每个task都是在独立的Container中运行的,单个Container可以申请的最小和最大内存的限制就是这两个参数,注意,并不是这两个参数决定单个Container申请内存的大小,而仅仅是限制的一个范围。 三、理解yarn的内存规整化因子和内存规整化算法 先不说和哪个参数有关,单纯理解这一概念。举例: 假如规整化因子b=512M,上述讲的参数yarn.scheduler.minimum-allocation-mb为1024,yarn.scheduler.maximum-allocation-mb为8096,然后我打算给单个map任务申请内存资源(mapreduce.map.memory.mb): 申请的资源为a=1000M时,实际得到的Container内存大小为1024M(小于yarn.scheduler.minimum-allocation-mb的话自动设置为yarn.scheduler.minimum-allocation-mb); 申请的资源为a=1500M时,实际得到的Container内存大小为1536M,计算公式为:ceiling(a/b)*b,即ceiling(a/b)=ceiling(1500/512)=3,3*512=1536。此处假如b=1024,则Container实际内存大小为2048M 也就是说Container实际内存大小最小为yarn.scheduler.minimum-allocation-mb值,然后增加时的最小增加量为规整化因子b,最大不超过yarn.scheduler.maximum-allocation-mb 四、理解mapreduce.map.memory.mb、mapreduce.reduce.memory.mb "三"中提到的"打算给单个map任务申请内存资源"也就是a,其实就是指的"mapreduce.map.memory.mb"或"mapreduce.reduce.memory.mb",注意其值不要超过yarn.scheduler.maximum-allocation-mb 五、理解mapreduce.map.java.opts、mapreduce.reduce.java.opts 以map任务为例,Container其实就是在执行一个脚本文件,而脚本文件中,会执行一个 Java 的子进程,这个子进程就是真正的 Map Task,mapreduce.map.java.opts 其实就是启动 JVM 虚拟机时,传递给虚拟机的启动参数,而默认值 -Xmx200m 表示这个 Java 程序可以使用的最大堆内存数,一旦超过这个大小,JVM 就会抛出 Out of Memory 异常,并终止进程。而 mapreduce.map.memory.mb 设置的是 Container 的内存上限,这个参数由 NodeManager 读取并进行控制,当 Container 的内存大小超过了这个参数值,NodeManager 会负责 kill 掉 Container。在后面分析 yarn.nodemanager.vmem-pmem-ratio 这个参数的时候,会讲解 NodeManager 监控 Container 内存(包括虚拟内存和物理内存)及 kill 掉 Container 的过程。 也就是说,mapreduce.map.java.opts一定要小于mapreduce.map.memory.mb mapreduce.reduce.java.opts同mapreduce.map.java.opts一样的道理。 六、理解规整化因子指的是哪个参数 "三"中提到的规整化因子也就是b,具体指的是哪个参数和yarn使用的调度器有关,一共有三种调度器:capacity scheduler(默认调度器)、fair scheduler和fifo scheduler 当使用capacity scheduler或者fifo scheduler时,规整化因子指的就是参数yarn.scheduler.minimum-allocation-mb,不能单独配置,即yarn.scheduler.increment-allocation-mb无作用; 当使用fair scheduler时,规整化因子指的是参数yarn.scheduler.increment-allocation-mb 至此,关于yarn和mapreduce的任务内存配置问题讲完了,这也是我目前理解的层次。
posted @ 2017-08-30 21:05 xzc 阅读(303) | 评论 (0)编辑 收藏
1. 日期输出格式化

所有日期、时间的api都在datetime模块内。

1. datetime => string

now = datetime.datetime.now()
now.strftime('%Y-%m-%d %H:%M:%S')
#输出2012-03-05 16:26:23.870105

strftime是datetime类的实例方法。

2. string => datetime

t_str = '2012-03-05 16:26:23'
d = datetime.datetime.strptime(t_str, '%Y-%m-%d %H:%M:%S')

strptime是datetime类的静态方法。

2. 日期比较操作

在datetime模块中有timedelta类,这个类的对象用于表示一个时间间隔,比如两个日期或者时间的差别。

构造方法:

datetime.timedelta(days=0, seconds=0, microseconds=0, milliseconds=0, minutes=0, hours=0, weeks=0)

所有的参数都有默认值0,这些参数可以是int或float,正的或负的。

可以通过timedelta.days、tiemdelta.seconds等获取相应的时间值。

timedelta类的实例,支持加、减、乘、除等操作,所得的结果也是timedelta类的实例。比如:

year = timedelta(days=365)
ten_years = year *10
nine_years = ten_years - year

同时,date、time和datetime类也支持与timedelta的加、减运算。

datetime1 = datetime2 +/- timedelta
timedelta = datetime1 - datetime2

这样,可以很方便的实现一些功能。

1. 两个日期相差多少天。

d1 = datetime.datetime.strptime('2012-03-05 17:41:20', '%Y-%m-%d %H:%M:%S')
d2 = datetime.datetime.strptime('2012-03-02 17:41:20', '%Y-%m-%d %H:%M:%S')
delta = d1 - d2
print delta.days

输出:3

2. 今天的n天后的日期。

now = datetime.datetime.now()
delta = datetime.timedelta(days=3)
n_days = now + delta
print n_days.strftime('%Y-%m-%d %H:%M:%S')

输出:2012-03-08 17:44:50

复制代码
#coding=utf-8
import datetime
now=datetime.datetime.now()
print now
#将日期转化为字符串 datetime => string
print now.strftime('%Y-%m-%d %H:%M:%S')

t_str = '2012-03-05 16:26:23'
#将字符串转换为日期 string => datetime
d=datetime.datetime.strptime(t_str,'%Y-%m-%d %H:%M:%S')
print d

#在datetime模块中有timedelta类,这个类的对象用于表示一个时间间隔,比如两个日#期或者时间的差别。

#计算两个日期的间隔
d1 = datetime.datetime.strptime('2012-03-05 17:41:20', '%Y-%m-%d %H:%M:%S')
d2 = datetime.datetime.strptime('2012-03-02 17:41:20', '%Y-%m-%d %H:%M:%S')
delta = d1 - d2
print delta.days
print delta

#今天的n天后的日期。
now=datetime.datetime.now()
delta=datetime.timedelta(days=3)
n_days=now+delta
print n_days.strftime('%Y-%m-%d %H:%M:%S')
复制代码
posted @ 2017-08-14 23:09 xzc 阅读(1361) | 评论 (0)编辑 收藏

Shell中并没有真正意义的多线程,要实现多线程可以启动多个后端进程,最大程度利用cpu性能。

直接看代码示例吧。

(1) 顺序执行的代码

复制代码
 1 #!/bin/bash  2 date  3 for i in `seq 1 5`  4 do  5 {  6     echo "sleep 5"  7     sleep 5  8 }  9 done 10 date 
复制代码

输出:

复制代码
Sat Nov 19 09:21:51 CST 2016 sleep 5 sleep 5 sleep 5 sleep 5 sleep 5 Sat Nov 19 09:22:16 CST 2016
复制代码

(2) 并行代码

使用'&'+wait 实现“多进程”实现

复制代码
 1 #!/bin/bash  2 date  3 for i in `seq 1 5`  4 do  5 {  6     echo "sleep 5"  7     sleep 5  8 } &  9 done 10 wait  ##等待所有子后台进程结束 11 date
复制代码

输出:

复制代码
Sat Nov 19 09:25:07 CST 2016 sleep 5 sleep 5 sleep 5 sleep 5 sleep 5 Sat Nov 19 09:25:12 CST 2016
复制代码

 (3) 对于大量处理任务如何实现启动后台进程的数量可控?

  简单的方法可以使用2层for/while循环实现,每次wait内层循环的多个后台程序执行完成

  但是这种方式的问题是,如果内层循环有“慢节点”可能导致整个任务的执行执行时间长。

  更高级的实现可以看(4)

(4) 使用命名管道(fifo)实现每次启动后台进程数量可控。 

复制代码
 1 #!/bin/bash  2   3 function my_cmd(){  4     t=$RANDOM  5     t=$[t%15]  6     sleep $t  7     echo "sleep $t s"  8 }  9  10 tmp_fifofile="/tmp/$$.fifo"  11 mkfifo $tmp_fifofile      # 新建一个fifo类型的文件 12 exec 6<>$tmp_fifofile     # 将fd6指向fifo类型 13 rm $tmp_fifofile    #删也可以 14  15 thread_num=5  # 最大可同时执行线程数量 16 job_num=100   # 任务总数 17  18 #根据线程总数量设置令牌个数 19 for ((i=0;i<${thread_num};i++));do 20     echo 21 done >&6  22  23 for ((i=0;i<${job_num};i++));do # 任务数量 24     # 一个read -u6命令执行一次,就从fd6中减去一个回车符,然后向下执行, 25     # fd6中没有回车符的时候,就停在这了,从而实现了线程数量控制 26     read -u6  27  28     #可以把具体的需要执行的命令封装成一个函数 29     {    30         my_cmd 31     } & 32  33     echo >&6 # 当进程结束以后,再向fd6中加上一个回车符,即补上了read -u6减去的那个 34 done 35  36 wait 37 exec 6>&- # 关闭fd6 38 echo "over"
复制代码

 

参考:http://lawrence-zxc.github.io/2012/06/16/shell-thread/

posted @ 2017-08-02 17:01 xzc 阅读(337) | 评论 (0)编辑 收藏

之前在论坛看到一个关于HDFS权限的问题,当时无法回答该问题。无法回答并不意味着对HDFS权限一无所知,而是不能准确完整的阐述HDFS权限,因此决定系统地学习HDFS文件权限。HDFS的文件和目录权限模型共享了POSIX(Portable Operating System Interface,可移植操作系统接口)模型的很多部分,比如每个文件和目录与一个拥有者和组相关联,文件或者目录对于拥有者、组内的其它用户和组外的其它用户有不同的权限等。与POSIX模型不同的是,HDFS中的文件没有可执行文件的概念,因而也没有setuid和setgid,虽然目录依然保留着可执行目录的概念(x),但对于目录也没有setuid和setgid。粘贴位(sticky bit)可以用在目录上,用于阻止除超级用户,目录或文件的拥有者外的任何删除或移动目录中的文件,文件上的粘贴位不起作用。

      当创建文件或目录时,拥有者为运行客户端进程的用户,组为父目录所属的组。每个访问HDFS的客户端进程有一个由用户姓名和组列表两部分组的成标识,无论何时HDFS必须对由客户端进程访问的文件或目录进行权限检查,规则如下:

 

  • 如果进程的用户名匹配文件或目录的拥有者,那么测试拥有者权限
  • 否则如果文件或目录所属的组匹配组列表中任何组,那么测试组权限
  • 否则测试其它权限

 

      如果权限检查失败,则客户端操作失败。

      从hadoop-0.22开始,hadoop支持两种不同的操作模式以确定用户,分别为simple和kerberos具体使用哪个方式由参数hadoop.security.authentication设置,该参数位于core-site.xml文件中,默认值为simple。在simple模式下,客户端进程的身份由主机的操作系统确定,比如在类Unix系统中,用户名为命令whoami的输出。在kerberos模式下,客户端进程的身份由Kerberos凭证确定,比如在一个Kerberized环境中,用户可能使用kinit工具得到了一个Kerberos ticket-granting-ticket(TGT)且使用klist确定当前的principal。当映射一个Kerberosprincipal到HDFS的用户名时,除了最主要的部分外其余部分都被丢弃,比如一个principal为todd/foobar@CORP.COMPANY.COM,将映射为HDFS上的todd。无论哪种操作模式,对于HDFS来说用户标识机制都是外部的,HDFS本身没有创建用户标,建立组或者处理用户凭证的规定。

      上面讨论了确定用户的两种模式,即simple和kerberos,下面学习如何确定用户组。用户组是通过由参数hadoop.security.group.mapping设置的组映射服务确定的,默认实现是org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback,该实现首先确定Java本地接口(JNI)是否可用,如果JNI可用,该实现将使用hadoop中的API为用户解析用户组列表。如果JNI不可用,那么使用ShellBasedUnixGroupsMapping,该实现将使用Linux/Unix中的bash –cgroups命令为用户解析用户组列表。其它实现还有LdapGroupsMapping,通过直接连接LDAP服务器来解析用户组列表。对HDFS来说,用户到组的映射是在NameNode上执行的,因而NameNode的主机系统配置决定了用户的组映射。HDFS将文件或目录的用户和组存储为字符串,并且不像Linux/Unix那样可以将用户和组转换为数字。

      每个针对文件或者目录的操作都将全路径名称传递到NameNode,然后对该路径的每次操作都将应用权限检查。客户端隐含地关联用户身份到NameNode的连接,减少改变现存客户端API的需要。总是存在这么一种情景,当在一个文件上的操作成功后,当重复该操作时可能失败,因为该文件或者路径中的某些目录已经不再存在。例如,当客户端第一次开始读取一个文件时,它向NameNode发出的第一个请求来发现该文件第一个块的位置,第二个寻找其他块的请求可能失败。另一方面,对于已经知道文件块的客户端来说,删除文件不会取消访问。通过添加权限,客户端对文件的访问在请求之间可能撤回,对于已经知道文件块的客户端来说,改变权限不会取消客户端的访问。

      HDFS中超级用户与通常熟悉的Linux或Unix中的root用户不同,HDFS的超级用户是与NameNode进程有相同标示的用户,更简单易懂些,启动NameNode的用户就为超级用户。对于谁是超级用户没有固定的定义,当NameNode启动后,该进程的标示决定了谁是超级用户。HDFS的超级用户不必是NameNode主机的超级用户,也需用所有的集群使用相同的超级用户,出于实验目的在个人工作站上运行HDFS的人自然而然的称为超级用户而不需要任何配置。另外参数dfs.permissions.superusergroup设置了超级用户,该组中的所有用户也为超级用户。超级用户在HDFS中可以执行任何操作而针对超级用户的权限检查永远不会失败。

      HDFS也提供了对POSIX ACL(访问控制列表)支持来为特定的用户或者用户组提供更加细粒度的文件权限。ACL是不同于用户和组的自然组织层次的有用的权限控制方式,ACL可以为特定的用户和组设置不同的权限,而不仅仅是文件的拥有者和文件所属的组。默认情况下,HDFS禁用ACL,因此NameNode禁止ACL的创建,为了启用ACL,需要在hdfs-site.xml中将参数dfs.namenode.acls.enabled设置为true。

      访问控制列表由一组ACL项组成,每个ACL项命名了特定的用户或组,并为其授予或拒绝读,写和执行的权限,例如:

 

user::rw- user:bruce:rwx                  #effective:r-- group::r-x                      #effective:r-- group:sales:rwx                 #effective:r-- mask::r-- other::r-- 

 

      每个ACL项由类型,可选的名称和权限字符串组成,它们之间使用冒号(:)。在上面的例子中文件的拥有者具有读写权限,文件所属的组具有读和执行的权限,其他用户具有读权限,这些设置与将文件设置为654等价(6表示拥有者的读写权限,5表示组的读和执行权限,4表示其他用户的读权限)。除此之外,还有两个扩展的ACL项,分别为用户bruce和组sales,并都授予了读写和执行的权限。mask项是一个特殊的项,用于过滤授予所有命名用户,命名组及未命名组的权限,即过滤除文件拥有者和其他用户(other)之外的任何ACL项。在该例子中,mask值有读权限,则bruce用户、sales组和文件所属的组只具有读权限。每个ACL必须有mask项,如果用户在设置ACL时没有使用mask项,一个mask项被自动加入到ACL中,该mask项是通过计算所有被mask过滤项的权限与(&运算)得出的。对拥有ACL的文件执行chmod实际改变的是mask项的权限,因为mask项扮演的是过滤器的角色,这将有效地约束所有扩展项的权限,而不是仅改变组的权限而可能漏掉其它扩展项的权限。

      访问控制列表和默认访问控制列表存在着不同,前者定义了在执行权限检查实施的规则,后者定义了新文件或者子目录创建时自动接收的ACL项,例如:

user::rwx group::r-x other::r-x default:user::rwx default:user:bruce:rwx          #effective:r-x default:group::r-x default:group:sales:rwx         #effective:r-x default:mask::r-x default:other::r-x 

      只有目录可能拥有默认访问控制列表,当创建新文件或者子目录时,自动拷贝父辈的默认访问控制列表到自己的访问控制列表中,新的子目录也拷贝父辈默认的访问控制列表到自己的默认访问控制列表中。这样,当创建子目录时默认ACL将沿着文件系统树被任意深层次地拷贝。在新的子ACL中,准确的权限由模式参数过滤。默认的umask为022,通常新目录权限为755,新文件权限为644。模式参数为未命名用户(文件的拥有者),mask及其他用户过滤拷贝的权限值。在上面的例子中,创建权限为755的子目录时,模式对最终结果没有影响,但是如果创建权限为644的文件时,模式过滤器导致新文件的ACL中文件拥有者的权限为读写,mask的权限为读以及其他用户权限为读。mask的权限意味着用户bruce和组sales只有读权限。拷贝ACL发生在文件或子目录的创建时,后面如果修改父辈的默认ACL将不再影响已存在子类的ACL。

      默认ACL必须包含所有最小要求的ACL项,包括文件拥有者项,文件所属的组项和其它用户项。如果用户没有在默认ACL中配置上述三项中的任何一个,那么该项将通过从访问ACL拷贝对应的权限来自动插入,或者如果没有访问ACL则自动插入权限位。默认ACL也必须拥有mask,如果mask没有被指定,通过计算所有被mask过滤项的权限与(&运算)自动插入mask。当一个文件拥有ACL时,权限检查的算法变为:

 

  • 如果用户名匹配文件的拥有者,则测试拥有者权限
  • 否则,如果用户名匹配命名用户项中的用户名,则测试由mask权限过滤后的该项的权限
  • 否则,如果文件所属的组匹配组列表中的任何组,并且如果这些被mask过滤的权限具有访问权限,那么使用这么权限
  • 否则,如果存在命名组项匹配组列表中的成员,并且如果这些被mask过滤的权限具有访问权限,那么使用这么权限
  • 否则,如果文件所属的组或者任何命名组项匹配组列表中的成员,但不具备访问权限,那么访问被拒绝
  • 否则测试文件的其他用户权限

 

      最佳实践时基于传统的权限位设置大部分权限要求,然后定义少量带有特殊规则的ACL增加权限位。相比较只是用权限位的文件,使用ACL的文件会在NameNode中产生额外的内存消耗。

      上面学习了HDFS中的文件权限和访问控制列表,最后学习一下如何针对权限和ACL进行配置,下表列出了其中的重要参数:

参数名

位置

用途

dfs.permissions.enabled

hdfs-site.xml

默认值为true,即启用权限检查。如果为 false,则禁用权限检查。

hadoop.http.staticuser.user

core-site.xml

默认值为dr.who,查看web UI的用户

dfs.permissions.superusergroup

hdfs-site.xml

超级用户的组名称,默认为supergroup

<fs.permissions.umask-mode

core-site.xml

创建文件和目录时使用的umask,默认值为八进制022,每位数字对应了拥有者,组和其他用户。该值既可以使用八进制数字,如022,也可以使用符号,如u=rwx,g=r-x,o=r-x(对应022)

dfs.cluster.administrators

hdfs-site.xml

被指定为ACL的集群管理员

dfs.namenode.acls.enabled

hdfs-site.xml

默认值为false,禁用ACL,设置为true则启用ACL。当ACL被禁用时,NameNode拒绝设置或者获取ACL的请求

posted @ 2017-07-28 10:55 xzc 阅读(959) | 评论 (0)编辑 收藏