#注意事项:

#当运行本Notebook的程序后,如果要关闭Notebook,请选择菜单: File > Close and Halt 才能确实停止当前正在运行的程序,并且释放资源

#如果没有使用以上方法,只关闭此分页,程序仍在运行,未释放资源,当您打开并运行其他的Notebook,可能会发生错误

1
sc.master
'spark://192.168.10.170:7077'
1
2
3
4
5
6
global Path    
if sc.master[0:5]=="local" :
Path="file:/home/hduser/pythonsparkexample/PythonProject/"
else:
Path="hdfs://master:9000/user/hduser/"
#如果要在cluster模式运行(hadoop yarn 或Spark Stand alone),请按照书上的说明,先把文件上传到HDFS目录

19.3 建立RDD、DataFrame与Spark SQL tempTable

1
#Step 1  读取文本文件,建立RDD
1
RawUserRDD= sc.textFile(Path+"data/u.user")
1
2
3
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
distData.collect()
[1, 2, 3, 4, 5]
1
RawUserRDD.count()
943
1
RawUserRDD.take(5)
['1|24|M|technician|85711',
 '2|53|F|other|94043',
 '3|23|M|writer|32067',
 '4|24|M|technician|43537',
 '5|33|F|other|15213']
1
2
userRDD =RawUserRDD.map(lambda line: line.split("|"))
userRDD .take(5)
[['1', '24', 'M', 'technician', '85711'],
 ['2', '53', 'F', 'other', '94043'],
 ['3', '23', 'M', 'writer', '32067'],
 ['4', '24', 'M', 'technician', '43537'],
 ['5', '33', 'F', 'other', '15213']]

建立DataFrame

1
sqlContext = SparkSession.builder.getOrCreate()
1
2
3
4
5
6
7
8
9
10
11
from pyspark.sql import Row
user_Rows = userRDD.map(lambda p:
Row(
userid=int(p[0]),
age=int(p[1]),
gender=p[2],
occupation=p[3],
zipcode=p[4]
)
)
user_Rows.take(5)
[Row(age=24, gender='M', occupation='technician', userid=1, zipcode='85711'),
 Row(age=53, gender='F', occupation='other', userid=2, zipcode='94043'),
 Row(age=23, gender='M', occupation='writer', userid=3, zipcode='32067'),
 Row(age=24, gender='M', occupation='technician', userid=4, zipcode='43537'),
 Row(age=33, gender='F', occupation='other', userid=5, zipcode='15213')]
1
2
3
user_df = sqlContext.createDataFrame(user_Rows)
user_df .printSchema()
user_df.show()
root
 |-- age: long (nullable = true)
 |-- gender: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- userid: long (nullable = true)
 |-- zipcode: string (nullable = true)

+---+------+-------------+------+-------+
|age|gender|   occupation|userid|zipcode|
+---+------+-------------+------+-------+
| 24|     M|   technician|     1|  85711|
| 53|     F|        other|     2|  94043|
| 23|     M|       writer|     3|  32067|
| 24|     M|   technician|     4|  43537|
| 33|     F|        other|     5|  15213|
| 42|     M|    executive|     6|  98101|
| 57|     M|administrator|     7|  91344|
| 36|     M|administrator|     8|  05201|
| 29|     M|      student|     9|  01002|
| 53|     M|       lawyer|    10|  90703|
| 39|     F|        other|    11|  30329|
| 28|     F|        other|    12|  06405|
| 47|     M|     educator|    13|  29206|
| 45|     M|    scientist|    14|  55106|
| 49|     F|     educator|    15|  97301|
| 21|     M|entertainment|    16|  10309|
| 30|     M|   programmer|    17|  06355|
| 35|     F|        other|    18|  37212|
| 40|     M|    librarian|    19|  02138|
| 42|     F|    homemaker|    20|  95660|
+---+------+-------------+------+-------+
only showing top 20 rows
1
user_df.show(5)
+---+------+----------+------+-------+
|age|gender|occupation|userid|zipcode|
+---+------+----------+------+-------+
| 24|     M|technician|     1|  85711|
| 53|     F|     other|     2|  94043|
| 23|     M|    writer|     3|  32067|
| 24|     M|technician|     4|  43537|
| 33|     F|     other|     5|  15213|
+---+------+----------+------+-------+
only showing top 5 rows
1
2
df=user_df.alias("df")
df.show(5)
+---+------+----------+------+-------+
|age|gender|occupation|userid|zipcode|
+---+------+----------+------+-------+
| 24|     M|technician|     1|  85711|
| 53|     F|     other|     2|  94043|
| 23|     M|    writer|     3|  32067|
| 24|     M|technician|     4|  43537|
| 33|     F|     other|     5|  15213|
+---+------+----------+------+-------+
only showing top 5 rows

建立Spark SQL tempTable

