先读些系列的文章,对spark有个基本的了解

1. spark 简介

2. spark 基本概念解析

3. spark 编程模式

4. spark 之 RDD

5. 这些年,你不能错过的 spark 学习资源

6. 深入研究 spark 运行原理之 job, stage, task

7. 使用 Spark DataFrame 进行大数据分析

8. 实战案例 | Spark 在金融领域的应用 | 日内走势预测

9. 搭建 IPython + Notebook + Spark 开发环境

10. spark 应用程序性能优化|12 个优化方法

11. spark 机器学习

RDD:基于内存的集群计算容错抽象

书籍

百度云盘:
【spark 快速大数据分析】链接: https://pan.baidu.com/s/1D7GMbGeKEVg7edJwujehGA 提取码: he2d

服务器分布

master ip: 192.168.10.170
user: spark
passwd:s1
spark_root_dir:/home/spark/spark-2.4.0-bin-hadoop2.7

slaver ip: 192.168.10.171, 192.168.10.172
user: spark
passwd:s1
spark_root_dir:/home/spark/spark-2.4.0-bin-hadoop2.7

安装

download

1 下载特定版本安装包,并解压

1
tar -zxvf spark-2.4.0-bin-hadoop2.7.tgz

2 配置 JAVA_HOME

1
2
export JAVA_HOME=/home/spark/jdk1.8.0_191
export PATH=$PATH:$JAVA_HOME/bin

3 配置python api

1
2
cd spark_home_dir/python
sudo python3 setup.py install

4 测试配置成功

1
spark@spark1:~/spark-2.4.0-bin-hadoop2.7$ ./bin/spark-submit examples/src/main/python/pi.py

