#
首先 本文中的 hadoop join 在实际开发没有用处!
如果在开发中 请使用 cascading groupby, 进行 hadoop join,
本文只是为探讨弄懂 cascading 实现做准备。
当然 如果有有人 hadoop join 过 请联系我,大家交流下 !
文件可能需要的一些参考:
hadoop jython ( windows )
jython ,jython 编译以及jar 包
少量 linux shell
本文介绍 hadoop 可能使用到的 join 接口测试 ,已经参考:
使用Hadoop实现Inner Join操作的方法【from淘宝】:http://labs.chinamobile.com/groups/58_547
下面 测试后 ,我这大体上 对 hadoop join 的方式是这样理解的 (猜想):
数据1 ; 数据2
job1.map( 数据1 ) =(临时文件1)> 文件标示1+需要join列 数据
job2.map( 数据2 ) =(临时文件2)> 文件标示2+需要join列 数据
临时文件 mapred.join.expr 生成
job3.map ->
文件标示1+需要join列 : 数据
文件标示2+需要join列 : 数据
......
job3.Combiner - >
需要join列 : 文件标示1+数据
需要join列 : 文件标示2+数据
job3.Reducer->
需要join列 : 使用 java-list > 生成
文件2-列x [ 数据,数据... ]
文件1-列x [ 数据,数据... ]
然后 你这 left join ,或 inner join 或 xxx join 逻辑 就自己来吧
结果集合
[root@localhost python]# cat /home/megajobs/del/jobs/tools/hadoop-0.18.3/data/090907/1
1
2
3
4
5
[root@localhost python]# cat /home/megajobs/del/jobs/tools/hadoop-0.18.3/data/090907/2
2
4
3
1
修改 ..../hadoop-0.18.3/src/examples/python/compile
#!/usr/bin/env bash
export HADOOP_HOME=/home/xx/del/jobs/tools/hadoop-0.18.3
export CASCADING_HOME=/home/xx/del/jobs/tools/cascading-1.0.16-hadoop-0.18.3
export JYTHON_HOME=/home/xx/del/jobs/tools/jython2.2.1
export CLASSPATH="$HADOOP_HOME/hadoop-0.18.3-core.jar"
# so that filenames w/ spaces are handled correctly in loops below
IFS=
# add libs to CLASSPATH
for f in $HADOOP_HOME/lib/*.jar; do
CLASSPATH=${CLASSPATH}:$f;
done
for f in $HADOOP_HOME/lib/jetty-ext/*.jar; do
CLASSPATH=${CLASSPATH}:$f;
done
for f in $CASCADING_HOME/*.jar; do
CLASSPATH=${CLASSPATH}:$f;
done
for f in $CASCADING_HOME/lib/*.jar; do
CLASSPATH=${CLASSPATH}:$f;
done
for f in $JYTHON_HOME/*.jar; do
CLASSPATH=${CLASSPATH}:$f;
done
# restore ordinary behaviour
unset IFS
/home/xx/del/jobs/tools/jython2.2.1/jythonc -p org.apache.hadoop.examples -d -j $1.jar -c $1.py
/home/xx/del/jobs/tools/hadoop-0.18.3/bin/hadoop jar $1.jar $2 $3 $4 $5 $6 $7 $8 $9
简单 数据 链接 :
from org.apache.hadoop.fs import Path
from org.apache.hadoop.io import *
from org.apache.hadoop.mapred.lib import *
from org.apache.hadoop.mapred.join import *
from org.apache.hadoop.mapred import *
import sys
import getopt
class tMap(Mapper, MapReduceBase):
def map(self, key, value, output, reporter):
output.collect( Text( str(key) ) , Text( value.toString() ))
def main(args):
conf = JobConf(tMap)
conf.setJobName("wordcount")
conf.setMapperClass( tMap )
FileInputFormat.setInputPaths(conf,[ Path(sp) for sp in args[1:-1]])
conf.setOutputKeyClass( Text )
conf.setOutputValueClass( Text )
conf.setOutputPath(Path(args[-1]))
JobClient.runJob(conf)
if __name__ == "__main__":main(sys.argv)
运行
./compile test file:///home/xx/del/jobs/tools/hadoop-0.18.3/data/090907/1 file:///home/xx/del/jobs/tools/hadoop-0.18.3/data/090907/2 file:///home/xx/del/jobs/tools/hadoop-0.18.3/tmp/wc78
结果:
[xx@localhost wc78]$ cat ../wc78/part-00000
0 1
0 2
2 4
2 2
4 3
4 3
6 1
6 4
8 5
简单的数据 join :
from org.apache.hadoop.fs import Path
from org.apache.hadoop.io import *
from org.apache.hadoop.mapred.lib import *
from org.apache.hadoop.mapred.join import *
from org.apache.hadoop.mapred import *
import sys
import getopt
class tMap(Mapper, MapReduceBase):
def map(self, key, value, output, reporter):
output.collect( Text( str(key) ) , Text( value.toString() ))
def main(args):
conf = JobConf(tMap)
conf.setJobName("wordcount")
conf.setMapperClass( tMap )
conf.set("mapred.join.expr", CompositeInputFormat.compose("override",TextInputFormat, args[1:-1] ) )
conf.setOutputKeyClass( Text )
conf.setOutputValueClass( Text )
conf.setInputFormat(CompositeInputFormat)
conf.setOutputPath(Path(args[-1]))
JobClient.runJob(conf)
if __name__ == "__main__":main(sys.argv)
运行结果 ( ) :
./compile test file:///home/xx/del/jobs/tools/hadoop-0.18.3/data/090907/1 file:///home/xx/del/jobs/tools/hadoop-0.18.3/data/090907/2 file:///home/xx/del/jobs/tools/hadoop-0.18.3/tmp/wc79
[xx@localhost wc78]$ cat ../wc79/part-00000
0 2
2 4
4 3
6 1
8 5
参考 : hadoop window 搭建 后,由于对 py 的语法喜欢 ,一直想 把hadoop,改成jython 的
这次 在 自己电脑上 终于 完成,下面介绍过程:
测试环境:
依然的 windows + cygwin
hadoop 0.18 # C:/cygwin/home/lky/tools/java/hadoop-0.18.3
jython 2.2.1 # C:/jython2.2.1
参考: PythonWordCount
启动 hadoop 并到 hdoop_home 下
# 在云环境中创建 input 目录
$>bin/hadoop dfs -mkdir input
# 在 包 hadoop 的 NOTICE.txt 拷贝到 input 目录下
$>bin/hadoop dfs -copyFromLocal c:/cygwin/home/lky/tools/java/hadoop-0.18.3/NOTICE.txt hdfs:///user/lky/input
$>cd src/examples/python
# 创建 个 脚本 ( jy->jar->hd run ) 一步完成!
# 当然 在 linux 写个脚本比这 好看 呵呵!
$>vim run.bat
"C:\Program Files\Java\jdk1.6.0_11\bin\java.exe" -classpath "C:\jython2.2.1\jython.jar;%CLASSPATH%" org.python.util.jython C:\jython2.2.1\Tools\jythonc\jythonc.py -p org.apache.hadoop.examples -d -j wc.jar -c %1
sh C:\cygwin\home\lky\tools\java\hadoop-0.18.3\bin\hadoop jar wc.jar %2 %3 %4 %5 %6 %7 %8 %9
# 修改 jythonc 打包 环境 。 +hadoop jar
$>vim C:\jython2.2.1\Tools\jythonc\jythonc.py
# Copyright (c) Corporation for National Research Initiatives
# Driver script for jythonc2. See module main.py for details
import sys,os,glob
for fn in glob.glob('c:/cygwin/home/lky/tools/java/hadoop-0.18.3/*.jar') :sys.path.append(fn)
for fn in glob.glob('c:/jython2.2.1/*.jar') :sys.path.append(fn)
for fn in glob.glob('c:/cygwin/home/lky/tools/java/hadoop-0.18.3/lib/*.jar') :sys.path.append(fn)
import main
main.main()
import os
os._exit(0)
# 运行
C:/cygwin/home/lky/tools/java/hadoop-0.18.3/src/examples/python>
run.bat WordCount.py hdfs:///user/lky/input file:///c:/cygwin/home/lky/tools/java/hadoop-0.18.3/tmp2
结果输出:
cat c:/cygwin/home/lky/tools/java/hadoop-0.18.3/tmp2/part-00000
(http://www.apache.org/). 1
Apache 1
Foundation 1
Software 1
The 1
This 1
by 1
developed 1
includes 1
product 1
software 1
下面重头来了 :(简洁的 jy hdoop 代码)
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from org.apache.hadoop.fs import Path
from org.apache.hadoop.io import *
from org.apache.hadoop.mapred import *
import sys
import getopt
class WordCountMap(Mapper, MapReduceBase):
one = IntWritable(1)
def map(self, key, value, output, reporter):
for w in value.toString().split():
output.collect(Text(w), self.one)
class Summer(Reducer, MapReduceBase):
def reduce(self, key, values, output, reporter):
sum = 0
while values.hasNext():
sum += values.next().get()
output.collect(key, IntWritable(sum))
def printUsage(code):
print "wordcount [-m <maps>] [-r <reduces>] <input> <output>"
sys.exit(code)
def main(args):
conf = JobConf(WordCountMap);
conf.setJobName("wordcount");
conf.setOutputKeyClass(Text);
conf.setOutputValueClass(IntWritable);
conf.setMapperClass(WordCountMap);
conf.setCombinerClass(Summer);
conf.setReducerClass(Summer);
try:
flags, other_args = getopt.getopt(args[1:], "m:r:")
except getopt.GetoptError:
printUsage(1)
if len(other_args) != 2:
printUsage(1)
for f,v in flags:
if f == "-m":
conf.setNumMapTasks(int(v))
elif f == "-r":
conf.setNumReduceTasks(int(v))
conf.setInputPath(Path(other_args[0]))
conf.setOutputPath(Path(other_args[1]))
JobClient.runJob(conf);
if __name__ == "__main__":
main(sys.argv)
谢谢 同事 孙超 讲解
这就 把他的 思想 画个图
代码:
# -*- coding: UTF8 -*-
import sys
# 最小 支持度
sup_min = int(sys.argv[1])
ss = ","
# 交易 数据 库
D=[
'A,B,C,D',
'B,C,E',
'A,B,C,E',
'B,D,E',
'A,B,C,D'
]
print "交易数据库展现"
for arr in D : print arr
print
'''
rows=int(sys.argv[1])
D=[]
for tid in open('BuyMusic.20090722.mob.prodIds').readlines()[:rows] :
D.append(tid.split("\n")[0].split("\t")[1])
print "读取 文件结束 BuyMusic.20090722.mob.prodIds !"
'''
#全局 频繁项 收集
sup_data_map = {}
#全局 最大频繁项 收集
is_zsup={}
# 遍历过程 临时 局部 频繁项 收集
mapL = {}
# 第一次 频繁项 收集
def find_frequent_1_itemset(I):
if I=='null' or I=='' : return
if mapL.has_key(I): mapL[I]+=1
else: mapL[I]=1
map(find_frequent_1_itemset,[ I for TID in D for I in TID.split(ss) ])
# 刷选掉 小于 最小支持度 的 频繁项
def remove_not_sup_min(map,supmin=sup_min):
for k in [k for k,v in map.items() if v<supmin] :
del map[k]
remove_not_sup_min(mapL)
print "第一次 筛选 频繁项 结束!"
print mapL
# 装载 全局 频繁项 最大频繁项
for k,v in mapL.items() :
sup_data_map[k]=v
is_zsup[k]=v
# 判定 是否 'BD' 属于 'BCD' 中
isInTid = lambda I,TID : len(I.split(ss)) == len([i for i in I if i in TID.split(ss)])
# 组合 [A,B] + [A,C] = [A,B.C]
def comb(arr1,arr2):
tmap={}
for v in arr1+arr2 : tmap[v]=""
return tmap.keys()
# apriori 迭代核心
def runL(mapL,dep):
mapL2 = {}
C={}
keys = mapL.keys()
iik=""
jjk=""
# 根据 上次 频繁项 ,生成本次 '可能频繁项' 集合
for ii in range(len(keys)) :
for jj in range(ii+1,len(keys)) :
keystr=comb([ch for ch in keys[ii].split(ss)],[ch for ch in keys[jj].split(ss)])
if not len(keystr) == dep : continue
keystr.sort()
tk=ss.join(keystr)
if not tk in C : C[tk]=(keys[ii],keys[jj])
# '可能频繁项' 对比 交易数据库 计数
for tk,z in C.items():
for TID in D:
if isInTid(tk,TID) :
if mapL2.has_key(tk): mapL2[tk]+=1
else: mapL2[tk]=1
# 刷选掉 小于 最小支持度 的 频繁项
remove_not_sup_min(mapL2)
for k,v in is_zsup.items() :
for k1,v1 in mapL2.items() :
if isInTid(k,k1) :
del is_zsup[k]
break
# 全局 频繁项 ,最大频繁项 收集
for k,v in mapL2.items() :
sup_data_map[k]=v
is_zsup[k]=v
print "第"+str(dep)+"次 筛选 频繁项 结束!"
return mapL2
# 真正 运行
ii=1
while mapL :
ii=ii+1
mapL = runL(mapL,ii)
print mapL
# 全局 频繁项 中 去除 最大频繁项
for k,v in is_zsup.items() :
if sup_data_map.has_key(k) : del sup_data_map[k]
print "频繁项"
print sup_data_map
print
print "最大频繁项"
print is_zsup
print
print "可信度 展现"
for k,v in sup_data_map.items() :
for k1,v1 in is_zsup.items() :
if isInTid(k,k1) :
print k,"->",k1,"\t%.1f" %((float(is_zsup[k1])/float(sup_data_map[k]))*100)+"%"
结果:
-bash-3.00$ python ap.py 2
交易数据库展现
A,B,C,D
B,C,E
A,B,C,E
B,D,E
A,B,C,D
第一次 筛选 频繁项 结束!
{'A': 3, 'C': 4, 'B': 5, 'E': 3, 'D': 3}
第2次 筛选 频繁项 结束!
{'C,D': 2, 'C,E': 2, 'A,D': 2, 'A,B': 3, 'A,C': 3, 'B,E': 3, 'B,D': 3, 'B,C': 4}
第3次 筛选 频繁项 结束!
{'A,B,D': 2, 'A,B,C': 3, 'B,C,D': 2, 'B,C,E': 2, 'A,C,D': 2}
第4次 筛选 频繁项 结束!
{'A,B,C,D': 2}
第5次 筛选 频繁项 结束!
{}
频繁项
{'A': 3, 'C': 4, 'B': 5, 'E': 3, 'D': 3, 'C,D': 2, 'C,E': 2, 'A,D': 2, 'A,B': 3, 'A,C': 3, 'A,B,D': 2, 'B,C,D': 2, 'A,C,D': 2, 'B,E': 3, 'B,D': 3, 'B,C': 4, 'A,B,C': 3}
最大频繁项
{'B,C,E': 2, 'A,B,C,D': 2}
可信度 展现
A -> A,B,C,D 66.7%
C -> B,C,E 50.0%
C -> A,B,C,D 50.0%
B -> B,C,E 40.0%
B -> A,B,C,D 40.0%
E -> B,C,E 66.7%
D -> A,B,C,D 66.7%
C,D -> A,B,C,D 100.0%
C,E -> B,C,E 100.0%
A,D -> A,B,C,D 100.0%
A,B -> A,B,C,D 66.7%
A,C -> A,B,C,D 66.7%
A,B,D -> A,B,C,D 100.0%
B,C,D -> A,B,C,D 100.0%
A,C,D -> A,B,C,D 100.0%
B,E -> B,C,E 66.7%
B,D -> A,B,C,D 66.7%
B,C -> B,C,E 50.0%
B,C -> A,B,C,D 50.0%
A,B,C -> A,B,C,D 66.7%
一些特殊正则元字符说明:
1. *? 和 +? 和 {n,}? 懒惰匹配
1.1 非懒惰 ↓
echo "ab2c121a" |perl -ne 'print $1 if /(.*)"d/;' #print ab2c12
1.2 懒惰 ↓
echo "ab2c121a" |perl -ne 'print $1 if /(.*?)"d/;' #print ab
2. 回溯引用和前后查找:
2.1 向前查找 (?=..) ↓
echo "ab2c121a" |perl -ne 'print $1 if /(.*?)(?=2)/;' #print ab
2.2 向后查找 (?<=..) ↓
echo "ab2c121a" |perl -ne 'print $1 if /(?<=2)(.*)(?=2)/;' #print c1
2.3 负向-前/后 查找 (?!..) (?<!..)
#不能匹配 .. ↓
echo "ab2c121a" |perl -ne 'print $1 if /(?<!2)(c.*)/;' #print 无
echo "ab2c121a" |perl -ne 'print $1 if /(?<!3)(c.*)/;' #print c121a
2.4 条件 ?() = if ?()| = if else
# ?() 例如 <p> </p> 必须同时出现 ↓ ↓
echo "<p>xx</p>"|perl -ne 'print $2 if /(<p>)?("w*)(?(1)<"/p>)/' #print xx
echo "<p>xx"|perl -ne 'print $2,""n" if /(<p>)?("w*)(?(1)<"/p>)/' #print 空
echo "xx"|perl -ne 'print $2 if /(<p>)?("w*)(?(1)<"/p>)/' #print xx
# ?()| 例如 还是上面的,
# 当 有<p> 可以接</p> 也可以接 数字结尾 ↓
echo "<p>xx1</p>"|perl -ne 'print $2 if /(<p>)?("w*)(?(1)<"/p>|"d)/' #print xx1
echo "<p>xx1"|perl -ne 'print $2 if /(<p>)?("w*)(?(1)<"/p>|"d)/' # print xx
当熟悉 hash db python bsddb (db-key 转)
使用确实很方便,但是没有 想 关系数据库中的 select order by 查询 ,感觉比较郁闷! 上网 一顿 google ......
import bsddb
db = bsddb.btopen('/tmp/spam.db', 'c')
for i in range(10): db['%d'%i] = '%d'% (i*i)
db['3'] # 9
db.keys() # ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
db.set_location('6') # 36
db.previous() # 25
db.next() # 36
db.next() # 47
这可以定位,并且 previous , next 什么的 (不过目前好像是针对 string 自然 排序!)
这里比较实用的 demo
import bsddb
db = bsddb.btopen('/tmp/spam2.db', 'c')
db["2009-08-14 22:00"]="gg"
db["2009-08-15 22:00"]="cc"
db["2009-07-15 00:00"]="tt"
db["2009-08-16 22:00"]="gg"
# 注意 这 统配 等价 正则 = 2009-08-15.*
# 开始 以为能使用 正则 ,但不能 。只能简单的 xxx.* 形式的
db.set_location('2009-08-15') # ('2009-08-15 22:00', 'cc')
db.next() # ('2009-08-16 22:00', 'gg')
db.set_location('2009-08-15') # ('2009-08-15 22:00', 'cc')
db.previous() #('2009-08-14 22:00', 'gg')
转:http://www.daniweb.com/forums/thread31449.html
什么都不说了,直接看代码吧。
注解 应该写的比较详细
# liukaiyi
# 注 k-means ,维度类型 - 数值形式 ( 199 或 23.13 )
import sys, math, random
# -- 类化 '数据'
# 在 n-维度空间
class Point:
def __init__(self, coords, reference=None):
self.coords = coords
self.n = len(coords)
self.reference = reference
def __repr__(self):
return str(self.coords)
# -- 类化 '聚集点 / 聚类平均距离 点 '
# -- 在 n-维度空间
# -- k-means 核心类
# -- 每次 聚集各点 围绕她 进行聚集
# -- 并提供方法 求-聚集后的计算中心点,同时记入 此次 中心点(聚集各点平均距离),为下一次聚集提供中心点.
class Cluster:
def __init__(self, points):
if len(points) == 0: raise Exception("ILLEGAL: EMPTY CLUSTER")
self.points = points
self.n = points[0].n
for p in points:
if p.n != self.n: raise Exception("ILLEGAL: MULTISPACE CLUSTER")
# 求 聚集各点后 平均点
self.centroid = self.calculateCentroid()
def __repr__(self):
return str(self.points)
# 更新 中心点,并返回 原中心点 与 现中心点(聚集各点平均距离)距离
def update(self, points):
old_centroid = self.centroid
self.points = points
self.centroid = self.calculateCentroid()
return getDistance(old_centroid, self.centroid)
# 计算平均点 (聚集/收集各点(离本类的中心点)最近数据,后生成新的 中心点 )
def calculateCentroid(self):
centroid_coords = []
# 维度 迭代
for i in range(self.n):
centroid_coords.append(0.0)
# 收集各点 迭代
for p in self.points:
centroid_coords[i] = centroid_coords[i]+p.coords[i]
centroid_coords[i] = centroid_coords[i]/len(self.points)
return Point(centroid_coords)
# -- 返回根据 k-means 聚集形成的 数据集
def kmeans(points, k, cutoff):
# Randomly sample k Points from the points list, build Clusters around them
initial = random.sample(points, k)
clusters = []
for p in initial: clusters.append(Cluster([p]))
# 迭代 k-means 直到 每次迭代 各收集点 别的 最多 不超过 0.5
while True:
# k 个收集 数组
lists = []
for c in clusters: lists.append([])
# 迭代 每个 数据点 ,并计算与每个中心点距离
# 并把数据点添加入相应最短的中心点收集数组中
# 在迭代中 smallest_distance 为每个点与各中心点最短距离 参数,请注意看
for p in points:
smallest_distance = getDistance(p, clusters[0].centroid)
index = 0
for i in range(len(clusters[1:])):
distance = getDistance(p, clusters[i+1].centroid)
if distance < smallest_distance:
smallest_distance = distance
index = i+1
# 添加到 离最短中心距离的 数组中
lists[index].append(p)
# 聚集完,计算新 中心点
# 并 cluster.centroid 属性记入下 新中心点(下一次 聚集的中心点 )
# 并 计算与上一次 中心点 距离 ,如果 差值在 cutoff 0.5 以下 ,跳出迭代 (结束,返回最后一次 聚集集合)
biggest_shift = 0.0
for i in range(len(clusters)):
shift = clusters[i].update(lists[i])
biggest_shift = max(biggest_shift, shift)
if biggest_shift < cutoff: break
return clusters
# -- 得到欧几里德距离两点之间
def getDistance(a, b):
# Forbid measurements between Points in different spaces
if a.n != b.n: raise Exception("ILLEGAL: NON-COMPARABLE POINTS")
# Euclidean distance between a and b is sqrt(sum((a[i]-b[i])^2) for all i)
ret = 0.0
for i in range(a.n):
ret = ret+pow((a.coords[i]-b.coords[i]), 2)
return math.sqrt(ret)
# -- 在 n-维度 空间中创建 随机点
# -- 随机生成 测试数据
def makeRandomPoint(n, lower, upper):
coords = []
for i in range(n): coords.append(random.uniform(lower, upper))
return Point(coords)
# main
def main(args):
# 参数说明
# num_points, n, k, cutoff, lower, upper
# 随机数据数量 , 维度, 聚集数, 跳出迭代最小距离 , 维度数最大值,维度数最小值
num_points, n, k, cutoff, lower, upper = 10, 2, 3, 0.5, -200, 200
# 在 n-维度空间里 , 创建 num_points 随机点
# 测试数据生成
points = []
for i in range(num_points): points.append(makeRandomPoint(n, lower, upper))
# 使用 k-means 算法,来 聚集数据点 (算法入口点)
clusters = kmeans(points, k, cutoff)
print "\nPOINTS:"
for p in points: print "P:", p
print "\nCLUSTERS:"
for c in clusters: print "C:", c
if __name__ == "__main__": main(sys.argv)
1. vi /etc/vsftpd/vsftpd.conf
添加:
listen=YES
tcp_wrappers=YES
port_enable=YES
ftp_data_port=20
listen_port=21
listen_address=0.0.0.0
port_promiscuous=NO
no_anon_password=NO
anon_mkdir_write_enable=no
2.将chroot_list_enable=YES前的#去掉
并将chroot_list_file=/etc/vsftpd.chroot_list 前的#去掉
3.创建用户
useradd 用户
passwd 用户
4. vi /etc/vsftpd.chroot_list
将 用户 添加到文件里
5.修改用户的登录路径(主目录)
vi /etc/passwd
如:data:x:516:516::/home/data/data:/sbin/nologin
6.启动vsftp
service vsftpd restart
Java 代码:
package com.xunjie.dmsp.olduser;
import java.util.Properties;
import cascading.flow.Flow;
import cascading.flow.FlowConnector;
import cascading.operation.regex.RegexSplitter;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.scheme.TextLine;
import cascading.tap.Hfs;
import cascading.tap.Tap;
import cascading.tuple.Fields;
/**
* test.txt:
* 1 a
* 2 b
* 3 c
*
* /data/hadoop/hadoop/bin/hadoop jar
* dmsp_test_jar-1.0-SNAPSHOT-dependencies.jar
* hdfs:/user/hadoop/test/lky/test.txt
* file:///data/hadoop/test/lky/output
*/
public class Test2 {
public static void main(String[] args) {
//设定输入文件
String sourcePath= args[0];
//设置输出文件夹
String sinkPath = args[1];
//定义读取列
Fields inputfields = new Fields("num", "value");
//定义分解正则,默认 \t
RegexSplitter spliter = new RegexSplitter(inputfields);
//管道定义
Pipe p1 = new Pipe( "test" );
//管道嵌套:
//分解日志源文件,输出给定字段
p1 = new Each(p1,new Fields("line") ,spliter);
//设定输入和输出 ,使用 泛型Hfs
Tap source = new Hfs( new TextLine(), sourcePath );
Tap sink = new Hfs( new TextLine() , sinkPath );
//配置job
Properties properties = new Properties();
properties.setProperty("hadoop.job.ugi", "hadoop,hadoop");
FlowConnector.setApplicationJarClass( properties, Main.class );
FlowConnector flowConnector = new FlowConnector(properties);
Flow importFlow = flowConnector.connect( "import flow", source,sink,p1);
importFlow.start();
importFlow.complete();
}
}
这特殊关注下, 开启慢查询。在web开发中很有帮助
MYSQL启用日志,和查看日志
|
时间:2009-01-21 17:33:57 来源:http://wasabi.javaeye.com/blog/318962 作者:kenbli |
mysql有以下几种日志:
错误日志: -log-err
查询日志: -log
慢查询日志: -log-slow-queries
更新日志: -log-update
二进制日志: -log-bin
是否启用了日志
mysql>show variables like 'log_%';
怎样知道当前的日志
mysql> show master status;
顯示二進制日志數目
mysql> show master logs;
看二进制日志文件用mysqlbinlog
shell>mysqlbinlog mail-bin.000001
或者shell>mysqlbinlog mail-bin.000001 | tail
在配置文件中指定log的輸出位置.
Windows:Windows 的配置文件为 my.ini,一般在 MySQL 的安装目录下或者 c:\Windows 下。
Linux:Linux 的配置文件为 my.cnf ,一般在 /etc 下。
在linux下:
- # 在[mysqld] 中輸入
- #log
- log-error=/usr/local/mysql/log/error.log
- log=/usr/local/mysql/log/mysql.log
- long_query_time=2
- log-slow-queries= /usr/local/mysql/log/slowquery.log
windows下:
- # 在[mysqld] 中輸入
- #log
- log-error="E:/PROGRA~1/EASYPH~1.0B1/mysql/logs/error.log"
- log="E:/PROGRA~1/EASYPH~1.0B1/mysql/logs/mysql.log"
- long_query_time=2
- log-slow-queries= "E:/PROGRA~1/EASYPH~1.0B1/mysql/logs/slowquery.log"
开启慢查询
long_query_time =2 --是指执行超过多久的sql会被log下来,这里是2秒
log-slow-queries= /usr/local/mysql/log/slowquery.log --将查询返回较慢的语句进行记录
log-queries-not-using-indexes = nouseindex.log --就是字面意思,log下来没有使用索引的query
log=mylog.log --对所有执行语句进行记录 |
|