Spark SQL 数据操作指南
python复制下载# 配置JDBC连接属性'useSSL': 'false'# 禁用SSL连接# 配置数据库URL。
·
一、MySQL 数据库连接配置
1. 基础连接配置
python
复制
下载
# 配置JDBC连接属性
jdbc_props = {
'user': 'root',
'password': '123456',
'driver': 'com.mysql.cj.jdbc.Driver',
'useSSL': 'false' # 禁用SSL连接
}
# 配置数据库URL
jdbc_url = 'jdbc:mysql://localhost:3306/people?serverTimezone=UTC'
2. 高级连接选项(可选)
python
复制
下载
# 连接池配置示例
advanced_props = {
**jdbc_props,
'connectionPool': 'HikariCP',
'minimumIdle': '5',
'maximumPoolSize': '20'
}
二、数据读写操作
1. 从MySQL读取数据
python
复制
下载
# 读取整表
full_df = spark.read.jdbc(
url=jdbc_url,
table='people_info',
properties=jdbc_props
)
# 条件查询读取(推荐方式)
query_df = spark.read.jdbc(
url=jdbc_url,
table='(SELECT * FROM people_info WHERE height > 170) as filtered',
properties=jdbc_props
)
# 显示数据
query_df.show(5, truncate=False)
2. 数据写入MySQL
python
复制
下载
# 准备数据
csv_df = spark.read.csv(
'file:///data/people_info.csv',
header=True,
inferSchema=True
)
# 写入选项
write_options = {
'truncate': 'true', # 写入前清空表
'batchsize': '1000' # 批量写入大小
}
# 写入操作
csv_df.write.jdbc(
url=jdbc_url,
table='people_info',
mode='overwrite', # 可选项: append, overwrite, ignore, error
properties=jdbc_props,
**write_options
)
三、多格式数据读取
1. JSON格式读取
python
复制
下载
# 标准读取方式
json_df1 = spark.read.json('file:///data/people_info.json')
# 指定schema读取
from pyspark.sql.types import *
schema = StructType([
StructField("id", LongType()),
StructField("gender", StringType()),
StructField("height", IntegerType())
])
json_df2 = spark.read.schema(schema).json('file:///data/people_info.json')
# 多文件通配符读取
multi_json_df = spark.read.json('file:///data/people_*.json')
2. Parquet格式处理
python
复制
下载
# 基础读取
parquet_df = spark.read.parquet('file:///data/users.parquet')
# 分区表读取
partition_df = spark.read.parquet('file:///data/users/year=2023/month=07')
# 写入Parquet
df.write.parquet(
'file:///output/people.parquet',
mode='overwrite',
compression='snappy' # 压缩格式
)
四、DataFrame数据处理
1. 数据过滤
python
复制
下载
# SQL表达式过滤
filtered1 = df.filter("age > 15 AND score > 80")
# DataFrame API过滤
from pyspark.sql.functions import col
filtered2 = df.where((col('age') > 15) & (col('score') > 80)
# 复杂条件示例
complex_filter = df.where(
(col('gender') == 'F') &
((col('age').between(10, 20) | (col('score') > 90))
)
2. 数据排序
python
复制
下载
# 单列排序
sorted1 = df.orderBy('age', ascending=False)
# 多列排序
from pyspark.sql.functions import desc, asc
sorted2 = df.orderBy([desc('age'), asc('score')])
# 随机采样
sampled = df.orderBy(rand()).limit(100)
3. 数据聚合
python
复制
下载
# 基础聚合
agg1 = df.groupBy('gender').agg(
avg('height').alias('avg_height'),
max('age').alias('max_age')
)
# 多维度聚合
from pyspark.sql.functions import countDistinct
agg2 = df.groupBy('gender', (df.age > 18).alias('is_adult')).agg(
count('*').alias('count'),
countDistinct('user_id').alias('unique_users')
)
五、性能优化建议
-
读取优化:
python
复制
下载
# 分区读取大型表 spark.read.jdbc( url=jdbc_url, table='large_table', column='id', # 分区列 lowerBound=1, upperBound=100000, numPartitions=10, properties=jdbc_props ) -
缓存策略:
python
复制
下载
df.cache() # 内存缓存 df.persist(storageLevel=pyspark.StorageLevel.MEMORY_AND_DISK) # 内存+磁盘
-
写入优化:
python
复制
下载
# 控制并行度 df.coalesce(5).write.jdbc(...) # 批量写入配置 spark.conf.set("spark.sql.execution.arrow.enabled", "true")
六、常见问题解决方案
-
MySQL连接问题:
-
添加时区参数:
?serverTimezone=UTC -
确保MySQL驱动在classpath中
-
-
数据类型转换:
python
复制
下载
from pyspark.sql.functions import cast df = df.withColumn('age', col('age').cast(IntegerType())) -
空值处理:
python
复制
下载
df.na.fill({'age': 0, 'score': 50.0}) # 填充空值 df.na.drop() # 删除含空值的行
DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐



所有评论(0)