若配置成功会输出以下内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
2018-12-12 02:53:31 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2018-12-12 02:53:31 INFO SparkContext:54 - Running Spark version 2.4.0
2018-12-12 02:53:31 INFO SparkContext:54 - Submitted application: PythonPi
2018-12-12 02:53:31 INFO SecurityManager:54 - Changing view acls to: spark
2018-12-12 02:53:31 INFO SecurityManager:54 - Changing modify acls to: spark
2018-12-12 02:53:31 INFO SecurityManager:54 - Changing view acls groups to:
2018-12-12 02:53:31 INFO SecurityManager:54 - Changing modify acls groups to:
2018-12-12 02:53:31 INFO SecurityManager:54 - SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(spark); groups with view permissions: Set(); users with modify permissions: Set(spark); groups with modify permissions: Set()
2018-12-12 02:53:31 INFO Utils:54 - Successfully started service 'sparkDriver' on port 34371.
2018-12-12 02:53:31 INFO SparkEnv:54 - Registering MapOutputTracker
2018-12-12 02:53:31 INFO SparkEnv:54 - Registering BlockManagerMaster
2018-12-12 02:53:31 INFO BlockManagerMasterEndpoint:54 - Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
2018-12-12 02:53:31 INFO BlockManagerMasterEndpoint:54 - BlockManagerMasterEndpoint up
2018-12-12 02:53:31 INFO DiskBlockManager:54 - Created local directory at /tmp/blockmgr-0f28f494-80f9-4d57-880f-ff3ab72fc569
2018-12-12 02:53:31 INFO MemoryStore:54 - MemoryStore started with capacity 366.3 MB
2018-12-12 02:53:31 INFO SparkEnv:54 - Registering OutputCommitCoordinator
2018-12-12 02:53:32 INFO log:192 - Logging initialized @1724ms
2018-12-12 02:53:32 INFO Server:351 - jetty-9.3.z-SNAPSHOT, build timestamp: unknown, git hash: unknown
2018-12-12 02:53:32 INFO Server:419 - Started @1791ms
2018-12-12 02:53:32 INFO AbstractConnector:278 - Started ServerConnector@8aee1e3{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
2018-12-12 02:53:32 INFO Utils:54 - Successfully started service 'SparkUI' on port 4040.
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@4f06548c{/jobs,null,AVAILABLE,@Spark}
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@291e161b{/jobs/json,null,AVAILABLE,@Spark}
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@fca97e6{/jobs/job,null,AVAILABLE,@Spark}
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@4b23942e{/jobs/job/json,null,AVAILABLE,@Spark}
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@1254fd9f{/stages,null,AVAILABLE,@Spark}
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@49fa3d94{/stages/json,null,AVAILABLE,@Spark}
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@3c3aa98a{/stages/stage,null,AVAILABLE,@Spark}
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@46ecf8e5{/stages/stage/json,null,AVAILABLE,@Spark}
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@2f736f4b{/stages/pool,null,AVAILABLE,@Spark}
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@3acf3392{/stages/pool/json,null,AVAILABLE,@Spark}
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@7721ef93{/storage,null,AVAILABLE,@Spark}
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@56ab1a42{/storage/json,null,AVAILABLE,@Spark}
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@62163eeb{/storage/rdd,null,AVAILABLE,@Spark}
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@2106c298{/storage/rdd/json,null,AVAILABLE,@Spark}
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@5923ec5c{/environment,null,AVAILABLE,@Spark}
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@57d49430{/environment/json,null,AVAILABLE,@Spark}
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@4e60947{/executors,null,AVAILABLE,@Spark}
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@13e31941{/executors/json,null,AVAILABLE,@Spark}
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@55e4566d{/executors/threadDump,null,AVAILABLE,@Spark}
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@26e23df5{/executors/threadDump/json,null,AVAILABLE,@Spark}
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@127aa45f{/static,null,AVAILABLE,@Spark}
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@60a5aa78{/,null,AVAILABLE,@Spark}
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@2b225b78{/api,null,AVAILABLE,@Spark}
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@62fead0{/jobs/job/kill,null,AVAILABLE,@Spark}
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@62f13110{/stages/stage/kill,null,AVAILABLE,@Spark}
2018-12-12 02:53:32 INFO SparkUI:54 - Bound SparkUI to 0.0.0.0, and started at http://spark1:4040
2018-12-12 02:53:32 INFO Executor:54 - Starting executor ID driver on host localhost
2018-12-12 02:53:32 INFO Utils:54 - Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 42589.
2018-12-12 02:53:32 INFO NettyBlockTransferService:54 - Server created on spark1:42589
2018-12-12 02:53:32 INFO BlockManager:54 - Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
2018-12-12 02:53:32 INFO BlockManagerMaster:54 - Registering BlockManager BlockManagerId(driver, spark1, 42589, None)
2018-12-12 02:53:32 INFO BlockManagerMasterEndpoint:54 - Registering block manager spark1:42589 with 366.3 MB RAM, BlockManagerId(driver, spark1, 42589, None)
2018-12-12 02:53:32 INFO BlockManagerMaster:54 - Registered BlockManager BlockManagerId(driver, spark1, 42589, None)
2018-12-12 02:53:32 INFO BlockManager:54 - Initialized BlockManager: BlockManagerId(driver, spark1, 42589, None)
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@526f5cb1{/metrics/json,null,AVAILABLE,@Spark}
2018-12-12 02:53:32 INFO SharedState:54 - Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/home/spark/spark-2.4.0-bin-hadoop2.7/spark-warehouse').
2018-12-12 02:53:32 INFO SharedState:54 - Warehouse path is 'file:/home/spark/spark-2.4.0-bin-hadoop2.7/spark-warehouse'.
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@61e254a2{/SQL,null,AVAILABLE,@Spark}
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@4c890f2{/SQL/json,null,AVAILABLE,@Spark}
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@460c2a3f{/SQL/execution,null,AVAILABLE,@Spark}
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@7462cbcf{/SQL/execution/json,null,AVAILABLE,@Spark}
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@116110ac{/static/sql,null,AVAILABLE,@Spark}
2018-12-12 02:53:32 INFO StateStoreCoordinatorRef:54 - Registered StateStoreCoordinator endpoint
2018-12-12 02:53:33 INFO SparkContext:54 - Starting job: reduce at /home/spark/spark-2.4.0-bin-hadoop2.7/examples/src/main/python/pi.py:44
2018-12-12 02:53:33 INFO DAGScheduler:54 - Got job 0 (reduce at /home/spark/spark-2.4.0-bin-hadoop2.7/examples/src/main/python/pi.py:44) with 2 output partitions
2018-12-12 02:53:33 INFO DAGScheduler:54 - Final stage: ResultStage 0 (reduce at /home/spark/spark-2.4.0-bin-hadoop2.7/examples/src/main/python/pi.py:44)
2018-12-12 02:53:33 INFO DAGScheduler:54 - Parents of final stage: List()
2018-12-12 02:53:33 INFO DAGScheduler:54 - Missing parents: List()
2018-12-12 02:53:33 INFO DAGScheduler:54 - Submitting ResultStage 0 (PythonRDD[1] at reduce at /home/spark/spark-2.4.0-bin-hadoop2.7/examples/src/main/python/pi.py:44), which has no missing parents
2018-12-12 02:53:33 INFO MemoryStore:54 - Block broadcast_0 stored as values in memory (estimated size 6.2 KB, free 366.3 MB)
2018-12-12 02:53:33 INFO MemoryStore:54 - Block broadcast_0_piece0 stored as bytes in memory (estimated size 4.2 KB, free 366.3 MB)
2018-12-12 02:53:33 INFO BlockManagerInfo:54 - Added broadcast_0_piece0 in memory on spark1:42589 (size: 4.2 KB, free: 366.3 MB)
2018-12-12 02:53:33 INFO SparkContext:54 - Created broadcast 0 from broadcast at DAGScheduler.scala:1161
2018-12-12 02:53:33 INFO DAGScheduler:54 - Submitting 2 missing tasks from ResultStage 0 (PythonRDD[1] at reduce at /home/spark/spark-2.4.0-bin-hadoop2.7/examples/src/main/python/pi.py:44) (first 15 tasks are for partitions Vector(0, 1))
2018-12-12 02:53:33 INFO TaskSchedulerImpl:54 - Adding task set 0.0 with 2 tasks
2018-12-12 02:53:33 INFO TaskSetManager:54 - Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 7852 bytes)
2018-12-12 02:53:33 INFO TaskSetManager:54 - Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, PROCESS_LOCAL, 7852 bytes)
2018-12-12 02:53:33 INFO Executor:54 - Running task 1.0 in stage 0.0 (TID 1)
2018-12-12 02:53:33 INFO Executor:54 - Running task 0.0 in stage 0.0 (TID 0)
2018-12-12 02:53:33 INFO PythonRunner:54 - Times: total = 378, boot = 246, init = 48, finish = 84
2018-12-12 02:53:33 INFO PythonRunner:54 - Times: total = 382, boot = 242, init = 52, finish = 88
2018-12-12 02:53:33 INFO Executor:54 - Finished task 1.0 in stage 0.0 (TID 1). 1421 bytes result sent to driver
2018-12-12 02:53:33 INFO Executor:54 - Finished task 0.0 in stage 0.0 (TID 0). 1421 bytes result sent to driver
2018-12-12 02:53:33 INFO TaskSetManager:54 - Finished task 1.0 in stage 0.0 (TID 1) in 426 ms on localhost (executor driver) (1/2)
2018-12-12 02:53:33 INFO TaskSetManager:54 - Finished task 0.0 in stage 0.0 (TID 0) in 446 ms on localhost (executor driver) (2/2)
2018-12-12 02:53:33 INFO TaskSchedulerImpl:54 - Removed TaskSet 0.0, whose tasks have all completed, from pool
2018-12-12 02:53:33 INFO PythonAccumulatorV2:54 - Connected to AccumulatorServer at host: 127.0.0.1 port: 41033
2018-12-12 02:53:33 INFO DAGScheduler:54 - ResultStage 0 (reduce at /home/spark/spark-2.4.0-bin-hadoop2.7/examples/src/main/python/pi.py:44) finished in 0.536 s
2018-12-12 02:53:33 INFO DAGScheduler:54 - Job 0 finished: reduce at /home/spark/spark-2.4.0-bin-hadoop2.7/examples/src/main/python/pi.py:44, took 0.576000 s
Pi is roughly 3.150560
2018-12-12 02:53:33 INFO AbstractConnector:318 - Stopped Spark@8aee1e3{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
2018-12-12 02:53:33 INFO SparkUI:54 - Stopped Spark web UI at http://spark1:4040
2018-12-12 02:53:33 INFO MapOutputTrackerMasterEndpoint:54 - MapOutputTrackerMasterEndpoint stopped!
2018-12-12 02:53:33 INFO MemoryStore:54 - MemoryStore cleared
2018-12-12 02:53:33 INFO BlockManager:54 - BlockManager stopped
2018-12-12 02:53:33 INFO BlockManagerMaster:54 - BlockManagerMaster stopped
2018-12-12 02:53:33 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:54 - OutputCommitCoordinator stopped!
2018-12-12 02:53:33 INFO SparkContext:54 - Successfully stopped SparkContext
2018-12-12 02:53:34 INFO ShutdownHookManager:54 - Shutdown hook called
2018-12-12 02:53:34 INFO ShutdownHookManager:54 - Deleting directory /tmp/spark-c09679ff-e0e0-4e24-839b-74de1930969a
2018-12-12 02:53:34 INFO ShutdownHookManager:54 - Deleting directory /tmp/spark-c42919fe-751c-4223-9f83-530cbd978945
2018-12-12 02:53:34 INFO ShutdownHookManager:54 - Deleting directory /tmp/spark-c09679ff-e0e0-4e24-839b-74de1930969a/pyspark-876b59fd-18f9-4284-8131-b09ac5642632

快速入门

spark 2.2.x中文文档

编程指南

link

环境变量参考

export JAVA_HOME=/home/spark/jdk1.8.0_191

export PATH=$PATH:$JAVA_HOME/bin

export PYSPARK_PYTHON=python3

export SPARK_HOME=/home/spark/spark-2.4.0-bin-hadoop2.7

export PYSPARK_SUBMIT_ARGS=–master spark://192.168.10.170:7077 –deploy-mode client

或者

export JAVA_HOME=/home/spark/jdk1.8.0_191
export PATH=$PATH:$JAVA_HOME/bin

export SPARK_HOME=/home/spark/spark-2.4.0-bin-hadoop2.7
export PATH=$PATH:$SPARK_HOME/bin

export PYSPARK_PYTHON=python3
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS=”notebook –ip=192.168.10.170”
export MASTER=”spark://192.168.10.170:7077”

export PYSPARK_SUBMIT_ARGS=”–master spark://192.168.10.170:7077 –deploy-mode client”
export HADOOP_HOME=/home/spark/hadoop-2.7.7
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin

export JAVA_LIBRARY_PATH=$HADOOP_HOME/lib/native/

export SPARK__VERSION=2.4.0
export SCALA_VERSION=2.11.8

自建worker

spark-class org.apache.spark.deploy.worker.Worker spark://spark1:7077

python spark 基础学习

在 Spark 目录下运行以下命令可以启动 Spark Shell:

1
./bin/pyspark

或者如果在你当前环境已经使用 pip 安装了 PySpark,你也可以直接使用以下命令:

1
pyspark

运行输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
spark@spark1:~/spark-2.4.0-bin-hadoop2.7$ pyspark
Python 3.6.7 (default, Oct 22 2018, 11:32:17)
[GCC 8.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
2018-12-12 03:32:28 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.4.0
/_/

Using Python version 3.6.7 (default, Oct 22 2018 11:32:17)
SparkSession available as 'spark'.
>>>

Spark 最主要的抽象概念就是一个叫做 Dataset 的分布式数据集。
Dataset 可以从 Hadoop InputFormats(例如 HDFS 文件)创建或者由其他 Dataset 转换而来。由于 Python 语言的动态性, 我们不需要 Dataset 是强类型的。
因此 Python 中所有的 Dataset 都是 Dataset[Row], 并且为了和 Pandas 以及 R 中的 data frame 概念保持一致, 我们称其为 DataFrame。
下面我们利用 Spark 源码目录下 README 文件中的文本来新建一个 DataFrame:

1
>>> textFile = spark.read.text("README.md")

计算文本的行数,以及获取第一行

1
2
3
4
5
>>> textFile.count()   # Number of rows in this DataFrame
105

>>> textFile.first() # First row in this DataFrame
Row(value=u'# Apache Spark')

现在我们将该 DataFrame 转换成一个新的 DataFrame。

我们调用 filter 这个 transformation 算子返回一个只包含原始文件数据项子集的新 DataFrame。

1
>>> linesWithSpark = textFile.filter(textFile.value.contains("Spark"))

我们可以将 transformation 算子和 action 算子连在一起:

1
2
3
>>> textFile.filter(textFile.value.contains("Spark")).count()  # How many lines contain "Spark"?
20
`

Spark 可以很容易地实现 MapReduce 流程:

1
2
from pyspark.sql.functions import *
textFile.select(size(split(textFile.value, "\s+")).name("numWords")).agg(max(col("numWords"))).collect()

首先,使用 map 算子将每一行映射为一个整数值并给其取别名 “numWords”, 创建了一个新的 DataFrame。

然后在该 DataFrame 上调用 agg 算子找出最大的单词计数。select 和 agg 的参数都是 Column ,

我们可以使用 df.colName 从 DataFrame 上获取一列,也可以引入 pyspark.sql.functions, 它提供了很多方便的函数用来从旧的 Column 构建新的 Column。

因 Hadoop 而广为流行的 MapReduce 是一种通用的数据流模式。Spark 可以很容易地实现 MapReduce 流程:

1
>>> wordCounts = textFile.select(explode(split(textFile.value, "\s+")).alias("word")).groupBy("word").count()

这里我们在 select 函数中使用 explode 函数将一个行的 Dataset 转换成了一个单词的 Dataset,
然后组合 groupBy 和 count 算子来计算文件中每个单词出现的次数,生成一个包含 “word” 和 “count” 这 2 列的 DataFrame。
为了在 shell 中收集到单词计数, 我们可以调用 collect 算子:

1
2
>>> wordCounts.collect()
[Row(word='online', count=1), Row(word='graphs', count=1), Row(word='["Parallel', count=1), Row(word='["Building', count=1), Row(word='thread', count=1), Row(word='documentation', count=3), Row(word='command,', count=2), Row(word='abbreviated', count=1), Row(word='overview', count=1), Row(word='rich', count=1), Row(word='set', count=2), Row(word='-DskipTests', count=1), Row(word='name', count=1), Row(word='page](http://spark.apache.org/documentation.html).', count=1), Row(word='["Specifying', count=1), Row(word='stream', count=1), Row(word='run:', count=1), Row(word='not', count=1), Row(word='programs', count=2), Row(word='tests', count=2), Row(word='./dev/run-tests', count=1), Row(word='will', count=1), Row(word='[run', count=1), Row(word='particular', count=2), Row(word='option', count=1), Row(word='Alternatively,', count=1), Row(word='by', count=1), Row(word='must', count=1), Row(word='using', count=5), Row(word='you', count=4), Row(word='MLlib', count=1), Row(word='DataFrames,', count=1), Row(word='variable', count=1), Row(word='Note', count=1), Row(word='core', count=1), Row(word='more', count=1), Row(word='protocols', count=1), Row(word='guidance', count=2), Row(word='shell:', count=2), Row(word='can', count=7), Row(word='site,', count=1), Row(word='systems.', count=1), Row(word='Maven', count=1), Row(word='[building', count=1), Row(word='configure', count=1), Row(word='for', count=12), Row(word='README', count=1), Row(word='Interactive', count=2), Row(word='how', count=3), Row(word='[Configuration', count=1), Row(word='Hive', count=2), Row(word='system', count=1), Row(word='provides', count=1), Row(word='Hadoop-supported', count=1), Row(word='pre-built', count=1), Row(word='["Useful', count=1), Row(word='directory.', count=1), Row(word='Example', count=1), Row(word='example', count=3), Row(word='Kubernetes', count=1), Row(word='one', count=3), Row(word='MASTER', count=1), Row(word='in', count=6), Row(word='library', count=1), Row(word='Spark.', count=1), Row(word='contains', count=1), Row(word='Configuration', count=1), Row(word='programming', count=1), Row(word='with', count=4), Row(word='contributing', count=1), Row(word='downloaded', count=1), Row(word='1000).count()', count=1), Row(word='comes', count=1), Row(word='machine', count=1), Row(word='Tools"](http://spark.apache.org/developer-tools.html).', count=1), Row(word='building', count=2), Row(word='params', count=1), Row(word='Guide](http://spark.apache.org/docs/latest/configuration.html)', count=1), Row(word='given.', count=1), Row(word='be', count=2), Row(word='same', count=1), Row(word='integration', count=1), Row(word='than', count=1), Row(word='Programs', count=1), Row(word='locally', count=2), Row(word='using:', count=1), Row(word='fast', count=1), Row(word='[Apache', count=1), Row(word='your', count=1), Row(word='optimized', count=1), Row(word='Developer', count=1), Row(word='R,', count=1), Row(word='should', count=2), Row(word='graph', count=1), Row(word='package', count=1), Row(word='-T', count=1), Row(word='[project', count=1), Row(word='project', count=1), Row(word='`examples`', count=2), Row(word='resource-managers/kubernetes/integration-tests/README.md', count=1), Row(word='versions', count=1), Row(word='Spark](#building-spark).', count=1), Row(word='general', count=3), Row(word='other', count=1), Row(word='learning,', count=1), Row(word='when', count=1), Row(word='submit', count=1), Row(word='Apache', count=1), Row(word='1000:', count=2), Row(word='detailed', count=2), Row(word='About', count=1), Row(word='is', count=7), Row(word='on', count=7), Row(word='scala>', count=1), Row(word='print', count=1), Row(word='use', count=3), Row(word='different', count=1), Row(word='following', count=2), Row(word='SparkPi', count=2), Row(word='refer', count=2), Row(word='./bin/run-example', count=2), Row(word='data', count=1), Row(word='Tests', count=1), Row(word='Versions', count=1), Row(word='Data.', count=1), Row(word='processing.', count=1), Row(word='its', count=1), Row(word='basic', count=1), Row(word='latest', count=1), Row(word='only', count=1), Row(word='<class>', count=1), Row(word='have', count=1), Row(word='runs.', count=1), Row(word='You', count=4), Row(word='tips,', count=1), Row(word='project.', count=1), Row(word='developing', count=1), Row(word='YARN,', count=1), Row(word='It', count=2), Row(word='"local"', count=1), Row(word='processing,', count=1), Row(word='built', count=1), Row(word='Pi', count=1), Row(word='thread,', count=1), Row(word='A', count=1), Row(word='APIs', count=1), Row(word='Scala,', count=1), Row(word='file', count=1), Row(word='computation', count=1), Row(word='Once', count=1), Row(word='find', count=1), Row(word='the', count=24), Row(word='To', count=2), Row(word='sc.parallelize(1', count=1), Row(word='uses', count=1), Row(word='Version', count=1), Row(word='N', count=1), Row(word='programs,', count=1), Row(word='"yarn"', count=1), Row(word='see', count=4), Row(word='./bin/pyspark', count=1), Row(word='return', count=2), Row(word='computing', count=1), Row(word='Java,', count=1), Row(word='from', count=1), Row(word='Because', count=1), Row(word='cluster', count=2), Row(word='Streaming', count=1), Row(word='More', count=1), Row(word='analysis.', count=1), Row(word='Maven](http://maven.apache.org/).', count=1), Row(word='cluster.', count=1), Row(word='Running', count=1), Row(word='Please', count=4), Row(word='talk', count=1), Row(word='distributions.', count=1), Row(word='guide,', count=1), Row(word='tests](http://spark.apache.org/developer-tools.html#individual-tests).', count=1), Row(word='There', count=1), Row(word='"local[N]"', count=1), Row(word='Try', count=1), Row(word='and', count=10), Row(word='do', count=2), Row(word='Scala', count=2), Row(word='class', count=2), Row(word='build', count=4), Row(word='3"](https://cwiki.apache.org/confluence/display/MAVEN/Parallel+builds+in+Maven+3).', count=1), Row(word='setup', count=1), Row(word='need', count=1), Row(word='spark://', count=1), Row(word='Hadoop,', count=2), Row(word='Thriftserver', count=1), Row(word='are', count=1), Row(word='requires', count=1), Row(word='package.', count=1), Row(word='Enabling', count=1), Row(word='clean', count=1), Row(word='sc.parallelize(range(1000)).count()', count=1), Row(word='high-level', count=1), Row(word='SQL', count=2), Row(word='against', count=1), Row(word='of', count=5), Row(word='through', count=1), Row(word='review', count=1), Row(word='package.)', count=1), Row(word='Python,', count=2), Row(word='easiest', count=1), Row(word='no', count=1), Row(word='Testing', count=1), Row(word='several', count=1), Row(word='help', count=1), Row(word='The', count=1), Row(word='sample', count=1), Row(word='MASTER=spark://host:7077', count=1), Row(word='Big', count=1), Row(word='examples', count=2), Row(word='an', count=4), Row(word='#', count=1), Row(word='Online', count=1), Row(word='test,', count=1), Row(word='including', count=4), Row(word='usage', count=1), Row(word='Python', count=2), Row(word='at', count=2), Row(word='development', count=1), Row(word='Spark"](http://spark.apache.org/docs/latest/building-spark.html).', count=1), Row(word='IDE,', count=1), Row(word='way', count=1), Row(word='Contributing', count=1), Row(word='get', count=1), Row(word='that', count=2), Row(word='##', count=9), Row(word='For', count=3), Row(word='prefer', count=1), Row(word='This', count=2), Row(word='build/mvn', count=1), Row(word='builds', count=1), Row(word='running', count=1), Row(word='web', count=1), Row(word='run', count=7), Row(word='locally.', count=1), Row(word='Spark', count=16), Row(word='URL,', count=1), Row(word='a', count=9), Row(word='higher-level', count=1), Row(word='tools', count=1), Row(word='if', count=4), Row(word='available', count=1), Row(word='', count=48), Row(word='Documentation', count=1), Row(word='this', count=1), Row(word='(You', count=1), Row(word='>>>', count=1), Row(word='information', count=1), Row(word='info', count=1), Row(word='<http://spark.apache.org/>', count=1), Row(word='Shell', count=2), Row(word='environment', count=1), Row(word='built,', count=1), Row(word='module,', count=1), Row(word='them,', count=1), Row(word='`./bin/run-example', count=1), Row(word='instance:', count=1), Row(word='first', count=1), Row(word='[Contribution', count=1), Row(word='guide](http://spark.apache.org/contributing.html)', count=1), Row(word='documentation,', count=1), Row(word='[params]`.', count=1), Row(word='mesos://', count=1), Row(word='engine', count=1), Row(word='GraphX', count=1), Row(word='Maven,', count=1), Row(word='example:', count=1), Row(word='HDFS', count=1), Row(word='YARN"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version-and-enabling-yarn)', count=1), Row(word='or', count=3), Row(word='to', count=17), Row(word='Hadoop', count=3), Row(word='individual', count=1), Row(word='also', count=5), Row(word='changed', count=1), Row(word='started', count=1), Row(word='./bin/spark-shell', count=1), Row(word='threads.', count=1), Row(word='supports', count=2), Row(word='storage', count=1), Row(word='version', count=1), Row(word='instructions.', count=1), Row(word='Building', count=1), Row(word='start', count=1), Row(word='Many', count=1), Row(word='which', count=2), Row(word='And', count=1), Row(word='distribution', count=1)]

搭建集群

link

在spark1,spark2,spark3的spark_home_dir下,

1
vim /home/spark/spark-2.4.0-bin-hadoop2.7/conf/spark-env.sh

添加以下内容

1
2
3
4
export SPARK_MASTER_IP=master
export SPARK_WORKER_CORES=4
export SPARK_WORKER_MEMORY=512m
export SPARK_WORKER_INSTANCES=4

SPARK_MASTER_IP :主机地址,现为192.168.10.170
SPARK_WORKER_CORES: 每个worker 分配的cpu内核数目,
SPARK_WORKER_MEMORY :每个worker 分配的最大内存
SPARK_WORKER_INSTANCES: 每个spark启动worker的数目

在spark1的的spark_home_dir下,启动主机

1
./sbin/start-master.sh

在spark2的的spark_home_dir下,启动从机的worker

1
./sbin/start-slave.sh spark://spark1:7077

在spark3的的spark_home_dir下,启动从机的worker

1
./sbin/start-slave.sh spark://spark1:7077

启动成功后,用浏览器访问 http://spark1:8080 ,查看spark-standalone集群。

备注: 的文字格式应为 “spark://master_ip:7077”

spark machine learning api

link

脚本启动spark-hadoop集群

1
2
3
4
5
6
7
8
9
10
11
12
13
### cluster_setup.sh
### location: spakr1:/home/spark/spark-2.4.0-bin-hadoop2.7/cluster_setup.sh
#! /bin/bash
echo "set up spark cluster and hadoop cluster"

/home/spark/spark-2.4.0-bin-hadoop2.7/sbin/start-master.sh
ssh spark@slave1 "/home/spark/spark-2.4.0-bin-hadoop2.7/sbin/start-slave.sh spark://spark1:7077"
ssh spark@slave2 "/home/spark/spark-2.4.0-bin-hadoop2.7/sbin/start-slave.sh spark://spark1:7077"
/home/spark/spark-2.4.0-bin-hadoop2.7/sbin/start-master.sh spark://spark1:7077

/home/spark/hadoop-2.7.7/sbin/start-all.sh
ssh spark@slave1 "/home/spark/hadoop-2.7.7/sbin/start-all.sh"
ssh spark@slave2 "/home/spark/hadoop-2.7.7/sbin/start-all.sh"

配置jupyter运行spark

1
PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS="notebook  --ip=192.168.10.170"  pyspark

不过我们可以把这两个加入环境变量中去,在~/.bashrc加入以下内容即可

1
2
export PYSPARK_DRIVER_PYTHON=jupyter 
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --ip=192.168.10.170"

更新环境变量

1
source  ~/.bashrc

以后直接在shell中直接输入pyspark即可。

在pycharm调用pyspark

需要在Run/Debug Configurations设置环境变量

1
2
3
4
SPARK_HOME=/home/spark/spark-2.4.0-bin-hadoop2.7
PYSPARK_DRIVER_PYTHON=
PYSPARK_PYTHON=/usr/bin/python3
MASTER=spark://192.168.10.170:7077

Transformation操作

Transformation Transformation Meaning
map(func) 利用函数func处理原DStream的每个元素,返回一个新的DStream
flatMap(func) 与map相似,但是每个输入项可用被映射为0个或者多个输出项
filter(func) 返回一个新的DStream,它仅仅包含源DStream中满足函数func的项
repartition(numPartitions) 通过创建更多或者更少的partition改变这个DStream的并行级别(level of parallelism)
union(otherStream) 返回一个新的DStream,它包含源DStream和otherStream的联合元素
count() 通过计算源DStream中每个RDD的元素数量,返回一个包含单元素(single-element)RDDs的新DStream
reduce(func) 利用函数func聚集源DStream中每个RDD的元素,返回一个包含单元素(single-element)RDDs的新DStream。函数应该是相关联的,以使计算可以并行化
countByValue() 这个算子应用于元素类型为K的DStream上,返回一个(K,long)对的新DStream,每个键的值是在原DStream的每个RDD中的频率。
reduceByKey(func, [numTasks]) 当在一个由(K,V)对组成的DStream上调用这个算子,返回一个新的由(K,V)对组成的DStream,每一个key的值均由给定的reduce函数聚集起来。注意:在默认情况下,这个算子利用了Spark默认的并发任务数去分组。你可以用numTasks参数设置不同的任务数
join(otherStream, [numTasks]) 当应用于两个DStream(一个包含(K,V)对,一个包含(K,W)对),返回一个包含(K, (V, W))对的新DStream
cogroup(otherStream, [numTasks]) 当应用于两个DStream(一个包含(K,V)对,一个包含(K,W)对),返回一个包含(K, Seq[V], Seq[W])的元组
transform(func) 通过对源DStream的每个RDD应用RDD-to-RDD函数,创建一个新的DStream。这个可以在DStream中的任何RDD操作中使用
updateStateByKey(func) 利用给定的函数更新DStream的状态,返回一个新”state”的DStream。Meaning

spark 基本函数操作

使用parallelize对array对象并行化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
spark@spark1:~/spark-2.4.0-bin-hadoop2.7/test_app_build$ spark-shell
18/12/13 01:41:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://spark1:4040
Spark context available as 'sc' (master = local[*], app id = local-1544665284869).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.0
/_/

Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_191)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val lines = sc.parallelize(Array("hello","spark","hello","world","!"))
lines: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24

使用foreache和println内置函数对lines的元素进行打印输出

1
2
3
4
5
6
scala> lines.foreach(println)
hello
world
spark
!
hello

是用map内置函数对array对象元素进行更改化为key-value类型

1
2
3
4
5
6
7
8
9
scala> val lines2 = lines.map(world => (world,1))
lines2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[1] at map at <console>:25

scala> lines2.foreach(println)
(hello,1)
(spark,1)
(hello,1)
(world,1)
(!,1)

使用filter内置函数对array对象进行条件筛选。

1
2
3
4
5
scala> val lines3 = lines.filter(word => word.contains("hello"))

scala> lines3.foreach(println)
hello
hello

集合操作

进入spark-shell并读取文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
spark@spark1:~/spark-2.4.0-bin-hadoop2.7$ spark-shell
18/12/13 02:55:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/12/13 02:56:03 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Spark context Web UI available at http://spark1:4041
Spark context available as 'sc' (master = local[*], app id = local-1544669763113).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.0
/_/

Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_191)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val inputs= sc.textFile("data.txt")
inputs: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[1] at textFile at <console>:24

scala> inputs.foreach(println)
hello I am hear ,
hello word !
hello spark ,
hello ethan .

flatMap内置函数对每一行进行操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
scala> val lines = input
input_file_name inputs

scala> val lines = inputs.flatMap(line => line.split(" "))
lines: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at <console>:25

scala> lines.foreach(println)
hello
word
!
hello
spark
,
hello
ethan
.
hello
I
am
hear
,

初始化两个rdd集合,以便后续做集合运算

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
scala> var rdd1 = sc.parallelize(Array("coffe","coffe","panda","monkey","tea"))
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at parallelize at <console>:24

scala> rdd1.foreach(println)
panda
coffe
coffe
monkey
tea

scala> val rdd2 = sc.parallelize(Array("coffe","monkey","kitty"))
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4] at parallelize at <console>:24

