氚云低代码平台数据同步本地数据库软件
·
氚云低代码平台数据同步本地数据库软件介绍
最近需要同步氚云平台的数据至本地做数据分析,传统的方法需要反复的建立表和字段太麻烦了,就开发了本工具能省下更多的时间。
整体流程:氚云编写表结构自定义接口---->获取表结构建表—>调用氚云的标准批量同步接口同步数据。
一、软件概述
本软件是一款专为氚云低代码平台设计的数据同步工具,能够自动将氚云平台的数据同步至本地MySQL数据库,实现数据的快速备份与本地分析。软件具备自动建表、智能字段映射、时间过滤等功能,无需手动编写SQL脚本,大幅提升数据同步效率。
二、核心功能
1. 自动建表
- 根据氚云表单结构自动生成MySQL表结构
- 支持主表、子表及多值表的完整建表
- 自动添加表注释和字段注释,便于理解数据结构


2. 智能数据同步
- 支持全量数据同步和增量数据同步
- 通过时间过滤配置实现指定时间段数据同步
- 自动处理关联字段,确保数据完整性

3. 灵活配置
- 支持自定义数据库连接信息
- 可配置时间过滤规则(默认30天)
- 支持字段类型映射自定义

三、自动建表字段特性
1. 字段类型映射规则
| 氚云字段类型 | MySQL字段类型 | 说明 |
|---|---|---|
| 单行文本 | VARCHAR(255) | 常规文本存储 |
| 多行文本 | TEXT | 长文本存储 |
| 数字 | DECIMAL(20,6) | 支持小数位 |
| 日期时间 | DATETIME | 标准日期时间格式 |
| 关联字段 | VARCHAR(64) | 生成两个字段:字段名_id 和 字段名(内容相同) |
| 关联数组 | JSON | 存储关联数据数组 |
| 多选项 | VARCHAR(255) | 逗号分隔存储 |
2. 特殊字段处理
- 关联字段(Association):同时生成「字段名_id」和「字段名」两个字段,内容完全相同,便于不同场景使用
- 系统字段:自动保留氚云系统字段(如CreatedTime、ModifiedTime等)
- 多值表:表名格式为「主表名_多值表名」或「主表名_子表名_多值表名」,清晰反映表间关系
3. 表注释生成规则
- 主表:直接使用氚云表单显示名称
- 子表:格式为「主表名称_子表名称」
- 多值表:格式为「主表名称_多值表名称」或「主表名称_子表名称_多值表名称」
四、使用流程
1. 配置准备
- 编辑
config/config.json文件,配置数据库连接信息 - 设置时间过滤规则(可选)
- 配置需要同步的表单架构
2. 运行同步
- 直接运行
h3tomysqlddl.exe可执行文件 - 或通过命令行运行:
python start.py
3. 查看结果
- 同步日志保存在
logs/目录下 - 数据自动同步至配置的本地MySQL数据库
五、时间过滤配置说明
配置示例
{
"schemas": [
{
"code": "表单编码",
"time_filter": {
"days": 30,
"field": "CreatedTime",
"operator": 1
}
}
]
}
参数说明
days:过滤天数(默认30天)field:时间字段(默认"CreatedTime")operator:操作符(默认1=大于等于)
六、软件优势
1. 节省时间
- 无需手动创建表结构和编写SQL脚本
- 自动处理复杂的表关系和字段映射
- 一键启动数据同步流程
2. 数据安全
- 本地数据库备份,避免数据丢失风险
- 支持增量同步,减少数据传输量
- 完整保留数据结构和关系
3. 便于分析
- 本地数据库支持复杂SQL查询
- 可与数据分析工具(如Excel、PowerBI等)直接对接
- 快速响应数据分析需求
4. 灵活扩展
- 支持自定义字段类型映射
- 可配置同步规则和过滤条件
- 兼容氚云平台的各种字段类型
七、适用场景
- 数据备份:定期同步氚云数据至本地,确保数据安全
- 数据分析:利用本地数据库进行复杂数据分析和报表生成
- 开发测试:为开发和测试提供真实数据环境
- 数据迁移:在不同系统间迁移数据时作为中间环节
八、技术规格
- 开发语言:Python
- 支持数据库:MySQL
- 依赖:SQLAlchemy、Requests、PyInstaller
- 运行环境:Windows
九、部分核心代码
1、氚云表结构自定义接口
private Dictionary<string, object> ExtractFieldInfo(H3.DataModel.PropertySchema prop)
{
var dict = new Dictionary<string, object>();
dict["Name"] = prop.Name;
dict["DisplayName"] = prop.DisplayName;
dict["DataType"] = prop.DataType.ToString();
dict["DataTypeName"] = GetDataTypeName(prop.DataType.ToString());
// 特殊字段显示名处理(参考定时器日志)
if (prop.Name == "ObjectId") dict["DisplayName"] = "数据ID";
else if (prop.Name == "ModifiedBy") dict["DisplayName"] = "修改人";
else if (prop.Name == "WorkflowInstanceId") dict["DisplayName"] = "工作流程实例ID";
else if (prop.Name == "Status") dict["DisplayName"] = "流程状态";
// 关联表单信息
string dataTypeStr = prop.DataType.ToString();
if (dataTypeStr == "Association" || dataTypeStr == "AssociationArray")
{
dict["AssociationSchemaCode"] = prop.AssociationSchemaCode;
dict["AssociationSchemaName"] = GetSchemaName(prop.AssociationSchemaCode);
}
// 子表处理
if (dataTypeStr == "BizObjectArray")
{
var childSchema = prop.ChildSchema;
if (childSchema != null && childSchema.Properties != null)
{
var childFields = new List<Dictionary<string, object>>();
foreach (var childProp in childSchema.Properties)
{
childFields.Add(ExtractFieldInfo(childProp));
}
dict["ChildFields"] = childFields;
}
}
return dict;
}
private string GetDataTypeName(string dataType)
{
switch (dataType)
{
case "ShortString": return "单行文本";
case "Unit": return "系统组织基本单元(单)";
case "DateTime": return "日期";
case "Association": return "关联表单(单)";
case "Double": return "(数字)浮点型";
case "Int": return "(数字)整型";
case "String": return "多行文本";
case "Bool": return "是/否";
case "File": return "附件";
case "Image": return "图片";
case "Address": return "地址";
case "Map": return "位置";
case "UnitArray": return "系统组织基本单元(多)";
case "AssociationArray": return "关联表单(多)";
case "BizObjectArray": return "子表";
default: return dataType;
}
}
2、表操作
def table_exists(self, table: str):
logger.info(f"检查表是否存在: {table}")
sql = "SELECT COUNT(1) c FROM information_schema.tables WHERE table_schema = DATABASE() AND table_name = :t"
with self.engine.begin() as conn:
r = conn.execute(text(sql), {"t": table}).scalar()
exists = r and int(r) > 0
logger.info(f"表{table}存在: {exists}")
return exists
def get_columns(self, table: str):
logger.info(f"获取表{table}的字段信息")
sql = "SELECT COLUMN_NAME, DATA_TYPE, COLUMN_TYPE, IS_NULLABLE FROM information_schema.columns WHERE table_schema = DATABASE() AND table_name = :t"
with self.engine.begin() as conn:
res = conn.execute(text(sql), {"t": table}).fetchall()
columns = {row[0]: {"data_type": row[1], "column_type": row[2], "is_nullable": row[3]} for row in res}
logger.info(f"表{table}包含{len(columns)}个字段")
return columns
def index_exists(self, table: str, index_name: str):
logger.info(f"检查表{table}的索引{index_name}是否存在")
sql = "SELECT COUNT(1) c FROM information_schema.statistics WHERE table_schema = DATABASE() AND table_name = :t AND index_name = :i"
with self.engine.begin() as conn:
r = conn.execute(text(sql), {"t": table, "i": index_name}).scalar()
exists = r and int(r) > 0
logger.info(f"索引{index_name}存在: {exists}")
return exists
3、使用氚云标准同步接口
while retry_count <= max_retries:
start_time = time.time()
try:
logger.info(f"调用API批量加载数据: {schema_code} from={from_row} size={page_size} (尝试{retry_count+1}/{max_retries+1})")
logger.debug(f"API URL: {url}")
logger.debug(f"请求头: {headers}")
logger.debug(f"请求体: {payload}")
with httpx.Client(timeout=30) as client:
r = client.post(url, headers=headers, json=payload)
r.raise_for_status()
data = r.json()
elapsed_time = time.time() - start_time
logger.debug(f"批量数据响应: {data}")
logger.info(f"批量数据接口调用成功: {schema_code}, 耗时: {elapsed_time:.2f}秒")
return data
except httpx.HTTPStatusError as e:
error_details = {
"schema_code": schema_code,
"from_row": from_row,
"page_size": page_size,
"retry_count": retry_count,
"url": str(e.request.url),
"status_code": e.response.status_code,
"response_text": e.response.text
}
logger.error(f"批量数据接口HTTP错误: {json.dumps(error_details, ensure_ascii=False)}")
logger.error(f"请求体: {json.dumps(e.request.read(), ensure_ascii=False)}")
# 只有5xx错误才重试
if e.response.status_code >= 500:
retry_count += 1
if retry_count > max_retries:
logger.error(f"已达到最大重试次数{max_retries},停止重试")
return {
"result": "error",
"code": str(e.response.status_code),
"message": f"HTTP {e.response.status_code}: {e.response.text}",
"url": str(e.request.url),
"details": error_details
}
logger.info(f"HTTP {e.response.status_code}错误可重试,{retry_delay}秒后重试...")
time.sleep(retry_delay)
retry_delay *= 2 # 指数退避
软件特点:简单易用、自动建表、智能同步、安全可靠
核心价值:快速将氚云数据同步至本地,为数据分析和备份提供高效解决方案
DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐
所有评论(0)