先读些系列的文章,对spark有个基本的了解
6. 深入研究 spark 运行原理之 job, stage, task
8. 实战案例 | Spark 在金融领域的应用 | 日内走势预测
9. 搭建 IPython + Notebook + Spark 开发环境
书籍
百度云盘:
【spark 快速大数据分析】链接: https://pan.baidu.com/s/1D7GMbGeKEVg7edJwujehGA 提取码: he2d
服务器分布
master ip: 192.168.10.170
user: spark
passwd:s1
spark_root_dir:/home/spark/spark-2.4.0-bin-hadoop2.7
slaver ip: 192.168.10.171, 192.168.10.172
user: spark
passwd:s1
spark_root_dir:/home/spark/spark-2.4.0-bin-hadoop2.7
安装
1 下载特定版本安装包,并解压1
tar -zxvf spark-2.4.0-bin-hadoop2.7.tgz
2 配置 JAVA_HOME
1 | export JAVA_HOME=/home/spark/jdk1.8.0_191 |
3 配置python api1
2cd spark_home_dir/python
sudo python3 setup.py install
4 测试配置成功1
spark@spark1:~/spark-2.4.0-bin-hadoop2.7$ ./bin/spark-submit examples/src/main/python/pi.py
若配置成功会输出以下内容1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
1032018-12-12 02:53:31 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2018-12-12 02:53:31 INFO SparkContext:54 - Running Spark version 2.4.0
2018-12-12 02:53:31 INFO SparkContext:54 - Submitted application: PythonPi
2018-12-12 02:53:31 INFO SecurityManager:54 - Changing view acls to: spark
2018-12-12 02:53:31 INFO SecurityManager:54 - Changing modify acls to: spark
2018-12-12 02:53:31 INFO SecurityManager:54 - Changing view acls groups to:
2018-12-12 02:53:31 INFO SecurityManager:54 - Changing modify acls groups to:
2018-12-12 02:53:31 INFO SecurityManager:54 - SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(spark); groups with view permissions: Set(); users with modify permissions: Set(spark); groups with modify permissions: Set()
2018-12-12 02:53:31 INFO Utils:54 - Successfully started service 'sparkDriver' on port 34371.
2018-12-12 02:53:31 INFO SparkEnv:54 - Registering MapOutputTracker
2018-12-12 02:53:31 INFO SparkEnv:54 - Registering BlockManagerMaster
2018-12-12 02:53:31 INFO BlockManagerMasterEndpoint:54 - Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
2018-12-12 02:53:31 INFO BlockManagerMasterEndpoint:54 - BlockManagerMasterEndpoint up
2018-12-12 02:53:31 INFO DiskBlockManager:54 - Created local directory at /tmp/blockmgr-0f28f494-80f9-4d57-880f-ff3ab72fc569
2018-12-12 02:53:31 INFO MemoryStore:54 - MemoryStore started with capacity 366.3 MB
2018-12-12 02:53:31 INFO SparkEnv:54 - Registering OutputCommitCoordinator
2018-12-12 02:53:32 INFO log:192 - Logging initialized @1724ms
2018-12-12 02:53:32 INFO Server:351 - jetty-9.3.z-SNAPSHOT, build timestamp: unknown, git hash: unknown
2018-12-12 02:53:32 INFO Server:419 - Started @1791ms
2018-12-12 02:53:32 INFO AbstractConnector:278 - Started ServerConnector@8aee1e3{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
2018-12-12 02:53:32 INFO Utils:54 - Successfully started service 'SparkUI' on port 4040.
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@4f06548c{/jobs,null,AVAILABLE, }
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@291e161b{/jobs/json,null,AVAILABLE, }
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler 97e6{/jobs/job,null,AVAILABLE, }
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@4b23942e{/jobs/job/json,null,AVAILABLE, }
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@1254fd9f{/stages,null,AVAILABLE, }
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@49fa3d94{/stages/json,null,AVAILABLE, }
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@3c3aa98a{/stages/stage,null,AVAILABLE, }
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@46ecf8e5{/stages/stage/json,null,AVAILABLE, }
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@2f736f4b{/stages/pool,null,AVAILABLE, }
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@3acf3392{/stages/pool/json,null,AVAILABLE, }
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@7721ef93{/storage,null,AVAILABLE, }
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@56ab1a42{/storage/json,null,AVAILABLE, }
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@62163eeb{/storage/rdd,null,AVAILABLE, }
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@2106c298{/storage/rdd/json,null,AVAILABLE, }
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@5923ec5c{/environment,null,AVAILABLE, }
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@57d49430{/environment/json,null,AVAILABLE, }
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@4e60947{/executors,null,AVAILABLE, }
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@13e31941{/executors/json,null,AVAILABLE, }
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@55e4566d{/executors/threadDump,null,AVAILABLE, }
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@26e23df5{/executors/threadDump/json,null,AVAILABLE, }
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@127aa45f{/static,null,AVAILABLE, }
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@60a5aa78{/,null,AVAILABLE, }
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@2b225b78{/api,null,AVAILABLE, }
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@62fead0{/jobs/job/kill,null,AVAILABLE, }
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@62f13110{/stages/stage/kill,null,AVAILABLE, }
2018-12-12 02:53:32 INFO SparkUI:54 - Bound SparkUI to 0.0.0.0, and started at http://spark1:4040
2018-12-12 02:53:32 INFO Executor:54 - Starting executor ID driver on host localhost
2018-12-12 02:53:32 INFO Utils:54 - Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 42589.
2018-12-12 02:53:32 INFO NettyBlockTransferService:54 - Server created on spark1:42589
2018-12-12 02:53:32 INFO BlockManager:54 - Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
2018-12-12 02:53:32 INFO BlockManagerMaster:54 - Registering BlockManager BlockManagerId(driver, spark1, 42589, None)
2018-12-12 02:53:32 INFO BlockManagerMasterEndpoint:54 - Registering block manager spark1:42589 with 366.3 MB RAM, BlockManagerId(driver, spark1, 42589, None)
2018-12-12 02:53:32 INFO BlockManagerMaster:54 - Registered BlockManager BlockManagerId(driver, spark1, 42589, None)
2018-12-12 02:53:32 INFO BlockManager:54 - Initialized BlockManager: BlockManagerId(driver, spark1, 42589, None)
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@526f5cb1{/metrics/json,null,AVAILABLE, }
2018-12-12 02:53:32 INFO SharedState:54 - Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/home/spark/spark-2.4.0-bin-hadoop2.7/spark-warehouse').
2018-12-12 02:53:32 INFO SharedState:54 - Warehouse path is 'file:/home/spark/spark-2.4.0-bin-hadoop2.7/spark-warehouse'.
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@61e254a2{/SQL,null,AVAILABLE, }
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@4c890f2{/SQL/json,null,AVAILABLE, }
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@460c2a3f{/SQL/execution,null,AVAILABLE, }
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@7462cbcf{/SQL/execution/json,null,AVAILABLE, }
2018-12-12 02:53:32 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@116110ac{/static/sql,null,AVAILABLE, }
2018-12-12 02:53:32 INFO StateStoreCoordinatorRef:54 - Registered StateStoreCoordinator endpoint
2018-12-12 02:53:33 INFO SparkContext:54 - Starting job: reduce at /home/spark/spark-2.4.0-bin-hadoop2.7/examples/src/main/python/pi.py:44
2018-12-12 02:53:33 INFO DAGScheduler:54 - Got job 0 (reduce at /home/spark/spark-2.4.0-bin-hadoop2.7/examples/src/main/python/pi.py:44) with 2 output partitions
2018-12-12 02:53:33 INFO DAGScheduler:54 - Final stage: ResultStage 0 (reduce at /home/spark/spark-2.4.0-bin-hadoop2.7/examples/src/main/python/pi.py:44)
2018-12-12 02:53:33 INFO DAGScheduler:54 - Parents of final stage: List()
2018-12-12 02:53:33 INFO DAGScheduler:54 - Missing parents: List()
2018-12-12 02:53:33 INFO DAGScheduler:54 - Submitting ResultStage 0 (PythonRDD[1] at reduce at /home/spark/spark-2.4.0-bin-hadoop2.7/examples/src/main/python/pi.py:44), which has no missing parents
2018-12-12 02:53:33 INFO MemoryStore:54 - Block broadcast_0 stored as values in memory (estimated size 6.2 KB, free 366.3 MB)
2018-12-12 02:53:33 INFO MemoryStore:54 - Block broadcast_0_piece0 stored as bytes in memory (estimated size 4.2 KB, free 366.3 MB)
2018-12-12 02:53:33 INFO BlockManagerInfo:54 - Added broadcast_0_piece0 in memory on spark1:42589 (size: 4.2 KB, free: 366.3 MB)
2018-12-12 02:53:33 INFO SparkContext:54 - Created broadcast 0 from broadcast at DAGScheduler.scala:1161
2018-12-12 02:53:33 INFO DAGScheduler:54 - Submitting 2 missing tasks from ResultStage 0 (PythonRDD[1] at reduce at /home/spark/spark-2.4.0-bin-hadoop2.7/examples/src/main/python/pi.py:44) (first 15 tasks are for partitions Vector(0, 1))
2018-12-12 02:53:33 INFO TaskSchedulerImpl:54 - Adding task set 0.0 with 2 tasks
2018-12-12 02:53:33 INFO TaskSetManager:54 - Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 7852 bytes)
2018-12-12 02:53:33 INFO TaskSetManager:54 - Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, PROCESS_LOCAL, 7852 bytes)
2018-12-12 02:53:33 INFO Executor:54 - Running task 1.0 in stage 0.0 (TID 1)
2018-12-12 02:53:33 INFO Executor:54 - Running task 0.0 in stage 0.0 (TID 0)
2018-12-12 02:53:33 INFO PythonRunner:54 - Times: total = 378, boot = 246, init = 48, finish = 84
2018-12-12 02:53:33 INFO PythonRunner:54 - Times: total = 382, boot = 242, init = 52, finish = 88
2018-12-12 02:53:33 INFO Executor:54 - Finished task 1.0 in stage 0.0 (TID 1). 1421 bytes result sent to driver
2018-12-12 02:53:33 INFO Executor:54 - Finished task 0.0 in stage 0.0 (TID 0). 1421 bytes result sent to driver
2018-12-12 02:53:33 INFO TaskSetManager:54 - Finished task 1.0 in stage 0.0 (TID 1) in 426 ms on localhost (executor driver) (1/2)
2018-12-12 02:53:33 INFO TaskSetManager:54 - Finished task 0.0 in stage 0.0 (TID 0) in 446 ms on localhost (executor driver) (2/2)
2018-12-12 02:53:33 INFO TaskSchedulerImpl:54 - Removed TaskSet 0.0, whose tasks have all completed, from pool
2018-12-12 02:53:33 INFO PythonAccumulatorV2:54 - Connected to AccumulatorServer at host: 127.0.0.1 port: 41033
2018-12-12 02:53:33 INFO DAGScheduler:54 - ResultStage 0 (reduce at /home/spark/spark-2.4.0-bin-hadoop2.7/examples/src/main/python/pi.py:44) finished in 0.536 s
2018-12-12 02:53:33 INFO DAGScheduler:54 - Job 0 finished: reduce at /home/spark/spark-2.4.0-bin-hadoop2.7/examples/src/main/python/pi.py:44, took 0.576000 s
Pi is roughly 3.150560
2018-12-12 02:53:33 INFO AbstractConnector:318 - Stopped Spark@8aee1e3{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
2018-12-12 02:53:33 INFO SparkUI:54 - Stopped Spark web UI at http://spark1:4040
2018-12-12 02:53:33 INFO MapOutputTrackerMasterEndpoint:54 - MapOutputTrackerMasterEndpoint stopped!
2018-12-12 02:53:33 INFO MemoryStore:54 - MemoryStore cleared
2018-12-12 02:53:33 INFO BlockManager:54 - BlockManager stopped
2018-12-12 02:53:33 INFO BlockManagerMaster:54 - BlockManagerMaster stopped
2018-12-12 02:53:33 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:54 - OutputCommitCoordinator stopped!
2018-12-12 02:53:33 INFO SparkContext:54 - Successfully stopped SparkContext
2018-12-12 02:53:34 INFO ShutdownHookManager:54 - Shutdown hook called
2018-12-12 02:53:34 INFO ShutdownHookManager:54 - Deleting directory /tmp/spark-c09679ff-e0e0-4e24-839b-74de1930969a
2018-12-12 02:53:34 INFO ShutdownHookManager:54 - Deleting directory /tmp/spark-c42919fe-751c-4223-9f83-530cbd978945
2018-12-12 02:53:34 INFO ShutdownHookManager:54 - Deleting directory /tmp/spark-c09679ff-e0e0-4e24-839b-74de1930969a/pyspark-876b59fd-18f9-4284-8131-b09ac5642632
快速入门
编程指南
环境变量参考
export JAVA_HOME=/home/spark/jdk1.8.0_191
export PATH=$PATH:$JAVA_HOME/bin
export PYSPARK_PYTHON=python3
export SPARK_HOME=/home/spark/spark-2.4.0-bin-hadoop2.7
export PYSPARK_SUBMIT_ARGS=–master spark://192.168.10.170:7077 –deploy-mode client
或者
export JAVA_HOME=/home/spark/jdk1.8.0_191
export PATH=$PATH:$JAVA_HOME/bin
export SPARK_HOME=/home/spark/spark-2.4.0-bin-hadoop2.7
export PATH=$PATH:$SPARK_HOME/bin
export PYSPARK_PYTHON=python3
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS=”notebook –ip=192.168.10.170”
export MASTER=”spark://192.168.10.170:7077”
export PYSPARK_SUBMIT_ARGS=”–master spark://192.168.10.170:7077 –deploy-mode client”
export HADOOP_HOME=/home/spark/hadoop-2.7.7
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export JAVA_LIBRARY_PATH=$HADOOP_HOME/lib/native/
export SPARK__VERSION=2.4.0
export SCALA_VERSION=2.11.8
自建worker
spark-class org.apache.spark.deploy.worker.Worker spark://spark1:7077
python spark 基础学习
在 Spark 目录下运行以下命令可以启动 Spark Shell:
1 | ./bin/pyspark |
或者如果在你当前环境已经使用 pip 安装了 PySpark,你也可以直接使用以下命令:1
pyspark
运行输出
1 | spark@spark1:~/spark-2.4.0-bin-hadoop2.7$ pyspark |
Spark 最主要的抽象概念就是一个叫做 Dataset 的分布式数据集。
Dataset 可以从 Hadoop InputFormats(例如 HDFS 文件)创建或者由其他 Dataset 转换而来。由于 Python 语言的动态性, 我们不需要 Dataset 是强类型的。
因此 Python 中所有的 Dataset 都是 Dataset[Row], 并且为了和 Pandas 以及 R 中的 data frame 概念保持一致, 我们称其为 DataFrame。
下面我们利用 Spark 源码目录下 README 文件中的文本来新建一个 DataFrame:
1 | >>> textFile = spark.read.text("README.md") |
计算文本的行数,以及获取第一行
1 | # Number of rows in this DataFrame textFile.count() |
现在我们将该 DataFrame 转换成一个新的 DataFrame。
我们调用 filter 这个 transformation 算子返回一个只包含原始文件数据项子集的新 DataFrame。
1 | >>> linesWithSpark = textFile.filter(textFile.value.contains("Spark")) |
我们可以将 transformation 算子和 action 算子连在一起:1
2
3>>> textFile.filter(textFile.value.contains("Spark")).count() # How many lines contain "Spark"?
20
`
Spark 可以很容易地实现 MapReduce 流程:
1 | from pyspark.sql.functions import * |
首先,使用 map 算子将每一行映射为一个整数值并给其取别名 “numWords”, 创建了一个新的 DataFrame。
然后在该 DataFrame 上调用 agg 算子找出最大的单词计数。select 和 agg 的参数都是 Column ,
我们可以使用 df.colName 从 DataFrame 上获取一列,也可以引入 pyspark.sql.functions, 它提供了很多方便的函数用来从旧的 Column 构建新的 Column。
因 Hadoop 而广为流行的 MapReduce 是一种通用的数据流模式。Spark 可以很容易地实现 MapReduce 流程:
1 | >>> wordCounts = textFile.select(explode(split(textFile.value, "\s+")).alias("word")).groupBy("word").count() |
这里我们在 select 函数中使用 explode 函数将一个行的 Dataset 转换成了一个单词的 Dataset,
然后组合 groupBy 和 count 算子来计算文件中每个单词出现的次数,生成一个包含 “word” 和 “count” 这 2 列的 DataFrame。
为了在 shell 中收集到单词计数, 我们可以调用 collect 算子:
1 | >>> wordCounts.collect() |
搭建集群
在spark1,spark2,spark3的spark_home_dir下,1
vim /home/spark/spark-2.4.0-bin-hadoop2.7/conf/spark-env.sh
添加以下内容1
2
3
4export SPARK_MASTER_IP=master
export SPARK_WORKER_CORES=4
export SPARK_WORKER_MEMORY=512m
export SPARK_WORKER_INSTANCES=4
SPARK_MASTER_IP :主机地址,现为192.168.10.170
SPARK_WORKER_CORES: 每个worker 分配的cpu内核数目,
SPARK_WORKER_MEMORY :每个worker 分配的最大内存
SPARK_WORKER_INSTANCES: 每个spark启动worker的数目
在spark1的的spark_home_dir下,启动主机1
./sbin/start-master.sh
在spark2的的spark_home_dir下,启动从机的worker1
./sbin/start-slave.sh spark://spark1:7077
在spark3的的spark_home_dir下,启动从机的worker1
./sbin/start-slave.sh spark://spark1:7077
启动成功后,用浏览器访问 http://spark1:8080 ,查看spark-standalone集群。
备注:
spark machine learning api
脚本启动spark-hadoop集群
1 | ### cluster_setup.sh |
配置jupyter运行spark
1 | PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS="notebook --ip=192.168.10.170" pyspark |
不过我们可以把这两个加入环境变量中去,在~/.bashrc加入以下内容即可
1 | export PYSPARK_DRIVER_PYTHON=jupyter |
更新环境变量1
source ~/.bashrc
以后直接在shell中直接输入pyspark即可。
在pycharm调用pyspark
需要在Run/Debug Configurations设置环境变量1
2
3
4SPARK_HOME=/home/spark/spark-2.4.0-bin-hadoop2.7
PYSPARK_DRIVER_PYTHON=
PYSPARK_PYTHON=/usr/bin/python3
MASTER=spark://192.168.10.170:7077
Transformation操作
Transformation | Transformation Meaning |
---|---|
map(func) | 利用函数func处理原DStream的每个元素,返回一个新的DStream |
flatMap(func) | 与map相似,但是每个输入项可用被映射为0个或者多个输出项 |
filter(func) | 返回一个新的DStream,它仅仅包含源DStream中满足函数func的项 |
repartition(numPartitions) | 通过创建更多或者更少的partition改变这个DStream的并行级别(level of parallelism) |
union(otherStream) | 返回一个新的DStream,它包含源DStream和otherStream的联合元素 |
count() | 通过计算源DStream中每个RDD的元素数量,返回一个包含单元素(single-element)RDDs的新DStream |
reduce(func) | 利用函数func聚集源DStream中每个RDD的元素,返回一个包含单元素(single-element)RDDs的新DStream。函数应该是相关联的,以使计算可以并行化 |
countByValue() | 这个算子应用于元素类型为K的DStream上,返回一个(K,long)对的新DStream,每个键的值是在原DStream的每个RDD中的频率。 |
reduceByKey(func, [numTasks]) | 当在一个由(K,V)对组成的DStream上调用这个算子,返回一个新的由(K,V)对组成的DStream,每一个key的值均由给定的reduce函数聚集起来。注意:在默认情况下,这个算子利用了Spark默认的并发任务数去分组。你可以用numTasks参数设置不同的任务数 |
join(otherStream, [numTasks]) | 当应用于两个DStream(一个包含(K,V)对,一个包含(K,W)对),返回一个包含(K, (V, W))对的新DStream |
cogroup(otherStream, [numTasks]) | 当应用于两个DStream(一个包含(K,V)对,一个包含(K,W)对),返回一个包含(K, Seq[V], Seq[W])的元组 |
transform(func) | 通过对源DStream的每个RDD应用RDD-to-RDD函数,创建一个新的DStream。这个可以在DStream中的任何RDD操作中使用 |
updateStateByKey(func) | 利用给定的函数更新DStream的状态,返回一个新”state”的DStream。Meaning |
spark 基本函数操作
使用parallelize对array对象并行化
1 | spark@spark1:~/spark-2.4.0-bin-hadoop2.7/test_app_build$ spark-shell |
使用foreache和println内置函数对lines的元素进行打印输出
1 | scala> lines.foreach(println) |
是用map内置函数对array对象元素进行更改化为key-value类型1
2
3
4
5
6
7
8
9scala> val lines2 = lines.map(world => (world,1))
lines2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[1] at map at <console>:25
scala> lines2.foreach(println)
(hello,1)
(spark,1)
(hello,1)
(world,1)
(!,1)
使用filter内置函数对array对象进行条件筛选。1
2
3
4
5scala> val lines3 = lines.filter(word => word.contains("hello"))
scala> lines3.foreach(println)
hello
hello
集合操作
进入spark-shell并读取文件1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25spark@spark1:~/spark-2.4.0-bin-hadoop2.7$ spark-shell
18/12/13 02:55:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/12/13 02:56:03 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Spark context Web UI available at http://spark1:4041
Spark context available as 'sc' (master = local[*], app id = local-1544669763113).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.0
/_/
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_191)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val inputs= sc.textFile("data.txt")
inputs: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[1] at textFile at <console>:24
scala> inputs.foreach(println)
hello I am hear ,
hello word !
hello spark ,
hello ethan .
flatMap内置函数对每一行进行操作1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21scala> val lines = input
input_file_name inputs
scala> val lines = inputs.flatMap(line => line.split(" "))
lines: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at <console>:25
scala> lines.foreach(println)
hello
word
!
hello
spark
,
hello
ethan
.
hello
I
am
hear
,
初始化两个rdd集合,以便后续做集合运算1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17scala> var rdd1 = sc.parallelize(Array("coffe","coffe","panda","monkey","tea"))
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at parallelize at <console>:24
scala> rdd1.foreach(println)
panda
coffe
coffe
monkey
tea
scala> val rdd2 = sc.parallelize(Array("coffe","monkey","kitty"))
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4] at parallelize at <console>:24
scala> rdd2.foreach(println)
coffe
monkey
kitty
对rdd1 元素去重操作1
2
3
4
5
6
7
8
9scala> var rdd_distinct = rdd1.distinct()
rdd_distinct: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at distinct at <console>:25
scala> rdd_distinct.foreach(println)
tea
monkey
panda
coffe
求两集合的交集
1 | scala> val rdd_union = rdd1.union(rdd2) |
求两集合的交集1
2
3
4
5
6
7
8
9scala> val rdd_inter= rdd1.intersection(rdd2)
rdd_inter: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[14] at intersection at <console>:27
scala> rdd_inter
res9: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[14] at intersection at <console>:27
scala> rdd_inter.foreach(println)
monkey
coffe
求集合的子集
1 | scala> val rdd_sub=rdd1.subtract(rdd2) |
key values 值操作
读取文件,形成key-values数据1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18scala> val rdd=sc.textFile("data.txt")
rdd: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[1] at textFile at <console>:24
scala> rdd.foreach(println)
hello I am hear ,
hello word !
hello spark ,
hello ethan .
scala> val rdd2 = rdd.map(line => (line.split(" ")(0),line))
rdd2: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[2] at map at <console>:25
scala> rdd2.foreach(println)
(hello,hello I am hear ,)
(hello,hello word !)
(hello,hello spark ,)
(hello,hello ethan .)
reduceByKey
1 | scala> val rdd3 = sc.parallelize(Array((1,2),(3,4),(3,6))) |
groupByKey1
2
3
4
5
6
7
scala> val rdd5=rdd3.groupByKey()
rdd5: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[6] at groupByKey at <console>:25
scala> rdd5.foreach(println)
(3,CompactBuffer(4, 6))
(1,CompactBuffer(2))
查看rdds key值1
2
3
4
5
6
7scala> val rdd6=rdd3.keys
rdd6: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at keys at <console>:25
scala> rdd6.foreach(println)
1
3
3
sortByKey
1 | scala> val rdd7=rdd3.sortByKey() |
combineByKey():(creatCombiner,mergeValue,mergeCombiners,partitioner)
遍历partiton中的元素,元素的key,要么之前见过的,要么不是。
如果是新元素,适应我们提供的creatCombiner()函数,
如果是这个partition中已经存在的Key,就会使用mergeValue()函数,
举例,求分数平均值
1 | scala> val scores = sc.parallelize(Array(("jake",80.0),("jake",90.0),("jake",85.0),("mike",85.0),("mike",92.0),("mike",90.0))) |
配置spark JDBC
下载 mysql-connector jar
https://dev.mysql.com/downloads/file/?id=480090
为了让 Spark 能用上 MySQL 服务器,我们需要驱动程序 Connector/J for MySQL. 下载这个压缩文件解压后拷贝 mysql-connector-java-5.1.39-bin.jar 到 spark 目录,然后在 conf/spark-defaults.conf 中添加类路径,如下:
spark.driver.extraClassPath = /usr/local/spark/mysql-connector-java-5.1.47-bin.jar
spark.executor.extraClassPath = /usr/local/spark/mysql-connector-java-5.1.47-bin.jar
Spark SQL
创建SparkSession
1 | from pyspark.sql import SparkSession |
创建 DataFrames
1 |
|
DataSet 操作
1 |
|
SQL 查询编程
1 | // Register the DataFrame as a SQL temporary view |
Global Temporary View
1 |
|
JDBC 连接创建
1 | jdbcDF = spark.read.format("jdbc").option("url","jdbc:mysql://192.168.10.170:3306").option("dbtable","HKIA_v1.flight_schedule").option("user", "hkia_v1").option("password", "Asdf168!!").load() |
1 | jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://192.168.10.170:3306").option("dbtable", "HKIA_v1.flight_schedule").option("user", "hkia_v1").option("password", "Asdf168!!").load() |
1 | jdbcDF.show() |
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-2-d4912bf3e00e> in <module>()
----> 1 jdbcDF.show()
/home/spark/spark-2.4.0-bin-hadoop2.7/python/pyspark/sql/dataframe.py in show(self, n, truncate, vertical)
376 """
377 if isinstance(truncate, bool) and truncate:
--> 378 print(self._jdf.showString(n, 20, vertical))
379 else:
380 print(self._jdf.showString(n, int(truncate), vertical))
/home/spark/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/home/spark/spark-2.4.0-bin-hadoop2.7/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/home/spark/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o38.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Master removed our application: KILLED
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1874)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3384)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2545)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2545)
at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3365)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2545)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2759)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:255)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:292)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)