scala> rdd2.foreach(println)
coffe
monkey
kitty

对rdd1 元素去重操作

1
2
3
4
5
6
7
8
9
scala> var rdd_distinct = rdd1.distinct()
rdd_distinct: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at distinct at <console>:25

scala> rdd_distinct.foreach(println)

tea
monkey
panda
coffe

求两集合的交集

1
2
3
4
5
6
7
8
9
10
11
12
scala> val rdd_union = rdd1.union(rdd2)
rdd_union: org.apache.spark.rdd.RDD[String] = UnionRDD[8] at union at <console>:27

scala> rdd_union.foreach(println)
coffe
panda
coffe
monkey
tea
monkey
coffe
kitty

求两集合的交集

1
2
3
4
5
6
7
8
9
scala> val rdd_inter= rdd1.intersection(rdd2)
rdd_inter: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[14] at intersection at <console>:27

scala> rdd_inter
res9: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[14] at intersection at <console>:27

scala> rdd_inter.foreach(println)
monkey
coffe

求集合的子集

1
2
3
4
5
6
scala> val rdd_sub=rdd1.subtract(rdd2)
rdd_sub: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[18] at subtract at <console>:27

scala> rdd_sub.foreach(println)
panda
tea

key values 值操作

读取文件,形成key-values数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
scala> val rdd=sc.textFile("data.txt")
rdd: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[1] at textFile at <console>:24

