2017年5月15日
#
一、POI概述
Apache POI是Apache软件基金会的开放源码函式库,POI提供API给Java程序对Microsoft Office格式档案读和写的功能。
结构:
HSSF - 提供读写Microsoft Excel格式档案的功能。
XSSF - 提供读写Microsoft Excel OOXML格式档案的功能。
HWPF - 提供读写Microsoft Word格式档案的功能。
HSLF - 提供读写Microsoft PowerPoint格式档案的功能。
HDGF - 提供读写Microsoft Visio格式档案的功能。
使用必须引入依赖
org.apache.poi
poi
3.17
注:3.17版本是支持jdk6的最后版本
二、HSSF概况
HSSF 是Horrible SpreadSheet Format的缩写,通过HSSF,你可以用纯Java代码来读取、写入、修改Excel文件。HSSF 为读取操作提供了两类API:usermodel和eventusermodel,即“用户模型”和“事件-用户模型”。
三、 POI EXCEL文档结构类
HSSFWorkbook excel文档对象
HSSFSheet excel的sheet
HSSFRow excel的行
HSSFCell excel的单元格
HSSFFont excel字体
HSSFName 名称
HSSFDataFormat 日期格式
HSSFHeader sheet头
HSSFFooter sheet尾
HSSFCellStyle cell样式
HSSFDateUtil 日期
HSSFPrintSetup 打印
HSSFErrorConstants 错误信息表
四、EXCEL的读写操作
1、读取“区域数据.xls”并储存于list集合中,“区域数据.xls”如下图
public List
importXLS(){
ArrayList
list = new ArrayList<>();
try {
//1、获取文件输入流
InputStream inputStream = new FileInputStream("/Users/Shared/区域数据.xls");
//2、获取Excel工作簿对象
HSSFWorkbook workbook = new HSSFWorkbook(inputStream);
//3、得到Excel工作表对象
HSSFSheet sheetAt = workbook.getSheetAt(0);
//4、循环读取表格数据
for (Row row : sheetAt) {
//首行(即表头)不读取
if (row.getRowNum() == 0) {
continue;
}
//读取当前行中单元格数据,索引从0开始
String areaNum = row.getCell(0).getStringCellValue();
String province = row.getCell(1).getStringCellValue();
String city = row.getCell(2).getStringCellValue();
String district = row.getCell(3).getStringCellValue();
String postcode = row.getCell(4).getStringCellValue();
Area area = new Area();
area.setCity(city);
area.setDistrict(district);
area.setProvince(province);
area.setPostCode(postcode);
list.add(area);
}
//5、关闭流
workbook.close();
} catch (IOException e) {
e.printStackTrace();
}
return list;
}
2、导出数据到“区域数据.xls”文件中,页面数据如下图:
public void exportExcel() throws IOException {
Page
page = areaService.pageQuery(null);
List
list = page.getContent();
//1.在内存中创建一个excel文件
HSSFWorkbook hssfWorkbook = new HSSFWorkbook();
//2.创建工作簿
HSSFSheet sheet = hssfWorkbook.createSheet();
//3.创建标题行
HSSFRow titlerRow = sheet.createRow(0);
titlerRow.createCell(0).setCellValue("省");
titlerRow.createCell(1).setCellValue("市");
titlerRow.createCell(2).setCellValue("区");
titlerRow.createCell(3).setCellValue("邮编");
titlerRow.createCell(4).setCellValue("简码");
titlerRow.createCell(5).setCellValue("城市编码");
//4.遍历数据,创建数据行
for (Area area : list) {
//获取最后一行的行号
int lastRowNum = sheet.getLastRowNum();
HSSFRow dataRow = sheet.createRow(lastRowNum + 1);
dataRow.createCell(0).setCellValue(area.getProvince());
dataRow.createCell(1).setCellValue(area.getCity());
dataRow.createCell(2).setCellValue(area.getDistrict());
dataRow.createCell(3).setCellValue(area.getPostcode());
dataRow.createCell(4).setCellValue(area.getShortcode());
dataRow.createCell(5).setCellValue(area.getCitycode());
}
//5.创建文件名
String fileName = "区域数据统计.xls";
//6.获取输出流对象
HttpServletResponse response = ServletActionContext.getResponse();
ServletOutputStream outputStream = response.getOutputStream();
//7.获取mimeType
ServletContext servletContext = ServletActionContext.getServletContext();
String mimeType = servletContext.getMimeType(fileName);
//8.获取浏览器信息,对文件名进行重新编码
HttpServletRequest request = ServletActionContext.getRequest();
fileName = FileUtils.filenameEncoding(fileName, request);
//9.设置信息头
response.setContentType(mimeType);
response.setHeader("Content-Disposition","attachment;filename="+fileName);
//10.写出文件,关闭流
hssfWorkbook.write(outputStream);
hssfWorkbook.close();
}
工具类
public class FileUtils {
public static String filenameEncoding(String filename, HttpServletRequest request) throws IOException {
String agent = request.getHeader("User-Agent"); //获取浏览器
if (agent.contains("Firefox")) {
BASE64Encoder base64Encoder = new BASE64Encoder();
filename = "=?utf-8?B?"
+ base64Encoder.encode(filename.getBytes("utf-8"))
+ "?=";
} else if(agent.contains("MSIE")) {
filename = URLEncoder.encode(filename, "utf-8");
} else if(agent.contains ("Safari")) {
filename = new String (filename.getBytes ("utf-8"),"ISO8859-1");
} else {
filename = URLEncoder.encode(filename, "utf-8");
}
return filename;
}
}
写出xls文件:
五、 EXCEL常用操作方法
1、 得到Excel常用对象
POIFSFileSystem fs=newPOIFSFileSystem(new FileInputStream("d:/test.xls"));
//得到Excel工作簿对象
HSSFWorkbook wb = new HSSFWorkbook(fs);
//得到Excel工作表对象
HSSFSheet sheet = wb.getSheetAt(0);
//得到Excel工作表的行
HSSFRow row = sheet.getRow(i);
//得到Excel工作表指定行的单元格
HSSFCell cell = row.getCell((short) j);
cellStyle = cell.getCellStyle();//得到单元格样式
2、建立Excel常用对象
HSSFWorkbook wb = new HSSFWorkbook();//创建Excel工作簿对象
HSSFSheet sheet = wb.createSheet("new sheet");//创建Excel工作表对象
HSSFRow row = sheet.createRow((short)0); //创建Excel工作表的行
cellStyle = wb.createCellStyle();//创建单元格样式
row.createCell((short)0).setCellStyle(cellStyle); //创建Excel工作表指定行的单元格
row.createCell((short)0).setCellValue(1); //设置Excel工作表的值
3、设置sheet名称和单元格内容
wb.setSheetName(1, "第一张工作表",HSSFCell.ENCODING_UTF_16);
cell.setEncoding((short) 1);
cell.setCellValue("单元格内容");
4、取得sheet的数目
wb.getNumberOfSheets()
5、 根据index取得sheet对象
HSSFSheet sheet = wb.getSheetAt(0);
6、取得有效的行数
int rowcount = sheet.getLastRowNum();
7、取得一行的有效单元格个数
row.getLastCellNum();
8、单元格值类型读写
cell.setCellType(HSSFCell.CELL_TYPE_STRING); //设置单元格为STRING类型
cell.getNumericCellValue();//读取为数值类型的单元格内容
9、设置列宽、行高
sheet.setColumnWidth((short)column,(short)width);
row.setHeight((short)height);
10、添加区域,合并单元格
Region region = new Region((short)rowFrom,(short)columnFrom,(short)rowTo
,(short)columnTo);//合并从第rowFrom行columnFrom列
sheet.addMergedRegion(region);// 到rowTo行columnTo的区域
//得到所有区域
sheet.getNumMergedRegions()
11、保存Excel文件
FileOutputStream fileOut = new FileOutputStream(path);
wb.write(fileOut);
12、根据单元格不同属性返回字符串数值
public String getCellStringValue(HSSFCell cell) {
String cellValue = "";
switch (cell.getCellType()) {
case HSSFCell.CELL_TYPE_STRING://字符串类型
cellValue = cell.getStringCellValue();
if(cellValue.trim().equals("")||cellValue.trim().length()<=0)
cellValue=" ";
break;
case HSSFCell.CELL_TYPE_NUMERIC: //数值类型
cellValue = String.valueOf(cell.getNumericCellValue());
break;
case HSSFCell.CELL_TYPE_FORMULA: //公式
cell.setCellType(HSSFCell.CELL_TYPE_NUMERIC);
cellValue = String.valueOf(cell.getNumericCellValue());
break;
case HSSFCell.CELL_TYPE_BLANK:
cellValue=" ";
break;
case HSSFCell.CELL_TYPE_BOOLEAN:
break;
case HSSFCell.CELL_TYPE_ERROR:
break;
default:
break;
}
return cellValue;
}
13、常用单元格边框格式
HSSFCellStyle style = wb.createCellStyle();
style.setBorderBottom(HSSFCellStyle.BORDER_DOTTED);//下边框
style.setBorderLeft(HSSFCellStyle.BORDER_DOTTED);//左边框
style.setBorderRight(HSSFCellStyle.BORDER_THIN);//右边框
style.setBorderTop(HSSFCellStyle.BORDER_THIN);//上边框
14、设置字体和内容位置
HSSFFont f = wb.createFont();
f.setFontHeightInPoints((short) 11);//字号
f.setBoldweight(HSSFFont.BOLDWEIGHT_NORMAL);//加粗
style.setFont(f);
style.setAlignment(HSSFCellStyle.ALIGN_CENTER);//左右居中
style.setVerticalAlignment(HSSFCellStyle.VERTICAL_CENTER);//上下居中
style.setRotation(short rotation);//单元格内容的旋转的角度
HSSFDataFormat df = wb.createDataFormat();
style1.setDataFormat(df.getFormat("0.00%"));//设置单元格数据格式
cell.setCellFormula(string);//给单元格设公式
style.setRotation(short rotation);//单元格内容的旋转的角度
15、插入图片
//先把读进来的图片放到一个ByteArrayOutputStream中,以便产生ByteArray
ByteArrayOutputStream byteArrayOut = new ByteArrayOutputStream();
BufferedImage bufferImg = ImageIO.read(new File("ok.jpg"));
ImageIO.write(bufferImg,"jpg",byteArrayOut);
//读进一个excel模版
FileInputStream fos = new FileInputStream(filePathName+"/stencil.xlt");
fs = new POIFSFileSystem(fos);
//创建一个工作薄
HSSFWorkbook wb = new HSSFWorkbook(fs);
HSSFSheet sheet = wb.getSheetAt(0);
HSSFPatriarch patriarch = sheet.createDrawingPatriarch();
HSSFClientAnchor anchor = new HSSFClientAnchor(0,0,1023,255,(short) 0,0,(short)10,10);
patriarch.createPicture(anchor , wb.addPicture(byteArrayOut.toByteArray(),HSSFWorkbook.PICTURE_TYPE_JPEG));
16、调整工作表位置
HSSFWorkbook wb = new HSSFWorkbook();
HSSFSheet sheet = wb.createSheet("format sheet");
HSSFPrintSetup ps = sheet.getPrintSetup();
sheet.setAutobreaks(true);
ps.setFitHeight((short)1);
ps.setFitWidth((short)1);
1、在学习从文件读取数据中,写了个示例代码,读取不在同一个目录的file.txt,运行后报这个Python OSError: [Errno 22] Invalid argument:错误:
(1)、首先,在F盘的python_stu中新增了一个file.txt,同时在F盘的python_stu文件目录底下新增一个file文件夹,里面有个file_reader.py来读取python_stu文件目录底下的file.txt,代码分别如下:
file.txt:
测试
测试2
测试3
file_reader.py:
with open('F:\python_stu\file.txt') as file_obj:
contents = file_obj.read();
print(contents.rstrip());
(2)、运行后报错:
(3)、出现这种错误的原因是由于读取不到这个文件,看Traceback报的错误,最后一行,很明显读取不到file.txt,前面的F:\\python_stu没错,后面的名称怎么变了,还是x0cile.txt。
(4)、解决办法,可修改上述第一行代码为:
with open('F:\python_stu/file.txt') as file_obj:
或者:
with open('F:/python_stu/file.txt') as file_obj:
或者:
with open('F://python_stu//file.txt') as file_obj:
又或者:
with open('F:\\python_stu\\file.txt') as file_obj:
还有一些我就不附上了,上面第一种方式不统一,最好不要用,用统一的方式,而且有时候还有注意一些转义字符,比如 \t,\n也会导致报错。
前面学习了使用命令hdfs haadmin -failover手动进行故障转移,在该模式下,即使现役NameNode已经失效,系统也不会自动从现役NameNode转移到待机NameNode,下面学习如何配置部署HA自动进行故障转移。自动故障转移为HDFS部署增加了两个新组件:ZooKeeper和ZKFailoverController(ZKFC)进程。ZooKeeper是维护少量协调数据,通知客户端这些数据的改变和监视客户端故障的高可用服务。HA的自动故障转移依赖于ZooKeeper的以下功能:
- 故障检测:集群中的每个NameNode在ZooKeeper中维护了一个持久会话,如果机器崩溃,ZooKeeper中的会话将终止,ZooKeeper通知另一个NameNode需要触发故障转移。
- 现役NameNode选择:ZooKeeper提供了一个简单的机制用于唯一的选择一个节点为active状态。如果目前现役NameNode崩溃,另一个节点可能从ZooKeeper获得特殊的排外锁以表明它应该成为现役NameNode。
ZKFC是自动故障转移中的另一个新组件,是ZooKeeper的客户端,也监视和管理NameNode的状态。每个运行NameNode的主机也运行了一个ZKFC进程,ZKFC负责:
- 健康监测:ZKFC使用一个健康检查命令定期地ping与之在相同主机的NameNode,只要该NameNode及时地回复健康状态,ZKFC认为该节点是健康的。如果该节点崩溃,冻结或进入不健康状态,健康监测器标识该节点为非健康的。
- ZooKeeper会话管理:当本地NameNode是健康的,ZKFC保持一个在ZooKeeper中打开的会话。如果本地NameNode处于active状态,ZKFC也保持一个特殊的znode锁,该锁使用了ZooKeeper对短暂节点的支持,如果会话终止,锁节点将自动删除。
- 基于ZooKeeper的选择:如果本地NameNode是健康的,且ZKFC发现没有其它的节点当前持有znode锁,它将为自己获取该锁。如果成功,则它已经赢得了选择,并负责运行故障转移进程以使它的本地NameNode为active。故障转移进城与前面描述的手动故障转移相似,首先如果必要保护之前的现役NameNode,然后本地NameNode转换为active状态。
在典型部署中,ZooKeeper守护进程运行在三个或者五个节点上,但由于ZooKeeper本身需要较少的资源,所以将ZooKeeper部署在与现役NameNode和待机NameNode相同的主机上,还可以将ZooKeeper部署到与YARN的ResourceManager相同的节点上。建议配置ZooKeeper将数据存储在与HDFS元数据不同的硬盘上以得到最好的性能和隔离性。在配置自动故障转移之前需要先停掉集群,目前在集群运行时还不可能将手动故障转移的安装转换为自动故障转移的安装。接下来看看如何配置HA的自动故障转移。首先在hdfs-site.xml中添加下面的参数,该参数的值默认为false:
- <property>
- <name>dfs.ha.automatic-failover.enabled</name>
- <value>true</value>
- </property>
在core-site.xml文件中添加下面的参数,该参数的值为ZooKeeper服务器的地址,ZKFC将使用该地址。
- <property>
- <name>ha.zookeeper.quorum</name> <value>zk1.example.com:2181,zk2.example.com:2181,zk3.example.com:2181</value>
- </property>
在HA或者HDFS联盟中,上面的两个参数还需要以NameServiceID为后缀,比如dfs.ha.automatic-failover.enabled.mycluster。除了上面的两个参数外,还有其它几个参数用于自动故障转移,比如ha.zookeeper.session-timeout.ms,但对于大多数安装来说都不是必须的。
在添加了上述的配置参数后,下一步就是在ZooKeeper中初始化要求的状态,可以在任一NameNode中运行下面的命令实现该目的,该命令将在ZooKeeper中创建znode:
在启用自动故障转移的集群中,start-dfs.sh脚本将在任何运行NameNode的主机上自动启动ZKFC守护进程,一旦ZKFC启动完毕,它们将自动选择一个NameNode为现役NameNode。如果手动管理集群中的服务,需要在每台运行NameNode的主机上手动启动ZKFC,命令为:
- hadoop-daemon.sh start zkfc
- hdfs zkfc
如果正在运行一个安全的集群,可能想确保存储在ZooKeeper中的信息也是安全的,这将阻止恶意的客户端修改ZooKeeper中的元数据或者潜在地触发一个错误的故障转移。为了保护ZooKeeper中的信息,首先在core-site.xml中添加下面的参数:
- <property>
- <name>ha.zookeeper.auth</name>
- <value>@/path/to/zk-auth.txt</value>
- </property>
- <property>
- <name>ha.zookeeper.acl</name>
- <value>@/path/to/zk-acl.txt</value>
- </property>
参数值中的@字符表示参数值保存在@后的硬盘文件中。第一个配置文件指定了ZooKeeper的认证列表,其格式与ZK CLI使用的相同,例如:digest:hdfs-zkfcs:mypassword,其中hdfs-zkfcs为ZooKeeper的用户名,mypassword为密码。其次使用下面的命令为该认证生成一个ZooKeeper访问控制列表:
- $ java -cp $ZK_HOME/lib/*:$ZK_HOME/zookeeper-3.4.2.jar org.apache.zookeeper.server.auth.DigestAuthenticationProvider hdfs-zkfcs:mypassword
- output: hdfs-zkfcs:mypassword->hdfs-zkfcs:P/OQvnYyU/nF/mGYvB/xurX8dYs=
拷贝->之后的字符串并添加digest:前缀,然后粘贴到zk-acls.txt中,例如:digest:hdfs-zkfcs:vlUvLnd8MlacsE80rDuu6ONESbM=:rwcda。要想使ACLs生效,需要再次运行zkfc –formatZK。最后可能像下面这样在ZK CLI中验证ACLs:
- [zk: localhost:2181(CONNECTED) 1] getAcl /hadoop-ha
- 'digest,'hdfs-zkfcs:vlUvLnd8MlacsE80rDuu6ONESbM=
- : cdrwa
在安装完成自动故障转移后,或许需要测试一下。首先定位现役NameNode,可以通过访问NameNode的web页面来确定哪个NameNode是active状态的。一旦确定了处于active状态的NameNode,就需要在该节点上制造点故障,比如使用命令kill -9 <pid of NN>模拟JVM崩溃,或重启主机或拔掉网线来模拟不同的中断。一旦触发了自动故障转移,另一个NameNode应该自动在几秒钟内变为active状态。检测到故障并触发故障转移由参数ha.zookeeper.session-timeout.ms控制,该参数为与core-site.xml中,默认为5秒。如果测试不成功,可能是配置问题,检查ZKFC和NameNode进程的日志以进一步诊断问题,通常错误都是很明显的。
理想情况下,我们应用对Yarn资源的请求应该立刻得到满足,但现实情况资源往往是有限的,特别是在一个很繁忙的集群,一个应用资源的请求经常需要等待一段时间才能的到相应的资源。在Yarn中,负责给应用分配资源的就是Scheduler。其实调度本身就是一个难题,很难找到一个完美的策略可以解决所有的应用场景。为此,Yarn提供了多种调度器和可配置的策略供我们选择。
一、调度器的选择
在Yarn中有三种调度器可以选择:FIFO Scheduler
,Capacity Scheduler
,FairS cheduler
。
FIFO Scheduler
把应用按提交的顺序排成一个队列,这是一个先进先出队列,在进行资源分配的时候,先给队列中最头上的应用进行分配资源,待最头上的应用需求满足后再给下一个分配,以此类推。
FIFO Scheduler
是最简单也是最容易理解的调度器,也不需要任何配置,但它并不适用于共享集群。大的应用可能会占用所有集群资源,这就导致其它应用被阻塞。在共享集群中,更适合采用Capacity Scheduler
或Fair Scheduler
,这两个调度器都允许大任务和小任务在提交的同时获得一定的系统资源。
下面“Yarn调度器对比图”展示了这几个调度器的区别,从图中可以看出,在FIFO 调度器中,小任务会被大任务阻塞。
而对于Capacity调度器,有一个专门的队列用来运行小任务,但是为小任务专门设置一个队列会预先占用一定的集群资源,这就导致大任务的执行时间会落后于使用FIFO调度器时的时间。
在Fair调度器中,我们不需要预先占用一定的系统资源,Fair调度器会为所有运行的job动态的调整系统资源。如下图所示,当第一个大job提交时,只有这一个job在运行,此时它获得了所有集群资源;当第二个小任务提交后,Fair调度器会分配一半资源给这个小任务,让这两个任务公平的共享集群资源。
需要注意的是,在下图Fair调度器中,从第二个任务提交到获得资源会有一定的延迟,因为它需要等待第一个任务释放占用的Container。小任务执行完成之后也会释放自己占用的资源,大任务又获得了全部的系统资源。最终的效果就是Fair调度器即得到了高的资源利用率又能保证小任务及时完成。
Yarn调度器对比图:
二、Capacity Scheduler(容器调度器)的配置
2.1 容器调度介绍
Capacity 调度器允许多个组织共享整个集群,每个组织可以获得集群的一部分计算能力。通过为每个组织分配专门的队列,然后再为每个队列分配一定的集群资源,这样整个集群就可以通过设置多个队列的方式给多个组织提供服务了。除此之外,队列内部又可以垂直划分,这样一个组织内部的多个成员就可以共享这个队列资源了,在一个队列内部,资源的调度是采用的是先进先出(FIFO)策略。
通过上面那幅图,我们已经知道一个job可能使用不了整个队列的资源。然而如果这个队列中运行多个job,如果这个队列的资源够用,那么就分配给这些job,如果这个队列的资源不够用了呢?其实Capacity调度器仍可能分配额外的资源给这个队列,这就是“弹性队列”(queue elasticity)的概念。
在正常的操作中,Capacity调度器不会强制释放Container,当一个队列资源不够用时,这个队列只能获得其它队列释放后的Container资源。当然,我们可以为队列设置一个最大资源使用量,以免这个队列过多的占用空闲资源,导致其它队列无法使用这些空闲资源,这就是”弹性队列”需要权衡的地方。
2.2 容器调度的配置
假设我们有如下层次的队列:
root ├── prod └── dev ├── eng └── science
下面是一个简单的Capacity调度器的配置文件,文件名为capacity-scheduler.xml
。在这个配置中,在root队列下面定义了两个子队列prod
和dev
,分别占40%和60%的容量。需要注意,一个队列的配置是通过属性yarn.sheduler.capacity.<queue-path>.<sub-property>
指定的,<queue-path>
代表的是队列的继承树,如root.prod
队列,<sub-property>
一般指capacity
和maximum-capacity
。
我们可以看到,dev
队列又被分成了eng
和science
两个相同容量的子队列。dev
的maximum-capacity
属性被设置成了75%,所以即使prod
队列完全空闲dev
也不会占用全部集群资源,也就是说,prod
队列仍有25%的可用资源用来应急。我们注意到,eng
和science
两个队列没有设置maximum-capacity
属性,也就是说eng
或science
队列中的job可能会用到整个dev
队列的所有资源(最多为集群的75%)。而类似的,prod
由于没有设置maximum-capacity属性,它有可能会占用集群全部资源。
Capacity容器除了可以配置队列及其容量外,我们还可以配置一个用户或应用可以分配的最大资源数量、可以同时运行多少应用、队列的ACL认证等。
2.3 队列的设置
关于队列的设置,这取决于我们具体的应用。比如,在MapReduce中,我们可以通过mapreduce.job.queuename
属性指定要用的队列。如果队列不存在,我们在提交任务时就会收到错误。如果我们没有定义任何队列,所有的应用将会放在一个default
队列中。
注意:对于Capacity调度器,我们的队列名必须是队列树中的最后一部分,如果我们使用队列树则不会被识别。比如,在上面配置中,我们使用prod
和eng
作为队列名是可以的,但是如果我们用root.dev.eng
或者dev.eng
是无效的。
三、Fair Scheduler(公平调度器)的配置
3.1 公平调度
Fair调度器的设计目标是为所有的应用分配公平的资源(对公平的定义可以通过参数来设置)。在上面的“Yarn调度器对比图”展示了一个队列中两个应用的公平调度;当然,公平调度在也可以在多个队列间工作。举个例子,假设有两个用户A和B,他们分别拥有一个队列。当A启动一个job而B没有任务时,A会获得全部集群资源;当B启动一个job后,A的job会继续运行,不过一会儿之后两个任务会各自获得一半的集群资源。如果此时B再启动第二个job并且其它job还在运行,则它将会和B的第一个job共享B这个队列的资源,也就是B的两个job会用于四分之一的集群资源,而A的job仍然用于集群一半的资源,结果就是资源最终在两个用户之间平等的共享。过程如下图所示:
3.2 启用Fair Scheduler
调度器的使用是通过yarn-site.xml
配置文件中的yarn.resourcemanager.scheduler.class
参数进行配置的,默认采用Capacity Scheduler调度器。如果我们要使用Fair调度器,需要在这个参数上配置FairScheduler类的全限定名: org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler
。
3.3 队列的配置
Fair调度器的配置文件位于类路径下的fair-scheduler.xml
文件中,这个路径可以通过yarn.scheduler.fair.allocation.file
属性进行修改。若没有这个配置文件,Fair调度器采用的分配策略,这个策略和3.1节介绍的类似:调度器会在用户提交第一个应用时为其自动创建一个队列,队列的名字就是用户名,所有的应用都会被分配到相应的用户队列中。
我们可以在配置文件中配置每一个队列,并且可以像Capacity 调度器一样分层次配置队列。比如,参考capacity-scheduler.xml
来配置fair-scheduler:
队列的层次是通过嵌套<queue>
元素实现的。所有的队列都是root
队列的孩子,即使我们没有配到<root>
元素里。在这个配置中,我们把dev
队列有分成了eng
和science
两个队列。
Fair调度器中的队列有一个权重属性(这个权重就是对公平的定义),并把这个属性作为公平调度的依据。在这个例子中,当调度器分配集群40:60
资源给prod
和dev
时便视作公平,eng
和science
队列没有定义权重,则会被平均分配。这里的权重并不是百分比,我们把上面的40和60分别替换成2和3,效果也是一样的。注意,对于在没有配置文件时按用户自动创建的队列,它们仍有权重并且权重值为1。
每个队列内部仍可以有不同的调度策略。队列的默认调度策略可以通过顶级元素<defaultQueueSchedulingPolicy>
进行配置,如果没有配置,默认采用公平调度。
尽管是Fair调度器,其仍支持在队列级别进行FIFO调度。每个队列的调度策略可以被其内部的<schedulingPolicy>
元素覆盖,在上面这个例子中,prod
队列就被指定采用FIFO进行调度,所以,对于提交到prod
队列的任务就可以按照FIFO规则顺序的执行了。需要注意,prod
和dev
之间的调度仍然是公平调度,同样eng
和science
也是公平调度。
尽管上面的配置中没有展示,每个队列仍可配置最大、最小资源占用数和最大可运行的应用的数量。
3.4 队列的设置
Fair调度器采用了一套基于规则的系统来确定应用应该放到哪个队列。在上面的例子中,<queuePlacementPolicy>
元素定义了一个规则列表,其中的每个规则会被逐个尝试直到匹配成功。例如,上例第一个规则specified
,则会把应用放到它指定的队列中,若这个应用没有指定队列名或队列名不存在,则说明不匹配这个规则,然后尝试下一个规则。primaryGroup
规则会尝试把应用放在以用户所在的Unix组名命名的队列中,如果没有这个队列,不创建队列转而尝试下一个规则。当前面所有规则不满足时,则触发default
规则,把应用放在dev.eng
队列中。
当然,我们可以不配置queuePlacementPolicy
规则,调度器则默认采用如下规则:
<queuePlacementPolicy> <rule name="specified" /> <rule name="user" /> </queuePlacementPolicy>
上面规则可以归结成一句话,除非队列被准确的定义,否则会以用户名为队列名创建队列。
还有一个简单的配置策略可以使得所有的应用放入同一个队列(default),这样就可以让所有应用之间平等共享集群而不是在用户之间。这个配置的定义如下:
<queuePlacementPolicy> <rule name="default" /> </queuePlacementPolicy>
实现上面功能我们还可以不使用配置文件,直接设置yarn.scheduler.fair.user-as-default-queue=false
,这样应用便会被放入default 队列,而不是各个用户名队列。另外,我们还可以设置yarn.scheduler.fair.allow-undeclared-pools=false
,这样用户就无法创建队列了。
3.5 抢占(Preemption)
当一个job提交到一个繁忙集群中的空队列时,job并不会马上执行,而是阻塞直到正在运行的job释放系统资源。为了使提交job的执行时间更具预测性(可以设置等待的超时时间),Fair调度器支持抢占。
抢占就是允许调度器杀掉占用超过其应占份额资源队列的containers,这些containers资源便可被分配到应该享有这些份额资源的队列中。需要注意抢占会降低集群的执行效率,因为被终止的containers需要被重新执行。
可以通过设置一个全局的参数yarn.scheduler.fair.preemption=true
来启用抢占功能。此外,还有两个参数用来控制抢占的过期时间(这两个参数默认没有配置,需要至少配置一个来允许抢占Container):
- minimum share preemption timeout - fair share preemption timeout
如果队列在minimum share preemption timeout
指定的时间内未获得最小的资源保障,调度器就会抢占containers。我们可以通过配置文件中的顶级元素<defaultMinSharePreemptionTimeout>
为所有队列配置这个超时时间;我们还可以在<queue>
元素内配置<minSharePreemptionTimeout>
元素来为某个队列指定超时时间。
与之类似,如果队列在fair share preemption timeout
指定时间内未获得平等的资源的一半(这个比例可以配置),调度器则会进行抢占containers。这个超时时间可以通过顶级元素<defaultFairSharePreemptionTimeout>
和元素级元素<fairSharePreemptionTimeout>
分别配置所有队列和某个队列的超时时间。上面提到的比例可以通过<defaultFairSharePreemptionThreshold>
(配置所有队列)和<fairSharePreemptionThreshold>
(配置某个队列)进行配置,默认是0.5。
在做Shuffle阶段的优化过程中,遇到了数据倾斜的问题,造成了对一些情况下优化效果不明显。主要是因为在Job完成后的所得到的Counters是整个Job的总和,优化是基于这些Counters得出的平均值,而由于数据倾斜的原因造成map处理数据量的差异过大,使得这些平均值能代表的价值降低。Hive的执行是分阶段的,map处理数据量的差异取决于上一个stage的reduce输出,所以如何将数据均匀的分配到各个reduce中,就是解决数据倾斜的根本所在。规避错误来更好的运行比解决错误更高效。在查看了一些资料后,总结如下。
1数据倾斜的原因
1.1操作:
关键词 | 情形 | 后果 |
Join | 其中一个表较小, 但是key集中 | 分发到某一个或几个Reduce上的数据远高于平均值 |
大表与大表,但是分桶的判断字段0值或空值过多 | 这些空值都由一个reduce处理,灰常慢 |
group by | group by 维度过小, 某值的数量过多 | 处理某值的reduce灰常耗时 |
Count Distinct | 某特殊值过多 | 处理此特殊值的reduce耗时 |
1.2原因:
1)、key分布不均匀
2)、业务数据本身的特性
3)、建表时考虑不周
4)、某些SQL语句本身就有数据倾斜
1.3表现:
任务进度长时间维持在99%(或100%),查看任务监控页面,发现只有少量(1个或几个)reduce子任务未完成。因为其处理的数据量和其他reduce差异过大。
单一reduce的记录数与平均记录数差异过大,通常可能达到3倍甚至更多。 最长时长远大于平均时长。
2数据倾斜的解决方案
2.1参数调节:
hive.map.aggr=true
Map 端部分聚合,相当于Combiner
hive.groupby.skewindata=true
有数据倾斜的时候进行负载均衡,当选项设定为 true,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。
2.2 SQL语句调节:
如何Join:
关于驱动表的选取,选用join key分布最均匀的表作为驱动表
做好列裁剪和filter操作,以达到两表做join的时候,数据量相对变小的效果。
大小表Join:
使用map join让小的维度表(1000条以下的记录条数) 先进内存。在map端完成reduce.
大表Join大表:
把空值的key变成一个字符串加上随机数,把倾斜的数据分到不同的reduce上,由于null值关联不上,处理后并不影响最终结果。
count distinct大量相同特殊值
count distinct时,将值为空的情况单独处理,如果是计算count distinct,可以不用处理,直接过滤,在最后结果中加1。如果还有其他计算,需要进行group by,可以先将值为空的记录单独处理,再和其他计算结果进行union。
group by维度过小:
采用sum() group by的方式来替换count(distinct)完成计算。
特殊情况特殊处理:
在业务逻辑优化效果的不大情况下,有些时候是可以将倾斜的数据单独拿出来处理。最后union回去。
3典型的业务场景
3.1空值产生的数据倾斜
场景:如日志中,常会有信息丢失的问题,比如日志中的 user_id,如果取其中的 user_id 和 用户表中的user_id 关联,会碰到数据倾斜的问题。
解决方法1: user_id为空的不参与关联(红色字体为修改后)
select * from log a join users b on a.user_id is not null and a.user_id = b.user_id union all select * from log a where a.user_id is null;
解决方法2 :赋与空值分新的key值
select * from log a left outer join users b on case when a.user_id is null then concat(‘hive’,rand() ) else a.user_id end = b.user_id;
结论:方法2比方法1效率更好,不但io少了,而且作业数也少了。解决方法1中 log读取两次,jobs是2。解决方法2 job数是1 。这个优化适合无效 id (比如 -99 , ’’, null 等) 产生的倾斜问题。把空值的 key 变成一个字符串加上随机数,就能把倾斜的数据分到不同的reduce上 ,解决数据倾斜问题。
3.2不同数据类型关联产生数据倾斜
场景:用户表中user_id字段为int,log表中user_id字段既有string类型也有int类型。当按照user_id进行两个表的Join操作时,默认的Hash操作会按int型的id来进行分配,这样会导致所有string类型id的记录都分配到一个Reducer中。
解决方法:把数字类型转换成字符串类型
select * from users a left outer join logs b on a.usr_id = cast(b.user_id as string)
3.3小表不小不大,怎么用 map join 解决倾斜问题
使用 map join 解决小表(记录数少)关联大表的数据倾斜问题,这个方法使用的频率非常高,但如果小表很大,大到map join会出现bug或异常,这时就需要特别的处理。 以下例子:
select * from log a left outer join users b on a.user_id = b.user_id;
users 表有 600w+ 的记录,把 users 分发到所有的 map 上也是个不小的开销,而且 map join 不支持这么大的小表。如果用普通的 join,又会碰到数据倾斜的问题。
解决方法:
select /*+mapjoin(x)*/* from log a
left outer join (
select /*+mapjoin(c)*/d.* from (
select distinct user_id from log ) c join users d
on c.user_id = d.user_id ) x
on a.user_id = b.user_id;
假如,log里user_id有上百万个,这就又回到原来map join问题。所幸,每日的会员uv不会太多,有交易的会员不会太多,有点击的会员不会太多,有佣金的会员不会太多等等。所以这个方法能解决很多场景下的数据倾斜问题。
4总结
使map的输出数据更均匀的分布到reduce中去,是我们的最终目标。由于Hash算法的局限性,按key Hash会或多或少的造成数据倾斜。大量经验表明数据倾斜的原因是人为的建表疏忽或业务逻辑可以规避的。在此给出较为通用的步骤:
1、采样log表,哪些user_id比较倾斜,得到一个结果表tmp1。由于对计算框架来说,所有的数据过来,他都是不知道数据分布情况的,所以采样是并不可少的。
2、数据的分布符合社会学统计规则,贫富不均。倾斜的key不会太多,就像一个社会的富人不多,奇特的人不多一样。所以tmp1记录数会很少。把tmp1和users做map join生成tmp2,把tmp2读到distribute file cache。这是一个map过程。
3、map读入users和log,假如记录来自log,则检查user_id是否在tmp2里,如果是,输出到本地文件a,否则生成<user_id,value>的key,value对,假如记录来自member,生成<user_id,value>的key,value对,进入reduce阶段。
4、最终把a文件,把Stage3 reduce阶段输出的文件合并起写到hdfs。
如果确认业务需要这样倾斜的逻辑,考虑以下的优化方案:
1、对于join,在判断小表不大于1G的情况下,使用map join
2、对于group by或distinct,设定 hive.groupby.skewindata=true
3、尽量使用上述的SQL语句调节进行优化
Hive的一般学习者和培训者在谈性能优化的时候一般都会从语法和参数这些雕虫小技的角度谈优化,而不会革命性的优化Hive的性能,产生这种现象的原因有:1,历史原因和思维定势:大家学习SQL的时候一般都是就单机DB,这个时候你的性能优化技巧确实主要是SQL语法和参数调优;2,Hive的核心的性能问题往往是产生在超过规模数据集,例如说100亿条级别的数据集,以及每天处理上千上万个Hive作业的情况下产生的;上面的第二点是我们现在Hive性能调优部分要彻底解决的内容;要从根本上解决和显著的解决实际企业中Hive真正的性能优化问题,必须考虑到底什么是Hive性能的限制,我们按照优先级来说:第一重要的是:战略性架构 解决海量数据下大量Job过于频繁的IO问题,而这个问题实质上涉及了架构方面的分表 数据复用 以及分区表等调优的方式; 补充:1,海量的数据中有些数据是高频使用的数据,而有些是很少使用的,如果能够分离成为不同的表,会极大的提升效率;很多的作业可能会有共同点,抽离出来先进行计算并保留计算结果,后面的作业都可以复用;同时,底层的基础功能也可以先计算,在上层应用的时候直接拿数据结果,而不是每次都重复计算; 2,合理从用静态分区表和动态分区表,可以避免数据全局扫描及计算资源更合理的利用; 3,数据倾斜的一站式解决方案;第二重要的是:引擎和物理层面,很多内容都是普通Hive使用这不知道的! 从Hive语法和Job内部的角度去进行优化,这要求MapReduce以及Hive如何被翻译成为MapReduce要非常精通;第三重要的是:一些关键的参数;归根到底,Hive的性能优化主要考虑的是如何最大化和最有效的使用CPU Memory IO;
Hive背后的Mapper调优:
1,Mapper数过大,会产生大量小文件,由于Mapper是基于虚拟机的,过多的Mapper创建和初始化及关闭虚拟机都会消耗大量的硬件资源;
Mapper数太小,并发度过小,Job执行时间过长,无法充分利用分布式硬件资源;
2,Mapper数据由什么决定呢?
输入文件数目;
输入文件的大小;
配置参数;
默认情况下:例如一个文件800M,BLock大小是128M,那么Mapper数目就是7个,6个Mapper处理的数据是 128M, 1个Mapper处理的数据是32M;再例如,一个目录下有三个文件分别大小问5M 10M 150M
此时会产生4个Mapper,处理的数据分别是5M 10M 128M 22M;
减少Mapper的个数,就要合并小文件,这种小文件有可能是直接来自于数据源的小文件,也可能是Reducer产生的小文件;
set hive.input.format=org.apache.Hadoop.hive.ql.io.CombineHiveInputFormat;
set hive.merge.mapFiles=true;
set hive.merge.mapredFiles=true;
set hive.merge.size.per.task=256000000
set mapred.max.split.size=256000000
set mapred.min.split.size.per.node=128000000
增加Mapper的个数,一般是通过控制Hive SQL中上一个Job的Reducer个数来控制的,例如在Join操作的时候会把多个表分解为多个Job;
set mapred.map.tasks=2;
set hive.merge.mapFiles=true;
set hive.merge.mapredFiles=true;
set hive.merge.size.per.task=256000000
例如我们有5个300M的文件;按照上面的配置会产生10个Mapper,5个Mapper处理的都是256M的数据,另外5个Mapper处理的都是44M的数据,问题是:大的Mapper会数据倾斜
如何解决,设置set mapred.map.tasks=6,此时根据MapRed的运行机制,会划分6个Mapper,每个Mapper的处理数据的大小是250M, min(1500M/6, 256M) =250M
Hive背后的Reducer调优:
1,Reducer数目过大的话,会产生很多小文件,每个Reducer都会产生一个文件,如果这些小文件是下一个JOB的输入,则会需要对小文件进行合并;同样启动 初始化和销毁Reducer的虚拟机也需要消耗大量的硬件;
Reducer数据过小的话,Reduce的时间会比较长,也可能会出现数据倾斜;
2,如何控制Reducer的个数呢?
set hive.exec.reducers.byte.per.reducer=1G
set hive.exec.reducers.max=999
Reducer个数=min(999, Reducer的数据输入总量/1G);
set mapred.reduce.tasks = 10, 默认是1; 如果说当前的Reducer的结果很大,且被接下来多个Job使用其结果,我们该如何设置参数呢?一般都需要调大该参数;
什么情况下只有一个Reducer?如果不进行Group by但却需要汇总,或者说Order by,当然如果最后Reducer的数据小于默认的1G的话,也会只有一个Reducer;
1,Hive在分布式运行的时候最害怕的是数据倾斜,这是由于分布式系统的特性决定的,因为分布式系统之所以很快是由于作业平均分配给了不同的节点,不同节点同心协力,从而达到更快处理完作业的目的;
顺便说明一下,处理数据倾斜的能力是hadoop和Spark工程师最核心的竞争力之一;
2,Hive中数据倾斜的原因:
数据在分布式节点上分布不平衡;
join时某些key可能特别大;
groupBy的时候某个Key可能特别多;
count(distinct)有可能出现数据倾斜,因为其内部首先会进行groupBy操作;
3,join,我们希望join时候key是分散,如果一个key的数据量特别大,有可能会出现数据倾斜和OOM,一个核心点是:小表join大表,在reduce阶段左侧的小表会加载进内存,减少OOM的风险;
4,大表join大表的情况:数据倾斜,例如null值,解决办法一般是要打散null值,例如说使用随机数等,如果数据倾斜比较严重,采用这种方式可以提升至少一倍的速度;
5,mapJoin:小表join(超)大表的时候,可以采用mapJoin的方式把小表全部加载到Mapper端的内存中/*+MAPJOIN(table_name)*/;
6,小表join(超)大表的时候,是否会自动进行mapJoin,想进行mapJoin,需要设置:set hive.auto.convert.join=true,Hive在进行join的时候会判断左表的大小来决定是否进行mapJoin:
set hive.mapjoin.smalltable.filesize=128000000;
set hive.mapjoin.cache.numrows=100000;
上述参数可以根据实际的硬件机器的内存进行调整,对性能有至关重要的影响,因为没有了Shuffle;
对于mapJoin我们能够使用Mapper端JVM中多大的内存呢?
set hive.mapjoin.followby.gby.localtask.max.momery.usage = 0.8
set hive.mapjoin.localtask.max.memory.uage=0.9
7,groupBy,我们可以设置在Mapper端进行部分聚合,最后在Reducer端进行全局聚合
set hive.map.aggr=true;
set hive.groupby.mapaggr.checkinterval=100000
set hive.groupby.skewindata = true 内部会产生两个Job,第一个Job会通过自己的算法打散倾斜的Key并进行聚合操作且保留结果,第二个Job会完成全部的groupBy操作,会产生Mapper-Reducer-Reducer的结构
8, count(distinct),如果某个字段特别多,容易产生数据倾斜,解决思路:
在查询语句中例如对null进行过滤,在结果中加1
9, 笛卡尔积:join时候没有on条件,或者on条件无效,这个时候会使用Reducer进行笛卡尔积的操作;
1. 两者分别是什么?
Apache Hive是一个构建在Hadoop基础设施之上的数据仓库。通过Hive可以使用HQL语言查询存放在HDFS上的数据。HQL是一种类SQL语言,这种语言最终被转化为Map/Reduce. 虽然Hive提供了SQL查询功能,但是Hive不能够进行交互查询--因为它只能够在Haoop上批量的执行Hadoop。
Apache HBase是一种Key/Value系统,它运行在HDFS之上。和Hive不一样,Hbase的能够在它的数据库上实时运行,而不是运行MapReduce任务。Hive被分区为表格,表格又被进一步分割为列簇。列簇必须使用schema定义,列簇将某一类型列集合起来(列不要求schema定义)。例如,“message”列簇可能包含:“to”, ”from” “date”, “subject”, 和”body”. 每一个 key/value对在Hbase中被定义为一个cell,每一个key由row-key,列簇、列和时间戳。在Hbase中,行是key/value映射的集合,这个映射通过row-key来唯一标识。Hbase利用Hadoop的基础设施,可以利用通用的设备进行水平的扩展。
2. 两者的特点
Hive帮助熟悉SQL的人运行MapReduce任务。因为它是JDBC兼容的,同时,它也能够和现存的SQL工具整合在一起。运行Hive查询会花费很长时间,因为它会默认遍历表中所有的数据。虽然有这样的缺点,一次遍历的数据量可以通过Hive的分区机制来控制。分区允许在数据集上运行过滤查询,这些数据集存储在不同的文件夹内,查询的时候只遍历指定文件夹(分区)中的数据。这种机制可以用来,例如,只处理在某一个时间范围内的文件,只要这些文件名中包括了时间格式。
HBase通过存储key/value来工作。它支持四种主要的操作:增加或者更新行,查看一个范围内的cell,获取指定的行,删除指定的行、列或者是列的版本。版本信息用来获取历史数据(每一行的历史数据可以被删除,然后通过Hbase compactions就可以释放出空间)。虽然HBase包括表格,但是schema仅仅被表格和列簇所要求,列不需要schema。Hbase的表格包括增加/计数功能。
3. 限制
Hive目前不支持更新操作。另外,由于hive在hadoop上运行批量操作,它需要花费很长的时间,通常是几分钟到几个小时才可以获取到查询的结果。Hive必须提供预先定义好的schema将文件和目录映射到列,并且Hive与ACID不兼容。
HBase查询是通过特定的语言来编写的,这种语言需要重新学习。类SQL的功能可以通过Apache Phonenix实现,但这是以必须提供schema为代价的。另外,Hbase也并不是兼容所有的ACID特性,虽然它支持某些特性。最后但不是最重要的--为了运行Hbase,Zookeeper是必须的,zookeeper是一个用来进行分布式协调的服务,这些服务包括配置服务,维护元信息和命名空间服务。
4. 应用场景
Hive适合用来对一段时间内的数据进行分析查询,例如,用来计算趋势或者网站的日志。Hive不应该用来进行实时的查询。因为它需要很长时间才可以返回结果。
Hbase非常适合用来进行大数据的实时查询。Facebook用Hbase进行消息和实时的分析。它也可以用来统计Facebook的连接数。
5. 总结
Hive和Hbase是两种基于Hadoop的不同技术--Hive是一种类SQL的引擎,并且运行MapReduce任务,Hbase是一种在Hadoop之上的NoSQL 的Key/vale数据库。当然,这两种工具是可以同时使用的。就像用Google来搜索,用FaceBook进行社交一样,Hive可以用来进行统计查询,HBase可以用来进行实时查询,数据也可以从Hive写到Hbase,设置再从Hbase写回Hive。
摘要: 目录 1,环境准备 2,安装Hive和配置环境变量 3,安装MySQL 4,在mysql上创建hive元数据库,并对hive进行授权 5,安装jar包到hive 6,配置hive-site.xml 7,元数据存储初始化 8,启动验证hive 9,报错及解决方法
1,环境准备:
准备好Hadoop集群,参照...
阅读全文
TCP和UDP是OSI模型中的运输层中的协议。TCP提供可靠的通信传输,而UDP则常被用于让广播和细节控制交给应用的通信传输。
UDP(User Datagram Protocol)
UDP不提供复杂的控制机制,利用IP提供面向无连接的通信服务。并且它是将应用程序发来的数据在收到的那一刻,立刻按照原样发送到网络上的一种机制。
即使是出现网络拥堵的情况下,UDP也无法进行流量控制等避免网络拥塞的行为。此外,传输途中如果出现了丢包,UDO也不负责重发。甚至当出现包的到达顺序乱掉时也没有纠正的功能。如果需要这些细节控制,那么不得不交给由采用UDO的应用程序去处理。换句话说,UDP将部分控制转移到应用程序去处理,自己却只提供作为传输层协议的最基本功能。UDP有点类似于用户说什么听什么的机制,但是需要用户充分考虑好上层协议类型并制作相应的应用程序。
TCP(Transmission Control Protocol)
TCP充分实现爱呢了数据传输时各种控制功能,可以进行丢包的重发控制,还可以对次序乱掉的分包进行顺序控制。而这些在UDP中都没有。此外,TCP作为一种面向有连接的协议,只有在确认通信对端存在时才会发送数据,从而可以控制通信流量的浪费。
TCP通过检验和、序列号、确认应答、重发控制、连接管理以及窗口控制等机制实现可靠性传输。此处不一一叙述。
TCP与UDP如何加以区分使用?
TCP用于在传输层有必要实现可靠性传输的情况。由于它是面向有连接并具备顺序控制、重发控制等机制的。所以它可以为应用提供可靠传输。
另一方面,UDP主要用于那些对高速传输和实时性有较高要求的通信或广播通信。举一个IP电话进行通话的例子。如果使用TCP,数据在传送途中如果丢失会被重发,但是这样无法流畅地传输通话人的声音,会导致无法进行正常交流。而采用UDP,它不会进行重发处理。从而也就不会有声音大幅度延迟到达的问题。即使有部分数据丢失,也只是影响某一小部分的通话。此外,在多播与广播通信中也使用UDP而不是UDP。RIP、DHCP等基于广播的协议也要依赖于UDP。
TCP与UDP区别总结:
1、TCP面向连接(如打电话要先拨号建立连接);UDP是无连接的,即发送数据之前不需要建立连接
2、TCP提供可靠的服务。也就是说,通过TCP连接传送的数据,无差错,不丢失,不重复,且按序到达;UDP尽最大努力交付,即不保证可靠交付
3、TCP面向字节流,实际上是TCP把数据看成一连串无结构的字节流;UDP是面向报文的
UDP没有拥塞控制,因此网络出现拥塞不会使源主机的发送速率降低(对实时应用很有用,如IP电话,实时视频会议等)
4、每一条TCP连接只能是点到点的;UDP支持一对一,一对多,多对一和多对多的交互通信
5、TCP首部开销20字节;UDP的首部开销小,只有8个字节
6、TCP的逻辑通信信道是全双工的可靠信道,UDP则是不可靠信道
UDP如何实现可靠传输
由于在传输层UDP已经是不可靠的连接,那就要在应用层自己实现一些保障可靠传输的机制
简单来讲,要使用UDP来构建可靠的面向连接的数据传输,就要实现类似于TCP协议的
超时重传(定时器)
有序接受 (添加包序号)
应答确认 (Seq/Ack应答机制)
滑动窗口流量控制等机制 (滑动窗口协议)
等于说要在传输层的上一层(或者直接在应用层)实现TCP协议的可靠数据传输机制,比如使用UDP数据包+序列号,UDP数据包+时间戳等方法。
目前已经有一些实现UDP可靠传输的机制,比如
UDT(UDP-based Data Transfer Protocol)
基于UDP的数据传输协议(UDP-based Data Transfer Protocol,简称UDT)是一种互联网数据传输协议。UDT的主要目的是支持高速广域网上的海量数据传输,而互联网上的标准数据传输协议TCP在高带宽长距离网络上性能很差。 顾名思义,UDT建于UDP之上,并引入新的拥塞控制和数据可靠性控制机制。UDT是面向连接的双向的应用层协议。它同时支持可靠的数据流传输和部分可靠的数据报传输。 由于UDT完全在UDP上实现,它也可以应用在除了高速数据传输之外的其它应用领域,例如点到点技术(P2P),防火墙穿透,多媒体数据传输等等。
一、
1、什么是Hbase。
是一个高可靠性、高性能、列存储、可伸缩、实时读写的分布式数据库系统。
适合于存储非结构化数据,基于列的而不是基于行的模式
如图:Hadoop生态中hbase与其他部分的关系。
2、关系数据库已经流行很多年,并且hadoop已经有了HDFS和MapReduce,为什么需要HBase?
Hadoop可以很好地解决大规模数据的离线批量处理问题,但是,受限于HadoopMapReduce编程框架的高延迟数据处理机制,使得Hadoop无法满足大规模数据实时处理应用的需求
HDFS面向批量访问模式,不是随机访问模式
传统的通用关系型数据库无法应对在数据规模剧增时导致的系统扩展性和性能问题(分库分表也不能很好解决)
传统关系数据库在数据结构变化时一般需要停机维护;空列浪费存储空间
因此,业界出现了一类面向半结构化数据存储和处理的高可扩展、低写入/查询延迟的系统,例如,键值数据库、文档数据库和列族数据库(如BigTable和HBase等)
HBase已经成功应用于互联网服务领域和传统行业的众多在线式数据分析处理系统中
3、HBase与传统的关系数据库的区别
(1)数据类型:关系数据库采用关系模型,具有丰富的数据类型和存储方式,HBase则采用了更加简单的数据模型,它把数据存储为未经解释的字符串
(2)数据操作:关系数据库中包含了丰富的操作,其中会涉及复杂的多表连接。HBase操作则不存在复杂的表与表之间的关系,只有简单的插入、查询、删除、清空等,因为HBase在设计上就避免了复杂的表和表之间的关系
(3)存储模式:关系数据库是基于行模式存储的。HBase是基于列存储的,每个列族都由几个文件保存,不同列族的文件是分离的
(4)数据索引:关系数据库通常可以针对不同列构建复杂的多个索引,以提高数据访问性能。HBase只有一个索引——行键,通过巧妙的设计,HBase中的所有访问方法,或者通过行键访问,或者通过行键扫描,从而使得整个系统不会慢下来
(5)数据维护:在关系数据库中,更新操作会用最新的当前值去替换记录中原来的旧值,旧值被覆盖后就不会存在。而在HBase中执行更新操作时,并不会删除数据旧的版本,而是生成一个新的版本,旧有的版本仍然保留
(6)可伸缩性:关系数据库很难实现横向扩展,纵向扩展的空间也比较有限。相反,HBase和BigTable这些分布式数据库就是为了实现灵活的水平扩展而开发的,能够轻易地通过在集群中增加或者减少硬件数量来实现性能的伸缩
二、Hbase数据模型
1、模型概述
HBase是一个稀疏、多维度、排序的映射表,这张表的索引是行键、列族、列限定符和时间戳
每个值是一个未经解释的字符串,没有数据类型
用户在表中存储数据,每一行都有一个可排序的行键和任意多的列
表在水平方向由一个或者多个列族组成,一个列族中可以包含任意多个列,同一个列族里面的数据存储在一起
列族支持动态扩展,可以很轻松地添加一个列族或列,无需预先定义列的数量以及类型,所有列均以字符串形式存储,用户需要自行进行数据类型转换
HBase中执行更新操作时,并不会删除数据旧的版本,而是生成一个新的版本,旧有的版本仍然保留(这是和HDFS只允许追加不允许修改的特性相关的)
2、数据坐标
HBase中需要根据行键、列族、列限定符和时间戳来确定一个单元格,因此,可以视为一个“四维坐标”,即[行键,列族, 列限定符,时间戳]
键 |
值 |
[“201505003”,“Info”,“email”, 1174184619081] |
“xie@qq.com” |
[“201505003”,“Info”,“email”, 1174184620720] |
“you@163.com”
|
3、概念视图
4、物理视图
三、HBase实现原理
1、HBase的实现包括三个主要的功能组件:
(1)库函数:链接到每个客户端
(2)一个Master主服务器
(3)许多个Region服务器
主服务器Master负责管理和维护HBase表的分区信息,维护Region服务器列表,分配Region,负载均衡
Region服务器负责存储和维护分配给自己的Region,处理来自客户端的读写请求
客户端并不是直接从Master主服务器上读取数据,而是在获得Region的存储位置信息后,直接从Region服务器上读取数据
客户端并不依赖Master,而是通过Zookeeper来获得Region位置信息,大多数客户端甚至从来不和Master通信,这种设计方式使得Master负载很小
2、Region
开始只有一个Region,后来不断分裂
Region拆分操作非常快,接近瞬间,因为拆分之后的Region读取的仍然是原存储文件,直到“合并”过程把存储文件异步地写到独立的文件之后,才会读取新文件
同一个Region不会被分拆到多个Region服务器
每个Region服务器存储10-1000个Region
元数据表,又名.META.表,存储了Region和Region服务器的映射关系
当HBase表很大时, .META.表也会被分裂成多个Region
根数据表,又名-ROOT-表,记录所有元数据的具体位置
-ROOT-表只有唯一一个Region,名字是在程序中被写死的
Zookeeper文件记录了-ROOT-表的位置
客户端访问数据时的“三级寻址”
为了加速寻址,客户端会缓存位置信息,同时,需要解决缓存失效问题
寻址过程客户端只需要询问Zookeeper服务器,不需要连接Master服务器
3、HBase的三层结构中各层次的名称和作用
层次 |
名称 |
作用 |
第一层 |
Zookeper文件 |
记录了-ROOT-表的位置信息 |
第二层 |
-ROOT-表 |
记录了.META.表的Region位置信息
-ROOT-表只能有一个Region。通过-ROOT-表,就可以访问.META.表中的数据 |
第三层 |
.META.表 |
记录了用户数据表的Region位置信息,.META.表可以有多个Region,保存了HBase中所有用户数据表的Region位置信息 |
四、HBase运行机制
1、HBase系统架构
(1、客户端包含访问HBase的接口,同时在缓存中维护着已经访问过的Region位置信息,用来加快后续数据访问过程
(2、Zookeeper可以帮助选举出一个Master作为集群的总管,并保证在任何时刻总有唯一一个Master在运行,这就避免了Master的“单点失效”问题
(Zookeeper是一个很好的集群管理工具,被大量用于分布式计算,提供配置维护、域名服务、分布式同步、组服务等。)
(3. Master
主服务器Master主要负责表和Region的管理工作:
管理用户对表的增加、删除、修改、查询等操作
实现不同Region服务器之间的负载均衡
在Region分裂或合并后,负责重新调整Region的分布
对发生故障失效的Region服务器上的Region进行迁移
(4. Region服务器
Region服务器是HBase中最核心的模块,负责维护分配给自己的Region,并响应用户的读写请求
2、Region
(1、用户读写数据过程
用户写入数据时,被分配到相应Region服务器去执行
用户数据首先被写入到MemStore和Hlog中
只有当操作写入Hlog之后,commit()调用才会将其返回给客户端
当用户读取数据时,Region服务器会首先访问MemStore缓存,如果找不到,再去磁盘上面的StoreFile中寻找
(2、缓存的刷新
系统会周期性地把MemStore缓存里的内容刷写到磁盘的StoreFile文件中,清空缓存,并在Hlog里面写入一个标记、
每次刷写都生成一个新的StoreFile文件,因此,每个Store包含多个StoreFile文件
每个Region服务器都有一个自己的HLog文件,每次启动都检查该文件,确认最近一次执行缓存刷新操作之后是否发生新的写入操作;如果发现更新,则先写入MemStore,再刷写到StoreFile,最后删除旧的Hlog文件,开始为用户提供服务
(3、StroreFile的合并
每次刷写都生成一个新的StoreFile,数量太多,影响查找速度
调用Store.compact()把多个合并成一个
合并操作比较耗费资源,只有数量达到一个阈值才启动合并
3、Store工作原理
Store是Region服务器的核心
多个StoreFile合并成一个
触发分裂操作,1个父Region被分裂成两个子Region
单个StoreFile过大时,又 4、HLog工作原理
分布式环境必须要考虑系统出错。HBase采用HLog保证系统恢复
HBase系统为每个Region服务器配置了一个HLog文件,它是一种预写式日志(WriteAhead Log)
用户更新数据必须首先写入日志后,才能写入MemStore缓存,并且,直到MemStore缓存内容对应的日志已经写入磁盘,该缓存内容才能被刷写到磁盘
Zookeeper会实时监测每个Region服务器的状态,当某个Region服务器发生故障时,Zookeeper会通知Master
Master首先会处理该故障Region服务器上面遗留的HLog文件,这个遗留的HLog文件中包含了来自多个Region对象的日志记录
系统会根据每条日志记录所属的Region对象对HLog数据进行拆分,分别放到相应Region对象的目录下,然后,再将失效的Region重新分配到可用的Region服务器中,并把与该Region对象相关的HLog日志记录也发送给相应的Region服务器
Region服务器领取到分配给自己的Region对象以及与之相关的HLog日志记录以后,会重新做一遍日志记录中的各种操作,把日志记录中的数据写入到MemStore缓存中,然后,刷新到磁盘的StoreFile文件中,完成数据恢复
共用日志优点:提高对表的写操作性能;缺点:恢复时需要分拆日志
五、HBase性能
1、
行键(RowKey)
行键是按照字典序存储,因此,设计行键时,要充分利用这个排序特点,将经常一起读取的数据存储到一块,将最近可能会被访问的数据放在一块。
举个例子:如果最近写入HBase表中的数据是最可能被访问的,可以考虑将时间戳作为行键的一部分,由于是字典序排序,所以可以使用Long.MAX_VALUE- timestamp作为行键,这样能保证新写入的数据在读取时可以被快速命中。
InMemory:创建表的时候,可以通过HColumnDescriptor.setInMemory(true)将表放到Region服务器的缓存中,保证在读取的时候被cache命中。
Max Version:创建表的时候,可以通过HColumnDescriptor.setMaxVersions(int maxVersions)设置表中数据的最大版本,如果只需要保存最新版本的数据,那么可以设置setMaxVersions(1)。
Time To Live创建表的时候,可以通过HColumnDescriptor.setTimeToLive(inttimeToLive)设置表中数据的存储生命期,过期数据将自动被删除,例如如果只需要存储最近两天的数据,那么可以设置setTimeToLive(2* 24 * 60 * 60)。
2、HBaseMaster默认基于Web的UI服务端口为60010,HBase region服务器默认基于Web的UI服务端口为60030.如果master运行在名为master.foo.com的主机中,mater的主页地址就是http://master.foo.com:60010,用户可以通过Web浏览器输入这个地址查看该页面
可以查看HBase集群的当前状态
3、
NoSQL区别于关系型数据库的一点就是NoSQL不使用SQL作为查询语言,至于为何在NoSQL数据存储HBase上提供SQL接口
易使用,减少编码
4、HBase只有一个针对行健的索引
访问HBase表中的行,只有三种方式:
通过单个行健访问
通过一个行健的区间来访问
全表扫描
总结:
1、HBase数据库是BigTable的开源实现,和BigTable一样,支持大规模海量数据,分布式并发数据处理效率极高,易于扩展且支持动态伸缩,适用于廉价设备
2、HBase可以支持NativeJava API、HBaseShell、ThriftGateway、Hive等多种访问接口,可以根据具体应用场合选择相应访问方式
3、HBase实际上就是一个稀疏、多维、持久化存储的映射表,它采用行键、列键和时间戳进行索引,每个值都是未经解释的字符串。
4、HBase采用分区存储,一个大的表会被分拆许多个Region,这些Region会被分发到不同的服务器上实现分布式存储
5、HBase的系统架构包括客户端、Zookeeper服务器、Master主服务器、Region服务器。客户端包含访问HBase的接口;Zookeeper服务器负责提供稳定可靠的协同服务;Master主服务器主要负责表和Region的管理工作;Region服务器负责维护分配给自己的Region,并响应用户的读写请求
python 获得本机MAC地址:
import uuid
def get_mac_address():
mac=uuid.UUID(int=uuid.getnode()).hex[-12:]
return ":".join([mac[e:e+2] for e in range(0,11,2)])
python获取IP的方法:使用socket
import socketmyname=socket.getfqdn(socket.gethostname( ))
myaddr=socket.gethostbyname(myname)
print(myname)
print(myaddr)