Apache Spark™ is a multi-language engine for executing data engineering, data science, and machine learning on single-node machines or clusters.
Note
This is my Knowledge Transfer about How to get start with Spark Java development .
And this is my slide:
1.Spark 简介 1. Spark 的背景
2. Spark 的核心模块 Spark 的定义:一个通用的大数据分析引擎
Spark 的核心模块:
3. Spark 的生态组件 Spark 的生态:
2.Spark 代码快速上手 Maven 配置:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns= "http://maven.apache.org/POM/4.0.0"
xmlns:xsi= "http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation= "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" >
<modelVersion> 4.0.0</modelVersion>
<groupId> org.example</groupId>
<artifactId> spark-demo</artifactId>
<version> 1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source> 8</maven.compiler.source>
<maven.compiler.target> 8</maven.compiler.target>
<scala.version> 2.13</scala.version>
<spark.version> 3.3.1</spark.version>
</properties>
<dependencies>
<dependency>
<groupId> org.apache.spark</groupId>
<artifactId> spark-core_${scala.version}</artifactId>
<version> ${spark.version}</version>
</dependency>
<dependency>
<groupId> org.apache.spark</groupId>
<artifactId> spark-sql_${scala.version}</artifactId>
<version> ${spark.version}</version>
</dependency>
<dependency>
<groupId> org.apache.spark</groupId>
<artifactId> spark-mllib_${scala.version}</artifactId>
<version> ${spark.version}</version>
</dependency>
<dependency>
<groupId> org.apache.spark</groupId>
<artifactId> spark-graphx_${scala.version}</artifactId>
<version> ${spark.version}</version>
</dependency>
<dependency>
<groupId> mysql</groupId>
<artifactId> mysql-connector-java</artifactId>
<version> 8.0.22</version>
</dependency>
</dependencies>
</project>
1. 离线批处理场景 需求一: 离线批处理场景
数据来源于葡萄牙银行电话调查的结果
原始数据中的关键列:
lname fname province city age dur cam Santos Antonio Lisbon Porto 20 261 1 Silva Maria Porto Braga 57 149 1 Pereira Jorge Braga Faro 37 226 1 Martins Ana Faro Coimbra 40 151 1 … … … … .. … .
需求一: 离线批处理场景
需求列表:
Spark 读取 CSV 文件 过滤掉通话时间少于50秒的数据 过滤掉年龄小于20的人 生成完整名字,使用逗号连接姓和名字段得出 生成详细地址字段,使用逗号连接州和城市字段得出 根据年龄将客户分组,并统计各组客户的客户总数、通话联系的平均次数、电话联系的平均时间 写入 mysql 数据库中 代码实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import org.apache.spark.sql.* ;
import org.apache.spark.sql.expressions.UserDefinedFunction ;
import org.apache.spark.sql.types.DataTypes ;
import java.util.Properties ;
import static org.apache.spark.sql.functions.* ;
public class spark01_helloworld {
public static void main ( String [] args ){
SparkSession sparkSession = new SparkSession . Builder ()
. appName ( "csv2db" )
. master ( "local[*]" )
. getOrCreate ();
Dataset < Row > bankTable = sparkSession . read ()
. format ( "csv" )
. option ( "header" , "true" )
. load ( "C:\\Users\\JiaRui\\Desktop\\bank-additional-full.csv" );
//TODO: 1.过滤掉通话时间少于50秒的数据
bankTable = bankTable . filter ( "duration >= 50" );
//TODO: 2.过滤掉年龄小于20
bankTable . createOrReplaceTempView ( "bank_table" );
bankTable = sparkSession . sql ( "select * from bank_table where age >= 20" );
//TODO: 3.生成完整名字,使用逗号连接姓和名字段得出
bankTable = bankTable . withColumn (
"name" ,
concat ( bankTable . col ( "lname" ), lit ( ", " ), bankTable . col ( "fname" ))
);
//TODO: 4.生成详细地址字段,使用逗号连接州和城市字段得出
UserDefinedFunction mergeUdf = udf (
( String x , String y ) -> x + ", " + y , DataTypes . StringType
);
sparkSession . udf (). register ( "merge" , mergeUdf );
bankTable = bankTable . withColumn ( "address" ,
call_udf ( "merge" , col ( "province" ), col ( "city" )));
//TODO: 5.根据年龄将客户分组,并统计各组客户的客户总数、通话联系的平均次数、电话联系的平均时间
Dataset < Row > reportTable = bankTable . groupBy ( "age" )
. agg ( count ( "age" ). name ( "total customers" ),
round ( avg ( "campaign" ), 2 ). name ( "avg_calls" ),
round ( avg ( "duration" ), 2 ). name ( "avg_dur" ))
. orderBy ( "age" );
String url = "jdbc:mysql://172.20.121.222:3306/test" ;
Properties properties = new Properties ();
properties . setProperty ( "dirver" , "com.mysql.cj.jdbc.Driver" );
properties . setProperty ( "user" , "root" );
properties . setProperty ( "password" , "123456" );
bankTable . write ()
. mode ( SaveMode . Overwrite )
. jdbc ( url , "bank_table" , properties );
reportTable . write ()
. mode ( SaveMode . Overwrite )
. jdbc ( url , "report_table" , properties );
sparkSession . stop ();
}
}
2. 实时流处理场景 需求二: 实时流处理场景
数据源是股票价格数据的实时数据,每分钟更新
样例数据:
key: 000002.SZ value: 1502126681, 22.71, 21.54, 22.32, 22.17
其中每个字段分别是股票代码、事件时间戳、现价、买入价、卖出价、成交均价。
需求列表:
读取 kafka 数据 记录每支股票每半个小时的最高价和最低价 写入 Hbase 数据库中 代码实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import org.apache.spark.sql.SparkSession ;
import org.apache.spark.sql.streaming.Trigger ;
import static org.apache.spark.sql.functions.* ;
public class spark02_streaming {
public static void main ( String [] args ) throws Exception {
SparkSession sparkSession = SparkSession
. builder ()
. master ( "local[*]" )
. appName ( "StockCCI" )
. getOrCreate ();
//分别设置window长度、容忍最大晚到时间和触发间隔
String windowDuration = "30 minutes" ;
String waterThreshold = "5 minutes" ;
String triggerTime = "1 minutes" ;
sparkSession . readStream ()
. format ( "kafka" )
. option ( "kafka.bootstrap.servers" , "broker1:port1,broker2:port2" )
. option ( "subcribe" , "stock" )
. load ()
. selectExpr (
"CAST(key AS STRING)" ,
"CAST(value AS STRING)" . split ( "," )[ 0 ],
"CAST(value AS STRING)" . split ( "," )[ 1 ],
"CAST(value AS STRING)" . split ( "," )[ 2 ],
"CAST(value AS STRING)" . split ( "," )[ 3 ],
"CAST(value AS STRING)" . split ( "," )[ 4 ])
. toDF ( "companyno" , "timestamp" , "price" , "bidprice" , "sellpirce" , "avgprice" )
. selectExpr (
"CAST(companyno AS STRING)" ,
"CAST(timestamp AS TIMESTAMP)" ,
"CAST(price AS DOUBLE)" ,
"CAST(bidprice AS DOUBLE)" ,
"CAST(sellpirce AS DOUBLE)" ,
"CAST(avgprice AS DOUBLE)"
)
//设定水位
. withWatermark ( "timestamp" , waterThreshold )
. groupBy ( "timestamp" ,
windowDuration ,
"companyno" )
//求出最高价和最低价
. agg (
max ( col ( "price" )). as ( "max_price" ),
min ( col ( "price" )). as ( "min_price" )
)
. writeStream ()
. outputMode ( "append" )
. trigger ( Trigger . ProcessingTime ( triggerTime ))
. format ( "HBaseWriter" )
. start ()
. awaitTermination ();
}
}
3. 机器学习场景 需求三: 机器学习场景
数据源是文本和打分标签
训练数据:
id text label 0L “a b c spark” 1.0 1L “b d” 0.0 2L “spark f g h” 1.0 3L “hadoop mapr” 0.0
测试数据:
id “text” 4L “spark i j” 5L “l m n” 6L “spark hadoop spark” 7L “apache hadoop”
需求三: 机器学习场景
需求列表:
将文档分词 将分词的结果转换为词向量 学习模型 预测(是否为垃圾邮件) 代码实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import org.apache.spark.ml.Pipeline ;
import org.apache.spark.ml.PipelineModel ;
import org.apache.spark.ml.PipelineStage ;
import org.apache.spark.ml.classification.LogisticRegression ;
import org.apache.spark.ml.feature.HashingTF ;
import org.apache.spark.ml.feature.Tokenizer ;
import org.apache.spark.sql.Dataset ;
import org.apache.spark.sql.Row ;
import org.apache.spark.sql.SparkSession ;
import org.apache.spark.sql.types.DataTypes ;
import org.apache.spark.sql.types.Metadata ;
import org.apache.spark.sql.types.StructField ;
import org.apache.spark.sql.types.StructType ;
public class spark03_mlpipeline {
public static void main ( String [] args ) {
SparkSession sparkSession = SparkSession
. builder ()
. master ( "local[*]" )
. appName ( "mlpipeline" )
. getOrCreate ();
Dataset < Row > training = sparkSession . read ()
. format ( "csv" )
. option ( "header" , "true" )
. schema ( new StructType ( new StructField [] {
new StructField ( "id" , DataTypes . LongType , false , Metadata . empty ()),
new StructField ( "text" , DataTypes . StringType , false , Metadata . empty ()),
new StructField ( "label" , DataTypes . DoubleType , false , Metadata . empty ())
}))
. load ( "C:\\Users\\JiaRui\\Desktop\\training.csv" );
Tokenizer tokenizer = new Tokenizer ()
. setInputCol ( "text" )
. setOutputCol ( "words" );
HashingTF hashingTF = new HashingTF ()
. setNumFeatures ( 1000 )
. setInputCol ( tokenizer . getOutputCol ())
. setOutputCol ( "features" );
LogisticRegression lr = new LogisticRegression ()
. setMaxIter ( 10 )
. setRegParam ( 0 . 001 );
Pipeline pipeline = new Pipeline ()
. setStages ( new PipelineStage []{ tokenizer , hashingTF , lr });
PipelineModel model = pipeline . fit ( training );
Dataset < Row > test = sparkSession . read ()
. format ( "csv" )
. option ( "header" , "true" )
. schema ( new StructType ( new StructField []{
new StructField ( "id" , DataTypes . LongType , false , Metadata . empty ()),
new StructField ( "text" , DataTypes . StringType , false , Metadata . empty ())
}))
. load ( "C:\\Users\\JiaRui\\Desktop\\test.csv" );
Dataset < Row > res = model . transform ( test )
. select ( "id" , "text" , "probability" , "prediction" );
res . show ();
}
}
4. 图计算场景 需求四: 图计算场景
需求:使用 GraphX 实现了 PageRank 得分。
代码实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import org.apache.spark.api.java.JavaRDD ;
import org.apache.spark.graphx.Graph ;
import org.apache.spark.graphx.lib.PageRank ;
import org.apache.spark.sql.SparkSession ;
import org.apache.spark.storage.StorageLevel ;
import scala.Tuple2 ;
import scala.reflect.ClassTag ;
public class spark04_grahpx {
public static void main ( String [] args ) {
SparkSession sparkSession = new SparkSession
. Builder ()
. appName ( "pageRank" )
. master ( "local[*]" )
. getOrCreate ();
sparkSession . sparkContext (). setLogLevel ( "WARN" );
// URL neighbor URL
// URL neighbor URL
// ...
JavaRDD < String > lines = sparkSession . read ()
. textFile ( "C:\\Users\\JiaRui\\Desktop\\pageRank.csv" )
. javaRDD ();
JavaRDD < Tuple2 < Object , Object >> edges = lines . map ( s -> {
String [] parts = s . split ( "\\s+" );
return new Tuple2 <>( parts [ 0 ], parts [ 1 ]);
});
Graph < Object , Object > graph = Graph . fromEdgeTuples ( edges . rdd (),
1 ,
null ,
StorageLevel . MEMORY_AND_DISK (),
StorageLevel . MEMORY_AND_DISK (),
ClassTag . Any ());
Graph < Object , Object > rankedGraph = PageRank . runUntilConvergence ( graph , 0 . 001 , 0 . 001 , ClassTag . Any (), ClassTag . Any ());
JavaRDD < Tuple2 < Object , Object >> pageRankValues = rankedGraph . vertices (). toJavaRDD ();
pageRankValues . foreach ( x -> System . out . println ( x ));
}
}
3.Spark 数据处理与分析场景
4.Spark 分布式计算框架和编程模型 MapReduce
5.Spark 弹性数据集 RDD 和结构化数据 Dataset
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//读取 json 文件
spark . read . json ( "examples/src/main/resources/people.json" );
//读取 csv 文件
spark . read . csv ( "examples/src/main/resources/people.csv" );
//读取 parquet 文件
spark . read . parquet ( "examples/src/main/resources/people.parquet" );
//读取 orc 文件
spark . read . orc ( "examples/src/main/resources/people.orc" );
//读取文本文件
spark . read . text ( "examples/src/main/resources/people.csv" );
//通过 JDBC 连接外部数据库生成
spark . read . format ( "jdbc" )
. option ( "url" , "jdbc:driver" )
. option ( "user" , "" )
. option ( "password" , "" )
. load ();
1
2
3
4
5
6
7
8
9
//条件查询
df . select ( "age" ). where ( "name is not null and age > 10" ). foreach ( println ( _ ));
//分组统计
df . groupBy ( "age" ). count (). foreach ( println ( _ ));
//连接操作:
//支持 inner/cross/outer/full/full_outer/left/left_outer/cross/left_semi/left_anti等
leftDF . join ( rightDF , leftDF ( "pid" ) == rightDF ( "fid" ), "left_outer" );
6.Spark抽象概念、架构与运行环境
| 基于某个运行环境初始化 SparkSession
1
2
3
4
5
6
7
SparkSession sparkSession = new SparkSession
. Builder ()
. appName ( "newApp" )
. master ( "yarn-client" )
. config ( "spark.executor.instances" , "10" )
. config ( "spark.executor.memory" , "10g" )
. getOrCreate ();