scala> rdd.foreach(println)
hello I am hear ,
hello word !
hello spark ,
hello ethan .

scala> val rdd2 = rdd.map(line => (line.split(" ")(0),line))
rdd2: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[2] at map at <console>:25


scala> rdd2.foreach(println)
(hello,hello I am hear ,)
(hello,hello word !)
(hello,hello spark ,)
(hello,hello ethan .)

reduceByKey

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
scala> val rdd3 = sc.parallelize(Array((1,2),(3,4),(3,6)))
rdd3: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[3] at parallelize at <console>:24


scala> rdd3.foreach(println)
(3,4)
(1,2)
(3,6)


scala> val rdd4 =rdd3.reduceByKey((x,y)=>x+y)
rdd4: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[4] at reduceByKey at <console>:25

scala> rdd4.foreach(println)
(1,2)
(3,10)

groupByKey

1
2
3
4
5
6
7

scala> val rdd5=rdd3.groupByKey()
rdd5: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[6] at groupByKey at <console>:25

scala> rdd5.foreach(println)
(3,CompactBuffer(4, 6))
(1,CompactBuffer(2))

查看rdds key值

1
2
3
4
5
6
7
scala> val rdd6=rdd3.keys
rdd6: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at keys at <console>:25

scala> rdd6.foreach(println)
1
3
3

