氚云低代码平台数据同步本地数据库软件介绍

最近需要同步氚云平台的数据至本地做数据分析,传统的方法需要反复的建立表和字段太麻烦了,就开发了本工具能省下更多的时间。
整体流程:氚云编写表结构自定义接口---->获取表结构建表—>调用氚云的标准批量同步接口同步数据。

一、软件概述

本软件是一款专为氚云低代码平台设计的数据同步工具,能够自动将氚云平台的数据同步至本地MySQL数据库,实现数据的快速备份与本地分析。软件具备自动建表、智能字段映射、时间过滤等功能,无需手动编写SQL脚本,大幅提升数据同步效率。
在这里插入图片描述

二、核心功能

1. 自动建表

  • 根据氚云表单结构自动生成MySQL表结构
  • 支持主表、子表及多值表的完整建表
  • 自动添加表注释和字段注释,便于理解数据结构
    在这里插入图片描述
    在这里插入图片描述

2. 智能数据同步

  • 支持全量数据同步和增量数据同步
  • 通过时间过滤配置实现指定时间段数据同步
  • 自动处理关联字段,确保数据完整性

3. 灵活配置

  • 支持自定义数据库连接信息
  • 可配置时间过滤规则(默认30天)
  • 支持字段类型映射自定义
    在这里插入图片描述

三、自动建表字段特性

1. 字段类型映射规则

氚云字段类型 MySQL字段类型 说明
单行文本 VARCHAR(255) 常规文本存储
多行文本 TEXT 长文本存储
数字 DECIMAL(20,6) 支持小数位
日期时间 DATETIME 标准日期时间格式
关联字段 VARCHAR(64) 生成两个字段:字段名_id 和 字段名(内容相同)
关联数组 JSON 存储关联数据数组
多选项 VARCHAR(255) 逗号分隔存储

2. 特殊字段处理

  • 关联字段(Association):同时生成「字段名_id」和「字段名」两个字段,内容完全相同,便于不同场景使用
  • 系统字段:自动保留氚云系统字段(如CreatedTime、ModifiedTime等)
  • 多值表:表名格式为「主表名_多值表名」或「主表名_子表名_多值表名」,清晰反映表间关系

3. 表注释生成规则

  • 主表:直接使用氚云表单显示名称
  • 子表:格式为「主表名称_子表名称」
  • 多值表:格式为「主表名称_多值表名称」或「主表名称_子表名称_多值表名称」

四、使用流程

1. 配置准备

  1. 编辑 config/config.json 文件,配置数据库连接信息
  2. 设置时间过滤规则(可选)
  3. 配置需要同步的表单架构

2. 运行同步

  • 直接运行 h3tomysqlddl.exe 可执行文件
  • 或通过命令行运行:python start.py

3. 查看结果

  • 同步日志保存在 logs/ 目录下
  • 数据自动同步至配置的本地MySQL数据库

五、时间过滤配置说明

配置示例

{
  "schemas": [
    {
      "code": "表单编码",
      "time_filter": {
        "days": 30,
        "field": "CreatedTime",
        "operator": 1
      }
    }
  ]
}

参数说明

  • days:过滤天数(默认30天)
  • field:时间字段(默认"CreatedTime")
  • operator:操作符(默认1=大于等于)

六、软件优势

1. 节省时间

  • 无需手动创建表结构和编写SQL脚本
  • 自动处理复杂的表关系和字段映射
  • 一键启动数据同步流程

2. 数据安全

  • 本地数据库备份,避免数据丢失风险
  • 支持增量同步,减少数据传输量
  • 完整保留数据结构和关系

3. 便于分析

  • 本地数据库支持复杂SQL查询
  • 可与数据分析工具(如Excel、PowerBI等)直接对接
  • 快速响应数据分析需求

4. 灵活扩展

  • 支持自定义字段类型映射
  • 可配置同步规则和过滤条件
  • 兼容氚云平台的各种字段类型

七、适用场景

  1. 数据备份:定期同步氚云数据至本地,确保数据安全
  2. 数据分析:利用本地数据库进行复杂数据分析和报表生成
  3. 开发测试:为开发和测试提供真实数据环境
  4. 数据迁移:在不同系统间迁移数据时作为中间环节

八、技术规格

  • 开发语言:Python
  • 支持数据库:MySQL
  • 依赖:SQLAlchemy、Requests、PyInstaller
  • 运行环境:Windows

九、部分核心代码

