PHP信创=PHP项目无缝适配达梦数据库改造实战详解
·
PHP项目无缝适配达梦数据库改造实战详解
一、先搞清楚为什么要改造
达梦数据库(DM8) 是国产数据库,信创项目要求用它替换MySQL。
问题在于:你的PHP项目原来是对着MySQL写的,达梦虽然兼容SQL标准,但有很多细节不一样,直接换会报错。
改造目标:
原来:PHP → PDO/MySQLi → MySQL
改后:PHP → PDO → 达梦DM8
要求:业务代码改动最小,最好不改
达梦和MySQL的主要差异:
差异点 MySQL写法 达梦写法
─────────────────────────────────────────────────────
自增主键 AUTO_INCREMENT IDENTITY(1,1)
字符串函数 IFNULL() NVL()
日期函数 NOW() SYSDATE / CURRENT_TIMESTAMP
分页 LIMIT 10,20 LIMIT 10 OFFSET 20 或 rownum
反引号 `table_name` "table_name" 或直接不用
布尔类型 TINYINT(1)/BOOLEAN BIT 或 TINYINT
JSON类型 JSON TEXT/CLOB(DM8部分版本支持)
表名大小写 不敏感 默认大写敏感
空字符串 '' 达梦区分NULL和''
---
二、改造方案选择
方案一:直接改SQL(最暴力,改动大,不推荐)
方案二:数据库抽象层(推荐,业务代码基本不动)
方案三:中间件代理(运维层面解决,代码完全不动)
本文用方案二:封装一个兼容层,让上层代码感知不到数据库换了。
架构图:
业务代码(Model/Controller)
↓ 调用
数据库抽象层(DB类/ORM)← 我们在这里做兼容
↓ 转换SQL
达梦DM8
---
三、安装达梦PHP扩展
3.1 确认达梦版本和PHP版本
# 查看达梦版本
/opt/dmdbms/bin/disql SYSDBA/SYSDBA001 << 'EOF'
SELECT * FROM V$VERSION;
EOF
# 查看PHP版本
php -v
# 查看PHP扩展目录
php -i | grep extension_dir
# 输出类似:extension_dir => /usr/lib/php/20210902
3.2 安装达梦PDO扩展
达梦官方提供了PHP扩展,在安装包里:
# 达梦安装目录下找PHP扩展
find /opt/dmdbms -name "*.so" | grep -i php
# 或者
ls /opt/dmdbms/drivers/php/
# 通常有这几个文件:
# php_pdo_dm.so → PDO驱动(推荐用这个)
# php_dm.so → 原生DM扩展
# 复制到PHP扩展目录
PHP_EXT_DIR=$(php -i | grep "^extension_dir" | awk '{print $3}')
echo "PHP扩展目录: $PHP_EXT_DIR"
cp /opt/dmdbms/drivers/php/php8.1/php_pdo_dm.so $PHP_EXT_DIR/
# 注意:要选对PHP版本对应的目录,php7.4/php8.1/php8.2
# 设置权限
chmod 755 $PHP_EXT_DIR/php_pdo_dm.so
3.3 配置php.ini加载扩展
# 找到php.ini位置
php --ini | grep "Loaded Configuration"
# 添加扩展
echo "extension=php_pdo_dm.so" >> /etc/php/8.1/fpm/php.ini
echo "extension=php_pdo_dm.so" >> /etc/php/8.1/cli/php.ini
# 重启PHP-FPM
systemctl restart php8.1-fpm
# 验证扩展加载成功
php -m | grep -i dm
# 应该看到:pdo_dm
php -r "print_r(PDO::getAvailableDrivers());"
# 应该看到数组里有 dm
3.4 测试连接
<?php
// test_dm_connect.php
try {
$dsn = 'dm:host=127.0.0.1;port=5236;dbname=DAMENG';
$pdo = new PDO($dsn, 'SYSDBA', 'SYSDBA001', [
PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_ASSOC,
]);
$stmt = $pdo->query("SELECT * FROM V\$VERSION");
$row = $stmt->fetch();
echo "连接成功!达梦版本:\n";
print_r($row);
} catch (PDOException $e) {
echo "连接失败:" . $e->getMessage() . "\n";
}
php test_dm_connect.php
---
四、核心兼容层代码
这是整个改造的核心,封装一个 DmPDO 类,自动处理SQL差异。
4.1 达梦PDO兼容类
app/Database/DmPDO.php:
<?php
namespace App\Database;
use PDO;
use PDOStatement;
/**
* 达梦数据库PDO兼容层
* 自动转换MySQL语法到达梦语法
*/
class DmPDO extends PDO
{
private bool $isDm;
private string $dbType;
public function __construct(string $dsn, string $username, string $password, array $options = [])
{
// 判断是否是达梦数据库
$this->isDm = str_starts_with($dsn, 'dm:');
$this->dbType = $this->isDm ? 'dm' : 'mysql';
$defaultOptions = [
PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_ASSOC,
PDO::ATTR_EMULATE_PREPARES => false,
];
parent::__construct($dsn, $username, $password, array_merge($defaultOptions, $options));
// 达梦连接初始化
if ($this->isDm) {
$this->initDmConnection();
}
}
private function initDmConnection(): void
{
// 设置字符集
$this->exec("SET NAMES 'UTF8'");
// 设置日期格式(兼容MySQL格式)
$this->exec("ALTER SESSION SET NLS_DATE_FORMAT='YYYY-MM-DD HH24:MI:SS'");
// 设置时区
$this->exec("ALTER SESSION SET TIME_ZONE='+08:00'");
}
/**
* 执行SQL前自动转换语法
*/
public function query(string $query, ?int $fetchMode = null, mixed ...$fetchModeArgs): PDOStatement|false
{
$query = $this->convertSql($query);
if ($fetchMode !== null) {
return parent::query($query, $fetchMode, ...$fetchModeArgs);
}
return parent::query($query);
}
public function prepare(string $query, array $options = []): PDOStatement|false
{
$query = $this->convertSql($query);
return parent::prepare($query, $options);
}
public function exec(string $statement): int|false
{
$statement = $this->convertSql($statement);
return parent::exec($statement);
}
/**
* SQL语法转换核心方法
*/
public function convertSql(string $sql): string
{
if (!$this->isDm) {
return $sql; // MySQL不需要转换
}
$sql = $this->convertBackticks($sql); // 反引号转双引号
$sql = $this->convertLimit($sql); // LIMIT语法
$sql = $this->convertFunctions($sql); // 函数转换
$sql = $this->convertDataTypes($sql); // 数据类型
$sql = $this->convertAutoIncrement($sql); // 自增
$sql = $this->convertBooleans($sql); // 布尔值
$sql = $this->convertGroupConcat($sql); // GROUP_CONCAT
$sql = $this->convertInsertIgnore($sql); // INSERT IGNORE
$sql = $this->convertOnDuplicateKey($sql); // ON DUPLICATE KEY
return $sql;
}
/**
* 反引号转双引号
* MySQL: `table_name` → 达梦: "table_name"
*/
private function convertBackticks(string $sql): string
{
return str_replace('`', '"', $sql);
}
/**
* LIMIT语法转换
* MySQL: LIMIT 20, 10 → 达梦: LIMIT 10 OFFSET 20
* MySQL: LIMIT 10 → 达梦: LIMIT 10(这个一样,不用转)
*/
private function convertLimit(string $sql): string
{
// 匹配 LIMIT offset, count 格式
return preg_replace_callback(
'/\bLIMIT\s+(\d+)\s*,\s*(\d+)\b/i',
function ($matches) {
$offset = (int)$matches[1];
$count = (int)$matches[2];
return "LIMIT {$count} OFFSET {$offset}";
},
$sql
);
}
/**
* 函数名转换
*/
private function convertFunctions(string $sql): string
{
$replacements = [
// 空值处理
'/\bIFNULL\s*\(/i' => 'NVL(',
'/\bIFNULL\b/i' => 'NVL',
// 条件函数
'/\bIF\s*\(\s*(.+?)\s*,\s*(.+?)\s*,\s*(.+?)\s*\)/i' => 'DECODE($1, TRUE, $2, $3)',
// 字符串函数
'/\bGROUP_CONCAT\s*\(/i' => 'WM_CONCAT(',
'/\bCONCAT_WS\s*\(/i' => 'CONCAT_WS(', // 达梦支持
// 日期函数
'/\bNOW\s*\(\s*\)/i' => 'SYSDATE',
'/\bCURDATE\s*\(\s*\)/i' => 'TRUNC(SYSDATE)',
'/\bCURTIME\s*\(\s*\)/i' => 'TO_CHAR(SYSDATE,\'HH24:MI:SS\')',
'/\bUNIX_TIMESTAMP\s*\(\s*\)/i' => 'DATEDIFF(SECOND, \'1970-01-01\', SYSDATE)',
'/\bDATE_FORMAT\s*\((.+?),\s*\'%Y-%m-%d\'\)/i' => 'TO_CHAR($1, \'YYYY-MM-DD\')',
'/\bDATE_FORMAT\s*\((.+?),\s*\'%Y-%m-%d %H:%i:%s\'\)/i' => 'TO_CHAR($1, \'YYYY-MM-DD HH24:MI:SS\')',
'/\bDATE_FORMAT\s*\((.+?),\s*\'%Y%m%d\'\)/i' => 'TO_CHAR($1, \'YYYYMMDD\')',
'/\bDATE_ADD\s*\((.+?),\s*INTERVAL\s+(\d+)\s+DAY\)/i' => 'DATEADD(DAY, $2, $1)',
'/\bDATE_ADD\s*\((.+?),\s*INTERVAL\s+(\d+)\s+HOUR\)/i' => 'DATEADD(HOUR, $2, $1)',
'/\bDATE_SUB\s*\((.+?),\s*INTERVAL\s+(\d+)\s+DAY\)/i' => 'DATEADD(DAY, -$2, $1)',
'/\bDATEDIFF\s*\((.+?),\s*(.+?)\)/i' => 'DATEDIFF(DAY, $2, $1)',
'/\bYEAR\s*\(/i' => 'YEAR(', // 达梦支持
'/\bMONTH\s*\(/i' => 'MONTH(', // 达梦支持
'/\bDAY\s*\(/i' => 'DAY(', // 达梦支持
// 数学函数
'/\bRAND\s*\(\s*\)/i' => 'RAND()', // 达梦支持
// 字符串处理
'/\bLOCATE\s*\(/i' => 'INSTR(', // 参数顺序不同,需特殊处理
'/\bMID\s*\(/i' => 'SUBSTR(',
'/\bSUBSTRING\s*\(/i' => 'SUBSTR(',
// 类型转换
'/\bCAST\s*\((.+?)\s+AS\s+SIGNED\)/i' => 'CAST($1 AS INT)',
'/\bCAST\s*\((.+?)\s+AS\s+UNSIGNED\)/i' => 'CAST($1 AS INT)',
'/\bCAST\s*\((.+?)\s+AS\s+CHAR\)/i' => 'CAST($1 AS VARCHAR)',
];
foreach ($replacements as $pattern => $replacement) {
$sql = preg_replace($pattern, $replacement, $sql);
}
return $sql;
}
/**
* 数据类型转换(用于CREATE TABLE)
*/
private function convertDataTypes(string $sql): string
{
$replacements = [
'/\bTINYINT\s*\(1\)/i' => 'BIT',
'/\bINT\s+AUTO_INCREMENT/i' => 'INT IDENTITY(1,1)',
'/\bBIGINT\s+AUTO_INCREMENT/i' => 'BIGINT IDENTITY(1,1)',
'/\bAUTO_INCREMENT/i' => 'IDENTITY(1,1)',
'/\bDATETIME/i' => 'TIMESTAMP',
'/\bTEXT\b/i' => 'CLOB',
'/\bLONGTEXT\b/i' => 'CLOB',
'/\bMEDIUMTEXT\b/i' => 'CLOB',
'/\bBLOB\b/i' => 'BLOB',
'/\bLONGBLOB\b/i' => 'BLOB',
'/\bDOUBLE\b/i' => 'DOUBLE',
'/\bFLOAT\b/i' => 'FLOAT',
'/\bJSON\b/i' => 'CLOB',
'/\bENUM\s*\([^)]+\)/i' => 'VARCHAR(50)',
'/\bSET\s*\([^)]+\)/i' => 'VARCHAR(200)',
// 字符集声明(达梦不需要)
'/\s+CHARACTER\s+SET\s+\w+/i' => '',
'/\s+COLLATE\s+\w+/i' => '',
'/\s+DEFAULT\s+CHARSET\s*=\s*\w+/i' => '',
'/\s+ENGINE\s*=\s*\w+/i' => '',
];
foreach ($replacements as $pattern => $replacement) {
$sql = preg_replace($pattern, $replacement, $sql);
}
return $sql;
}
/**
* 自增主键转换
*/
private function convertAutoIncrement(string $sql): string
{
// 处理 AUTO_INCREMENT=xxx 表选项
$sql = preg_replace('/\s+AUTO_INCREMENT\s*=\s*\d+/i', '', $sql);
return $sql;
}
/**
* 布尔值转换
* MySQL: true/false → 达梦: 1/0
*/
private function convertBooleans(string $sql): string
{
$sql = preg_replace('/\bTRUE\b/i', '1', $sql);
$sql = preg_replace('/\bFALSE\b/i', '0', $sql);
return $sql;
}
/**
* GROUP_CONCAT 转 WM_CONCAT
* 注意:WM_CONCAT不支持ORDER BY和SEPARATOR,复杂场景需要用LISTAGG
*/
private function convertGroupConcat(string $sql): string
{
// 简单的GROUP_CONCAT(field)
$sql = preg_replace(
'/\bGROUP_CONCAT\s*\(\s*([^)]+?)\s*\)/i',
'WM_CONCAT($1)',
$sql
);
// 带SEPARATOR的GROUP_CONCAT(转LISTAGG)
$sql = preg_replace(
'/\bGROUP_CONCAT\s*\(\s*([^)]+?)\s+SEPARATOR\s+\'([^\']+)\'\s*\)/i',
"LISTAGG($1, '$2') WITHIN GROUP (ORDER BY $1)",
$sql
);
return $sql;
}
/**
* INSERT IGNORE 转换
* MySQL: INSERT IGNORE INTO → 达梦: INSERT INTO(用MERGE代替)
*/
private function convertInsertIgnore(string $sql): string
{
// 简单处理:去掉IGNORE关键字(可能插入重复数据时报错,需业务层处理)
return preg_replace('/\bINSERT\s+IGNORE\b/i', 'INSERT', $sql);
}
/**
* ON DUPLICATE KEY UPDATE 转换
* 达梦用MERGE INTO实现,这里做简单处理
*/
private function convertOnDuplicateKey(string $sql): string
{
// 这个比较复杂,标记出来让开发者手动处理
if (stripos($sql, 'ON DUPLICATE KEY UPDATE') !== false) {
// 记录日志,提醒需要手动改造
error_log("[DM兼容层警告] 检测到ON DUPLICATE KEY UPDATE,需要手动改造为MERGE INTO: " . $sql);
}
return $sql;
}
/**
* 获取最后插入的ID
* 达梦用IDENTITY_VAL_LOCAL()
*/
public function lastInsertId(?string $name = null): string|false
{
if ($this->isDm) {
try {
$stmt = parent::query("SELECT IDENTITY_VAL_LOCAL()");
$result = $stmt->fetchColumn();
return $result !== false ? (string)$result : false;
} catch (\Exception $e) {
return false;
}
}
return parent::lastInsertId($name);
}
public function isDm(): bool
{
return $this->isDm;
}
public function getDbType(): string
{
return $this->dbType;
}
}
---
五、数据库连接工厂
app/Database/ConnectionFactory.php:
<?php
namespace App\Database;
use PDO;
class ConnectionFactory
{
private static ?PDO $instance = null;
private static array $config = [];
public static function setConfig(array $config): void
{
self::$config = $config;
}
public static function getInstance(): PDO
{
if (self::$instance === null) {
self::$instance = self::createConnection();
}
return self::$instance;
}
private static function createConnection(): PDO
{
$config = self::$config;
$driver = $config['driver'] ?? 'mysql';
return match ($driver) {
'dm' => self::createDmConnection($config),
'mysql' => self::createMysqlConnection($config),
'pgsql' => self::createPgsqlConnection($config),
default => throw new \InvalidArgumentException("不支持的数据库驱动: {$driver}"),
};
}
private static function createDmConnection(array $config): DmPDO
{
$dsn = sprintf(
'dm:host=%s;port=%s;dbname=%s',
$config['host'] ?? '127.0.0.1',
$config['port'] ?? '5236',
$config['dbname'] ?? 'DAMENG'
);
return new DmPDO(
$dsn,
$config['username'] ?? 'SYSDBA',
$config['password'] ?? 'SYSDBA001',
$config['options'] ?? []
);
}
private static function createMysqlConnection(array $config): DmPDO
{
$dsn = sprintf(
'mysql:host=%s;port=%s;dbname=%s;charset=%s',
$config['host'] ?? '127.0.0.1',
$config['port'] ?? '3306',
$config['dbname'] ?? 'myapp',
$config['charset'] ?? 'utf8mb4'
);
return new DmPDO(
$dsn,
$config['username'] ?? 'root',
$config['password'] ?? '',
$config['options'] ?? []
);
}
private static function createPgsqlConnection(array $config): PDO
{
$dsn = sprintf(
'pgsql:host=%s;port=%s;dbname=%s',
$config['host'] ?? '127.0.0.1',
$config['port'] ?? '5432',
$config['dbname'] ?? 'myapp'
);
return new PDO(
$dsn,
$config['username'] ?? 'postgres',
$config['password'] ?? '',
[PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION]
);
}
public static function reset(): void
{
self::$instance = null;
}
}
配置文件 config/database.php:
<?php
return [
// 切换这一行就能换数据库,其他代码不动
'driver' => env('DB_DRIVER', 'mysql'), // mysql 或 dm
// MySQL配置
'mysql' => [
'driver' => 'mysql',
'host' => env('DB_HOST', '127.0.0.1'),
'port' => env('DB_PORT', '3306'),
'dbname' => env('DB_DATABASE', 'myapp'),
'username' => env('DB_USERNAME', 'root'),
'password' => env('DB_PASSWORD', ''),
'charset' => 'utf8mb4',
],
// 达梦配置
'dm' => [
'driver' => 'dm',
'host' => env('DM_HOST', '127.0.0.1'),
'port' => env('DM_PORT', '5236'),
'dbname' => env('DM_DATABASE', 'DAMENG'),
'username' => env('DM_USERNAME', 'SYSDBA'),
'password' => env('DM_PASSWORD', 'SYSDBA001'),
],
];
---
六、QueryBuilder查询构造器
封装一个查询构造器,让写SQL更方便,同时自动处理兼容性。
app/Database/QueryBuilder.php:
<?php
namespace App\Database;
use PDO;
class QueryBuilder
{
private PDO $pdo;
private string $table = '';
private array $wheres = [];
private array $bindings = [];
private array $selects = ['*'];
private array $orders = [];
private array $joins = [];
private ?int $limit = null;
private ?int $offset = null;
private bool $isDm;
public function __construct(PDO $pdo)
{
$this->pdo = $pdo;
$this->isDm = ($pdo instanceof DmPDO) && $pdo->isDm();
}
public function table(string $table): static
{
$clone = clone $this;
$clone->table = $table;
return $clone;
}
public function select(string ...$columns): static
{
$clone = clone $this;
$clone->selects = $columns;
return $clone;
}
public function where(string $column, mixed $operator, mixed $value = null): static
{
$clone = clone $this;
// 支持两参数写法:where('name', 'John') 等同于 where('name', '=', 'John')
if ($value === null) {
$value = $operator;
$operator = '=';
}
$placeholder = ':where_' . count($clone->wheres) . '_' . preg_replace('/[^a-zA-Z0-9]/', '_', $column);
$clone->wheres[] = "{$column} {$operator} {$placeholder}";
$clone->bindings[$placeholder] = $value;
return $clone;
}
public function whereIn(string $column, array $values): static
{
$clone = clone $this;
$placeholders = [];
foreach ($values as $i => $value) {
$key = ":wherein_{$i}_" . preg_replace('/[^a-zA-Z0-9]/', '_', $column);
$placeholders[] = $key;
$clone->bindings[$key] = $value;
}
$clone->wheres[] = "{$column} IN (" . implode(', ', $placeholders) . ")";
return $clone;
}
public function whereNull(string $column): static
{
$clone = clone $this;
$clone->wheres[] = "{$column} IS NULL";
return $clone;
}
public function whereNotNull(string $column): static
{
$clone = clone $this;
$clone->wheres[] = "{$column} IS NOT NULL";
return $clone;
}
public function whereBetween(string $column, mixed $min, mixed $max): static
{
$clone = clone $this;
$keyMin = ":between_min_" . preg_replace('/[^a-zA-Z0-9]/', '_', $column);
$keyMax = ":between_max_" . preg_replace('/[^a-zA-Z0-9]/', '_', $column);
$clone->wheres[] = "{$column} BETWEEN {$keyMin} AND {$keyMax}";
$clone->bindings[$keyMin] = $min;
$clone->bindings[$keyMax] = $max;
return $clone;
}
public function orderBy(string $column, string $direction = 'ASC'): static
{
$clone = clone $this;
$direction = strtoupper($direction) === 'DESC' ? 'DESC' : 'ASC';
$clone->orders[] = "{$column} {$direction}";
return $clone;
}
public function limit(int $limit): static
{
$clone = clone $this;
$clone->limit = $limit;
return $clone;
}
public function offset(int $offset): static
{
$clone = clone $this;
$clone->offset = $offset;
return $clone;
}
public function join(string $table, string $first, string $operator, string $second): static
{
$clone = clone $this;
$clone->joins[] = "INNER JOIN {$table} ON {$first} {$operator} {$second}";
return $clone;
}
public function leftJoin(string $table, string $first, string $operator, string $second): static
{
$clone = clone $this;
$clone->joins[] = "LEFT JOIN {$table} ON {$first} {$operator} {$second}";
return $clone;
}
/**
* 分页(自动处理MySQL和达梦的差异)
*/
public function paginate(int $page, int $perPage = 15): array
{
$total = $this->count();
$offset = ($page - 1) * $perPage;
$items = $this->limit($perPage)->offset($offset)->get();
return [
'data' => $items,
'total' => $total,
'per_page' => $perPage,
'current_page' => $page,
'last_page' => (int)ceil($total / $perPage),
'from' => $offset + 1,
'to' => min($offset + $perPage, $total),
];
}
public function get(): array
{
$sql = $this->buildSelectSql();
$stmt = $this->pdo->prepare($sql);
$stmt->execute($this->bindings);
return $stmt->fetchAll();
}
public function first(): ?array
{
$result = $this->limit(1)->get();
return $result[0] ?? null;
}
public function count(): int
{
$clone = clone $this
✻ Cogitated for 1m 40s
> 继续
✢ Precipitating… ●P $clone->selects = ['COUNT(*) as aggregate'];
$clone->orders = [];
$clone->limit = null;
$clone->offset = null;
$sql = $clone->buildSelectSql(); $stmt = $this->pdo->prepare($sql);
$stmt->execute($clone->bindings);
$row = $stmt->fetch();
return (int)($row['aggregate'] ?? $row['AGGREGATE'] ?? 0);
}
public function insert(array $data): bool
{
$columns = array_keys($data);
$placeholders = array_map(fn($col) => ":{$col}", $columns);
$colList = implode(', ', $columns);
$valList = implode(', ', $placeholders);
$sql = "INSERT INTO {$this->table} ({$colList}) VALUES ({$valList})";
$stmt = $this->pdo->prepare($sql);
return $stmt->execute($data);
}
public function insertGetId(array $data): int|false
{
$result = $this->insert($data);
if (!$result) {
return false;
}
return (int)$this->pdo->lastInsertId();
}
public function update(array $data): int
{
$sets = [];
$bindings = [];
foreach ($data as $col => $val) {
$key = ":set_{$col}";
$sets[] = "{$col} = {$key}";
$bindings[$key] = $val;
}
$sql = "UPDATE {$this->table} SET " . implode(', ', $sets);
if (!empty($this->wheres)) {
$sql .= " WHERE " . implode(' AND ', $this->wheres);
}
$stmt = $this->pdo->prepare($sql);
$stmt->execute(array_merge($bindings, $this->bindings));
return $stmt->rowCount();
}
public function delete(): int
{
$sql = "DELETE FROM {$this->table}";
if (!empty($this->wheres)) {
$sql .= " WHERE " . implode(' AND ', $this->wheres);
}
$stmt = $this->pdo->prepare($sql);
$stmt->execute($this->bindings);
return $stmt->rowCount();
}
private function buildSelectSql(): string
{
$cols = implode(', ', $this->selects);
$sql = "SELECT {$cols} FROM {$this->table}";
if (!empty($this->joins)) {
$sql .= ' ' . implode(' ', $this->joins);
}
if (!empty($this->wheres)) {
$sql .= ' WHERE ' . implode(' AND ', $this->wheres);
}
if (!empty($this->orders)) {
$sql .= ' ORDER BY ' . implode(', ', $this->orders);
}
// 分页:MySQL和达梦都支持 LIMIT x OFFSET y
if ($this->limit !== null) {
$sql .= " LIMIT {$this->limit}";
}
if ($this->offset !== null) {
$sql .= " OFFSET {$this->offset}";
}
return $sql;
}
/**
* 执行原生SQL(自动走兼容层转换)
*/
public function raw(string $sql, array $bindings = []): array
{
$stmt = $this->pdo->prepare($sql);
$stmt->execute($bindings);
return $stmt->fetchAll();
}
public function rawExec(string $sql, array $bindings = []): int
{
$stmt = $this->pdo->prepare($sql);
$stmt->execute($bindings);
return $stmt->rowCount();
}
}
---
七、Model基类
app/Database/Model.php:
<?php
namespace App\Database;
use PDO;
abstract class Model
{
// 子类必须定义这两个
protected static string $table = '';
protected static string $primaryKey = 'id';
// 允许批量赋值的字段
protected static array $fillable = [];
// 自动维护时间戳
protected static bool $timestamps = true;
protected static ?PDO $pdo = null;
// 模型数据
protected array $attributes = [];
protected array $original = [];
protected bool $exists = false;
public static function setConnection(PDO $pdo): void
{
static::$pdo = $pdo;
}
protected static function getConnection(): PDO
{
if (static::$pdo === null) {
throw new \RuntimeException('数据库连接未初始化,请先调用 Model::setConnection()');
}
return static::$pdo;
}
protected static function newQuery(): QueryBuilder
{
return (new QueryBuilder(static::getConnection()))->table(static::$table);
}
// ===== 查询方法 =====
public static function all(): array
{
return static::newQuery()->get();
}
public static function find(int|string $id): ?static
{
$row = static::newQuery()
->where(static::$primaryKey, $id)
->first();
return $row ? static::newFromArray($row) : null;
}
public static function findOrFail(int|string $id): static
{
$model = static::find($id);
if ($model === null) {
throw new \RuntimeException(static::class . " ID={$id} 不存在");
}
return $model;
}
public static function where(string $column, mixed $operator, mixed $value = null): QueryBuilder
{
return static::newQuery()->where($column, $operator, $value);
}
public static function paginate(int $page = 1, int $perPage = 15): array
{
return static::newQuery()->paginate($page, $perPage);
}
// ===== 写入方法 =====
public static function create(array $data): static
{
$model = new static();
$model->fill($data);
$model->save();
return $model;
}
public function fill(array $data): static
{
foreach ($data as $key => $value) {
if (empty(static::$fillable) || in_array($key, static::$fillable)) {
$this->attributes[$key] = $value;
}
}
return $this;
}
public function save(): bool
{
$data = $this->attributes;
if (static::$timestamps) {
$now = date('Y-m-d H:i:s');
if (!$this->exists) {
$data['created_at'] = $now;
}
$data['updated_at'] = $now;
}
if ($this->exists) {
// 更新
$affected = static::newQuery()
->where(static::$primaryKey, $this->attributes[static::$primaryKey])
->update($data);
return $affected > 0;
} else {
// 插入
$id = static::newQuery()->insertGetId($data);
if ($id !== false) {
$this->attributes[static::$primaryKey] = $id;
$this->exists = true;
return true;
}
return false;
}
}
public function delete(): bool
{
if (!$this->exists) {
return false;
}
$affected = static::newQuery()
->where(static::$primaryKey, $this->attributes[static::$primaryKey])
->delete();
$this->exists = false;
return $affected > 0;
}
public static function destroy(int|string ...$ids): int
{
return static::newQuery()
->whereIn(static::$primaryKey, $ids)
->delete();
}
// ===== 魔术方法 =====
public function __get(string $name): mixed
{
return $this->attributes[$name] ?? null;
}
public function __set(string $name, mixed $value): void
{
$this->attributes[$name] = $value;
}
public function __isset(string $name): bool
{
return isset($this->attributes[$name]);
}
public function toArray(): array
{
return $this->attributes;
}
protected static function newFromArray(array $data): static
{
$model = new static();
$model->attributes = $data;
$model->original = $data;
$model->exists = true;
return $model;
}
}
---
八、具体业务Model示例
app/Models/User.php:
<?php
namespace App\Models;
use App\Database\Model;
class User extends Model
{
protected static string $table = 'users';
protected static string $primaryKey = 'id';
protected static array $fillable = [
'name', 'email', 'password', 'phone',
'status', 'avatar', 'department_id',
];
// 根据邮箱查找用户
public static function findByEmail(string $email): ?static
{
$row = static::newQuery()
->where('email', $email)
->first();
return $row ? static::newFromArray($row) : null;
}
// 获取某部门的活跃用户(分页)
public static function getActiveByDept(int $deptId, int $page = 1): array
{
return static::newQuery()
->where('department_id', $deptId)
->where('status', 1)
->orderBy('created_at', 'DESC')
->paginate($page, 20);
}
// 搜索用户
public static function search(string $keyword, int $page = 1): array
{
return static::newQuery()
->where('name', 'LIKE', "%{$keyword}%")
->orderBy('id', 'DESC')
->paginate($page, 15);
}
}
app/Models/Order.php:
<?php
namespace App\Models;
use App\Database\Model;
class Order extends Model
{
protected static string $table = 'orders';
protected static string $primaryKey = 'id';
protected static array $fillable = [
'order_no', 'user_id', 'amount',
'status', 'remark', 'paid_at',
];
// 统计某用户订单总金额
public static function sumAmountByUser(int $userId): float
{
$db = static::getConnection();
$stmt = $db->prepare(
"SELECT SUM(amount) as total FROM orders WHERE user_id = :uid AND status = :status"
);
$stmt->execute([':uid' => $userId, ':status' => 1]);
$row = $stmt->fetch();
// 达梦返回的列名可能是大写,做兼容处理
return (float)($row['total'] ?? $row['TOTAL'] ?? 0);
}
// 按月统计订单数(日期函数兼容)
public static function countByMonth(int $year, int $month): int
{
$db = static::getConnection();
// 用标准SQL写法,MySQL和达梦都支持
$stmt = $db->prepare(
"SELECT COUNT(*) as cnt FROM orders
WHERE YEAR(created_at) = :year
AND MONTH(created_at) = :month"
);
$stmt->execute([':year' => $year, ':month' => $month]);
$row = $stmt->fetch();
return (int)($row['cnt'] ?? $row['CNT'] ?? 0);
}
}
---
九、Laravel框架适配达梦
如果你用的是Laravel,不需要自己写上面那些,用官方扩展包。
9.1 安装达梦Laravel扩展
# 达梦官方提供了Laravel扩展
# 方式1:composer安装(如果有网络)
composer require laravel-dm8/laravel-dm8
# 方式2:手动安装(内网环境)
# 把扩展包放到 packages/ 目录
mkdir -p packages/laravel-dm8
# 解压达梦提供的Laravel扩展包到此目录
# 在 composer.json 里添加本地路径
composer.json 添加本地包:
{
"repositories": [
{
"type": "path",
"url": "./packages/laravel-dm8"
}
],
"require": {
"laravel-dm8/laravel-dm8": "*"
}
}
composer update
9.2 Laravel配置文件
config/database.php:
<?php
return [
'default' => env('DB_CONNECTION', 'mysql'),
'connections' => [
'mysql' => [
'driver' => 'mysql',
'host' => env('DB_HOST', '127.0.0.1'),
'port' => env('DB_PORT', '3306'),
'database' => env('DB_DATABASE', 'myapp'),
'username' => env('DB_USERNAME', 'root'),
'password' => env('DB_PASSWORD', ''),
'charset' => 'utf8mb4',
'collation' => 'utf8mb4_unicode_ci',
'prefix' => '',
'strict' => true,
],
// 达梦数据库配置
'dm' => [
'driver' => 'dm',
'host' => env('DM_HOST', '127.0.0.1'),
'port' => env('DM_PORT', '5236'),
'database' => env('DM_DATABASE', 'DAMENG'),
'username' => env('DM_USERNAME', 'SYSDBA'),
'password' => env('DM_PASSWORD', 'SYSDBA001'),
'charset' => 'utf8',
'prefix' => '',
'schema' => env('DM_SCHEMA', 'SYSDBA'),
],
],
];
.env:
# 切换数据库只改这一行
DB_CONNECTION=dm
DM_HOST=127.0.0.1
DM_PORT=5236
DM_DATABASE=DAMENG
DM_USERNAME=SYSDBA
DM_PASSWORD=SYSDBA001
DM_SCHEMA=MYAPP
9.3 Laravel Migration适配达梦
database/migrations/2024_01_01_create_users_table.php:
<?php
use Illuminate\Database\Migrations\Migration;
use Illuminate\Database\Schema\Blueprint;
use Illuminate\Support\Facades\Schema;
use Illuminate\Support\Facades\DB;
return new class extends Migration
{
public function up(): void
{
Schema::create('users', function (Blueprint $table) {
$table->id(); // 自动处理自增
$table->string('name', 100);
$table->string('email', 200)->unique();
$table->string('password', 255);
$table->string('phone', 20)->nullable();
$table->tinyInteger('status')->default(1);
$table->unsignedBigInteger('department_id')->nullable();
// 达梦不支持JSON类型,用text代替
if (config('database.default') === 'dm') {
$table->text('extra_info')->nullable();
} else {
$table->json('extra_info')->nullable();
}
$table->timestamps(); // created_at / updated_at
$table->softDeletes(); // deleted_at
});
}
public function down(): void
{
Schema::dropIfExists('users');
}
};
9.4 Laravel Eloquent注意事项
<?php
namespace App\Models;
use Illuminate\Database\Eloquent\Model;
use Illuminate\Database\Eloquent\SoftDeletes;
class User extends Model
{
use SoftDeletes;
protected $table = 'users';
protected $fillable = [
'name', 'email', 'password',
'phone', 'status', 'department_id',
];
protected $hidden = ['password'];
protected $casts = [
'status' => 'integer',
'extra_info' => 'array', // 达梦存TEXT,自动JSON序列化
];
// 达梦列名默认大写,需要统一处理
public function getAttribute($key)
{
// 先尝试原始key,再尝试大写key
$value = parent::getAttribute($key);
if ($value === null && config('database.default') === 'dm') {
$value = parent::getAttribute(strtoupper($key));
}
return $value;
}
// 查询作用域:只查活跃用户
public function scopeActive($query)
{
return $query->where('status', 1);
}
// 关联:属于某部门
public function department()
{
return $this->belongsTo(Department::class);
}
// 关联:有多个订单
public function orders()
{
return $this->hasMany(Order::class);
}
}
---
十、数据迁移工具
从MySQL迁移数据到达梦的脚本。
scripts/migrate_mysql_to_dm.php:
<?php
/**
* MySQL → 达梦 数据迁移脚本
* 用法:php migrate_mysql_to_dm.php --tables=users,orders
*/
// 解析命令行参数
$options = getopt('', ['tables:', 'batch:', 'skip-create']);
$tables = isset($options['tables']) ? explode(',', $options['tables']) : [];
$batch = (int)($options['batch'] ?? 500);
$skipCreate = isset($options['skip-create']);
// MySQL连接
$mysql = new PDO(
'mysql:host=127.0.0.1;port=3306;dbname=myapp;charset=utf8mb4',
'root',
'mysql_password',
[PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION]
);
// 达梦连接
$dm = new PDO(
'dm:host=127.0.0.1;port=5236;dbname=DAMENG',
'SYSDBA',
'SYSDBA001',
[PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION]
);
$dm->exec("ALTER SESSION SET NLS_DATE_FORMAT='YYYY-MM-DD HH24:MI:SS'");
// 如果没指定表,迁移所有表
if (empty($tables)) {
$stmt = $mysql->query("SHOW TABLES");
$tables = $stmt->fetchAll(PDO::FETCH_COLUMN);
}
echo "开始迁移,共 " . count($tables) . " 张表\n";
echo str_repeat('-', 60) . "\n";
foreach ($tables as $table) {
migrateTable($mysql, $dm, $table, $batch, $skipCreate);
}
echo "\n迁移完成!\n";
// ===== 迁移单张表 =====
function migrateTable(PDO $mysql, PDO $dm, string $table, int $batch, bool $skipCreate): void
{
echo "\n[{$table}] 开始迁移...\n";
// 1. 获取MySQL表结构
$createSql = getMysqlCreateSql($mysql, $table);
// 2. 转换为达梦建表语句
if (!$skipCreate) {
$dmCreateSql = convertCreateSql($createSql, $table);
try {
// 先删除旧表
$dm->exec("DROP TABLE IF EXISTS \"{$table}\"");
$dm->exec($dmCreateSql);
echo " [√] 建表成功\n";
} catch (\Exception $e) {
echo " [×] 建表失败: " . $e->getMessage() . "\n";
echo " SQL: {$dmCreateSql}\n";
return;
}
}
// 3. 迁移数据
$total = (int)$mysql->query("SELECT COUNT(*) FROM `{$table}`")->fetchColumn();
echo " 总记录数: {$total}\n";
if ($total === 0) {
echo " [√] 无数据,跳过\n";
return;
}
$offset = 0;
$migrated = 0;
$errors = 0;
while ($offset < $total) {
$rows = $mysql->query(
"SELECT * FROM `{$table}` LIMIT {$batch} OFFSET {$offset}"
)->fetchAll(PDO::FETCH_ASSOC);
if (empty($rows)) {
break;
}
// 批量插入达梦
$dm->beginTransaction();
try {
foreach ($rows as $row) {
insertRow($dm, $table, $row);
$migrated++;
}
$dm->commit();
} catch (\Exception $e) {
$dm->rollBack();
$errors++;
echo " [×] 批次插入失败 (offset={$offset}): " . $e->getMessage() . "\n";
}
$offset += $batch;
$percent = round($migrated / $total * 100, 1);
echo " 进度: {$migrated}/{$total} ({$percent}%)\r";
}
echo "\n [√] 迁移完成,成功: {$migrated},失败批次: {$errors}\n";
}
// ===== 获取MySQL建表语句 =====
function getMysqlCreateSql(PDO $mysql, string $table): string
{
$stmt = $mysql->query("SHOW CREATE TABLE `{$table}`");
$row = $stmt->fetch(PDO::FETCH_ASSOC);
return $row['Create Table'] ?? '';
}
// ===== 转换建表SQL =====
function convertCreateSql(string $sql, string $table): string
{
// 替换表名格式
$sql = preg_replace('/CREATE TABLE `' . $table . '`/', "CREATE TABLE \"{$table}\"", $sql);
// 列名反引号转双引号
$sql = preg_replace('/`(\w+)`/', '"$1"', $sql);
// 数据类型转换
$typeMap = [
'/\bTINYINT\(1\)/i' => 'BIT',
'/\bTINYINT\(\d+\)/i' => 'TINYINT',
'/\bSMALLINT\(\d+\)/i' => 'SMALLINT',
'/\bMEDIUMINT\(\d+\)/i' => 'INT',
'/\bINT\(\d+\)\s+UNSIGNED/i' => 'INT',
'/\bBIGINT\(\d+\)\s+UNSIGNED/i' => 'BIGINT',
'/\bINT\(\d+\)/i' => 'INT',
'/\bBIGINT\(\d+\)/i' => 'BIGINT',
'/\bDATETIME\b/i' => 'TIMESTAMP',
'/\bTEXT\b/i' => 'CLOB',
'/\bLONGTEXT\b/i' => 'CLOB',
'/\bMEDIUMTEXT\b/i' => 'CLOB',
'/\bTINYTEXT\b/i' => 'VARCHAR(255)',
'/\bBLOB\b/i' => 'BLOB',
'/\bLONGBLOB\b/i' => 'BLOB',
'/\bMEDIUMBLOB\b/i' => 'BLOB',
'/\bJSON\b/i' => 'CLOB',
'/\bENUM\([^)]+\)/i' => 'VARCHAR(50)',
'/\bSET\([^)]+\)/i' => 'VARCHAR(200)',
'/\bDOUBLE\b/i' => 'DOUBLE',
'/\bFLOAT\b/i' => 'FLOAT',
'/\bBIT\(1\)/i' => 'BIT',
'/AUTO_INCREMENT/i' => 'IDENTITY(1,1)',
'/\s+CHARACTER SET \w+/i' => '',
'/\s+COLLATE \w+/i' => '',
'/\s+COMMENT\s+\'[^\']*\'/i' => '',
'/\s+DEFAULT CHARSET=\w+/i' => '',
'/\s+ENGINE=\w+/i' => '',
'/\s+AUTO_INCREMENT=\d+/i' => '',
'/\s+ROW_FORMAT=\w+/i' => '',
];
foreach ($typeMap as $pattern => $replacement) {
$sql = preg_replace($pattern, $replacement, $sql);
}
// 处理索引(达梦语法略有不同)
$sql = preg_replace('/\s+KEY\s+"(\w+)"\s+\([^)]+\),?\n/i', "\n", $sql);
$sql = preg_replace('/\s+UNIQUE KEY\s+"(\w+)"\s+\(([^)]+)\)/i', ' UNIQUE ($2)', $sql);
// 清理多余逗号
$sql = preg_replace('/,\s*\)/s', "\n)", $sql);
return $sql;
}
// ===== 插入单行数据 =====
function insertRow(PDO $dm, string $table, array $row): void
{
$columns = array_keys($row);
$colList = implode(', ', array_map(fn($c) => "\"{$c}\"", $columns));
$valList = implode(', ', array_map(fn($c) => ":{$c}", $columns));
$sql = "INSERT INTO \"{$table}\" ({$colList}) VALUES ({$valList})";
$stmt = $dm->prepare($sql);
// 处理特殊值
foreach ($row as $key => &$value) {
if ($value === '') {
$value = null; // 达梦区分空字符串和NULL,统一转NULL
}
if (is_bool($value)) {
$value = $value ? 1 : 0;
}
}
unset($value);
$stmt->execute($row);
}
# 迁移所有表
php scripts/migrate_mysql_to_dm.php
# 只迁移指定表
php scripts/migrate_mysql_to_dm.php --tables=users,orders,products
# 跳过建表(只迁移数据)
php scripts/migrate_mysql_to_dm.php --tables=users --skip-create
# 每批500条
php scripts/migrate_mysql_to_dm.php --batch=500
---
十一、达梦列名大写问题专项处理
达梦默认把列名存为大写,这是最常见的坑。
<?php
// app/Database/DmResultNormalizer.php
namespace App\Database;
/**
* 达梦查询结果列名统一转小写
* 解决达梦返回 ['ID'=>1, 'NAME'=>'张三'] 而代码期望 ['id'=>1, 'name'=>'张三'] 的问题
*/
class DmResultNormalizer
{
/**
* 单行结果列名转小写
*/
public static function normalizeRow(?array $row): ?array
{
if ($row === null) {
return null;
}
return array_change_key_case($row, CASE_LOWER);
}
/**
* 多行结果列名转小写
*/
public static function normalizeRows(array $rows): array
{
return array_map([self::class, 'normalizeRow'], $rows);
}
}
在 QueryBuilder 的 get() 和 first() 里加上这个处理:
public function get(): array
{
$sql = $this->buildSelectSql();
$stmt = $this->pdo->prepare($sql);
$stmt->execute($this->bindings);
$rows = $stmt->fetchAll();
// 达梦列名转小写
if ($this->isDm) {
$rows = DmResultNormalizer::normalizeRows($rows);
}
return $rows;
}
public function first(): ?array
{
$result = $this->limit(1)->get();
return $result[0] ?? null;
}
---
十二、事务处理
<?php
// app/Database/Transaction.php
namespace App\Database;
use PDO;
use Throwable;
class Transaction
{
public static function run(PDO $pdo, callable $callback): mixed
{
$pdo->beginTransaction();
try {
$result = $callback($pdo);
$pdo->commit();
return $result;
} catch (Throwable $e) {
$pdo->rollBack();
throw $e;
}
}
}
使用示例:
<?php
use App\Database\Transaction;
use App\Database\ConnectionFactory;
// 转账业务(事务示例)
function transfer(int $fromId, int $toId, float $amount): void
{
$pdo = ConnectionFactory::getInstance();
Transaction::run($pdo, function (PDO $pdo) use ($fromId, $toId, $amount) {
$db = new \App\Database\QueryBuilder($pdo);
// 扣款
$affected = $db->table('accounts')
->where('id', $fromId)
->where('balance', '>=', $amount)
->update(['balance' => $p
✻ Cogitated for 1m 48s
> 继续
✢ Spinning… ●S // 扣款(用原生SQL保证原子性)
$stmt = $pdo->prepare(
"UPDATE accounts SET balance = balance - :amount WHERE id = :id AND balance >= :amount"
); $stmt->execute([':amount' => $amount, ':id' => $fromId]);
if ($stmt->rowCount() === 0) {
throw new \RuntimeException('余额不足或账户不存在');
}
// 加款
$stmt2 = $pdo->prepare(
"UPDATE accounts SET balance = balance + :amount WHERE id = :id"
);
$stmt2->execute([':amount' => $amount, ':id' => $toId]);
if ($stmt2->rowCount() === 0) {
throw new \RuntimeException('目标账户不存在');
}
// 记录流水
$stmt3 = $pdo->prepare(
"INSERT INTO transfer_logs (from_id, to_id, amount, created_at)
VALUES (:from_id, :to_id, :amount, :now)"
);
$stmt3->execute([
':from_id' => $fromId,
':to_id' => $toId,
':amount' => $amount,
':now' => date('Y-m-d H:i:s'),
]);
});
}
---
十三、特殊SQL改造案例
这里列出实际项目中最常遇到的SQL改造场景,每个都给出MySQL写法和达梦写法对比。
13.1 ON DUPLICATE KEY UPDATE → MERGE INTO
<?php
// app/Database/DmMergeBuilder.php
namespace App\Database;
use PDO;
/**
* 把 MySQL 的 INSERT ... ON DUPLICATE KEY UPDATE
* 转换为达梦的 MERGE INTO 语法
*/
class DmMergeBuilder
{
private PDO $pdo;
private bool $isDm;
public function __construct(PDO $pdo)
{
$this->pdo = $pdo;
$this->isDm = ($pdo instanceof DmPDO) && $pdo->isDm();
}
/**
* 插入或更新(upsert)
*
* @param string $table 表名
* @param array $insertData 插入的数据
* @param array $updateData 冲突时更新的字段(不传则更新所有非主键字段)
* @param string $uniqueKey 唯一键字段名,默认 id
*/
public function upsert(
string $table,
array $insertData,
array $updateData = [],
string $uniqueKey = 'id'
): bool {
if ($this->isDm) {
return $this->dmMerge($table, $insertData, $updateData, $uniqueKey);
}
return $this->mysqlUpsert($table, $insertData, $updateData, $uniqueKey);
}
/**
* 达梦:MERGE INTO 实现
*
* 生成的SQL示例:
* MERGE INTO "users" T
* USING (SELECT :id AS id, :name AS name, :email AS email FROM DUAL) S
* ON (T.id = S.id)
* WHEN MATCHED THEN UPDATE SET T.name = S.name, T.email = S.email
* WHEN NOT MATCHED THEN INSERT (id, name, email) VALUES (S.id, S.name, S.email)
*/
private function dmMerge(
string $table,
array $insertData,
array $updateData,
string $uniqueKey
): bool {
$columns = array_keys($insertData);
// USING子句:把参数包装成虚拟表
$usingCols = implode(', ', array_map(
fn($col) => ":{$col} AS {$col}",
$columns
));
// ON条件
$onClause = "T.\"{$uniqueKey}\" = S.\"{$uniqueKey}\"";
// WHEN MATCHED THEN UPDATE
if (empty($updateData)) {
// 默认更新除唯一键外的所有字段
$updateData = array_filter(
$columns,
fn($col) => $col !== $uniqueKey
);
}
$updateSets = implode(', ', array_map(
fn($col) => "T.\"{$col}\" = S.\"{$col}\"",
(array)$updateData
));
// WHEN NOT MATCHED THEN INSERT
$insertCols = implode(', ', array_map(fn($c) => "\"{$c}\"", $columns));
$insertVals = implode(', ', array_map(fn($c) => "S.\"{$c}\"", $columns));
$sql = "MERGE INTO \"{$table}\" T
USING (SELECT {$usingCols} FROM DUAL) S
ON ({$onClause})
WHEN MATCHED THEN
UPDATE SET {$updateSets}
WHEN NOT MATCHED THEN
INSERT ({$insertCols}) VALUES ({$insertVals})";
$stmt = $this->pdo->prepare($sql);
return $stmt->execute($insertData);
}
/**
* MySQL:INSERT ... ON DUPLICATE KEY UPDATE 实现
*/
private function mysqlUpsert(
string $table,
array $insertData,
array $updateData,
string $uniqueKey
): bool {
$columns = array_keys($insertData);
$placeholders = array_map(fn($c) => ":{$c}", $columns);
$colList = implode(', ', array_map(fn($c) => "`{$c}`", $columns));
$valList = implode(', ', $placeholders);
if (empty($updateData)) {
$updateData = array_filter($columns, fn($c) => $c !== $uniqueKey);
}
$updateSets = implode(', ', array_map(
fn($col) => "`{$col}` = VALUES(`{$col}`)",
(array)$updateData
));
$sql = "INSERT INTO `{$table}` ({$colList}) VALUES ({$valList})
ON DUPLICATE KEY UPDATE {$updateSets}";
$stmt = $this->pdo->prepare($sql);
return $stmt->execute($insertData);
}
}
使用示例:
<?php
$merger = new \App\Database\DmMergeBuilder($pdo);
// 有则更新name和email,无则插入
$merger->upsert(
table: 'users',
insertData: ['id' => 1, 'name' => '张三', 'email' => 'zs@example.com', 'status' => 1],
updateData: ['name', 'email'],
uniqueKey: 'id'
);
---
13.2 批量插入
<?php
// app/Database/BulkInserter.php
namespace App\Database;
use PDO;
class BulkInserter
{
private PDO $pdo;
private bool $isDm;
private int $chunkSize;
public function __construct(PDO $pdo, int $chunkSize = 200)
{
$this->pdo = $pdo;
$this->isDm = ($pdo instanceof DmPDO) && $pdo->isDm();
$this->chunkSize = $chunkSize;
}
/**
* 批量插入
* MySQL支持 INSERT INTO t VALUES (...),(...),...
* 达梦也支持这个语法,但单次不能太多,分批处理
*/
public function insert(string $table, array $rows): int
{
if (empty($rows)) {
return 0;
}
$total = 0;
$chunks = array_chunk($rows, $this->chunkSize);
$columns = array_keys($rows[0]);
foreach ($chunks as $chunk) {
$total += $this->insertChunk($table, $columns, $chunk);
}
return $total;
}
private function insertChunk(string $table, array $columns, array $rows): int
{
$colQuote = $this->isDm ? '"' : '`';
$colList = implode(', ', array_map(
fn($c) => "{$colQuote}{$c}{$colQuote}",
$columns
));
$placeholderGroups = [];
$bindings = [];
foreach ($rows as $i => $row) {
$group = [];
foreach ($columns as $col) {
$key = ":{$col}_{$i}";
$group[] = $key;
$bindings[$key] = $row[$col] ?? null;
}
$placeholderGroups[] = '(' . implode(', ', $group) . ')';
}
$tblQuote = $this->isDm ? '"' : '`';
$sql = "INSERT INTO {$tblQuote}{$table}{$tblQuote} ({$colList}) VALUES "
. implode(', ', $placeholderGroups);
$stmt = $this->pdo->prepare($sql);
$stmt->execute($bindings);
return $stmt->rowCount();
}
}
使用示例:
<?php
$inserter = new \App\Database\BulkInserter($pdo, chunkSize: 200);
$rows = [];
for ($i = 1; $i <= 1000; $i++) {
$rows[] = [
'name' => "用户{$i}",
'email' => "user{$i}@example.com",
'status' => 1,
'created_at' => date('Y-m-d H:i:s'),
];
}
$count = $inserter->insert('users', $rows);
echo "插入了 {$count} 条记录\n";
---
13.3 全文搜索改造
<?php
// MySQL的LIKE在达梦里完全兼容,但全文索引语法不同
// MySQL全文搜索(不兼容达梦)
// SELECT * FROM articles WHERE MATCH(title, content) AGAINST('关键词')
// 改造后:统一用LIKE(性能差但兼容)
function searchArticles(PDO $pdo, string $keyword, int $page = 1): array
{
$isDm = ($pdo instanceof \App\Database\DmPDO) && $pdo->isDm();
$kw = "%{$keyword}%";
if ($isDm) {
// 达梦用LIKE,或者达梦自己的全文检索(需要建全文索引)
$sql = "SELECT id, title, content, created_at
FROM \"articles\"
WHERE \"title\" LIKE :kw OR \"content\" LIKE :kw
ORDER BY \"created_at\" DESC
LIMIT :limit OFFSET :offset";
} else {
$sql = "SELECT id, title, content, created_at
FROM `articles`
WHERE `title` LIKE :kw OR `content` LIKE :kw
ORDER BY `created_at` DESC
LIMIT :limit OFFSET :offset";
}
$perPage = 15;
$offset = ($page - 1) * $perPage;
$stmt = $pdo->prepare($sql);
$stmt->bindValue(':kw', $kw, PDO::PARAM_STR);
$stmt->bindValue(':limit', $perPage, PDO::PARAM_INT);
$stmt->bindValue(':offset', $offset, PDO::PARAM_INT);
$stmt->execute();
return $stmt->fetchAll();
}
---
13.4 存储过程调用
<?php
// 达梦存储过程调用方式
function callDmProcedure(PDO $pdo, string $procName, array $params = []): array
{
// 达梦调用存储过程
$placeholders = implode(', ', array_fill(0, count($params), '?'));
$sql = "CALL {$procName}({$placeholders})";
$stmt = $pdo->prepare($sql);
foreach ($params as $i => $param) {
$stmt->bindValue($i + 1, $param);
}
$stmt->execute();
return $stmt->fetchAll();
}
// MySQL存储过程调用(语法一样,兼容)
// CALL procedure_name(param1, param2)
---
十四、ThinkPHP6完整适配达梦
ThinkPHP6有自己的数据库驱动机制,需要写一个达梦驱动。
extend/think/db/connector/Dm.php:
<?php
namespace think\db\connector;
use PDO;
use think\db\PDOConnection;
class Dm extends PDOConnection
{
// 达梦默认配置
protected array $config = [
'type' => 'dm',
'hostname' => '127.0.0.1',
'hostport' => '5236',
'database' => 'DAMENG',
'username' => 'SYSDBA',
'password' => 'SYSDBA001',
'charset' => 'utf8',
'schema' => 'SYSDBA',
'prefix' => '',
'debug' => false,
'deploy' => 0,
'rw_separate' => false,
];
public function parseDsn(array $config): string
{
return sprintf(
'dm:host=%s;port=%s;dbname=%s',
$config['hostname'],
$config['hostport'],
$config['database']
);
}
public function getFields(string $tableName): array
{
[$tableName] = explode(' ', $tableName);
$schema = strtoupper($this->config['schema'] ?? 'SYSDBA');
$tableName = strtoupper(trim($tableName, '"'));
$sql = "SELECT COLUMN_NAME, DATA_TYPE, NULLABLE, DATA_DEFAULT
FROM ALL_TAB_COLUMNS
WHERE OWNER = '{$schema}' AND TABLE_NAME = '{$tableName}'
ORDER BY COLUMN_ID";
$stmt = $this->query($sql, [], false, true);
$fields = [];
foreach ($stmt as $row) {
$row = array_change_key_case($row, CASE_LOWER);
$name = strtolower($row['column_name']);
$fields[$name] = [
'name' => $name,
'type' => strtolower($row['data_type']),
'notnull' => $row['nullable'] === 'N',
'default' => $row['data_default'],
'primary' => false,
'autoinc' => false,
];
}
// 查主键
$pkSql = "SELECT cols.column_name
FROM all_constraints cons, all_cons_columns cols
WHERE cons.constraint_type = 'P'
AND cons.constraint_name = cols.constraint_name
AND cons.owner = '{$schema}'
AND cols.table_name = '{$tableName}'";
$pkStmt = $this->query($pkSql, [], false, true);
foreach ($pkStmt as $pk) {
$pkName = strtolower($pk['COLUMN_NAME'] ?? $pk['column_name']);
if (isset($fields[$pkName])) {
$fields[$pkName]['primary'] = true;
$fields[$pkName]['autoinc'] = true;
}
}
return $fields;
}
public function getTables(string $dbName = ''): array
{
$schema = strtoupper($this->config['schema'] ?? 'SYSDBA');
$sql = "SELECT TABLE_NAME FROM ALL_TABLES WHERE OWNER = '{$schema}' ORDER BY TABLE_NAME";
$stmt = $this->query($sql, [], false, true);
return array_map(
fn($row) => strtolower($row['TABLE_NAME'] ?? $row['table_name']),
$stmt
);
}
// 覆盖limit方法,兼容达梦语法
public function parseLimitSql(string $sql, int $offset, int $limit): string
{
if ($limit > 0) {
$sql .= " LIMIT {$limit}";
}
if ($offset > 0) {
$sql .= " OFFSET {$offset}";
}
return $sql;
}
protected function supportSavepoint(): bool
{
return true;
}
}
ThinkPHP6 config/database.php 注册达梦驱动:
<?php
return [
'default' => env('DB_DRIVER', 'mysql'),
'connections' => [
'mysql' => [
'type' => 'mysql',
'hostname' => env('DB_HOST', '127.0.0.1'),
'database' => env('DB_DATABASE', 'myapp'),
'username' => env('DB_USERNAME', 'root'),
'password' => env('DB_PASSWORD', ''),
'hostport' => env('DB_PORT', '3306'),
'charset' => 'utf8mb4',
'prefix' => env('DB_PREFIX', ''),
],
'dm' => [
'type' => '\\think\\db\\connector\\Dm', // 指向自定义驱动
'hostname' => env('DM_HOST', '127.0.0.1'),
'database' => env('DM_DATABASE', 'DAMENG'),
'username' => env('DM_USERNAME', 'SYSDBA'),
'password' => env('DM_PASSWORD', 'SYSDBA001'),
'hostport' => env('DM_PORT', '5236'),
'charset' => 'utf8',
'schema' => env('DM_SCHEMA', 'SYSDBA'),
'prefix' => env('DB_PREFIX', ''),
],
],
];
---
十五、完整初始化入口
把所有组件串起来,项目启动时调用一次。
bootstrap/database.php:
<?php
use App\Database\ConnectionFactory;
use App\Database\Model;
// 读取配置
$driver = $_ENV['DB_DRIVER'] ?? 'mysql';
$config = match ($driver) {
'dm' => [
'driver' => 'dm',
'host' => $_ENV['DM_HOST'] ?? '127.0.0.1',
'port' => $_ENV['DM_PORT'] ?? '5236',
'dbname' => $_ENV['DM_DATABASE'] ?? 'DAMENG',
'username' => $_ENV['DM_USERNAME'] ?? 'SYSDBA',
'password' => $_ENV['DM_PASSWORD'] ?? 'SYSDBA001',
],
default => [
'driver' => 'mysql',
'host' => $_ENV['DB_HOST'] ?? '127.0.0.1',
'port' => $_ENV['DB_PORT'] ?? '3306',
'dbname' => $_ENV['DB_DATABASE'] ?? 'myapp',
'username' => $_ENV['DB_USERNAME'] ?? 'root',
'password' => $_ENV['DB_PASSWORD'] ?? '',
'charset' => 'utf8mb4',
],
};
// 初始化连接工厂
ConnectionFactory::setConfig($config);
// 把连接注入到所有Model
Model::setConnection(ConnectionFactory::getInstance());
// 输出当前数据库类型(调试用,生产环境删掉)
// echo "当前数据库: {$driver}\n";
项目入口 public/index.php:
<?php
require_once __DIR__ . '/../vendor/autoload.php';
// 加载环境变量
$dotenv = Dotenv\Dotenv::createImmutable(__DIR__ . '/..');
$dotenv->load();
// 初始化数据库
require_once __DIR__ . '/../bootstrap/database.php';
// 后续路由/框架启动...
---
十六、验证改造效果的测试脚本
tests/dm_compat_test.php:
<?php
require_once __DIR__ . '/../vendor/autoload.php';
require_once __DIR__ . '/../bootstrap/database.php';
use App\Database\ConnectionFactory;
use App\Database\QueryBuilder;
use App\Database\DmMergeBuilder;
use App\Database\BulkInserter;
$pdo = ConnectionFactory::getInstance();
$db = new QueryBuilder($pdo);
$pass = 0;
$fail = 0;
function test(string $name, callable $fn): void
{
global $pass, $fail;
try {
$fn();
echo " [√] {$name}\n";
$pass++;
} catch (\Throwable $e) {
echo " [×] {$name}: " . $e->getMessage() . "\n";
$fail++;
}
}
echo "\n===== 达梦兼容性测试 =====\n\n";
// 1. 基础连接
echo "【连接测试】\n";
test('PDO连接正常', function () use ($pdo) {
$stmt = $pdo->query("SELECT 1+1 AS result");
$row = $stmt->fetch();
$val = $row['result'] ?? $row['RESULT'];
assert($val == 2, "期望2,得到{$val}");
});
// 2. CRUD
echo "\n【CRUD测试】\n";
test('建测试表', function () use ($pdo) {
$pdo->exec("DROP TABLE IF EXISTS \"dm_test\"");
$pdo->exec("CREATE TABLE \"dm_test\" (
\"id\" INT IDENTITY(1,1) PRIMARY KEY,
\"name\" VARCHAR(100),
\"score\" DECIMAL(10,2),
\"is_active\" BIT DEFAULT 1,
\"created_at\" TIMESTAMP
)");
});
test('INSERT', function () use ($db) {
$id = $db->table('dm_test')->insertGetId([
'name' => '测试用户',
'score' => 99.5,
'is_active' => 1,
'created_at' => date('Y-m-d H:i:s'),
]);
assert($id > 0, "插入失败,ID={$id}");
});
test('SELECT', function () use ($db) {
$row = $db->table('dm_test')->where('name', '测试用户')->first();
assert($row !== null, '查询结果为空');
assert(($row['name'] ?? $row['NAME']) === '测试用户', '名称不匹配');
});
test('UPDATE', function () use ($db) {
$affected = $db->table('dm_test')
->where('name', '测试用户')
->update(['score' => 100.0]);
assert($affected > 0, '更新影响行数为0');
});
test('分页查询', function () use ($db) {
// 先插入多条
for ($i = 1; $i <= 5; $i++) {
$db->table('dm_test')->insert([
'name' => "用户{$i}",
'score' => $i * 10.0,
'is_active' => 1,
'created_at' => date('Y-m-d H:i:s'),
]);
}
$result = $db->table('dm_test')->paginate(1, 3);
assert(count($result['data']) === 3, '分页数量不对');
assert($result['total'] >= 6, '总数不对');
});
test('DELETE', function () use ($db) {
$affected = $db->table('dm_test')
->where('name', 'LIKE', '用户%')
->delete();
assert($affected > 0, '删除影响行数为0');
});
// 3. SQL转换
echo "\n【SQL转换测试】\n";
test('LIMIT offset,count 转换', function () use ($pdo) {
if (!($pdo instanceof \App\Database\DmPDO)) {
return; // MySQL跳过
}
$converted = $pdo->convertSql("SELECT * FROM t LIMIT 20, 10");
assert(
str_contains($converted, 'LIMIT 10 OFFSET 20'),
"转换结果不对: {$converted}"
);
});
test('反引号转双引号', function () use ($pdo) {
if (!($pdo instanceof \App\Database\DmPDO)) {
return;
}
$converted = $pdo->convertSql("SELECT `id`, `name` FROM `users`");
assert(
str_contains($converted, '"id"'),
"反引号未转换: {$converted}"
);
});
test('IFNULL 转 NVL', function () use ($pdo) {
if (!($pdo instanceof \App\Database\DmPDO)) {
return;
}
$converted = $pdo->convertSql("SELECT IFNULL(name, '无名') FROM users");
assert(
str_contains($converted, 'NVL('),
"IFNULL未转换: {$converted}"
);
});
test('NOW() 转 SYSDATE', function () use ($pdo) {
if (!($pdo instanceof \App\Database\DmPDO)) {
return;
}
$converted = $pdo->convertSql("SELECT NOW() as t");
assert(
str_contains($converted, 'SYSDATE'),
"NOW()未转换: {$converted}"
);
});
// 4. MERGE INTO
echo "\n【Upsert测试】\n";
test('MERGE INTO / ON DUPLICATE KEY', function () use ($pdo) {
$merger = new DmMergeBuilder($pdo);
$result = $merger->upsert(
table: 'dm_test',
insertData: ['id' => 999, 'name' => 'upsert测试', 'score' => 88.0, 'is_active' => 1, 'created_at' =>
date('Y-m-d H:i:s')],
updateData: ['name', 'score'],
uniqueKey: 'id'
);
assert($result === true, 'upsert失败');
});
// 5. 批量插入
echo "\n【批量插入测试】\n";
test('批量插入100条', function () use ($pdo) {
$inserter = new BulkInserter($pdo, 50);
$rows = [];
for ($i = 1; $i <= 100; $i++) {
$rows[] = [
'name' => "批量用户{$i}",
'score' => round($i * 1.5, 2),
'is_active' => 1,
'created_at' => date('Y-m-d H:i:s'),
];
}
$count = $inserter->insert('dm_test', $rows);
assert($count === 100, "期望插入100条,实际{$count}条");
});
// 6. 清理
test('清理测试表', function () use ($pdo) {
$pdo->exec("DROP TABLE IF EXISTS \"dm_test\"");
});
// 输出结果
echo "\n" . str_repeat('=', 40) . "\n";
echo "测试完成:通过 {$pass} 个,失败 {$fail} 个\n";
echo str_repeat('=', 40) . "\n\n";
exit($fail > 0 ? 1 : 0);
# 运行测试
php tests/dm_compat_test.php
# 切换到MySQL测试
DB_DRIVER=mysql php tests/dm_compat_test.php
---
十七、常见报错速查
┌──────────────────────────────┬──────────────────────────────┬─────────────────────────────────────────────┐
│ 报错信息 │ 原因 │ 解决方法 │
├──────────────────────────────┼──────────────────────────────┼─────────────────────────────────────────────┤
│ Column 'XXX' not found │ 达梦列名大写,代码用小写访问 │ 用 array_change_key_case($row, CASE_LOWER) │
├──────────────────────────────┼──────────────────────────────┼─────────────────────────────────────────────┤
│ syntax error near LIMIT │ 用了 LIMIT x,y 格式 │ 改为 LIMIT y OFFSET x │
├──────────────────────────────┼──────────────────────────────┼─────────────────────────────────────────────┤
│ invalid identifier │ 用了MySQL保留字作列名 │ 加双引号 "order" "desc" │
├──────────────────────────────┼──────────────────────────────┼─────────────────────────────────────────────┤
│ table or view does not exist │ 表名大小写不对 │ 达梦默认大写,建表时统一用大写或加引号 │
├──────────────────────────────┼──────────────────────────────┼─────────────────────────────────────────────┤
│ value too large for column │ VARCHAR长度不够 │ 达梦VARCHAR按字节算,中文占3字节,长度要×3 │
├──────────────────────────────┼──────────────────────────────┼─────────────────────────────────────────────┤
│ cannot insert NULL │ 空字符串被当NULL │ 达梦区分 '' 和 NULL,检查NOT NULL字段 │
├──────────────────────────────┼──────────────────────────────┼─────────────────────────────────────────────┤
│ driver not found: dm │ PDO扩展未加载 │ 检查 php -m | grep dm,重启php-fpm │
├──────────────────────────────┼──────────────────────────────┼─────────────────────────────────────────────┤
│ character set mismatch │ 字符集不一致 │ 建库时指定 CHARSET=1,连接时 SET NAMES UTF8 │
├──────────────────────────────┼──────────────────────────────┼─────────────────────────────────────────────┤
│ sequence not found │ 用了MySQL的lastInsertId方式 │ 改用 IDENTITY_VAL_LOCAL() │
├──────────────────────────────┼──────────────────────────────┼─────────────────────────────────────────────┤
│ object already exists │ 重复建表 │ 先 DROP TABLE IF EXISTS 再建 │
└──────────────────────────────┴──────────────────────────────┴─────────────────────────────────────────────┘
---
总结
整个改造的核心思路就三层:
第一层:DmPDO 兼容类
→ 拦截所有SQL,自动转换语法差异
→ 业务代码完全感知不到
第二层:QueryBuilder + Model
→ 封装常用操作,不直接写SQL
→ 分页、CRUD、事务统一处理
第三层:特殊场景单独处理
DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐

所有评论(0)