sortByKey

1
2
3
4
5
6
7
8
9
10
scala> val rdd7=rdd3.sortByKey()
rdd7: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[10] at sortByKey at <console>:25

scala> rdd7.foreach
foreach foreachAsync foreachPartition foreachPartitionAsync

scala> rdd7.foreach(println)
(1,2)
(3,4)
(3,6)

combineByKey():(creatCombiner,mergeValue,mergeCombiners,partitioner)
遍历partiton中的元素,元素的key,要么之前见过的,要么不是。
如果是新元素,适应我们提供的creatCombiner()函数,
如果是这个partition中已经存在的Key,就会使用mergeValue()函数,
举例,求分数平均值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
scala> val scores = sc.parallelize(Array(("jake",80.0),("jake",90.0),("jake",85.0),("mike",85.0),("mike",92.0),("mike",90.0)))
scores: org.apache.spark.rdd.RDD[(String, Double)] = ParallelCollectionRDD[11] at parallelize at <console>:24

scala> scores.for
foreach foreachAsync foreachPartition foreachPartitionAsync formatted

scala> scores.foreach
foreach foreachAsync foreachPartition foreachPartitionAsync

scala> scores.foreach(println)
(jake,90.0)
(jake,80.0)
(jake,85.0)
(mike,85.0)
(mike,90.0)
(mike,92.0)


