本文将详细介绍如何利用Python的Pandas库,将3500万家专利信息读取并存入MySQL数据库的过程。

一、使用Pandas读取专利数据

首先,我们可以使用Pandas读取存储专利信息的表格,获取表的属性名。通过.columns方法,可以获得这些属性名,并将其用于后续的数据库表创建。

import pandas as pd

# 读取专利数据
data = pd.read_csv('patents.csv')

# 获取属性名
columns = data.columns.tolist()
print(columns)

二、属性名映射与SQL表创建

接下来,我们将获取的中文属性名映射到相应的英文属性名,并使用ChatGPT生成创建MySQL表的SQL语句。

提示词如下:

请你结合下述专利表的属性列。请你首先输出中文属性名到英文属性名的字典对应关系。再生成创建专利表的sql语句:

['专利公开号', '专利名称', '专利类型', '专利摘要', '申请人', '专利申请号', '申请日', '申请公布日',
       '授权公布号', '授权公布日', '申请地址', '主权项', '发明人', '分类号', '主分类号', '代理机构', '分案原申请号',
       '优先权', '国际申请', '国际公布', '代理人', '省份或国家代码', '法律状态', '专利领域', '专利学科',
       '多次公布']

在这里插入图片描述

以下是ChatGPT生成的中文属性名与英文属性名的对应关系:

column_mapping = {
    '专利公开号': 'publication_number',
    '专利名称': 'patent_name',
    '专利类型': 'patent_type',
    '专利摘要': 'abstract',
    '申请人': 'applicant',
    '专利申请号': 'application_number',
    '申请日': 'application_date',
    '申请公布日': 'publication_date',
    '授权公布号': 'grant_publication_number',
    '授权公布日': 'grant_publication_date',
    '申请地址': 'application_address',
    '主权项': 'claims',
    '发明人': 'inventor',
    '分类号': 'classification_number',
    '主分类号': 'main_classification_number',
    '代理机构': 'agency',
    '分案原申请号': 'original_application_number',
    '优先权': 'priority',
    '国际申请': 'international_application',
    '国际公布': 'international_publication',
    '代理人': 'agent',
    '省份或国家代码': 'province_or_country_code',
    '法律状态': 'legal_status',
    '专利领域': 'patent_field',
    '专利学科': 'patent_subject',
    '多次公布': 'multiple_publications'
}

通过将上述映射关系转化为SQL语句,我们可以创建名为patents的数据库表。建表的sql语句也可让GPT生成,自己微调一下属性和VARCHAR的size。
以下是创建表的SQL语句。

CREATE TABLE patents (
    id INT AUTO_INCREMENT PRIMARY KEY,
    publication_number VARCHAR(31),
    patent_name VARCHAR(255),
    patent_type VARCHAR(31),
    abstract TEXT,
    applicant VARCHAR(255),
    application_number VARCHAR(31),
    application_date DATE,
    publication_date DATE,
    grant_publication_number VARCHAR(31),
    grant_publication_date DATE,
    application_address VARCHAR(255),
    claims MEDIUMTEXT,
    inventor VARCHAR(1024),
    classification_number VARCHAR(1024),
    main_classification_number VARCHAR(127),
    agency VARCHAR(255),
    original_application_number VARCHAR(255),
    priority VARCHAR(512),
    international_application VARCHAR(255),
    international_publication VARCHAR(255),
    agent VARCHAR(255),
    province_or_country_code INT,
    legal_status TEXT,
    patent_field VARCHAR(255),
    patent_subject VARCHAR(255),
    multiple_publications VARCHAR(63),
    province VARCHAR(32)
);

建表代码:

import os
import pymysql
import pandas as pd
from tqdm import tqdm

# 初始化 database, password