1
user_df.registerTempTable("user_table")
1
sqlContext.sql(" SELECT * FROM user_table").show()
+---+------+-------------+------+-------+
|age|gender|   occupation|userid|zipcode|
+---+------+-------------+------+-------+
| 24|     M|   technician|     1|  85711|
| 53|     F|        other|     2|  94043|
| 23|     M|       writer|     3|  32067|
| 24|     M|   technician|     4|  43537|
| 33|     F|        other|     5|  15213|
| 42|     M|    executive|     6|  98101|
| 57|     M|administrator|     7|  91344|
| 36|     M|administrator|     8|  05201|
| 29|     M|      student|     9|  01002|
| 53|     M|       lawyer|    10|  90703|
| 39|     F|        other|    11|  30329|
| 28|     F|        other|    12|  06405|
| 47|     M|     educator|    13|  29206|
| 45|     M|    scientist|    14|  55106|
| 49|     F|     educator|    15|  97301|
| 21|     M|entertainment|    16|  10309|
| 30|     M|   programmer|    17|  06355|
| 35|     F|        other|    18|  37212|
| 40|     M|    librarian|    19|  02138|
| 42|     F|    homemaker|    20|  95660|
+---+------+-------------+------+-------+
only showing top 20 rows
1
sqlContext.sql(" SELECT count(*) counts FROM user_table").show()
+------+
|counts|
+------+
|   943|
+------+
1
2
3
4
sqlContext.sql("""
SELECT count(*) counts
FROM user_table
""").show()
+------+
|counts|
+------+
|   943|
+------+
1
sqlContext.sql(" SELECT *  FROM user_table ").show()
+---+------+-------------+------+-------+
|age|gender|   occupation|userid|zipcode|
+---+------+-------------+------+-------+
| 24|     M|   technician|     1|  85711|
| 53|     F|        other|     2|  94043|
| 23|     M|       writer|     3|  32067|
| 24|     M|   technician|     4|  43537|
| 33|     F|        other|     5|  15213|
| 42|     M|    executive|     6|  98101|
| 57|     M|administrator|     7|  91344|
| 36|     M|administrator|     8|  05201|
| 29|     M|      student|     9|  01002|
| 53|     M|       lawyer|    10|  90703|
| 39|     F|        other|    11|  30329|
| 28|     F|        other|    12|  06405|
| 47|     M|     educator|    13|  29206|
| 45|     M|    scientist|    14|  55106|
| 49|     F|     educator|    15|  97301|
| 21|     M|entertainment|    16|  10309|
| 30|     M|   programmer|    17|  06355|
| 35|     F|        other|    18|  37212|
| 40|     M|    librarian|    19|  02138|
| 42|     F|    homemaker|    20|  95660|
+---+------+-------------+------+-------+
only showing top 20 rows
1
sqlContext.sql(" SELECT *  FROM user_table").show(5)
+---+------+----------+------+-------+
|age|gender|occupation|userid|zipcode|
+---+------+----------+------+-------+
| 24|     M|technician|     1|  85711|
| 53|     F|     other|     2|  94043|
| 23|     M|    writer|     3|  32067|
| 24|     M|technician|     4|  43537|
| 33|     F|     other|     5|  15213|
+---+------+----------+------+-------+
only showing top 5 rows
1
sqlContext.sql(" SELECT *  FROM user_table LIMIT 5").show()
+---+------+----------+------+-------+
|age|gender|occupation|userid|zipcode|
+---+------+----------+------+-------+
| 24|     M|technician|     1|  85711|
| 53|     F|     other|     2|  94043|
| 23|     M|    writer|     3|  32067|
| 24|     M|technician|     4|  43537|
| 33|     F|     other|     5|  15213|
+---+------+----------+------+-------+

19.3 显示部分字段

1
2
userRDDnew= userRDD.map(lambda x: (x[0],x[3],x[2] ,x[1]) )
userRDDnew.take(5)
[('1', 'technician', 'M', '24'),
 ('2', 'other', 'F', '53'),
 ('3', 'writer', 'M', '23'),
 ('4', 'technician', 'M', '24'),
 ('5', 'other', 'F', '33')]
1
user_df.select("userid","occupation","gender","age").show(5)
+------+----------+------+---+
|userid|occupation|gender|age|
+------+----------+------+---+
|     1|technician|     M| 24|
|     2|     other|     F| 53|
|     3|    writer|     M| 23|
|     4|technician|     M| 24|
|     5|     other|     F| 33|
+------+----------+------+---+
only showing top 5 rows
1
user_df.select( user_df.userid, user_df.occupation,user_df.gender,user_df.age ).show(5)
+------+----------+------+---+
|userid|occupation|gender|age|
+------+----------+------+---+
|     1|technician|     M| 24|
|     2|     other|     F| 53|
|     3|    writer|     M| 23|
|     4|technician|     M| 24|
|     5|     other|     F| 33|
+------+----------+------+---+
only showing top 5 rows
1
df.select(df.userid,df.occupation,df.gender,df.age  ).show(5)
+------+----------+------+---+
|userid|occupation|gender|age|
+------+----------+------+---+
|     1|technician|     M| 24|
|     2|     other|     F| 53|
|     3|    writer|     M| 23|
|     4|technician|     M| 24|
|     5|     other|     F| 33|
+------+----------+------+---+
only showing top 5 rows
1
user_df.select(user_df.userid, user_df.occupation,df.gender,df.age ).show(5)
+------+----------+------+---+
|userid|occupation|gender|age|
+------+----------+------+---+
|     1|technician|     M| 24|
|     2|     other|     F| 53|
|     3|    writer|     M| 23|
|     4|technician|     M| 24|
|     5|     other|     F| 33|
+------+----------+------+---+
only showing top 5 rows
1
df[df['userid'],df['occupation'],df['gender'],df['age']  ].show(5)
+------+----------+------+---+
|userid|occupation|gender|age|
+------+----------+------+---+
|     1|technician|     M| 24|
|     2|     other|     F| 53|
|     3|    writer|     M| 23|
|     4|technician|     M| 24|
|     5|     other|     F| 33|
+------+----------+------+---+
only showing top 5 rows
1
sqlContext.sql(" SELECT userid,occupation,gender,age  FROM user_table").show(5)
+------+----------+------+---+
|userid|occupation|gender|age|
+------+----------+------+---+
|     1|technician|     M| 24|
|     2|     other|     F| 53|
|     3|    writer|     M| 23|
|     4|technician|     M| 24|
|     5|     other|     F| 33|
+------+----------+------+---+
only showing top 5 rows

19.4 增加计算字段

1
2
userRDDnew= userRDD.map(lambda x: (x[0],x[3],x[2],x[1] ,2016-int(x[1])) )
userRDDnew.take(5)
[('1', 'technician', 'M', '24', 1992),
 ('2', 'other', 'F', '53', 1963),
 ('3', 'writer', 'M', '23', 1993),
 ('4', 'technician', 'M', '24', 1992),
 ('5', 'other', 'F', '33', 1983)]