scala> val score2=scores.combineByKey(score => (1,score),(c1:(Int,Double),newScore)=>(c1._1+1,c1._2+newScore),(c1:(Int,Double),c2:(Int,Double))=>(c1._1+c2._1,c1._2+c2._2))
score2: org.apache.spark.rdd.RDD[(String, (Int, Double))] = ShuffledRDD[12] at combineByKey at <console>:25

scala> score2.foreach(println)
(jake,(3,255.0))

配置spark JDBC

下载 mysql-connector jar

https://dev.mysql.com/downloads/file/?id=480090

为了让 Spark 能用上 MySQL 服务器,我们需要驱动程序 Connector/J for MySQL. 下载这个压缩文件解压后拷贝 mysql-connector-java-5.1.39-bin.jar 到 spark 目录,然后在 conf/spark-defaults.conf 中添加类路径,如下:

spark.driver.extraClassPath = /usr/local/spark/mysql-connector-java-5.1.47-bin.jar
spark.executor.extraClassPath = /usr/local/spark/mysql-connector-java-5.1.47-bin.jar

Spark SQL

创建SparkSession

1
2
3
4
5
6
7
from pyspark.sql import SparkSession

spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()

创建 DataFrames

1
2
3
4
5
6
7
8
9
10
11
# spark is an existing SparkSession
df = spark.read.json("examples/src/main/resources/people.json")
# Displays the content of the DataFrame to stdout
df.show()
# +----+-------+
# | age| name|
# +----+-------+
# |null|Michael|
# | 30| Andy|
# | 19| Justin|
# +----+-------+

