一、PyMySQL 基础:安装与核心流程

1. 什么是 PyMySQL?

PyMySQL 是 Python3 中用于连接和操作 MySQL 数据库的开源库,实现了 Python 数据库 API v2.0,是 MySQLdb 的替代方案,支持与 MySQL 数据库的交互(如增删改查、事务处理等)。

2. 安装方法

通过 pip 命令安装,推荐使用清华源加速:

bash

pip install pymysql -i https://pypi.tuna.tsinghua.edu.cn/simple

3. 核心使用流程

PyMySQL 操作 MySQL 需遵循固定步骤,确保资源正确使用和数据一致性:

  1. 创建数据库连接对象:通过pymysql.connect()建立 Python 与 MySQL 的连接,需指定连接参数(主机、端口、用户名、密码等)。
  2. 获取游标对象:通过连接对象的cursor()方法获取游标,用于执行 SQL 语句。
  3. 执行 SQL 语句:使用游标对象的execute()(单条)或executemany()(多条)方法执行 SQL。
  4. 提交事务与关闭资源:通过commit()提交事务(修改操作必需),若失败则rollback()回滚;最后关闭游标和连接释放资源。

二、核心对象与参数说明

1. 连接对象(connect)

通过pymysql.connect()创建,关键参数如下:

参数名 说明 示例值
host 数据库主机 IP 或域名 127.0.0.1(本地)
port 端口号(默认 3306) 3306
user 登录用户名 root
password 登录密码 123456
db 要连接的数据库名 spider
charset 字符集(避免中文乱码) utf8
cursorclass 游标类型(默认元组,可选字典) pymysql.cursors.DictCursor

2. 游标对象(cursor)

常用方法:

方法 说明
execute(sql) 执行单条 SQL 语句
executemany(sql, vals) 批量执行 SQL(vals为数据集合)
fetchone() 获取结果集的下一行
fetchall() 获取结果集的所有行
rowcount 返回受影响的行数(如查询结果数)
close() 关闭游标

三、MySQL 表操作:增删改查实战

1. 创建表

通过CREATE TABLE语句创建表,需指定字段名、类型、约束(如主键、非空)。
示例(创建学生表students):

import pymysql
# 建立连接
db = pymysql.connect(host='127.0.0.1', user='root', password='123456', db='spider', port=3306)
cursor = db.cursor()
# 创建表SQL
sql = """
create table if not exists students(
    id varchar(255) not null, 
    name varchar(255), 
    age int not null, 
    primary key(id)
)
"""
cursor.execute(sql)  # 执行创建表
db.close()  # 关闭连接

2. 插入数据

支持单条插入、动态字典插入和批量插入,需通过事务确保数据一致性。

  • 动态字典插入(灵活适配字段变化):

    import pymysql
    data = {'id': '20120002', 'name': 'Andy', 'age': 20}  # 动态数据
    table = 'students'
    keys = ','.join(data.keys())  # 字段名:id,name,age
    values = ','.join(['%s'] * len(data))  # 占位符:%s,%s,%s
    db = pymysql.connect(host='127.0.0.1', user='root', password='123456', db='spider')
    cursor = db.cursor()
    sql = f"insert into {table}({keys}) values({values})"  # 拼接SQL
    try:
        cursor.execute(sql, tuple(data.values()))  # 执行插入
        db.commit()  # 提交事务
        print("插入成功")
    except:
        db.rollback()  # 失败回滚
    finally:
        cursor.close()
        db.close()
    

3. 更新数据

通过UPDATE语句修改数据,同样需事务支持。
示例(更新Bob的年龄为 23):

import pymysql
db = pymysql.connect(host='127.0.0.1', user='root', password='123456', db='spider')
cursor = db.cursor()
sql = "update students set age=%s where name=%s"  # 占位符传参
try:
    cursor.execute(sql, (23, 'Bob'))  # 执行更新
    db.commit()
except:
    db.rollback()
finally:
    db.close()

4. 删除数据

通过DELETE语句删除数据,注意添加条件避免误删。
示例(删除年龄 > 22 的记录):