1
df.select("userid","occupation","gender","age",2016-df.age).show(5)
+------+----------+------+---+------------+
|userid|occupation|gender|age|(2016 - age)|
+------+----------+------+---+------------+
|     1|technician|     M| 24|        1992|
|     2|     other|     F| 53|        1963|
|     3|    writer|     M| 23|        1993|
|     4|technician|     M| 24|        1992|
|     5|     other|     F| 33|        1983|
+------+----------+------+---+------------+
only showing top 5 rows
1
df.select("userid","occupation","gender","age",(2016-df.age).alias("birthyear")).show(5)
+------+----------+------+---+---------+
|userid|occupation|gender|age|birthyear|
+------+----------+------+---+---------+
|     1|technician|     M| 24|     1992|
|     2|     other|     F| 53|     1963|
|     3|    writer|     M| 23|     1993|
|     4|technician|     M| 24|     1992|
|     5|     other|     F| 33|     1983|
+------+----------+------+---+---------+
only showing top 5 rows
1
2
3
sqlContext.sql("""
SELECT userid,occupation,gender,age, 2016-age birthyear
FROM user_table""").show(5)
+------+----------+------+---+---------+
|userid|occupation|gender|age|birthyear|
+------+----------+------+---+---------+
|     1|technician|     M| 24|     1992|
|     2|     other|     F| 53|     1963|
|     3|    writer|     M| 23|     1993|
|     4|technician|     M| 24|     1992|
|     5|     other|     F| 33|     1983|
+------+----------+------+---+---------+
only showing top 5 rows

19.5筛选数据

1
userRDD.filter(lambda r:   r[3]=='technician' and r[2]=='M' and r[1]=='24').take(6)
[['1', '24', 'M', 'technician', '85711'],
 ['4', '24', 'M', 'technician', '43537'],
 ['456', '24', 'M', 'technician', '31820'],
 ['717', '24', 'M', 'technician', '84105'],
 ['832', '24', 'M', 'technician', '77042'],
 ['889', '24', 'M', 'technician', '78704']]
1
user_df.filter("occupation='technician' ").filter("gender='M' ").filter("age=24").show()
+---+------+----------+------+-------+
|age|gender|occupation|userid|zipcode|
+---+------+----------+------+-------+
| 24|     M|technician|     1|  85711|
| 24|     M|technician|     4|  43537|
| 24|     M|technician|   456|  31820|
| 24|     M|technician|   717|  84105|
| 24|     M|technician|   832|  77042|
| 24|     M|technician|   889|  78704|
+---+------+----------+------+-------+
1
user_df.filter("occupation='technician' and gender='M' and age=24").show()
+---+------+----------+------+-------+
|age|gender|occupation|userid|zipcode|
+---+------+----------+------+-------+
| 24|     M|technician|     1|  85711|
| 24|     M|technician|     4|  43537|
| 24|     M|technician|   456|  31820|
| 24|     M|technician|   717|  84105|
| 24|     M|technician|   832|  77042|
| 24|     M|technician|   889|  78704|
+---+------+----------+------+-------+
1
df.filter((df.occupation=='technician' ) & (df.gender=='M' ) & (df.age==24)).show()
+---+------+----------+------+-------+
|age|gender|occupation|userid|zipcode|
+---+------+----------+------+-------+
| 24|     M|technician|     1|  85711|
| 24|     M|technician|     4|  43537|
| 24|     M|technician|   456|  31820|
| 24|     M|technician|   717|  84105|
| 24|     M|technician|   832|  77042|
| 24|     M|technician|   889|  78704|
+---+------+----------+------+-------+
1
df.filter((df['occupation']=='technician' ) & (df['gender']=='M' ) & (df['age']==24)).show()
+---+------+----------+------+-------+
|age|gender|occupation|userid|zipcode|
+---+------+----------+------+-------+
| 24|     M|technician|     1|  85711|
| 24|     M|technician|     4|  43537|
| 24|     M|technician|   456|  31820|
| 24|     M|technician|   717|  84105|
| 24|     M|technician|   832|  77042|
| 24|     M|technician|   889|  78704|
+---+------+----------+------+-------+
1
#Step3 使用Spark SQL 筛选数据
1
2
3
4
sqlContext.sql(
'''SELECT *
FROM user_table
where occupation='technician' and gender='M' and age=24''').show(5)
+---+------+----------+------+-------+
|age|gender|occupation|userid|zipcode|
+---+------+----------+------+-------+
| 24|     M|technician|     1|  85711|
| 24|     M|technician|     4|  43537|
| 24|     M|technician|   456|  31820|
| 24|     M|technician|   717|  84105|
| 24|     M|technician|   832|  77042|
+---+------+----------+------+-------+
only showing top 5 rows

19.6 筛选数据

1
2
#Step1 使用多个filter筛选数据
user_df.filter("occupation='technician' ").filter("gender='M' ").filter("age=24").show(5)
+---+------+----------+------+-------+
|age|gender|occupation|userid|zipcode|
+---+------+----------+------+-------+
| 24|     M|technician|     1|  85711|
| 24|     M|technician|     4|  43537|
| 24|     M|technician|   456|  31820|
| 24|     M|technician|   717|  84105|
| 24|     M|technician|   832|  77042|
+---+------+----------+------+-------+
only showing top 5 rows
1
user_df.filter("occupation='technician' and gender='M' and age=24").show()
+---+------+----------+------+-------+
|age|gender|occupation|userid|zipcode|
+---+------+----------+------+-------+
| 24|     M|technician|     1|  85711|
| 24|     M|technician|     4|  43537|
| 24|     M|technician|   456|  31820|
| 24|     M|technician|   717|  84105|
| 24|     M|technician|   832|  77042|
| 24|     M|technician|   889|  78704|
+---+------+----------+------+-------+
1
user_df.filter(user_df.occupation=='technician' ).filter(user_df.gender=='M' ).filter(user_df.age==24).show()
+---+------+----------+------+-------+
|age|gender|occupation|userid|zipcode|
+---+------+----------+------+-------+
| 24|     M|technician|     1|  85711|
| 24|     M|technician|     4|  43537|
| 24|     M|technician|   456|  31820|
| 24|     M|technician|   717|  84105|
| 24|     M|technician|   832|  77042|
| 24|     M|technician|   889|  78704|
+---+------+----------+------+-------+
1
user_df.filter((df.occupation=='technician' ) & (df.gender=='M' ) & (df.age==24)).show()
+---+------+----------+------+-------+
|age|gender|occupation|userid|zipcode|
+---+------+----------+------+-------+
| 24|     M|technician|     1|  85711|
| 24|     M|technician|     4|  43537|
| 24|     M|technician|   456|  31820|
| 24|     M|technician|   717|  84105|
| 24|     M|technician|   832|  77042|
| 24|     M|technician|   889|  78704|
+---+------+----------+------+-------+
1
df.filter((df['occupation']=='technician' ) & (df['gender']=='M' ) &  (df['age']==24)).show()
+---+------+----------+------+-------+
|age|gender|occupation|userid|zipcode|
+---+------+----------+------+-------+
| 24|     M|technician|     1|  85711|
| 24|     M|technician|     4|  43537|
| 24|     M|technician|   456|  31820|
| 24|     M|technician|   717|  84105|
| 24|     M|technician|   832|  77042|
| 24|     M|technician|   889|  78704|
+---+------+----------+------+-------+