DataSet 操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# spark, df are from the previous example
# Print the schema in a tree format
df.printSchema()
# root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)

# Select only the "name" column
df.select("name").show()
# +-------+
# | name|
# +-------+
# |Michael|
# | Andy|
# | Justin|
# +-------+

# Select everybody, but increment the age by 1
df.select(df['name'], df['age'] + 1).show()
# +-------+---------+
# | name|(age + 1)|
# +-------+---------+
# |Michael| null|
# | Andy| 31|
# | Justin| 20|
# +-------+---------+

# Select people older than 21
df.filter(df['age'] > 21).show()
# +---+----+
# |age|name|
# +---+----+
# | 30|Andy|
# +---+----+

# Count people by age
df.groupBy("age").count().show()
# +----+-----+
# | age|count|
# +----+-----+
# | 19| 1|
# |null| 1|
# | 30| 1|
# +----+-----+

SQL 查询编程

1
2
3
4
5
6
7
8
9
10
11
12
// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+

Global Temporary View

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# Register the DataFrame as a global temporary view
df.createGlobalTempView("people")

# Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
# +----+-------+
# | age| name|
# +----+-------+
# |null|Michael|
# | 30| Andy|
# | 19| Justin|
# +----+-------+

# Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
# +----+-------+
# | age| name|
# +----+-------+
# |null|Michael|
# | 30| Andy|
# | 19| Justin|
# +----+-------+