import pymysql
table = 'students'
condition = 'age > 22'
db = pymysql.connect(host='127.0.0.1', user='root', password='123456', db='spider')
cursor = db.cursor()
sql = f"delete from {table} where {condition}"
try:
    cursor.execute(sql)
    db.commit()
except:
    db.rollback()
finally:
    db.close()

5. 查询数据

通过SELECT语句查询,使用fetchone()/fetchall()获取结果。
示例(查询年龄≥20 的学生):

import pymysql
db = pymysql.connect(host='127.0.0.1', user='root', password='123456', db='spider')
cursor = db.cursor()
sql = "select * from students where age>=20"
try:
    cursor.execute(sql)
    print("总条数:", cursor.rowcount)
    row = cursor.fetchone()  # 获取第一条
    while row:
        print("记录:", row)
        row = cursor.fetchone()  # 逐条获取
except:
    print("查询失败")
finally:
    db.close()

四、实战案例:爬虫数据存储

案例 1:豆瓣图书爬取与存储

任务:爬取豆瓣新书速递信息(书名、评分、作者、出版社、出版时间),存储到bookinfo

步骤

  1. 爬取页面:通过requests获取网页内容。
  2. 创建表:定义bookinfo表结构(id 自增主键,bookname、score 等字段)。
  3. 解析数据:用 XPath 提取字段(如书名//div[@id='content']//li//h2/a/text())。
  4. 批量存储:使用executemany()批量插入数据,确保效率。
  5. import requests
    import pymysql
    from lxml import etree
    
    
    def get_html(url):  # 获取网页内容
        try:
            # 添加请求头模拟浏览器访问
            headers = {
                "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
                "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
                "Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2"
            }
            r = requests.get(url, headers=headers)  # 发送请求
            # 设置返回内容的字符集编码
            r.encoding = r.apparent_encoding
            r.raise_for_status()  # 非正常返回抛出异常
            return r.text  # 返回网页的文本内容
        except Exception as error:
            print(f"获取网页内容错误: {error}")
            return None
    
    
    def parser(html):  # 解析函数
        doc = etree.HTML(html)  # html转换为可解析对象
        out_list = []
        # 调整XPath路径以匹配豆瓣新书页面结构
        for row in doc.xpath('//*[@id="content"]/div[2]/div[1]/ul/li'):
            try:
                # 书名 - 修正XPath,添加/text()获取文本,处理可能的空值
                title_elements = row.xpath('./div[2]/h2/a/text()')
                title = title_elements[0].strip() if title_elements else "未知书名"
    
                # 评分 - 修正XPath,添加/text()获取文本
                score_elements = row.xpath('./div[2]/p[2]/span[2]/text()')
                score = score_elements[0].strip() if score_elements else "无评分"
    
                # 作者、出版社、出版日期(通过/分隔)
                info_elements = row.xpath('./div[2]/p[1]/text()')
                info_text = info_elements[0].strip() if info_elements else ""
                info = [item.strip() for item in info_text.split("/")]
    
                # 确保信息列表长度为3,不足则补空
                while len(info) < 3:
                    info.append("")
    
                out_list.append([title, score, info[0], info[1], info[2]])
            except Exception as e:
                print(f"解析单条记录出错: {e}")
                continue
        return out_list
    
    
    def save_to_mysql(items):
        # 检查是否有数据
        if not items:
            print("没有数据可保存到MySQL")
            return
    
        try:
            # 添加charset参数解决中文乱码问题
            connect = pymysql.connect(
                host="127.0.0.1",
                user="root",
                passwd="123456",
                database="ppt",
                charset="utf8mb4"
            )
            cursor = connect.cursor()
    
            # 确保表结构存在
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS book (
                    id INT AUTO_INCREMENT PRIMARY KEY,
                    bookname VARCHAR(255) NOT NULL,
                    score VARCHAR(50),
                    autor VARCHAR(255),
                    press VARCHAR(255),
                    pubdate VARCHAR(100)
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
            """)
    
            sql = "insert into book (bookname, score, autor, press, pubdate) values (%s, %s, %s, %s, %s)"
            # 使用executemany提高批量插入效率
            cursor.executemany(sql, items)
            connect.commit()
            print(f"成功保存 {len(items)} 条记录到MySQL数据库")
        except Exception as err:
            connect.rollback()
            print(f"MySQL操作错误: {err}")
        finally:
            # 确保资源正确关闭
            if 'cursor' in locals():
                cursor.close()
            if 'connect' in locals() and connect.open:
                connect.close()
    
    
    if __name__ == "__main__":
        url = "https://book.douban.com/latest?icn=index-latestbook-all"
        print(f"开始爬取 {url} 的图书信息...")
        html = get_html(url)
        if not html:
            print("爬取网页失败,程序结束")
        else:
            book_list = parser(html)
            print(f"解析完成,共获取 {len(book_list)} 条有效图书信息")
            if book_list:
                save_to_mysql(book_list)
            else:
                print("未解析到有效图书信息")
    

案例 2:安居客二手房信息爬取与存储

任务:爬取重庆二手房信息(卖点、楼盘、地址、户型等),存储到houseinfo表。

关键代码

import requests
import pymysql
from lxml import etree
def get_html(url, time=30):  # get请求通用函数
    try:
        headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.132 Safari/537.36"
        }
        r = requests.get(url, headers=headers, timeout=time)
        r.encoding = r.apparent_encoding  # 设置字符集
        r.raise_for_status()  # 检查请求状态
        return r.text  # 返回网页文本
    except Exception as error:
        print(error)
        return None

out_list = []
def parser(html):
    doc = etree.HTML(html)
    for row in doc.xpath("//div[@class='property']"):
        title = row.xpath("a//h3/text()")[0].strip()  # 卖点标题
        # 房屋信息(户型、面积等)
        info = row.xpath("string(./a//*[@class='property-content-info'])")
        info = info.replace(" ", "").replace("\n\n", "\n").split("\n")  # 清洗数据
        # 楼盘、地址、价格
        house = row.xpath("a//*[@class='property-content-info-comm-name']/text()")[0]
        address = row.xpath("string(./a//*[@class='property-content-info-comm-address'])").strip()
        price = row.xpath("a//*[@class='property-price-average']/text()")[0].replace(" ", "")
        # 组合数据(卖点、楼盘、地址、户型、面积、楼层、建造年代、价格)
        row_list = [
            title,
            house.strip(),
            address,
            info[0] if len(info)>=1 else "",
            info[1] if len(info)>=2 else "",
            info[3] if len(info)>=4 else "",
            info[4] if len(info)>=5 else "",
            price
        ]
        out_list.append(row_list)
    # 处理下一页
    ele_next = doc.xpath("//*[@class='next next-active']/@href")
    if ele_next:
        new_url = ele_next[0]
        if new_url.startswith("?page="):
            new_url = "http://www.bspider.top/anjuke/" + new_url
        parser(get_html(new_url))  # 递归解析下一页

def save_mysql(sql, val, **dbinfo):
    try:
        connect = pymysql.connect(** dbinfo)  # 创建连接(动态参数)
        cursor = connect.cursor()
        cursor.executemany(sql, val)  # 执行多条插入
        connect.commit()  # 提交事务
    except Exception as err:
        connect.rollback()  # 回滚
        print(err)
    finally:
        cursor.close()
        connect.close()

if __name__ == "__main__":
    url = "http://www.bspider.top/anjuke/"
    html = get_html(url)  # 获取首页HTML
    parser(html)  # 解析数据(含多页)
    # 数据库配置
    dbinfo = {
        "host": "127.0.0.1",
        "user": "root",
        "password": "123456",
        "db": "ppt",
        "charset": "utf8",
        "cursorclass": pymysql.cursors.DictCursor
    }
    # 插入SQL
    sql = "INSERT into houseinfo(Title, House, Address, Struct, Area, Floor, MakeTime, price) VALUES(%s, %s, %s, %s, %s, %s, %s, %s)"
    save_mysql(sql, out_list, **dbinfo)  # 保存数据
  • 批量存储函数:封装save_mysql(),接收 SQL、数据集合和数据库参数,内部调用executemany()
  • 分页爬取:通过解析下一页链接递归爬取多页数据,统一存储。
Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