19.6 单个字段排序数据

1
userRDD.takeOrdered(10, key = lambda x: int(x[1]),)
[['30', '7', 'M', 'student', '55436'],
 ['471', '10', 'M', 'student', '77459'],
 ['289', '11', 'M', 'none', '94619'],
 ['142', '13', 'M', 'other', '48118'],
 ['609', '13', 'F', 'student', '55106'],
 ['628', '13', 'M', 'none', '94306'],
 ['674', '13', 'F', 'student', '55337'],
 ['880', '13', 'M', 'student', '83702'],
 ['206', '14', 'F', 'student', '53115'],
 ['813', '14', 'F', 'student', '02136']]

倒序

1
userRDD.takeOrdered(5, key = lambda x: -1*int(x[1]))
[['481', '73', 'M', 'retired', '37771'],
 ['767', '70', 'M', 'engineer', '00000'],
 ['803', '70', 'M', 'administrator', '78212'],
 ['860', '70', 'F', 'retired', '48322'],
 ['559', '69', 'M', 'executive', '10022']]
1
2
3
4
sqlContext.sql("""
SELECT userid,occupation,gender,age
FROM user_table
ORDER BY age""").show(5)
+------+----------+------+---+
|userid|occupation|gender|age|
+------+----------+------+---+
|    30|   student|     M|  7|
|   471|   student|     M| 10|
|   289|      none|     M| 11|
|   880|   student|     M| 13|
|   628|      none|     M| 13|
+------+----------+------+---+
only showing top 5 rows
1
2
3
4
sqlContext.sql("""
SELECT userid,occupation,gender,age
FROM user_table
ORDER BY age DESC""").show(5)
+------+-------------+------+---+
|userid|   occupation|gender|age|
+------+-------------+------+---+
|   481|      retired|     M| 73|
|   860|      retired|     F| 70|
|   767|     engineer|     M| 70|
|   803|administrator|     M| 70|
|   559|    executive|     M| 69|
+------+-------------+------+---+
only showing top 5 rows
1
user_df.select("userid","occupation","gender","age").orderBy("age").show(5)
+------+----------+------+---+
|userid|occupation|gender|age|
+------+----------+------+---+
|    30|   student|     M|  7|
|   471|   student|     M| 10|
|   289|      none|     M| 11|
|   880|   student|     M| 13|
|   628|      none|     M| 13|
+------+----------+------+---+
only showing top 5 rows
1
df.select("userid","occupation","gender","age").orderBy("age",ascending=0 ).show(5)
+------+-------------+------+---+
|userid|   occupation|gender|age|
+------+-------------+------+---+
|   481|      retired|     M| 73|
|   860|      retired|     F| 70|
|   767|     engineer|     M| 70|
|   803|administrator|     M| 70|
|   559|    executive|     M| 69|
+------+-------------+------+---+
only showing top 5 rows
1
df.select("userid","occupation","gender","age").orderBy(df.age).show(5)
+------+----------+------+---+
|userid|occupation|gender|age|
+------+----------+------+---+
|    30|   student|     M|  7|
|   471|   student|     M| 10|
|   289|      none|     M| 11|
|   880|   student|     M| 13|
|   628|      none|     M| 13|
+------+----------+------+---+
only showing top 5 rows
1
df.select("userid","occupation","gender","age").orderBy(df.age.desc()).show(5)
+------+-------------+------+---+
|userid|   occupation|gender|age|
+------+-------------+------+---+
|   481|      retired|     M| 73|
|   767|     engineer|     M| 70|
|   860|      retired|     F| 70|
|   803|administrator|     M| 70|
|   559|    executive|     M| 69|
+------+-------------+------+---+
only showing top 5 rows

19.7 多字段排序数据

1
userRDD.takeOrdered(5, key = lambda x: (-int(x[1]), x[2] ) )
[['481', '73', 'M', 'retired', '37771'],
 ['860', '70', 'F', 'retired', '48322'],
 ['767', '70', 'M', 'engineer', '00000'],
 ['803', '70', 'M', 'administrator', '78212'],
 ['559', '69', 'M', 'executive', '10022']]
1
2
3
4
sqlContext.sql("""
SELECT userid, age, gender,occupation,zipcode
FROM user_table
ORDER BY age DESC,gender """).show(5)
+------+---+------+-------------+-------+
|userid|age|gender|   occupation|zipcode|
+------+---+------+-------------+-------+
|   481| 73|     M|      retired|  37771|
|   860| 70|     F|      retired|  48322|
|   803| 70|     M|administrator|  78212|
|   767| 70|     M|     engineer|  00000|
|   559| 69|     M|    executive|  10022|
+------+---+------+-------------+-------+
only showing top 5 rows
1
df.orderBy(["age","gender"],ascending=[0,1] ).show(5)
+---+------+-------------+------+-------+
|age|gender|   occupation|userid|zipcode|
+---+------+-------------+------+-------+
| 73|     M|      retired|   481|  37771|
| 70|     F|      retired|   860|  48322|
| 70|     M|     engineer|   767|  00000|
| 70|     M|administrator|   803|  78212|
| 69|     M|    executive|   559|  10022|
+---+------+-------------+------+-------+
only showing top 5 rows
1
df.orderBy(df.age.desc(),df.gender ).show(5)
+---+------+-------------+------+-------+
|age|gender|   occupation|userid|zipcode|
+---+------+-------------+------+-------+
| 73|     M|      retired|   481|  37771|
| 70|     F|      retired|   860|  48322|
| 70|     M|     engineer|   767|  00000|
| 70|     M|administrator|   803|  78212|
| 69|     M|    executive|   559|  10022|
+---+------+-------------+------+-------+
only showing top 5 rows

