PyMySQL:Python 操作 MySQL 数据库全指南
PyMySQL 是 Python3 中用于连接和操作 MySQL 数据库的开源库,实现了 Python 数据库 API v2.0,是 MySQLdb 的替代方案,支持与 MySQL 数据库的交互(如增删改查、事务处理等)。
一、PyMySQL 基础:安装与核心流程
1. 什么是 PyMySQL?
PyMySQL 是 Python3 中用于连接和操作 MySQL 数据库的开源库,实现了 Python 数据库 API v2.0,是 MySQLdb 的替代方案,支持与 MySQL 数据库的交互(如增删改查、事务处理等)。
2. 安装方法
通过 pip 命令安装,推荐使用清华源加速:
bash
pip install pymysql -i https://pypi.tuna.tsinghua.edu.cn/simple
3. 核心使用流程
PyMySQL 操作 MySQL 需遵循固定步骤,确保资源正确使用和数据一致性:
- 创建数据库连接对象:通过
pymysql.connect()
建立 Python 与 MySQL 的连接,需指定连接参数(主机、端口、用户名、密码等)。 - 获取游标对象:通过连接对象的
cursor()
方法获取游标,用于执行 SQL 语句。 - 执行 SQL 语句:使用游标对象的
execute()
(单条)或executemany()
(多条)方法执行 SQL。 - 提交事务与关闭资源:通过
commit()
提交事务(修改操作必需),若失败则rollback()
回滚;最后关闭游标和连接释放资源。
二、核心对象与参数说明
1. 连接对象(connect)
通过pymysql.connect()
创建,关键参数如下:
参数名 | 说明 | 示例值 |
---|---|---|
host |
数据库主机 IP 或域名 | 127.0.0.1 (本地) |
port |
端口号(默认 3306) | 3306 |
user |
登录用户名 | root |
password |
登录密码 | 123456 |
db |
要连接的数据库名 | spider |
charset |
字符集(避免中文乱码) | utf8 |
cursorclass |
游标类型(默认元组,可选字典) | pymysql.cursors.DictCursor |
2. 游标对象(cursor)
常用方法:
方法 | 说明 |
---|---|
execute(sql) |
执行单条 SQL 语句 |
executemany(sql, vals) |
批量执行 SQL(vals 为数据集合) |
fetchone() |
获取结果集的下一行 |
fetchall() |
获取结果集的所有行 |
rowcount |
返回受影响的行数(如查询结果数) |
close() |
关闭游标 |
三、MySQL 表操作:增删改查实战
1. 创建表
通过CREATE TABLE
语句创建表,需指定字段名、类型、约束(如主键、非空)。
示例(创建学生表students
):
import pymysql
# 建立连接
db = pymysql.connect(host='127.0.0.1', user='root', password='123456', db='spider', port=3306)
cursor = db.cursor()
# 创建表SQL
sql = """
create table if not exists students(
id varchar(255) not null,
name varchar(255),
age int not null,
primary key(id)
)
"""
cursor.execute(sql) # 执行创建表
db.close() # 关闭连接
2. 插入数据
支持单条插入、动态字典插入和批量插入,需通过事务确保数据一致性。
- 动态字典插入(灵活适配字段变化):
import pymysql data = {'id': '20120002', 'name': 'Andy', 'age': 20} # 动态数据 table = 'students' keys = ','.join(data.keys()) # 字段名:id,name,age values = ','.join(['%s'] * len(data)) # 占位符:%s,%s,%s db = pymysql.connect(host='127.0.0.1', user='root', password='123456', db='spider') cursor = db.cursor() sql = f"insert into {table}({keys}) values({values})" # 拼接SQL try: cursor.execute(sql, tuple(data.values())) # 执行插入 db.commit() # 提交事务 print("插入成功") except: db.rollback() # 失败回滚 finally: cursor.close() db.close()
3. 更新数据
通过UPDATE
语句修改数据,同样需事务支持。
示例(更新Bob
的年龄为 23):
import pymysql
db = pymysql.connect(host='127.0.0.1', user='root', password='123456', db='spider')
cursor = db.cursor()
sql = "update students set age=%s where name=%s" # 占位符传参
try:
cursor.execute(sql, (23, 'Bob')) # 执行更新
db.commit()
except:
db.rollback()
finally:
db.close()
4. 删除数据
通过DELETE
语句删除数据,注意添加条件避免误删。
示例(删除年龄 > 22 的记录):
import pymysql
table = 'students'
condition = 'age > 22'
db = pymysql.connect(host='127.0.0.1', user='root', password='123456', db='spider')
cursor = db.cursor()
sql = f"delete from {table} where {condition}"
try:
cursor.execute(sql)
db.commit()
except:
db.rollback()
finally:
db.close()
5. 查询数据
通过SELECT
语句查询,使用fetchone()
/fetchall()
获取结果。
示例(查询年龄≥20 的学生):
import pymysql
db = pymysql.connect(host='127.0.0.1', user='root', password='123456', db='spider')
cursor = db.cursor()
sql = "select * from students where age>=20"
try:
cursor.execute(sql)
print("总条数:", cursor.rowcount)
row = cursor.fetchone() # 获取第一条
while row:
print("记录:", row)
row = cursor.fetchone() # 逐条获取
except:
print("查询失败")
finally:
db.close()
四、实战案例:爬虫数据存储
案例 1:豆瓣图书爬取与存储
任务:爬取豆瓣新书速递信息(书名、评分、作者、出版社、出版时间),存储到bookinfo
表
步骤:
- 爬取页面:通过
requests
获取网页内容。 - 创建表:定义
bookinfo
表结构(id 自增主键,bookname、score 等字段)。 - 解析数据:用 XPath 提取字段(如书名
//div[@id='content']//li//h2/a/text()
)。 - 批量存储:使用
executemany()
批量插入数据,确保效率。 -
import requests import pymysql from lxml import etree def get_html(url): # 获取网页内容 try: # 添加请求头模拟浏览器访问 headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", "Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2" } r = requests.get(url, headers=headers) # 发送请求 # 设置返回内容的字符集编码 r.encoding = r.apparent_encoding r.raise_for_status() # 非正常返回抛出异常 return r.text # 返回网页的文本内容 except Exception as error: print(f"获取网页内容错误: {error}") return None def parser(html): # 解析函数 doc = etree.HTML(html) # html转换为可解析对象 out_list = [] # 调整XPath路径以匹配豆瓣新书页面结构 for row in doc.xpath('//*[@id="content"]/div[2]/div[1]/ul/li'): try: # 书名 - 修正XPath,添加/text()获取文本,处理可能的空值 title_elements = row.xpath('./div[2]/h2/a/text()') title = title_elements[0].strip() if title_elements else "未知书名" # 评分 - 修正XPath,添加/text()获取文本 score_elements = row.xpath('./div[2]/p[2]/span[2]/text()') score = score_elements[0].strip() if score_elements else "无评分" # 作者、出版社、出版日期(通过/分隔) info_elements = row.xpath('./div[2]/p[1]/text()') info_text = info_elements[0].strip() if info_elements else "" info = [item.strip() for item in info_text.split("/")] # 确保信息列表长度为3,不足则补空 while len(info) < 3: info.append("") out_list.append([title, score, info[0], info[1], info[2]]) except Exception as e: print(f"解析单条记录出错: {e}") continue return out_list def save_to_mysql(items): # 检查是否有数据 if not items: print("没有数据可保存到MySQL") return try: # 添加charset参数解决中文乱码问题 connect = pymysql.connect( host="127.0.0.1", user="root", passwd="123456", database="ppt", charset="utf8mb4" ) cursor = connect.cursor() # 确保表结构存在 cursor.execute(""" CREATE TABLE IF NOT EXISTS book ( id INT AUTO_INCREMENT PRIMARY KEY, bookname VARCHAR(255) NOT NULL, score VARCHAR(50), autor VARCHAR(255), press VARCHAR(255), pubdate VARCHAR(100) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 """) sql = "insert into book (bookname, score, autor, press, pubdate) values (%s, %s, %s, %s, %s)" # 使用executemany提高批量插入效率 cursor.executemany(sql, items) connect.commit() print(f"成功保存 {len(items)} 条记录到MySQL数据库") except Exception as err: connect.rollback() print(f"MySQL操作错误: {err}") finally: # 确保资源正确关闭 if 'cursor' in locals(): cursor.close() if 'connect' in locals() and connect.open: connect.close() if __name__ == "__main__": url = "https://book.douban.com/latest?icn=index-latestbook-all" print(f"开始爬取 {url} 的图书信息...") html = get_html(url) if not html: print("爬取网页失败,程序结束") else: book_list = parser(html) print(f"解析完成,共获取 {len(book_list)} 条有效图书信息") if book_list: save_to_mysql(book_list) else: print("未解析到有效图书信息")
案例 2:安居客二手房信息爬取与存储
任务:爬取重庆二手房信息(卖点、楼盘、地址、户型等),存储到houseinfo
表。
关键代码:
import requests
import pymysql
from lxml import etree
def get_html(url, time=30): # get请求通用函数
try:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.132 Safari/537.36"
}
r = requests.get(url, headers=headers, timeout=time)
r.encoding = r.apparent_encoding # 设置字符集
r.raise_for_status() # 检查请求状态
return r.text # 返回网页文本
except Exception as error:
print(error)
return None
out_list = []
def parser(html):
doc = etree.HTML(html)
for row in doc.xpath("//div[@class='property']"):
title = row.xpath("a//h3/text()")[0].strip() # 卖点标题
# 房屋信息(户型、面积等)
info = row.xpath("string(./a//*[@class='property-content-info'])")
info = info.replace(" ", "").replace("\n\n", "\n").split("\n") # 清洗数据
# 楼盘、地址、价格
house = row.xpath("a//*[@class='property-content-info-comm-name']/text()")[0]
address = row.xpath("string(./a//*[@class='property-content-info-comm-address'])").strip()
price = row.xpath("a//*[@class='property-price-average']/text()")[0].replace(" ", "")
# 组合数据(卖点、楼盘、地址、户型、面积、楼层、建造年代、价格)
row_list = [
title,
house.strip(),
address,
info[0] if len(info)>=1 else "",
info[1] if len(info)>=2 else "",
info[3] if len(info)>=4 else "",
info[4] if len(info)>=5 else "",
price
]
out_list.append(row_list)
# 处理下一页
ele_next = doc.xpath("//*[@class='next next-active']/@href")
if ele_next:
new_url = ele_next[0]
if new_url.startswith("?page="):
new_url = "http://www.bspider.top/anjuke/" + new_url
parser(get_html(new_url)) # 递归解析下一页
def save_mysql(sql, val, **dbinfo):
try:
connect = pymysql.connect(** dbinfo) # 创建连接(动态参数)
cursor = connect.cursor()
cursor.executemany(sql, val) # 执行多条插入
connect.commit() # 提交事务
except Exception as err:
connect.rollback() # 回滚
print(err)
finally:
cursor.close()
connect.close()
if __name__ == "__main__":
url = "http://www.bspider.top/anjuke/"
html = get_html(url) # 获取首页HTML
parser(html) # 解析数据(含多页)
# 数据库配置
dbinfo = {
"host": "127.0.0.1",
"user": "root",
"password": "123456",
"db": "ppt",
"charset": "utf8",
"cursorclass": pymysql.cursors.DictCursor
}
# 插入SQL
sql = "INSERT into houseinfo(Title, House, Address, Struct, Area, Floor, MakeTime, price) VALUES(%s, %s, %s, %s, %s, %s, %s, %s)"
save_mysql(sql, out_list, **dbinfo) # 保存数据
- 批量存储函数:封装
save_mysql()
,接收 SQL、数据集合和数据库参数,内部调用executemany()
。 - 分页爬取:通过解析下一页链接递归爬取多页数据,统一存储。

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐
所有评论(0)