#注意事项:
#当运行本Notebook的程序后,如果要关闭Notebook,请选择菜单: File > Close and Halt 才能确实停止当前正在运行的程序,并且释放资源
#如果没有使用以上方法,只关闭此分页,程序仍在运行,未释放资源,当您打开并运行其他的Notebook,可能会发生错误
'spark://192.168.10.170:7077'
1 2 3 4 5 6 global Path if sc.master[0 :5 ]=="local" : Path="file:/home/hduser/pythonsparkexample/PythonProject/" else : Path="hdfs://master:9000/user/hduser/"
19.3 建立RDD、DataFrame与Spark SQL tempTable
1 RawUserRDD= sc.textFile(Path+"data/u.user" )
1 2 3 data = [1 , 2 , 3 , 4 , 5 ] distData = sc.parallelize(data) distData.collect()
[1, 2, 3, 4, 5]
943
['1|24|M|technician|85711',
'2|53|F|other|94043',
'3|23|M|writer|32067',
'4|24|M|technician|43537',
'5|33|F|other|15213']
1 2 userRDD =RawUserRDD.map(lambda line: line.split("|" )) userRDD .take(5 )
[['1', '24', 'M', 'technician', '85711'],
['2', '53', 'F', 'other', '94043'],
['3', '23', 'M', 'writer', '32067'],
['4', '24', 'M', 'technician', '43537'],
['5', '33', 'F', 'other', '15213']]
建立DataFrame 1 sqlContext = SparkSession.builder.getOrCreate()
1 2 3 4 5 6 7 8 9 10 11 from pyspark.sql import Rowuser_Rows = userRDD.map(lambda p: Row( userid=int(p[0 ]), age=int(p[1 ]), gender=p[2 ], occupation=p[3 ], zipcode=p[4 ] ) ) user_Rows.take(5 )
[Row(age=24, gender='M', occupation='technician', userid=1, zipcode='85711'),
Row(age=53, gender='F', occupation='other', userid=2, zipcode='94043'),
Row(age=23, gender='M', occupation='writer', userid=3, zipcode='32067'),
Row(age=24, gender='M', occupation='technician', userid=4, zipcode='43537'),
Row(age=33, gender='F', occupation='other', userid=5, zipcode='15213')]
1 2 3 user_df = sqlContext.createDataFrame(user_Rows) user_df .printSchema() user_df.show()
root
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- occupation: string (nullable = true)
|-- userid: long (nullable = true)
|-- zipcode: string (nullable = true)
+---+------+-------------+------+-------+
|age|gender| occupation|userid|zipcode|
+---+------+-------------+------+-------+
| 24| M| technician| 1| 85711|
| 53| F| other| 2| 94043|
| 23| M| writer| 3| 32067|
| 24| M| technician| 4| 43537|
| 33| F| other| 5| 15213|
| 42| M| executive| 6| 98101|
| 57| M|administrator| 7| 91344|
| 36| M|administrator| 8| 05201|
| 29| M| student| 9| 01002|
| 53| M| lawyer| 10| 90703|
| 39| F| other| 11| 30329|
| 28| F| other| 12| 06405|
| 47| M| educator| 13| 29206|
| 45| M| scientist| 14| 55106|
| 49| F| educator| 15| 97301|
| 21| M|entertainment| 16| 10309|
| 30| M| programmer| 17| 06355|
| 35| F| other| 18| 37212|
| 40| M| librarian| 19| 02138|
| 42| F| homemaker| 20| 95660|
+---+------+-------------+------+-------+
only showing top 20 rows
+---+------+----------+------+-------+
|age|gender|occupation|userid|zipcode|
+---+------+----------+------+-------+
| 24| M|technician| 1| 85711|
| 53| F| other| 2| 94043|
| 23| M| writer| 3| 32067|
| 24| M|technician| 4| 43537|
| 33| F| other| 5| 15213|
+---+------+----------+------+-------+
only showing top 5 rows
1 2 df=user_df.alias("df" ) df.show(5 )
+---+------+----------+------+-------+
|age|gender|occupation|userid|zipcode|
+---+------+----------+------+-------+
| 24| M|technician| 1| 85711|
| 53| F| other| 2| 94043|
| 23| M| writer| 3| 32067|
| 24| M|technician| 4| 43537|
| 33| F| other| 5| 15213|
+---+------+----------+------+-------+
only showing top 5 rows
建立Spark SQL tempTable 1 user_df.registerTempTable("user_table" )
1 sqlContext.sql(" SELECT * FROM user_table" ).show()
+---+------+-------------+------+-------+
|age|gender| occupation|userid|zipcode|
+---+------+-------------+------+-------+
| 24| M| technician| 1| 85711|
| 53| F| other| 2| 94043|
| 23| M| writer| 3| 32067|
| 24| M| technician| 4| 43537|
| 33| F| other| 5| 15213|
| 42| M| executive| 6| 98101|
| 57| M|administrator| 7| 91344|
| 36| M|administrator| 8| 05201|
| 29| M| student| 9| 01002|
| 53| M| lawyer| 10| 90703|
| 39| F| other| 11| 30329|
| 28| F| other| 12| 06405|
| 47| M| educator| 13| 29206|
| 45| M| scientist| 14| 55106|
| 49| F| educator| 15| 97301|
| 21| M|entertainment| 16| 10309|
| 30| M| programmer| 17| 06355|
| 35| F| other| 18| 37212|
| 40| M| librarian| 19| 02138|
| 42| F| homemaker| 20| 95660|
+---+------+-------------+------+-------+
only showing top 20 rows
1 sqlContext.sql(" SELECT count(*) counts FROM user_table" ).show()
+------+
|counts|
+------+
| 943|
+------+
1 2 3 4 sqlContext.sql(""" SELECT count(*) counts FROM user_table """ ).show()
+------+
|counts|
+------+
| 943|
+------+
1 sqlContext.sql(" SELECT * FROM user_table " ).show()
+---+------+-------------+------+-------+
|age|gender| occupation|userid|zipcode|
+---+------+-------------+------+-------+
| 24| M| technician| 1| 85711|
| 53| F| other| 2| 94043|
| 23| M| writer| 3| 32067|
| 24| M| technician| 4| 43537|
| 33| F| other| 5| 15213|
| 42| M| executive| 6| 98101|
| 57| M|administrator| 7| 91344|
| 36| M|administrator| 8| 05201|
| 29| M| student| 9| 01002|
| 53| M| lawyer| 10| 90703|
| 39| F| other| 11| 30329|
| 28| F| other| 12| 06405|
| 47| M| educator| 13| 29206|
| 45| M| scientist| 14| 55106|
| 49| F| educator| 15| 97301|
| 21| M|entertainment| 16| 10309|
| 30| M| programmer| 17| 06355|
| 35| F| other| 18| 37212|
| 40| M| librarian| 19| 02138|
| 42| F| homemaker| 20| 95660|
+---+------+-------------+------+-------+
only showing top 20 rows
1 sqlContext.sql(" SELECT * FROM user_table" ).show(5 )
+---+------+----------+------+-------+
|age|gender|occupation|userid|zipcode|
+---+------+----------+------+-------+
| 24| M|technician| 1| 85711|
| 53| F| other| 2| 94043|
| 23| M| writer| 3| 32067|
| 24| M|technician| 4| 43537|
| 33| F| other| 5| 15213|
+---+------+----------+------+-------+
only showing top 5 rows
1 sqlContext.sql(" SELECT * FROM user_table LIMIT 5" ).show()
+---+------+----------+------+-------+
|age|gender|occupation|userid|zipcode|
+---+------+----------+------+-------+
| 24| M|technician| 1| 85711|
| 53| F| other| 2| 94043|
| 23| M| writer| 3| 32067|
| 24| M|technician| 4| 43537|
| 33| F| other| 5| 15213|
+---+------+----------+------+-------+
19.3 显示部分字段 1 2 userRDDnew= userRDD.map(lambda x: (x[0 ],x[3 ],x[2 ] ,x[1 ]) ) userRDDnew.take(5 )
[('1', 'technician', 'M', '24'),
('2', 'other', 'F', '53'),
('3', 'writer', 'M', '23'),
('4', 'technician', 'M', '24'),
('5', 'other', 'F', '33')]
1 user_df.select("userid" ,"occupation" ,"gender" ,"age" ).show(5 )
+------+----------+------+---+
|userid|occupation|gender|age|
+------+----------+------+---+
| 1|technician| M| 24|
| 2| other| F| 53|
| 3| writer| M| 23|
| 4|technician| M| 24|
| 5| other| F| 33|
+------+----------+------+---+
only showing top 5 rows
1 user_df.select( user_df.userid, user_df.occupation,user_df.gender,user_df.age ).show(5 )
+------+----------+------+---+
|userid|occupation|gender|age|
+------+----------+------+---+
| 1|technician| M| 24|
| 2| other| F| 53|
| 3| writer| M| 23|
| 4|technician| M| 24|
| 5| other| F| 33|
+------+----------+------+---+
only showing top 5 rows
1 df.select(df.userid,df.occupation,df.gender,df.age ).show(5 )
+------+----------+------+---+
|userid|occupation|gender|age|
+------+----------+------+---+
| 1|technician| M| 24|
| 2| other| F| 53|
| 3| writer| M| 23|
| 4|technician| M| 24|
| 5| other| F| 33|
+------+----------+------+---+
only showing top 5 rows
1 user_df.select(user_df.userid, user_df.occupation,df.gender,df.age ).show(5 )
+------+----------+------+---+
|userid|occupation|gender|age|
+------+----------+------+---+
| 1|technician| M| 24|
| 2| other| F| 53|
| 3| writer| M| 23|
| 4|technician| M| 24|
| 5| other| F| 33|
+------+----------+------+---+
only showing top 5 rows
1 df[df['userid' ],df['occupation' ],df['gender' ],df['age' ] ].show(5 )
+------+----------+------+---+
|userid|occupation|gender|age|
+------+----------+------+---+
| 1|technician| M| 24|
| 2| other| F| 53|
| 3| writer| M| 23|
| 4|technician| M| 24|
| 5| other| F| 33|
+------+----------+------+---+
only showing top 5 rows
1 sqlContext.sql(" SELECT userid,occupation,gender,age FROM user_table" ).show(5 )
+------+----------+------+---+
|userid|occupation|gender|age|
+------+----------+------+---+
| 1|technician| M| 24|
| 2| other| F| 53|
| 3| writer| M| 23|
| 4|technician| M| 24|
| 5| other| F| 33|
+------+----------+------+---+
only showing top 5 rows
19.4 增加计算字段 1 2 userRDDnew= userRDD.map(lambda x: (x[0 ],x[3 ],x[2 ],x[1 ] ,2016 -int(x[1 ])) ) userRDDnew.take(5 )
[('1', 'technician', 'M', '24', 1992),
('2', 'other', 'F', '53', 1963),
('3', 'writer', 'M', '23', 1993),
('4', 'technician', 'M', '24', 1992),
('5', 'other', 'F', '33', 1983)]
1 df.select("userid" ,"occupation" ,"gender" ,"age" ,2016 -df.age).show(5 )
+------+----------+------+---+------------+
|userid|occupation|gender|age|(2016 - age)|
+------+----------+------+---+------------+
| 1|technician| M| 24| 1992|
| 2| other| F| 53| 1963|
| 3| writer| M| 23| 1993|
| 4|technician| M| 24| 1992|
| 5| other| F| 33| 1983|
+------+----------+------+---+------------+
only showing top 5 rows
1 df.select("userid" ,"occupation" ,"gender" ,"age" ,(2016 -df.age).alias("birthyear" )).show(5 )
+------+----------+------+---+---------+
|userid|occupation|gender|age|birthyear|
+------+----------+------+---+---------+
| 1|technician| M| 24| 1992|
| 2| other| F| 53| 1963|
| 3| writer| M| 23| 1993|
| 4|technician| M| 24| 1992|
| 5| other| F| 33| 1983|
+------+----------+------+---+---------+
only showing top 5 rows
1 2 3 sqlContext.sql(""" SELECT userid,occupation,gender,age, 2016-age birthyear FROM user_table""" ).show(5 )
+------+----------+------+---+---------+
|userid|occupation|gender|age|birthyear|
+------+----------+------+---+---------+
| 1|technician| M| 24| 1992|
| 2| other| F| 53| 1963|
| 3| writer| M| 23| 1993|
| 4|technician| M| 24| 1992|
| 5| other| F| 33| 1983|
+------+----------+------+---+---------+
only showing top 5 rows
19.5筛选数据 1 userRDD.filter(lambda r: r[3 ]=='technician' and r[2 ]=='M' and r[1 ]=='24' ).take(6 )
[['1', '24', 'M', 'technician', '85711'],
['4', '24', 'M', 'technician', '43537'],
['456', '24', 'M', 'technician', '31820'],
['717', '24', 'M', 'technician', '84105'],
['832', '24', 'M', 'technician', '77042'],
['889', '24', 'M', 'technician', '78704']]
1 user_df.filter("occupation='technician' " ).filter("gender='M' " ).filter("age=24" ).show()
+---+------+----------+------+-------+
|age|gender|occupation|userid|zipcode|
+---+------+----------+------+-------+
| 24| M|technician| 1| 85711|
| 24| M|technician| 4| 43537|
| 24| M|technician| 456| 31820|
| 24| M|technician| 717| 84105|
| 24| M|technician| 832| 77042|
| 24| M|technician| 889| 78704|
+---+------+----------+------+-------+
1 user_df.filter("occupation='technician' and gender='M' and age=24" ).show()
+---+------+----------+------+-------+
|age|gender|occupation|userid|zipcode|
+---+------+----------+------+-------+
| 24| M|technician| 1| 85711|
| 24| M|technician| 4| 43537|
| 24| M|technician| 456| 31820|
| 24| M|technician| 717| 84105|
| 24| M|technician| 832| 77042|
| 24| M|technician| 889| 78704|
+---+------+----------+------+-------+
1 df.filter((df.occupation=='technician' ) & (df.gender=='M' ) & (df.age==24 )).show()
+---+------+----------+------+-------+
|age|gender|occupation|userid|zipcode|
+---+------+----------+------+-------+
| 24| M|technician| 1| 85711|
| 24| M|technician| 4| 43537|
| 24| M|technician| 456| 31820|
| 24| M|technician| 717| 84105|
| 24| M|technician| 832| 77042|
| 24| M|technician| 889| 78704|
+---+------+----------+------+-------+
1 df.filter((df['occupation' ]=='technician' ) & (df['gender' ]=='M' ) & (df['age' ]==24 )).show()
+---+------+----------+------+-------+
|age|gender|occupation|userid|zipcode|
+---+------+----------+------+-------+
| 24| M|technician| 1| 85711|
| 24| M|technician| 4| 43537|
| 24| M|technician| 456| 31820|
| 24| M|technician| 717| 84105|
| 24| M|technician| 832| 77042|
| 24| M|technician| 889| 78704|
+---+------+----------+------+-------+
1 2 3 4 sqlContext.sql( '''SELECT * FROM user_table where occupation='technician' and gender='M' and age=24''' ).show(5 )
+---+------+----------+------+-------+
|age|gender|occupation|userid|zipcode|
+---+------+----------+------+-------+
| 24| M|technician| 1| 85711|
| 24| M|technician| 4| 43537|
| 24| M|technician| 456| 31820|
| 24| M|technician| 717| 84105|
| 24| M|technician| 832| 77042|
+---+------+----------+------+-------+
only showing top 5 rows
19.6 筛选数据 1 2 user_df.filter("occupation='technician' " ).filter("gender='M' " ).filter("age=24" ).show(5 )
+---+------+----------+------+-------+
|age|gender|occupation|userid|zipcode|
+---+------+----------+------+-------+
| 24| M|technician| 1| 85711|
| 24| M|technician| 4| 43537|
| 24| M|technician| 456| 31820|
| 24| M|technician| 717| 84105|
| 24| M|technician| 832| 77042|
+---+------+----------+------+-------+
only showing top 5 rows
1 user_df.filter("occupation='technician' and gender='M' and age=24" ).show()
+---+------+----------+------+-------+
|age|gender|occupation|userid|zipcode|
+---+------+----------+------+-------+
| 24| M|technician| 1| 85711|
| 24| M|technician| 4| 43537|
| 24| M|technician| 456| 31820|
| 24| M|technician| 717| 84105|
| 24| M|technician| 832| 77042|
| 24| M|technician| 889| 78704|
+---+------+----------+------+-------+
1 user_df.filter(user_df.occupation=='technician' ).filter(user_df.gender=='M' ).filter(user_df.age==24 ).show()
+---+------+----------+------+-------+
|age|gender|occupation|userid|zipcode|
+---+------+----------+------+-------+
| 24| M|technician| 1| 85711|
| 24| M|technician| 4| 43537|
| 24| M|technician| 456| 31820|
| 24| M|technician| 717| 84105|
| 24| M|technician| 832| 77042|
| 24| M|technician| 889| 78704|
+---+------+----------+------+-------+
1 user_df.filter((df.occupation=='technician' ) & (df.gender=='M' ) & (df.age==24 )).show()
+---+------+----------+------+-------+
|age|gender|occupation|userid|zipcode|
+---+------+----------+------+-------+
| 24| M|technician| 1| 85711|
| 24| M|technician| 4| 43537|
| 24| M|technician| 456| 31820|
| 24| M|technician| 717| 84105|
| 24| M|technician| 832| 77042|
| 24| M|technician| 889| 78704|
+---+------+----------+------+-------+
1 df.filter((df['occupation' ]=='technician' ) & (df['gender' ]=='M' ) & (df['age' ]==24 )).show()
+---+------+----------+------+-------+
|age|gender|occupation|userid|zipcode|
+---+------+----------+------+-------+
| 24| M|technician| 1| 85711|
| 24| M|technician| 4| 43537|
| 24| M|technician| 456| 31820|
| 24| M|technician| 717| 84105|
| 24| M|technician| 832| 77042|
| 24| M|technician| 889| 78704|
+---+------+----------+------+-------+
19.6 单个字段排序数据 1 userRDD.takeOrdered(10 , key = lambda x: int(x[1 ]),)
[['30', '7', 'M', 'student', '55436'],
['471', '10', 'M', 'student', '77459'],
['289', '11', 'M', 'none', '94619'],
['142', '13', 'M', 'other', '48118'],
['609', '13', 'F', 'student', '55106'],
['628', '13', 'M', 'none', '94306'],
['674', '13', 'F', 'student', '55337'],
['880', '13', 'M', 'student', '83702'],
['206', '14', 'F', 'student', '53115'],
['813', '14', 'F', 'student', '02136']]
倒序
1 userRDD.takeOrdered(5 , key = lambda x: -1 *int(x[1 ]))
[['481', '73', 'M', 'retired', '37771'],
['767', '70', 'M', 'engineer', '00000'],
['803', '70', 'M', 'administrator', '78212'],
['860', '70', 'F', 'retired', '48322'],
['559', '69', 'M', 'executive', '10022']]
1 2 3 4 sqlContext.sql(""" SELECT userid,occupation,gender,age FROM user_table ORDER BY age""" ).show(5 )
+------+----------+------+---+
|userid|occupation|gender|age|
+------+----------+------+---+
| 30| student| M| 7|
| 471| student| M| 10|
| 289| none| M| 11|
| 880| student| M| 13|
| 628| none| M| 13|
+------+----------+------+---+
only showing top 5 rows
1 2 3 4 sqlContext.sql(""" SELECT userid,occupation,gender,age FROM user_table ORDER BY age DESC""" ).show(5 )
+------+-------------+------+---+
|userid| occupation|gender|age|
+------+-------------+------+---+
| 481| retired| M| 73|
| 860| retired| F| 70|
| 767| engineer| M| 70|
| 803|administrator| M| 70|
| 559| executive| M| 69|
+------+-------------+------+---+
only showing top 5 rows
1 user_df.select("userid" ,"occupation" ,"gender" ,"age" ).orderBy("age" ).show(5 )
+------+----------+------+---+
|userid|occupation|gender|age|
+------+----------+------+---+
| 30| student| M| 7|
| 471| student| M| 10|
| 289| none| M| 11|
| 880| student| M| 13|
| 628| none| M| 13|
+------+----------+------+---+
only showing top 5 rows
1 df.select("userid" ,"occupation" ,"gender" ,"age" ).orderBy("age" ,ascending=0 ).show(5 )
+------+-------------+------+---+
|userid| occupation|gender|age|
+------+-------------+------+---+
| 481| retired| M| 73|
| 860| retired| F| 70|
| 767| engineer| M| 70|
| 803|administrator| M| 70|
| 559| executive| M| 69|
+------+-------------+------+---+
only showing top 5 rows
1 df.select("userid" ,"occupation" ,"gender" ,"age" ).orderBy(df.age).show(5 )
+------+----------+------+---+
|userid|occupation|gender|age|
+------+----------+------+---+
| 30| student| M| 7|
| 471| student| M| 10|
| 289| none| M| 11|
| 880| student| M| 13|
| 628| none| M| 13|
+------+----------+------+---+
only showing top 5 rows
1 df.select("userid" ,"occupation" ,"gender" ,"age" ).orderBy(df.age.desc()).show(5 )
+------+-------------+------+---+
|userid| occupation|gender|age|
+------+-------------+------+---+
| 481| retired| M| 73|
| 767| engineer| M| 70|
| 860| retired| F| 70|
| 803|administrator| M| 70|
| 559| executive| M| 69|
+------+-------------+------+---+
only showing top 5 rows
19.7 多字段排序数据 1 userRDD.takeOrdered(5 , key = lambda x: (-int(x[1 ]), x[2 ] ) )
[['481', '73', 'M', 'retired', '37771'],
['860', '70', 'F', 'retired', '48322'],
['767', '70', 'M', 'engineer', '00000'],
['803', '70', 'M', 'administrator', '78212'],
['559', '69', 'M', 'executive', '10022']]
1 2 3 4 sqlContext.sql(""" SELECT userid, age, gender,occupation,zipcode FROM user_table ORDER BY age DESC,gender """ ).show(5 )
+------+---+------+-------------+-------+
|userid|age|gender| occupation|zipcode|
+------+---+------+-------------+-------+
| 481| 73| M| retired| 37771|
| 860| 70| F| retired| 48322|
| 803| 70| M|administrator| 78212|
| 767| 70| M| engineer| 00000|
| 559| 69| M| executive| 10022|
+------+---+------+-------------+-------+
only showing top 5 rows
1 df.orderBy(["age" ,"gender" ],ascending=[0 ,1 ] ).show(5 )
+---+------+-------------+------+-------+
|age|gender| occupation|userid|zipcode|
+---+------+-------------+------+-------+
| 73| M| retired| 481| 37771|
| 70| F| retired| 860| 48322|
| 70| M| engineer| 767| 00000|
| 70| M|administrator| 803| 78212|
| 69| M| executive| 559| 10022|
+---+------+-------------+------+-------+
only showing top 5 rows
1 df.orderBy(df.age.desc(),df.gender ).show(5 )
+---+------+-------------+------+-------+
|age|gender| occupation|userid|zipcode|
+---+------+-------------+------+-------+
| 73| M| retired| 481| 37771|
| 70| F| retired| 860| 48322|
| 70| M| engineer| 767| 00000|
| 70| M|administrator| 803| 78212|
| 69| M| executive| 559| 10022|
+---+------+-------------+------+-------+
only showing top 5 rows
19.8 显示不重复数据 1 userRDD.map( lambda x:x[2 ] ).distinct().collect()
['M', 'F']
1 userRDD.map( lambda x:(x[1 ],x[2 ]) ).distinct().take(20 )
[('23', 'M'),
('42', 'M'),
('36', 'M'),
('39', 'F'),
('28', 'F'),
('47', 'M'),
('49', 'F'),
('30', 'M'),
('35', 'F'),
('42', 'F'),
('25', 'M'),
('30', 'F'),
('39', 'M'),
('49', 'M'),
('32', 'M'),
('41', 'M'),
('7', 'M'),
('38', 'F'),
('38', 'M'),
('27', 'F')]
1 sqlContext.sql(" SELECT distinct gender FROM user_table" ).show()
+------+
|gender|
+------+
| F|
| M|
+------+
1 sqlContext.sql(" SELECT distinct age,gender FROM user_table" ).show()
+---+------+
|age|gender|
+---+------+
| 39| F|
| 48| M|
| 26| M|
| 28| M|
| 54| M|
| 60| M|
| 50| M|
| 53| F|
| 30| M|
| 48| F|
| 47| M|
| 46| M|
| 56| M|
| 31| M|
| 32| M|
| 53| M|
| 29| F|
| 20| F|
| 21| F|
| 42| M|
+---+------+
only showing top 20 rows
1 user_df.select("gender" ).distinct().show()
+------+
|gender|
+------+
| F|
| M|
+------+
1 user_df.select("age" ,"gender" ).distinct().show()
+---+------+
|age|gender|
+---+------+
| 48| M|
| 39| F|
| 26| M|
| 28| M|
| 54| M|
| 60| M|
| 50| M|
| 30| M|
| 53| F|
| 48| F|
| 47| M|
| 46| M|
| 56| M|
| 31| M|
| 32| M|
| 53| M|
| 29| F|
| 20| F|
| 21| F|
| 42| M|
+---+------+
only showing top 20 rows
19.9 分组统计数据 1 2 userRDD.map(lambda x: (x[2 ],1 )) \ .reduceByKey(lambda x,y: x+y).collect()
[('M', 670), ('F', 273)]
1 userRDD.map(lambda x: ((x[2 ],x[3 ]),1 )).reduceByKey(lambda x,y: x+y).collect()
[(('M', 'technician'), 26),
(('M', 'writer'), 26),
(('M', 'lawyer'), 10),
(('M', 'scientist'), 28),
(('M', 'entertainment'), 16),
(('M', 'librarian'), 22),
(('F', 'librarian'), 29),
(('F', 'marketing'), 10),
(('M', 'marketing'), 16),
(('M', 'healthcare'), 5),
(('M', 'salesman'), 9),
(('F', 'writer'), 19),
(('F', 'lawyer'), 2),
(('F', 'healthcare'), 11),
(('F', 'scientist'), 3),
(('F', 'salesman'), 3),
(('F', 'entertainment'), 2),
(('F', 'technician'), 1),
(('F', 'other'), 36),
(('M', 'executive'), 29),
(('M', 'administrator'), 43),
(('M', 'student'), 136),
(('M', 'educator'), 69),
(('F', 'educator'), 26),
(('M', 'programmer'), 60),
(('F', 'homemaker'), 6),
(('F', 'artist'), 13),
(('M', 'engineer'), 65),
(('M', 'artist'), 15),
(('F', 'student'), 60),
(('F', 'administrator'), 36),
(('M', 'none'), 5),
(('M', 'other'), 69),
(('F', 'executive'), 3),
(('M', 'retired'), 13),
(('M', 'doctor'), 7),
(('F', 'none'), 4),
(('F', 'programmer'), 6),
(('F', 'engineer'), 2),
(('F', 'retired'), 1),
(('M', 'homemaker'), 1)]
1 2 3 4 sqlContext.sql(""" SELECT gender ,count(*) counts FROM user_table GROUP BY gender""" ).show()
+------+------+
|gender|counts|
+------+------+
| F| 273|
| M| 670|
+------+------+
1 2 3 4 5 sqlContext.sql(""" SELECT gender,occupation,count(*) counts FROM user_table GROUP BY gender,occupation """ ).show(100 )
+------+-------------+------+
|gender| occupation|counts|
+------+-------------+------+
| M| executive| 29|
| M| educator| 69|
| F| none| 4|
| F|entertainment| 2|
| F| retired| 1|
| F| artist| 13|
| F| librarian| 29|
| F| engineer| 2|
| F| healthcare| 11|
| F|administrator| 36|
| M| other| 69|
| M| homemaker| 1|
| F| lawyer| 2|
| M| programmer| 60|
| M| salesman| 9|
| M| none| 5|
| M| marketing| 16|
| M|entertainment| 16|
| M| technician| 26|
| M|administrator| 43|
| F| marketing| 10|
| F| programmer| 6|
| F| technician| 1|
| F| executive| 3|
| M| scientist| 28|
| F| educator| 26|
| M| retired| 13|
| M| healthcare| 5|
| M| writer| 26|
| M| lawyer| 10|
| M| student| 136|
| F| salesman| 3|
| M| doctor| 7|
| M| artist| 15|
| F| homemaker| 6|
| M| engineer| 65|
| F| other| 36|
| F| writer| 19|
| F| student| 60|
| F| scientist| 3|
| M| librarian| 22|
+------+-------------+------+
1 2 3 user_df.select("gender" ) \ .groupby("gender" ) \ .count().show()
+------+-----+
|gender|count|
+------+-----+
| F| 273|
| M| 670|
+------+-----+
1 2 3 4 5 user_df.select("gender" ,"occupation" ). \ groupby("gender" ,"occupation" ). \ count(). \ orderBy("gender" ,"occupation" ). \ show(100 )
+------+-------------+-----+
|gender| occupation|count|
+------+-------------+-----+
| F|administrator| 36|
| F| artist| 13|
| F| educator| 26|
| F| engineer| 2|
| F|entertainment| 2|
| F| executive| 3|
| F| healthcare| 11|
| F| homemaker| 6|
| F| lawyer| 2|
| F| librarian| 29|
| F| marketing| 10|
| F| none| 4|
| F| other| 36|
| F| programmer| 6|
| F| retired| 1|
| F| salesman| 3|
| F| scientist| 3|
| F| student| 60|
| F| technician| 1|
| F| writer| 19|
| M|administrator| 43|
| M| artist| 15|
| M| doctor| 7|
| M| educator| 69|
| M| engineer| 65|
| M|entertainment| 16|
| M| executive| 29|
| M| healthcare| 5|
| M| homemaker| 1|
| M| lawyer| 10|
| M| librarian| 22|
| M| marketing| 16|
| M| none| 5|
| M| other| 69|
| M| programmer| 60|
| M| retired| 13|
| M| salesman| 9|
| M| scientist| 28|
| M| student| 136|
| M| technician| 26|
| M| writer| 26|
+------+-------------+-----+
1 user_df.stat.crosstab("occupation" ,"gender" ).show(30 )
+-----------------+---+---+
|occupation_gender| F| M|
+-----------------+---+---+
| scientist| 3| 28|
| student| 60|136|
| writer| 19| 26|
| salesman| 3| 9|
| retired| 1| 13|
| administrator| 36| 43|
| programmer| 6| 60|
| doctor| 0| 7|
| homemaker| 6| 1|
| executive| 3| 29|
| engineer| 2| 65|
| entertainment| 2| 16|
| marketing| 10| 16|
| technician| 1| 26|
| artist| 13| 15|
| librarian| 29| 22|
| lawyer| 2| 10|
| educator| 26| 69|
| healthcare| 11| 5|
| none| 4| 5|
| other| 36| 69|
+-----------------+---+---+
1 user_df.describe().show()
+-------+-----------------+------+-------------+-----------------+------------------+
|summary| age|gender| occupation| userid| zipcode|
+-------+-----------------+------+-------------+-----------------+------------------+
| count| 943| 943| 943| 943| 943|
| mean|34.05196182396607| null| null| 472.0| 50868.78810810811|
| stddev|12.19273973305903| null| null|272.3649512449549|30891.373254138158|
| min| 7| F|administrator| 1| 00000|
| max| 73| M| writer| 943| Y1A6B|
+-------+-----------------+------+-------------+-----------------+------------------+
19.10 Join关联数据 ZipCode
1 2 3 rawDataWithHeader = sc.textFile(Path+"data/free-zipcode-database-Primary.csv" ) rawDataWithHeader .take(2 )
['"Zipcode","ZipCodeType","City","State","LocationType","Lat","Long","Location","Decommisioned","TaxReturnsFiled","EstimatedPopulation","TotalWages"',
'"00705","STANDARD","AIBONITO","PR","PRIMARY",18.14,-66.26,"NA-US-PR-AIBONITO","false",,,']
1 2 3 header = rawDataWithHeader.first() rawData = rawDataWithHeader.filter(lambda x:x !=header) rawData.first()
'"00705","STANDARD","AIBONITO","PR","PRIMARY",18.14,-66.26,"NA-US-PR-AIBONITO","false",,,'
1 2 rData=rawData.map(lambda x: x.replace("\"" , "" )) rData.first()
'00705,STANDARD,AIBONITO,PR,PRIMARY,18.14,-66.26,NA-US-PR-AIBONITO,false,,,'
1 2 ZipRDD = rData.map(lambda x: x.split("," )) ZipRDD.first()
['00705',
'STANDARD',
'AIBONITO',
'PR',
'PRIMARY',
'18.14',
'-66.26',
'NA-US-PR-AIBONITO',
'false',
'',
'',
'']
19.19.2 建立zipcode_table 1 2 3 4 5 6 7 8 9 10 from pyspark.sql import Rowzipcode_data =ZipRDD .map(lambda p: Row( zipcode=int(p[0 ]), zipCodeType=p[1 ], city=p[2 ], state=p[3 ] ) ) zipcode_data.take(5 )
[Row(city='AIBONITO', state='PR', zipCodeType='STANDARD', zipcode=705),
Row(city='ANASCO', state='PR', zipCodeType='STANDARD', zipcode=610),
Row(city='ANGELES', state='PR', zipCodeType='PO BOX', zipcode=611),
Row(city='ARECIBO', state='PR', zipCodeType='STANDARD', zipcode=612),
Row(city='ADJUNTAS', state='PR', zipCodeType='STANDARD', zipcode=601)]
1 2 zipcode_df = sqlContext.createDataFrame(zipcode_data ) zipcode_df.printSchema()
root
|-- city: string (nullable = true)
|-- state: string (nullable = true)
|-- zipCodeType: string (nullable = true)
|-- zipcode: long (nullable = true)
1 2 zipcode_df.registerTempTable("zipcode_table" ) zipcode_df.show(10 )
+---------+-----+-----------+-------+
| city|state|zipCodeType|zipcode|
+---------+-----+-----------+-------+
| AIBONITO| PR| STANDARD| 705|
| ANASCO| PR| STANDARD| 610|
| ANGELES| PR| PO BOX| 611|
| ARECIBO| PR| STANDARD| 612|
| ADJUNTAS| PR| STANDARD| 601|
| CASTANER| PR| PO BOX| 631|
| AGUADA| PR| STANDARD| 602|
|AGUADILLA| PR| STANDARD| 603|
|AGUADILLA| PR| PO BOX| 604|
|AGUADILLA| PR| PO BOX| 605|
+---------+-----+-----------+-------+
only showing top 10 rows
1 2 3 4 sqlContext.sql(""" SELECT z.* FROM zipcode_table z """ ).show(10 )
+---------+-----+-----------+-------+
| city|state|zipCodeType|zipcode|
+---------+-----+-----------+-------+
| AIBONITO| PR| STANDARD| 705|
| ANASCO| PR| STANDARD| 610|
| ANGELES| PR| PO BOX| 611|
| ARECIBO| PR| STANDARD| 612|
| ADJUNTAS| PR| STANDARD| 601|
| CASTANER| PR| PO BOX| 631|
| AGUADA| PR| STANDARD| 602|
|AGUADILLA| PR| STANDARD| 603|
|AGUADILLA| PR| PO BOX| 604|
|AGUADILLA| PR| PO BOX| 605|
+---------+-----+-----------+-------+
only showing top 10 rows
1 2 3 4 5 6 sqlContext.sql(""" SELECT u.* ,z.city,z.state FROM user_table u LEFT JOIN zipcode_table z ON u.zipcode = z.zipcode WHERE z.state='NY' """ ).show(10 )
+---+------+-------------+------+-------+----------------+-----+
|age|gender| occupation|userid|zipcode| city|state|
+---+------+-------------+------+-------+----------------+-----+
| 29| M| other| 478| 10019| NEW YORK| NY|
| 22| F| healthcare| 405| 10019| NEW YORK| NY|
| 22| M| student| 327| 11101|LONG ISLAND CITY| NY|
| 48| M| educator| 656| 10314| STATEN ISLAND| NY|
| 27| F| writer| 617| 11201| BROOKLYN| NY|
| 35| F| other| 760| 14211| BUFFALO| NY|
| 30| F| writer| 557| 11217| BROOKLYN| NY|
| 27| M| marketing| 806| 11217| BROOKLYN| NY|
| 32| F| other| 155| 11217| BROOKLYN| NY|
| 23| M|administrator| 509| 10011| NEW YORK| NY|
+---+------+-------------+------+-------+----------------+-----+
only showing top 10 rows
1 2 3 4 5 6 sqlContext.sql(""" SELECT z.state ,count(*) FROM user_table u LEFT JOIN zipcode_table z ON u.zipcode = z.zipcode GROUP BY z.state """ ).show(60 )
+-----+--------+
|state|count(1)|
+-----+--------+
| SC| 11|
| AZ| 14|
| LA| 6|
| MN| 78|
| NJ| 18|
| DC| 14|
| OR| 20|
| VA| 27|
| null| 35|
| RI| 3|
| KY| 11|
| WY| 1|
| NH| 6|
| MI| 23|
| NV| 3|
| WI| 22|
| ID| 7|
| CA| 116|
| CT| 17|
| NE| 6|
| MT| 2|
| NC| 19|
| VT| 5|
| MD| 27|
| DE| 3|
| MO| 17|
| IL| 50|
| ME| 2|
| ND| 2|
| WA| 24|
| MS| 3|
| AL| 3|
| IN| 9|
| AE| 1|
| OH| 32|
| TN| 12|
| IA| 14|
| NM| 2|
| PA| 34|
| SD| 1|
| NY| 60|
| TX| 51|
| WV| 3|
| GA| 19|
| MA| 35|
| KS| 4|
| CO| 20|
| FL| 24|
| AK| 5|
| AR| 1|
| OK| 9|
| AP| 1|
| UT| 9|
| HI| 2|
+-----+--------+
1 2 3 4 joined_df=user_df.join(zipcode_df , \ user_df.zipcode == zipcode_df.zipcode, "left_outer" ) joined_df.printSchema()
root
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- occupation: string (nullable = true)
|-- userid: long (nullable = true)
|-- zipcode: string (nullable = true)
|-- city: string (nullable = true)
|-- state: string (nullable = true)
|-- zipCodeType: string (nullable = true)
|-- zipcode: long (nullable = true)
+---+------+-------------+------+-------+-------------+-----+-----------+-------+
|age|gender| occupation|userid|zipcode| city|state|zipCodeType|zipcode|
+---+------+-------------+------+-------+-------------+-----+-----------+-------+
| 59| F|administrator| 131| 15237| PITTSBURGH| PA| STANDARD| 15237|
| 17| M| student| 619| 44134| CLEVELAND| OH| STANDARD| 44134|
| 38| F|entertainment| 839| 90814| LONG BEACH| CA| STANDARD| 90814|
| 48| M|administrator| 409| 98225| BELLINGHAM| WA| STANDARD| 98225|
| 31| M| educator| 791| 20064| WASHINGTON| DC| UNIQUE| 20064|
| 51| M| engineer| 271| 22932| CROZET| VA| STANDARD| 22932|
| 17| M|entertainment| 375| 37777| LOUISVILLE| TN| STANDARD| 37777|
| 27| M| student| 758| 53706| MADISON| WI| STANDARD| 53706|
| 33| M| scientist| 272| 53706| MADISON| WI| STANDARD| 53706|
| 30| F| librarian| 344| 94117|SAN FRANCISCO| CA| STANDARD| 94117|
+---+------+-------------+------+-------+-------------+-----+-----------+-------+
only showing top 10 rows
1 joined_df.filter("state='NY' " ).show(10 )
+---+------+-------------+------+-------+----------------+-----+-----------+-------+
|age|gender| occupation|userid|zipcode| city|state|zipCodeType|zipcode|
+---+------+-------------+------+-------+----------------+-----+-----------+-------+
| 29| M| other| 478| 10019| NEW YORK| NY| STANDARD| 10019|
| 22| F| healthcare| 405| 10019| NEW YORK| NY| STANDARD| 10019|
| 22| M| student| 327| 11101|LONG ISLAND CITY| NY| STANDARD| 11101|
| 48| M| educator| 656| 10314| STATEN ISLAND| NY| STANDARD| 10314|
| 27| F| writer| 617| 11201| BROOKLYN| NY| STANDARD| 11201|
| 35| F| other| 760| 14211| BUFFALO| NY| STANDARD| 14211|
| 30| F| writer| 557| 11217| BROOKLYN| NY| STANDARD| 11217|
| 27| M| marketing| 806| 11217| BROOKLYN| NY| STANDARD| 11217|
| 32| F| other| 155| 11217| BROOKLYN| NY| STANDARD| 11217|
| 23| M|administrator| 509| 10011| NEW YORK| NY| STANDARD| 10011|
+---+------+-------------+------+-------+----------------+-----+-----------+-------+
only showing top 10 rows
1 2 GroupByState_df=joined_df.groupBy("state" ).count() GroupByState_df.show(60 )
+-----+-----+
|state|count|
+-----+-----+
| SC| 11|
| AZ| 14|
| LA| 6|
| MN| 78|
| NJ| 18|
| DC| 14|
| OR| 20|
| VA| 27|
| null| 35|
| RI| 3|
| KY| 11|
| WY| 1|
| NH| 6|
| MI| 23|
| NV| 3|
| WI| 22|
| ID| 7|
| CA| 116|
| CT| 17|
| NE| 6|
| MT| 2|
| NC| 19|
| VT| 5|
| MD| 27|
| DE| 3|
| MO| 17|
| IL| 50|
| ME| 2|
| WA| 24|
| ND| 2|
| MS| 3|
| AL| 3|
| IN| 9|
| AE| 1|
| OH| 32|
| TN| 12|
| IA| 14|
| NM| 2|
| PA| 34|
| SD| 1|
| NY| 60|
| TX| 51|
| WV| 3|
| GA| 19|
| MA| 35|
| KS| 4|
| FL| 24|
| CO| 20|
| AK| 5|
| AR| 1|
| OK| 9|
| AP| 1|
| UT| 9|
| HI| 2|
+-----+-----+
19.11 以Pandas DataFrame绘图 1 2 3 import pandas as pdGroupByState_pandas_df =GroupByState_df.toPandas().set_index('state' ) GroupByState_pandas_df
count state SC 11 AZ 14 LA 6 MN 78 NJ 18 DC 14 OR 20 VA 27 NaN 35 RI 3 KY 11 WY 1 NH 6 MI 23 NV 3 WI 22 ID 7 CA 116 CT 17 NE 6 MT 2 NC 19 VT 5 MD 27 DE 3 MO 17 IL 50 ME 2 WA 24 ND 2 MS 3 AL 3 IN 9 AE 1 OH 32 TN 12 IA 14 NM 2 PA 34 SD 1 NY 60 TX 51 WV 3 GA 19 MA 35 KS 4 CO 20 FL 24 AK 5 AR 1 OK 9 AP 1 UT 9 HI 2
1 GroupByState_pandas_df.T
state SC AZ LA MN NJ DC OR VA None RI … MA KS CO FL AK AR OK AP UT HI count 11 14 6 78 18 14 20 27 35 3 … 35 4 20 24 5 1 9 1 9 2
1 rows × 54 columns
1 2 3 4 5 import matplotlib.pyplot as plt%matplotlib inline ax = GroupByState_pandas_df ['count' ] \ .plot(kind='bar' , title ="State " ,figsize=(12 ,6 ),legend=True , fontsize=12 ) plt.show()
1 2 3 4 5 6 Occupation_df=sqlContext.sql(""" SELECT u.occupation ,count(*) counts FROM user_table u GROUP BY occupation """ )Occupation_df.show(30 )
+-------------+------+
| occupation|counts|
+-------------+------+
| librarian| 51|
| retired| 14|
| lawyer| 12|
| none| 9|
| writer| 45|
| programmer| 66|
| marketing| 26|
| other| 105|
| executive| 32|
| scientist| 31|
| student| 196|
| salesman| 12|
| artist| 28|
| technician| 27|
|administrator| 79|
| engineer| 67|
| healthcare| 16|
| educator| 95|
|entertainment| 18|
| homemaker| 7|
| doctor| 7|
+-------------+------+
1 2 Occupation_pandas_df =Occupation_df.toPandas().set_index('occupation' ) Occupation_pandas_df
counts occupation librarian 51 retired 14 lawyer 12 none 9 writer 45 programmer 66 marketing 26 other 105 executive 32 scientist 31 student 196 salesman 12 artist 28 technician 27 administrator 79 engineer 67 healthcare 16 educator 95 entertainment 18 homemaker 7 doctor 7
1 2 3 4 ax =Occupation_pandas_df['counts' ].plot(kind='pie' , title ="occupation" ,figsize=(8 ,8 ),startangle=90 ,autopct='%1.1f%%' ) ax.legend(bbox_to_anchor=(1.05 , 1 ), loc=2 , borderaxespad=0. ) plt.show()
赞赏
你的支持是我写作的莫大鼓励