19.8 显示不重复数据

1
userRDD.map( lambda x:x[2] ).distinct().collect()
['M', 'F']
1
userRDD.map( lambda x:(x[1],x[2]) ).distinct().take(20)
[('23', 'M'),
 ('42', 'M'),
 ('36', 'M'),
 ('39', 'F'),
 ('28', 'F'),
 ('47', 'M'),
 ('49', 'F'),
 ('30', 'M'),
 ('35', 'F'),
 ('42', 'F'),
 ('25', 'M'),
 ('30', 'F'),
 ('39', 'M'),
 ('49', 'M'),
 ('32', 'M'),
 ('41', 'M'),
 ('7', 'M'),
 ('38', 'F'),
 ('38', 'M'),
 ('27', 'F')]
1
sqlContext.sql(" SELECT distinct gender FROM user_table").show()
+------+
|gender|
+------+
|     F|
|     M|
+------+
1
sqlContext.sql(" SELECT distinct age,gender  FROM user_table").show()
+---+------+
|age|gender|
+---+------+
| 39|     F|
| 48|     M|
| 26|     M|
| 28|     M|
| 54|     M|
| 60|     M|
| 50|     M|
| 53|     F|
| 30|     M|
| 48|     F|
| 47|     M|
| 46|     M|
| 56|     M|
| 31|     M|
| 32|     M|
| 53|     M|
| 29|     F|
| 20|     F|
| 21|     F|
| 42|     M|
+---+------+
only showing top 20 rows
1
user_df.select("gender").distinct().show()
+------+
|gender|
+------+
|     F|
|     M|
+------+
1
user_df.select("age","gender").distinct().show()
+---+------+
|age|gender|
+---+------+
| 48|     M|
| 39|     F|
| 26|     M|
| 28|     M|
| 54|     M|
| 60|     M|
| 50|     M|
| 30|     M|
| 53|     F|
| 48|     F|
| 47|     M|
| 46|     M|
| 56|     M|
| 31|     M|
| 32|     M|
| 53|     M|
| 29|     F|
| 20|     F|
| 21|     F|
| 42|     M|
+---+------+
only showing top 20 rows

19.9 分组统计数据

1
2
userRDD.map(lambda x: (x[2],1)) \
.reduceByKey(lambda x,y: x+y).collect()
[('M', 670), ('F', 273)]
1
userRDD.map(lambda x: ((x[2],x[3]),1)).reduceByKey(lambda x,y: x+y).collect()
[(('M', 'technician'), 26),
 (('M', 'writer'), 26),
 (('M', 'lawyer'), 10),
 (('M', 'scientist'), 28),
 (('M', 'entertainment'), 16),
 (('M', 'librarian'), 22),
 (('F', 'librarian'), 29),
 (('F', 'marketing'), 10),
 (('M', 'marketing'), 16),
 (('M', 'healthcare'), 5),
 (('M', 'salesman'), 9),
 (('F', 'writer'), 19),
 (('F', 'lawyer'), 2),
 (('F', 'healthcare'), 11),
 (('F', 'scientist'), 3),
 (('F', 'salesman'), 3),
 (('F', 'entertainment'), 2),
 (('F', 'technician'), 1),
 (('F', 'other'), 36),
 (('M', 'executive'), 29),
 (('M', 'administrator'), 43),
 (('M', 'student'), 136),
 (('M', 'educator'), 69),
 (('F', 'educator'), 26),
 (('M', 'programmer'), 60),
 (('F', 'homemaker'), 6),
 (('F', 'artist'), 13),
 (('M', 'engineer'), 65),
 (('M', 'artist'), 15),
 (('F', 'student'), 60),
 (('F', 'administrator'), 36),
 (('M', 'none'), 5),
 (('M', 'other'), 69),
 (('F', 'executive'), 3),
 (('M', 'retired'), 13),
 (('M', 'doctor'), 7),
 (('F', 'none'), 4),
 (('F', 'programmer'), 6),
 (('F', 'engineer'), 2),
 (('F', 'retired'), 1),
 (('M', 'homemaker'), 1)]