JDBC 连接创建

1
2
3
4
jdbcDF = spark.read.format("jdbc").option("url","jdbc:mysql://192.168.10.170:3306").option("dbtable","HKIA_v1.flight_schedule").option("user", "hkia_v1").option("password", "Asdf168!!").load()

jdbcDF.printSchema()
jdbcDF.show()
1
jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://192.168.10.170:3306").option("dbtable", "HKIA_v1.flight_schedule").option("user", "hkia_v1").option("password", "Asdf168!!").load()
1
jdbcDF.show()
---------------------------------------------------------------------------

Py4JJavaError                             Traceback (most recent call last)

<ipython-input-2-d4912bf3e00e> in <module>()
----> 1 jdbcDF.show()


/home/spark/spark-2.4.0-bin-hadoop2.7/python/pyspark/sql/dataframe.py in show(self, n, truncate, vertical)
    376         """
    377         if isinstance(truncate, bool) and truncate:
--> 378             print(self._jdf.showString(n, 20, vertical))
    379         else:
    380             print(self._jdf.showString(n, int(truncate), vertical))


/home/spark/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:


/home/spark/spark-2.4.0-bin-hadoop2.7/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()


/home/spark/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(


Py4JJavaError: An error occurred while calling o38.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Master removed our application: KILLED
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1874)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3384)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2545)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2545)
    at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3365)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2545)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2759)
    at org.apache.spark.sql.Dataset.getRows(Dataset.scala:255)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:292)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)