1、氚云表结构自定义接口

 private Dictionary<string, object> ExtractFieldInfo(H3.DataModel.PropertySchema prop)
    {
        var dict = new Dictionary<string, object>();
        dict["Name"] = prop.Name;
        dict["DisplayName"] = prop.DisplayName;
        dict["DataType"] = prop.DataType.ToString();
        dict["DataTypeName"] = GetDataTypeName(prop.DataType.ToString());

        // 特殊字段显示名处理(参考定时器日志)
        if (prop.Name == "ObjectId") dict["DisplayName"] = "数据ID";
        else if (prop.Name == "ModifiedBy") dict["DisplayName"] = "修改人";
        else if (prop.Name == "WorkflowInstanceId") dict["DisplayName"] = "工作流程实例ID";
        else if (prop.Name == "Status") dict["DisplayName"] = "流程状态";

        // 关联表单信息
        string dataTypeStr = prop.DataType.ToString();
        if (dataTypeStr == "Association" || dataTypeStr == "AssociationArray")
        {
            dict["AssociationSchemaCode"] = prop.AssociationSchemaCode;
            dict["AssociationSchemaName"] = GetSchemaName(prop.AssociationSchemaCode);
        }

        // 子表处理
        if (dataTypeStr == "BizObjectArray")
        {
            var childSchema = prop.ChildSchema;
            if (childSchema != null && childSchema.Properties != null)
            {
                var childFields = new List<Dictionary<string, object>>();
                foreach (var childProp in childSchema.Properties)
                {
                    childFields.Add(ExtractFieldInfo(childProp));
                }
                dict["ChildFields"] = childFields;
            }
        }

        return dict;
    }

    private string GetDataTypeName(string dataType)
    {
        switch (dataType)
        {
            case "ShortString": return "单行文本";
            case "Unit": return "系统组织基本单元(单)";
            case "DateTime": return "日期";
            case "Association": return "关联表单(单)";
            case "Double": return "(数字)浮点型";
            case "Int": return "(数字)整型";
            case "String": return "多行文本";
            case "Bool": return "是/否";
            case "File": return "附件";
            case "Image": return "图片";
            case "Address": return "地址";
            case "Map": return "位置";
            case "UnitArray": return "系统组织基本单元(多)";
            case "AssociationArray": return "关联表单(多)";
            case "BizObjectArray": return "子表";
            default: return dataType;
        }
    }

2、表操作

def table_exists(self, table: str):
        logger.info(f"检查表是否存在: {table}")
        sql = "SELECT COUNT(1) c FROM information_schema.tables WHERE table_schema = DATABASE() AND table_name = :t"
        with self.engine.begin() as conn:
            r = conn.execute(text(sql), {"t": table}).scalar()
            exists = r and int(r) > 0
            logger.info(f"表{table}存在: {exists}")
            return exists

    def get_columns(self, table: str):
        logger.info(f"获取表{table}的字段信息")
        sql = "SELECT COLUMN_NAME, DATA_TYPE, COLUMN_TYPE, IS_NULLABLE FROM information_schema.columns WHERE table_schema = DATABASE() AND table_name = :t"
        with self.engine.begin() as conn:
            res = conn.execute(text(sql), {"t": table}).fetchall()
            columns = {row[0]: {"data_type": row[1], "column_type": row[2], "is_nullable": row[3]} for row in res}
            logger.info(f"表{table}包含{len(columns)}个字段")
            return columns
    
    def index_exists(self, table: str, index_name: str):
        logger.info(f"检查表{table}的索引{index_name}是否存在")
        sql = "SELECT COUNT(1) c FROM information_schema.statistics WHERE table_schema = DATABASE() AND table_name = :t AND index_name = :i"
        with self.engine.begin() as conn:
            r = conn.execute(text(sql), {"t": table, "i": index_name}).scalar()
            exists = r and int(r) > 0
            logger.info(f"索引{index_name}存在: {exists}")
            return exists

3、使用氚云标准同步接口

while retry_count <= max_retries:
        start_time = time.time()
        try:
            logger.info(f"调用API批量加载数据: {schema_code} from={from_row} size={page_size} (尝试{retry_count+1}/{max_retries+1})")
            logger.debug(f"API URL: {url}")
            logger.debug(f"请求头: {headers}")
            logger.debug(f"请求体: {payload}")
            with httpx.Client(timeout=30) as client:
                r = client.post(url, headers=headers, json=payload)
                r.raise_for_status()
                data = r.json()
                elapsed_time = time.time() - start_time
                logger.debug(f"批量数据响应: {data}")
                logger.info(f"批量数据接口调用成功: {schema_code}, 耗时: {elapsed_time:.2f}秒")
                return data
        except httpx.HTTPStatusError as e:
            error_details = {
                "schema_code": schema_code,
                "from_row": from_row,
                "page_size": page_size,
                "retry_count": retry_count,
                "url": str(e.request.url),
                "status_code": e.response.status_code,
                "response_text": e.response.text
            }
            logger.error(f"批量数据接口HTTP错误: {json.dumps(error_details, ensure_ascii=False)}")
            logger.error(f"请求体: {json.dumps(e.request.read(), ensure_ascii=False)}")
            # 只有5xx错误才重试
            if e.response.status_code >= 500:
                retry_count += 1
                if retry_count > max_retries:
                    logger.error(f"已达到最大重试次数{max_retries},停止重试")
                    return {
                        "result": "error",
                        "code": str(e.response.status_code),
                        "message": f"HTTP {e.response.status_code}: {e.response.text}",
                        "url": str(e.request.url),
                        "details": error_details
                    }
                logger.info(f"HTTP {e.response.status_code}错误可重试,{retry_delay}秒后重试...")
                time.sleep(retry_delay)
                retry_delay *= 2  # 指数退避

软件特点:简单易用、自动建表、智能同步、安全可靠
核心价值:快速将氚云数据同步至本地,为数据分析和备份提供高效解决方案

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