1
2
3
4
sqlContext.sql(""" 
SELECT gender ,count(*) counts
FROM user_table
GROUP BY gender""").show()
+------+------+
|gender|counts|
+------+------+
|     F|   273|
|     M|   670|
+------+------+
1
2
3
4
5
sqlContext.sql(""" 
SELECT gender,occupation,count(*) counts
FROM user_table
GROUP BY gender,occupation
""").show(100)
+------+-------------+------+
|gender|   occupation|counts|
+------+-------------+------+
|     M|    executive|    29|
|     M|     educator|    69|
|     F|         none|     4|
|     F|entertainment|     2|
|     F|      retired|     1|
|     F|       artist|    13|
|     F|    librarian|    29|
|     F|     engineer|     2|
|     F|   healthcare|    11|
|     F|administrator|    36|
|     M|        other|    69|
|     M|    homemaker|     1|
|     F|       lawyer|     2|
|     M|   programmer|    60|
|     M|     salesman|     9|
|     M|         none|     5|
|     M|    marketing|    16|
|     M|entertainment|    16|
|     M|   technician|    26|
|     M|administrator|    43|
|     F|    marketing|    10|
|     F|   programmer|     6|
|     F|   technician|     1|
|     F|    executive|     3|
|     M|    scientist|    28|
|     F|     educator|    26|
|     M|      retired|    13|
|     M|   healthcare|     5|
|     M|       writer|    26|
|     M|       lawyer|    10|
|     M|      student|   136|
|     F|     salesman|     3|
|     M|       doctor|     7|
|     M|       artist|    15|
|     F|    homemaker|     6|
|     M|     engineer|    65|
|     F|        other|    36|
|     F|       writer|    19|
|     F|      student|    60|
|     F|    scientist|     3|
|     M|    librarian|    22|
+------+-------------+------+
1
2
3
user_df.select("gender")      \
.groupby("gender") \
.count().show()
+------+-----+
|gender|count|
+------+-----+
|     F|  273|
|     M|  670|
+------+-----+
1
2
3
4
5
user_df.select("gender","occupation").                 \
groupby("gender","occupation"). \
count(). \
orderBy("gender","occupation"). \
show(100)
+------+-------------+-----+
|gender|   occupation|count|
+------+-------------+-----+
|     F|administrator|   36|
|     F|       artist|   13|
|     F|     educator|   26|
|     F|     engineer|    2|
|     F|entertainment|    2|
|     F|    executive|    3|
|     F|   healthcare|   11|
|     F|    homemaker|    6|
|     F|       lawyer|    2|
|     F|    librarian|   29|
|     F|    marketing|   10|
|     F|         none|    4|
|     F|        other|   36|
|     F|   programmer|    6|
|     F|      retired|    1|
|     F|     salesman|    3|
|     F|    scientist|    3|
|     F|      student|   60|
|     F|   technician|    1|
|     F|       writer|   19|
|     M|administrator|   43|
|     M|       artist|   15|
|     M|       doctor|    7|
|     M|     educator|   69|
|     M|     engineer|   65|
|     M|entertainment|   16|
|     M|    executive|   29|
|     M|   healthcare|    5|
|     M|    homemaker|    1|
|     M|       lawyer|   10|
|     M|    librarian|   22|
|     M|    marketing|   16|
|     M|         none|    5|
|     M|        other|   69|
|     M|   programmer|   60|
|     M|      retired|   13|
|     M|     salesman|    9|
|     M|    scientist|   28|
|     M|      student|  136|
|     M|   technician|   26|
|     M|       writer|   26|
+------+-------------+-----+
1
user_df.stat.crosstab("occupation","gender" ).show(30)
+-----------------+---+---+
|occupation_gender|  F|  M|
+-----------------+---+---+
|        scientist|  3| 28|
|          student| 60|136|
|           writer| 19| 26|
|         salesman|  3|  9|
|          retired|  1| 13|
|    administrator| 36| 43|
|       programmer|  6| 60|
|           doctor|  0|  7|
|        homemaker|  6|  1|
|        executive|  3| 29|
|         engineer|  2| 65|
|    entertainment|  2| 16|
|        marketing| 10| 16|
|       technician|  1| 26|
|           artist| 13| 15|
|        librarian| 29| 22|
|           lawyer|  2| 10|
|         educator| 26| 69|
|       healthcare| 11|  5|
|             none|  4|  5|
|            other| 36| 69|
+-----------------+---+---+
1
user_df.describe().show()
+-------+-----------------+------+-------------+-----------------+------------------+
|summary|              age|gender|   occupation|           userid|           zipcode|
+-------+-----------------+------+-------------+-----------------+------------------+
|  count|              943|   943|          943|              943|               943|
|   mean|34.05196182396607|  null|         null|            472.0| 50868.78810810811|
| stddev|12.19273973305903|  null|         null|272.3649512449549|30891.373254138158|
|    min|                7|     F|administrator|                1|             00000|
|    max|               73|     M|       writer|              943|             Y1A6B|
+-------+-----------------+------+-------------+-----------------+------------------+

19.10 Join关联数据

ZipCode

1
#wget http://federalgovernmentzipcodes.us/free-zipcode-database-Primary.csv
1
2
3
#Path="file:/home/hduser/pythonwork/ipynotebook/"
rawDataWithHeader = sc.textFile(Path+"data/free-zipcode-database-Primary.csv")
rawDataWithHeader .take(2)
['"Zipcode","ZipCodeType","City","State","LocationType","Lat","Long","Location","Decommisioned","TaxReturnsFiled","EstimatedPopulation","TotalWages"',
 '"00705","STANDARD","AIBONITO","PR","PRIMARY",18.14,-66.26,"NA-US-PR-AIBONITO","false",,,']
1
2
3
header = rawDataWithHeader.first()
rawData = rawDataWithHeader.filter(lambda x:x !=header)
rawData.first()
'"00705","STANDARD","AIBONITO","PR","PRIMARY",18.14,-66.26,"NA-US-PR-AIBONITO","false",,,'
1
2
rData=rawData.map(lambda x: x.replace("\"", ""))   
rData.first()
'00705,STANDARD,AIBONITO,PR,PRIMARY,18.14,-66.26,NA-US-PR-AIBONITO,false,,,'
1
2
ZipRDD = rData.map(lambda x: x.split(","))
ZipRDD.first()
['00705',
 'STANDARD',
 'AIBONITO',
 'PR',
 'PRIMARY',
 '18.14',
 '-66.26',
 'NA-US-PR-AIBONITO',
 'false',
 '',
 '',
 '']

19.19.2 建立zipcode_table

1
2
3
4
5
6
7
8
9
10
from pyspark.sql import Row
zipcode_data =ZipRDD .map(lambda p:
Row(
zipcode=int(p[0]),
zipCodeType=p[1],
city=p[2],
state=p[3]
)
)
zipcode_data.take(5)
[Row(city='AIBONITO', state='PR', zipCodeType='STANDARD', zipcode=705),
 Row(city='ANASCO', state='PR', zipCodeType='STANDARD', zipcode=610),
 Row(city='ANGELES', state='PR', zipCodeType='PO BOX', zipcode=611),
 Row(city='ARECIBO', state='PR', zipCodeType='STANDARD', zipcode=612),
 Row(city='ADJUNTAS', state='PR', zipCodeType='STANDARD', zipcode=601)]
1
2
zipcode_df  = sqlContext.createDataFrame(zipcode_data )
zipcode_df.printSchema()
root
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zipCodeType: string (nullable = true)
 |-- zipcode: long (nullable = true)