column_mapping = {
    '专利公开号': 'publication_number',
    '专利名称': 'patent_name',
    '专利类型': 'patent_type',
    '专利摘要': 'abstract',
    '申请人': 'applicant',
    '专利申请号': 'application_number',
    '申请日': 'application_date',
    '申请公布日': 'publication_date',
    '授权公布号': 'grant_publication_number',
    '授权公布日': 'grant_publication_date',
    '申请地址': 'application_address',
    '主权项': 'claims',
    '发明人': 'inventor',
    '分类号': 'classification_number',
    '主分类号': 'main_classification_number',
    '代理机构': 'agency',
    '分案原申请号': 'original_application_number',
    '优先权': 'priority',
    '国际申请': 'international_application',
    '国际公布': 'international_publication',
    '代理人': 'agent',
    '省份或国家代码': 'province_or_country_code',
    '法律状态': 'legal_status',
    '专利领域': 'patent_field',
    '专利学科': 'patent_subject',
    '多次公布': 'multiple_publications'
}


def trans2int(item):
    if pd.isna(item):
        return None
    try:
        return int(eval(item))
    except:
        return None


def parse_item(row, attr_name, args: list):
    
    assert isinstance(args, list)

    # 字符截断
    trunc_item = {
        "applicant" : 255,
        "application_address": 255,
        "inventor": 1024,
        "classification_number": 1024,
        "main_classification_number": 127,
        "agency" : 255,
        "original_application_number": 127,
        "priority" : 511,
        "international_application": 255,
        "international_publication": 255,
        "agent": 127,
        "patent_field" : 255,
        "patent_subject" : 255,
        "multiple_publications": 63,
    }

    ans = []
    for attr in attr_name:
        en_attr = column_mapping[attr]
        item = row.get(attr, None)
        
        if pd.isna(item):
            ans.append(None)
            continue
        
        if en_attr == "province_or_country_code":
            item = trans2int(item)
        # 异常字符捕获
        elif isinstance(item, str):
            item = item.strip()
            # 有 空串 和 -
            if len(item) in [0, 1]:
                item = None
        
        if en_attr in trunc_item.keys():
            if isinstance(item, str):
                item = item[:trunc_item[en_attr]]
                
        ans.append(item)
    ans += args
    
    return tuple(ans)


def insert_sql_by_csv(file):
    df = pd.read_csv(file, low_memory=False)
    
    province = os.path.basename(file).split('.')[0]
    # 连接到MySQL数据库
    connection = pymysql.connect(
        host="localhost",  # MySQL数据库的主机
        user="root",  # MySQL用户名
        password=password,  # MySQL密码
        database=database,  # 你要插入数据的数据库
        charset="utf8mb4",
        cursorclass=pymysql.cursors.DictCursor,
    )
    
    BATCH_SIZE = 1000

    tmp_list = list(column_mapping.values()) + ["province"]
    sql = f"""
            INSERT INTO patents (
                {", ".join(tmp_list)}
            ) VALUES (
                {', '.join(['%s'] * len(tmp_list))}
            );
            """.strip()
    
    # 插入数据到MySQL
    try:
        with connection.cursor() as cursor:
            batch = []
            for _, row in tqdm(df.iterrows(), total=len(df)):
    
                batch.append(
                        parse_item(row, list(column_mapping.keys()), [province])
                    )

                # 当批次达到 BATCH_SIZE 时执行批量插入
                if len(batch) >= BATCH_SIZE:
                    cursor.executemany(sql, batch)
                    batch = []  # 清空批次
                    
            # 插入剩余的未满批次的数据
            if batch:
                cursor.executemany(sql, batch)

            # 提交事务
            connection.commit()
            
    except Exception as e:
        print(f"插入数据时出现错误: {e}")
        connection.rollback()

    finally:
        connection.close()


if __name__ == "__main__":    
    folder = "存放专利数据的文件夹"
    print(f"文件总数: {len(os.listdir(folder))}")
    cnt = 0
    for file_name in os.listdir(folder):
        if file_name.endswith(".csv"):
            cnt += 1
            filename = os.path.join(folder, file_name)
            print(cnt, file_name)
            insert_sql_by_csv(filename)

    print("所有数据全部插入完成!")

    # mysql -h 127.0.0.1 -P 3306 -u root -p
    # nohup python build_patent_table.py > build_patent_table.log 2>&1 &
Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