一、MySQL 数据库连接配置

1. 基础连接配置

python

复制

下载

# 配置JDBC连接属性
jdbc_props = {
    'user': 'root',
    'password': '123456',
    'driver': 'com.mysql.cj.jdbc.Driver',
    'useSSL': 'false'  # 禁用SSL连接
}

# 配置数据库URL
jdbc_url = 'jdbc:mysql://localhost:3306/people?serverTimezone=UTC'

2. 高级连接选项(可选)

python

复制

下载

# 连接池配置示例
advanced_props = {
    **jdbc_props,
    'connectionPool': 'HikariCP',
    'minimumIdle': '5',
    'maximumPoolSize': '20'
}

二、数据读写操作

1. 从MySQL读取数据

python

复制

下载

# 读取整表
full_df = spark.read.jdbc(
    url=jdbc_url,
    table='people_info',
    properties=jdbc_props
)

# 条件查询读取(推荐方式)
query_df = spark.read.jdbc(
    url=jdbc_url,
    table='(SELECT * FROM people_info WHERE height > 170) as filtered',
    properties=jdbc_props
)

# 显示数据
query_df.show(5, truncate=False)

2. 数据写入MySQL

python

复制

下载

# 准备数据
csv_df = spark.read.csv(
    'file:///data/people_info.csv',
    header=True,
    inferSchema=True
)

# 写入选项
write_options = {
    'truncate': 'true',  # 写入前清空表
    'batchsize': '1000'  # 批量写入大小
}

# 写入操作
csv_df.write.jdbc(
    url=jdbc_url,
    table='people_info',
    mode='overwrite',  # 可选项: append, overwrite, ignore, error
    properties=jdbc_props,
    **write_options
)

三、多格式数据读取

1. JSON格式读取

python

复制

下载

# 标准读取方式
json_df1 = spark.read.json('file:///data/people_info.json')

# 指定schema读取
from pyspark.sql.types import *
schema = StructType([
    StructField("id", LongType()),
    StructField("gender", StringType()),
    StructField("height", IntegerType())
])
json_df2 = spark.read.schema(schema).json('file:///data/people_info.json')

# 多文件通配符读取
multi_json_df = spark.read.json('file:///data/people_*.json')

2. Parquet格式处理

python

复制

下载

# 基础读取
parquet_df = spark.read.parquet('file:///data/users.parquet')

# 分区表读取
partition_df = spark.read.parquet('file:///data/users/year=2023/month=07')

# 写入Parquet
df.write.parquet(
    'file:///output/people.parquet',
    mode='overwrite',
    compression='snappy'  # 压缩格式
)

四、DataFrame数据处理

1. 数据过滤

python

复制

下载

# SQL表达式过滤
filtered1 = df.filter("age > 15 AND score > 80")

# DataFrame API过滤
from pyspark.sql.functions import col
filtered2 = df.where((col('age') > 15) & (col('score') > 80)

# 复杂条件示例
complex_filter = df.where(
    (col('gender') == 'F') & 
    ((col('age').between(10, 20) | (col('score') > 90))
)

2. 数据排序

python

复制

下载

# 单列排序
sorted1 = df.orderBy('age', ascending=False)

# 多列排序
from pyspark.sql.functions import desc, asc
sorted2 = df.orderBy([desc('age'), asc('score')])

# 随机采样
sampled = df.orderBy(rand()).limit(100)

3. 数据聚合

python

复制

下载

# 基础聚合
agg1 = df.groupBy('gender').agg(
    avg('height').alias('avg_height'),
    max('age').alias('max_age')
)

# 多维度聚合
from pyspark.sql.functions import countDistinct
agg2 = df.groupBy('gender', (df.age > 18).alias('is_adult')).agg(
    count('*').alias('count'),
    countDistinct('user_id').alias('unique_users')
)

五、性能优化建议

  1. 读取优化

    python

    复制

    下载

    # 分区读取大型表
    spark.read.jdbc(
        url=jdbc_url,
        table='large_table',
        column='id',  # 分区列
        lowerBound=1,
        upperBound=100000,
        numPartitions=10,
        properties=jdbc_props
    )
  2. 缓存策略

    python

    复制

    下载

    df.cache()  # 内存缓存
    df.persist(storageLevel=pyspark.StorageLevel.MEMORY_AND_DISK)  # 内存+磁盘
  3. 写入优化

    python

    复制

    下载

    # 控制并行度
    df.coalesce(5).write.jdbc(...)
    
    # 批量写入配置
    spark.conf.set("spark.sql.execution.arrow.enabled", "true")

六、常见问题解决方案

  1. MySQL连接问题

    • 添加时区参数:?serverTimezone=UTC

    • 确保MySQL驱动在classpath中

  2. 数据类型转换

    python

    复制

    下载

    from pyspark.sql.functions import cast
    df = df.withColumn('age', col('age').cast(IntegerType()))
  3. 空值处理

    python

    复制

    下载

    df.na.fill({'age': 0, 'score': 50.0})  # 填充空值
    df.na.drop()  # 删除含空值的行
Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