1
2
zipcode_df.registerTempTable("zipcode_table")
zipcode_df.show(10)
+---------+-----+-----------+-------+
|     city|state|zipCodeType|zipcode|
+---------+-----+-----------+-------+
| AIBONITO|   PR|   STANDARD|    705|
|   ANASCO|   PR|   STANDARD|    610|
|  ANGELES|   PR|     PO BOX|    611|
|  ARECIBO|   PR|   STANDARD|    612|
| ADJUNTAS|   PR|   STANDARD|    601|
| CASTANER|   PR|     PO BOX|    631|
|   AGUADA|   PR|   STANDARD|    602|
|AGUADILLA|   PR|   STANDARD|    603|
|AGUADILLA|   PR|     PO BOX|    604|
|AGUADILLA|   PR|     PO BOX|    605|
+---------+-----+-----------+-------+
only showing top 10 rows
1
2
3
4
sqlContext.sql(""" 
SELECT z.*
FROM zipcode_table z
""").show(10)
+---------+-----+-----------+-------+
|     city|state|zipCodeType|zipcode|
+---------+-----+-----------+-------+
| AIBONITO|   PR|   STANDARD|    705|
|   ANASCO|   PR|   STANDARD|    610|
|  ANGELES|   PR|     PO BOX|    611|
|  ARECIBO|   PR|   STANDARD|    612|
| ADJUNTAS|   PR|   STANDARD|    601|
| CASTANER|   PR|     PO BOX|    631|
|   AGUADA|   PR|   STANDARD|    602|
|AGUADILLA|   PR|   STANDARD|    603|
|AGUADILLA|   PR|     PO BOX|    604|
|AGUADILLA|   PR|     PO BOX|    605|
+---------+-----+-----------+-------+
only showing top 10 rows
1
2
3
4
5
6
sqlContext.sql(""" 
SELECT u.* ,z.city,z.state
FROM user_table u
LEFT JOIN zipcode_table z ON u.zipcode = z.zipcode
WHERE z.state='NY'
""").show(10)
+---+------+-------------+------+-------+----------------+-----+
|age|gender|   occupation|userid|zipcode|            city|state|
+---+------+-------------+------+-------+----------------+-----+
| 29|     M|        other|   478|  10019|        NEW YORK|   NY|
| 22|     F|   healthcare|   405|  10019|        NEW YORK|   NY|
| 22|     M|      student|   327|  11101|LONG ISLAND CITY|   NY|
| 48|     M|     educator|   656|  10314|   STATEN ISLAND|   NY|
| 27|     F|       writer|   617|  11201|        BROOKLYN|   NY|
| 35|     F|        other|   760|  14211|         BUFFALO|   NY|
| 30|     F|       writer|   557|  11217|        BROOKLYN|   NY|
| 27|     M|    marketing|   806|  11217|        BROOKLYN|   NY|
| 32|     F|        other|   155|  11217|        BROOKLYN|   NY|
| 23|     M|administrator|   509|  10011|        NEW YORK|   NY|
+---+------+-------------+------+-------+----------------+-----+
only showing top 10 rows
1
2
3
4
5
6
sqlContext.sql(""" 
SELECT z.state ,count(*)
FROM user_table u
LEFT JOIN zipcode_table z ON u.zipcode = z.zipcode
GROUP BY z.state
""").show(60)
+-----+--------+
|state|count(1)|
+-----+--------+
|   SC|      11|
|   AZ|      14|
|   LA|       6|
|   MN|      78|
|   NJ|      18|
|   DC|      14|
|   OR|      20|
|   VA|      27|
| null|      35|
|   RI|       3|
|   KY|      11|
|   WY|       1|
|   NH|       6|
|   MI|      23|
|   NV|       3|
|   WI|      22|
|   ID|       7|
|   CA|     116|
|   CT|      17|
|   NE|       6|
|   MT|       2|
|   NC|      19|
|   VT|       5|
|   MD|      27|
|   DE|       3|
|   MO|      17|
|   IL|      50|
|   ME|       2|
|   ND|       2|
|   WA|      24|
|   MS|       3|
|   AL|       3|
|   IN|       9|
|   AE|       1|
|   OH|      32|
|   TN|      12|
|   IA|      14|
|   NM|       2|
|   PA|      34|
|   SD|       1|
|   NY|      60|
|   TX|      51|
|   WV|       3|
|   GA|      19|
|   MA|      35|
|   KS|       4|
|   CO|      20|
|   FL|      24|
|   AK|       5|
|   AR|       1|
|   OK|       9|
|   AP|       1|
|   UT|       9|
|   HI|       2|
+-----+--------+
1
#user_df.leftOuterJoin(zipcode_df )
1
2
3
4
joined_df=user_df.join(zipcode_df ,  \
user_df.zipcode == zipcode_df.zipcode, "left_outer")

joined_df.printSchema()
root
 |-- age: long (nullable = true)
 |-- gender: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- userid: long (nullable = true)
 |-- zipcode: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zipCodeType: string (nullable = true)
 |-- zipcode: long (nullable = true)
1
joined_df.show(10)
+---+------+-------------+------+-------+-------------+-----+-----------+-------+
|age|gender|   occupation|userid|zipcode|         city|state|zipCodeType|zipcode|
+---+------+-------------+------+-------+-------------+-----+-----------+-------+
| 59|     F|administrator|   131|  15237|   PITTSBURGH|   PA|   STANDARD|  15237|
| 17|     M|      student|   619|  44134|    CLEVELAND|   OH|   STANDARD|  44134|
| 38|     F|entertainment|   839|  90814|   LONG BEACH|   CA|   STANDARD|  90814|
| 48|     M|administrator|   409|  98225|   BELLINGHAM|   WA|   STANDARD|  98225|
| 31|     M|     educator|   791|  20064|   WASHINGTON|   DC|     UNIQUE|  20064|
| 51|     M|     engineer|   271|  22932|       CROZET|   VA|   STANDARD|  22932|
| 17|     M|entertainment|   375|  37777|   LOUISVILLE|   TN|   STANDARD|  37777|
| 27|     M|      student|   758|  53706|      MADISON|   WI|   STANDARD|  53706|
| 33|     M|    scientist|   272|  53706|      MADISON|   WI|   STANDARD|  53706|
| 30|     F|    librarian|   344|  94117|SAN FRANCISCO|   CA|   STANDARD|  94117|
+---+------+-------------+------+-------+-------------+-----+-----------+-------+
only showing top 10 rows
1
joined_df.filter("state='NY' ").show(10)
+---+------+-------------+------+-------+----------------+-----+-----------+-------+
|age|gender|   occupation|userid|zipcode|            city|state|zipCodeType|zipcode|
+---+------+-------------+------+-------+----------------+-----+-----------+-------+
| 29|     M|        other|   478|  10019|        NEW YORK|   NY|   STANDARD|  10019|
| 22|     F|   healthcare|   405|  10019|        NEW YORK|   NY|   STANDARD|  10019|
| 22|     M|      student|   327|  11101|LONG ISLAND CITY|   NY|   STANDARD|  11101|
| 48|     M|     educator|   656|  10314|   STATEN ISLAND|   NY|   STANDARD|  10314|
| 27|     F|       writer|   617|  11201|        BROOKLYN|   NY|   STANDARD|  11201|
| 35|     F|        other|   760|  14211|         BUFFALO|   NY|   STANDARD|  14211|
| 30|     F|       writer|   557|  11217|        BROOKLYN|   NY|   STANDARD|  11217|
| 27|     M|    marketing|   806|  11217|        BROOKLYN|   NY|   STANDARD|  11217|
| 32|     F|        other|   155|  11217|        BROOKLYN|   NY|   STANDARD|  11217|
| 23|     M|administrator|   509|  10011|        NEW YORK|   NY|   STANDARD|  10011|
+---+------+-------------+------+-------+----------------+-----+-----------+-------+
only showing top 10 rows
1
2
GroupByState_df=joined_df.groupBy("state").count()
GroupByState_df.show(60)
+-----+-----+
|state|count|
+-----+-----+
|   SC|   11|
|   AZ|   14|
|   LA|    6|
|   MN|   78|
|   NJ|   18|
|   DC|   14|
|   OR|   20|
|   VA|   27|
| null|   35|
|   RI|    3|
|   KY|   11|
|   WY|    1|
|   NH|    6|
|   MI|   23|
|   NV|    3|
|   WI|   22|
|   ID|    7|
|   CA|  116|
|   CT|   17|
|   NE|    6|
|   MT|    2|
|   NC|   19|
|   VT|    5|
|   MD|   27|
|   DE|    3|
|   MO|   17|
|   IL|   50|
|   ME|    2|
|   WA|   24|
|   ND|    2|
|   MS|    3|
|   AL|    3|
|   IN|    9|
|   AE|    1|
|   OH|   32|
|   TN|   12|
|   IA|   14|
|   NM|    2|
|   PA|   34|
|   SD|    1|
|   NY|   60|
|   TX|   51|
|   WV|    3|
|   GA|   19|
|   MA|   35|
|   KS|    4|
|   FL|   24|
|   CO|   20|
|   AK|    5|
|   AR|    1|
|   OK|    9|
|   AP|    1|
|   UT|    9|
|   HI|    2|
+-----+-----+

19.11 以Pandas DataFrame绘图

1
2
3
import pandas as pd
GroupByState_pandas_df =GroupByState_df.toPandas().set_index('state')
GroupByState_pandas_df







































































































































































































































count
state
SC 11
AZ 14
LA 6
MN 78
NJ 18
DC 14
OR 20
VA 27
NaN 35
RI 3
KY 11
WY 1
NH 6
MI 23
NV 3
WI 22
ID 7
CA 116
CT 17
NE 6
MT 2
NC 19
VT 5
MD 27
DE 3
MO 17
IL 50
ME 2
WA 24
ND 2
MS 3
AL 3
IN 9
AE 1
OH 32
TN 12
IA 14
NM 2
PA 34
SD 1
NY 60
TX 51
WV 3
GA 19
MA 35
KS 4
CO 20
FL 24
AK 5
AR 1
OK 9
AP 1
UT 9
HI 2

1
GroupByState_pandas_df.T























































state SC AZ LA MN NJ DC OR VA None RI MA KS CO FL AK AR OK AP UT HI
count 11 14 6 78 18 14 20 27 35 3 35 4 20 24 5 1 9 1 9 2

1 rows × 54 columns


1
2
3
4
5
import matplotlib.pyplot as plt
%matplotlib inline
ax = GroupByState_pandas_df ['count'] \
.plot(kind='bar', title ="State ",figsize=(12,6),legend=True, fontsize=12)
plt.show()

png

1
2
3
4
5
6
Occupation_df=sqlContext.sql(""" 
SELECT u.occupation ,count(*) counts
FROM user_table u
GROUP BY occupation
""")
Occupation_df.show(30)
+-------------+------+
|   occupation|counts|
+-------------+------+
|    librarian|    51|
|      retired|    14|
|       lawyer|    12|
|         none|     9|
|       writer|    45|
|   programmer|    66|
|    marketing|    26|
|        other|   105|
|    executive|    32|
|    scientist|    31|
|      student|   196|
|     salesman|    12|
|       artist|    28|
|   technician|    27|
|administrator|    79|
|     engineer|    67|
|   healthcare|    16|
|     educator|    95|
|entertainment|    18|
|    homemaker|     7|
|       doctor|     7|
+-------------+------+
1
2
Occupation_pandas_df =Occupation_df.toPandas().set_index('occupation')
Occupation_pandas_df



































































































counts
occupation
librarian 51
retired 14
lawyer 12
none 9
writer 45
programmer 66
marketing 26
other 105
executive 32
scientist 31
student 196
salesman 12
artist 28
technician 27
administrator 79
engineer 67
healthcare 16
educator 95
entertainment 18
homemaker 7
doctor 7

1
2
3
4
ax =Occupation_pandas_df['counts'].plot(kind='pie',
title ="occupation",figsize=(8,8),startangle=90,autopct='%1.1f%%')
ax.legend(bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0.)
plt.show()

png