本章介绍一个完整的HLA/RTI人工智能大模型仿真示例,该例采用KY-RTI + Python + 国产大模型(通义千问Qwen2-0.5B)。这是一个接近真实环境的低空无人机巡逻态势仿真系统,稍加修改即可以成为一个实际系统。

11.1 AI大模型浪潮下的仿真新需求

        大模型技术正在深刻改变仿真领域。无论是作战仿真推演、自动驾驶仿真测试,还是工业数字孪生决策,集成AI大模型进行智能决策已经成为明显趋势。然而,一个广泛存在的问题是现有的大多数分布式仿真平台缺乏对Python脚本的原生支持。这让大量基于Python开发的AI模型难以直接接入传统的HLA/RTI仿真环境中。

        KY-RTI原生支持Python语言,且性能与C++相当,开发者可以在Python环境中直接使用HLA服务和调用大模型。在之前的文章《KY-RTI 分布仿真技术:第十章 Python程序设计》中,展示了Python聊天程序和时间管理同步的完整代码。使用Python编程极为方便,并且可以与C++、Java、Qt、C#等程序进行混合仿真,一个程序的设计者不需要关心其他程序所采用的编程语言。

        分布式仿真系统集成AI大模型后,任务调度是一个棘手问题。KY-RTI提供的HLA服务特别是时间管理机制,能够很好地满足AI模型推理任务与实时仿真环境严格同步的需求。

11.2 实战示例:低空无人机巡逻态势仿真

        本例是一个基于KY-RTI、Python和通义千问大模型的低空无人机巡逻态势仿真系统。如图11.1所示,态势显示在浏览器中,地图为基于OpenSteetMap的简易可公开的长沙市地图。

        图中右上角显示仿真时间、在空的无人机数,以及Ollama的连接状态等;左侧为地图,中间有一个蓝色的围栏区域为无人机的巡逻区域,图中有4架无人机UAV-1、UAV-2、UAV-3和UAV-4。在开始加入仿真时,无人机为绿色表示电量足够;当电量低于50%时,变成橙色;当电量低于25%时,变成红色;当电量为0时退出仿真。右侧从上到下分别显示大模型建议、规则告警和机队状态三个面板,支持通过滚动鼠标查看更多信息。当无人机电量不正常时会向千问大模型询问接下来应该如何操作,千问大模型回答的信息显示在大模型建议面板处;规则告警分为上下两部分,上部分显示当前的报警信息,下部分显示之前的报警信息。机队状态显示参加仿真的无人机的位置(经度/纬度/高度)、电量、飞行状态。

图11.1 态势图

11.3 系统架构

        仿真系统的联邦名为UavPatrolDemo,包含一个Web态势大屏联邦成员和多个独立的无人机联邦成员。在开发该系统时,联邦成员与Web服务采用Python3;浏览器大屏采用HTML + JavaScript + Leaflet。如图11.2所示,主要的Python文件和数据流都标注在图中。

图11.2 系统架构

        数据流说明:

        (1)每架无人机(UAVAgent)周期性调用updateAttributeValues将自身位置、电量、状态等属性发布到RTI。

       (2)WebDashboard作为订购方,通过reflectAttributeValues回调接收所有无人机的属性更新,并更新机队状态。

        (3)规则告警引擎(位于web_server.py)在收到遥测数据后,根据电量阈值和围栏范围判断是否触发告警,并立即通过API推送到前端。

        (4)当规则告警产生时,若大模型功能启用,则异步调用llm_advisor.py请求本机Ollama部署的千问(Qwen2-0.5B)大模型生成处置建议,结果返回后在前端显示。

        (5)浏览器前端每500毫秒轮询/api/state接口,获取最新态势数据并更新地图标记及右侧面板。

11.4 环境准备与快速部署

11.4.1 环境要求

组件

说明

KY-RTI

支持Python的HLA/RTI软件

Python

开发语言

浏览器

态势显示,通过http://127.0.0.1/8765访问

Ollama

大模型框架

Qwen2-0.5B

国产大模型

11.4.2 启动步骤

        步骤1:启动KY-RTI

        $ cd RTI-1.3NGv6/Linux-os-opt-mt/bin

        $ ./RTIManager.sh

        启动KY-RTI控制台,如图11.3所示。KY-RTI在Windows和Linux下非常可靠,在启动之后用户就可以专心做仿真而不必再关心它了。

图11.3 KY-RTI控制台

        步骤2:启动Ollama大模型服务

        (1)若没有安装Ollama,则执行下列安装命令

        $ curl -fsSL https://ollama.com/install.sh | sudo sh

        (2)启动Ollama, 浏览器可看到: http://localhost:11434/

        $ ollama serve 或者

        $ sudo systemctl restart ollama

        (3)若没有千问大模型,则执行下列命令拉取大模型

        $ ollama pull qwen2:0.5b

        步骤3:启动态势大屏

        $ cd RTI-1.3NGv6/Linux-os-opt-mt/apps-python3/demos/uav-patrol

        $ python3 WebDashboard.py

        在浏览器中打开 http://127.0.0.1:8765。

        步骤4:启动无人机成员

        $ python3 UAVAgent.py

        启动一个或多个终端,本示例在演示时启动了4个终端,在每个终端上执行上述命令,输入无人机名称,譬如:UAV-1、UAV-2、UAV-3、UAV-4。地面指挥所在呼叫无人机时也以该名称作为呼号。

11.4.3 运行态势

        启动后应观察到以下现象:

        (1)WebDashboard终端每收到一次遥测则显示一行信息

        (2)浏览器地图显示参加的无人机(电量≥50%绿色、25%50%橙色、<25%红色),右侧机队状态面板显示每架无人机的实时位置、电量与飞行状态。

        (3)规则告警面板随电量与围栏实时更新:

        电量低于50%且高于25%,提示注意监控

        电量低于25%且大于0,提示建议返航

        电量降至0%,提示电量耗尽,已迫降

        飞出蓝色围栏(经度或纬度越界),提示经度越界纬度越界

        (4)在某类规则告警出现时,会询问千问模型接下来应该如何操作,并在大模型建议面板处显示对应的中文处置建议(每架飞机每类问题请求一次)。

        (5)在电量耗尽后,该无人机在终端打印降落离网信息,自动退出联邦;地图标记消失,地图右上角的“在空N架计数减1。

        下面显示仿真过程中的典型态势图。

        图11.4是启动态势大屏后的态势,此时还没有无人机加入,仿真时间为0,右侧3个面板为空。

图11.4 启动态势大屏

        图11.5是4个无人机加入仿真后的态势,此时4个无人机电量充足,颜色为绿色。

图11.5 4个无人机加入仿真

        图11.6是在仿真时间99时,无人机UAV-1电量为45.6%,低于50%变成橙色;右侧的大模型建议面板显示了一条大模型建议:“T=92 UAV-1 电量=49.9 xxx”;规则告警面板也显示了多条告警信息。

图11.6 无人机UAV-1变成橙色

        图11.7在仿真时间175时,无人机UAV-1和UAV-2电量低于25%变成红色,无人机UAV-3和UAV-4电量低于50%变成橙色;右侧的大模型建议面板有6条大模型建议;规则告警面板显示了更多的告警信息。

图11.7 2架橙色无人机和2架红色无人机

        图11.8在仿真时间192时,无人机UAV-1和UAV-2电量为0已经退出了仿真,只剩下红色的无人机UAV-3和UAV-4;右侧的大模型建议面板有10条大模型建议;规则告警面板显示了更多的告警信息。

图11.8 2架无人机退出仿真

        图11.9在仿真时间228时,4架无人机都因电量为0已经退出了仿真;右侧的大模型建议面板有12条大模型建议;规则告警面板显示了更多的告警信息。

图11.9 4架无人机退出仿真

11.5 大模型建议

        下面是在整个仿真期间的12条大模型建议,大家可以看看是否合适。本例使用的千问大模型参数较少,实际上还可以使用参数更多的其他大模型。

        (1)T=228 UAV-4 电量=0.0%【大模型建议】立即启动应急通信设备,通知地面指挥中心。同时,根据无人机的当前状况,评估可能存在的问题,并采取相应的措施进行处理。在确保安全的前提下,尽快完成任务并返回预定地点。

        (2)T=226 UAV-3 电量=0.0%【大模型建议】立即联系地面指挥中心,请求援助。尽快恢复无人机的正常运行,并采取相应的措施防止事故进一步恶化。同时,确保无人机的安全,避免不必要的风险和损害。

        (3)T=188 UAV-2 电量=0.0%【大模型建议】首先,检查电池是否已经充满电。如果电量不足,应立即进行充电操作。其次,查看无人机的飞行状态是否正常,是否有异常现象。最后,检查地面指挥中心的设备是否正常运行,确保信息传输和数据处理的有效性。

        (4)T=185 UAV-4 电量=24.9%【大模型建议】首先,检查电池电量是否已满。如果已经满,尝试充电或更换电池。如果没有问题,可以尝试使用其他方式(如GPS定位)来确定无人机的位置和飞行状态。如果问题仍然存在,可能需要联系地面指挥中心寻求帮助。

        (5)T=184 UAV-1 电量=0.0%【大模型建议】立即联系地面指挥中心,请求无人机降落并进行充电。同时,通过卫星地图查看无人机的位置和飞行状态,确保安全后再继续执行任务。

        (6)T=180 UAV-3 电量=24.3%【大模型建议】首先,检查电池是否已经充满电。如果电池已满,可以尝试更换新的电池。如果无法解决问题,可能需要联系无人机制造商或维修服务来解决这个问题。

        (7)T=144 UAV-2 电量=24.9%【大模型建议】建议无人机UAV-2立即启动电池充电程序,确保续航能力。同时,应尽快联系地面指挥中心或相关航空公司的技术支持人员,以确定问题的具体原因和解决方案。

        (8)T=140 UAV-4 电量=49.5%【大模型建议】首先,检查UAV-4的电池情况。如果电量不足,应立即联系制造商或航空公司寻求帮助。其次,如果飞行状态正常,可以尝试重新充电。最后,如果问题仍然存在,可能需要更换电池或者进行其他维护工作。

        (9)T=138 UAV-1 电量=24.9%【大模型建议】立即联系地面指挥中心,报告无人机状态并寻求技术支持。在等待地面指挥中心的指令时,可以尝试使用其他方式(如手机)与地面指挥中心保持联系,以便及时获取最新的信息和指导。同时,确保无人机处于安全飞行状态,并采取适当的措施防止电量过低导致的问题发生。

        (10)T=133 UAV-3 电量=49.3%【大模型建议】无人机UAV-3的状况为电量不足。应立即联系地面指挥中心,并采取以下措施: 1. 将电量降至50%以下。 2. 在空中使用备用电池或充电器补充电量。 3. 如果无法恢复,考虑飞行至最近的机场降落。 建议尽快处理好无人机状态和问题,确保安全飞行。

        (11)T=101 UAV-2 电量=49.7%【大模型建议】立即启动无人机紧急降落程序。尽快联系地面指挥中心,报告问题情况,并协调地面指挥中心做出相应处理措施。在等待地面指挥中心的指令时,可以尝试使用无人机上的GPS导航设备进行定位和飞行控制。同时,保持与地面指挥中心的沟通,及时反馈无人机运行状态和遇到的问题。

        (12)T=92 UAV-1 电量=49.9%【大模型建议】立即联系地面指挥中心,报告无人机状况,并请求紧急降落。在等待降落的过程中,确保无人机处于安全的飞行状态。

11.6 关键代码

11.6.1 联邦对象模型(uav_patrol.fed)

(FED

  (Federation UavPatrolFederation)

  (FEDversion v1.3

  )

  (objects

    (class ObjectRoot

      (attribute privilegeToDelete reliable timestamp)

      (class RTIprivate)

      (class uav

        (attribute latitude reliable timestamp)

        (attribute longitude reliable timestamp)

        (attribute altitude reliable timestamp)

        (attribute batteryPct reliable timestamp)

        (attribute flightStatus reliable timestamp)

      )

    )

  )

  (interactions

    (class InteractionRoot reliable receive

      (class RTIprivate reliable receive)

    )

  )

)

11.6.2 无人机成员发布遥测(UAVAgent.py)

#coding:utf-8

"""无人机联邦成员:注册 UAV 对象,每秒发布巡逻遥测。

land()用于 Ctrl+C 或电量耗尽时正常离网。

模拟随机游走、电量消耗;电量低于 20% 时状态切换为 RTB(返航);

电量降至 0% 时停止移动、上报 LANDED 后自动降落并退出联邦。

"""

from RTIambassador import *

from HwFederateAmbassador import *

import GlobalVariables

import sys

import os

import random

import time

federationName = "UavPatrolDemo"

federateName = input("UAV callsign [UAV-1]: ").strip() or "UAV-1"

rti = RTI_RTIambassador()

joined = False

def land(rti, name):

    """无人机正常降落:退出联邦并删除本机发布的 UAV 对象实例。

    Args:

        rti:  RTI实例

        name: 无人机呼号,用于打印提示

    Returns:

        True 表示已成功退出联邦

    """

    print("\nLanding " + name + "...")

    time.sleep(0.3)

    try:

        rti.resignFederationExecution(RTI.ResignAction.DELETEOBJECTSANDRELEASEATTRIBUTES)

        print("Left federation (UAV object removed).")

        return True

    except RTI.FederateOwnsAttributes:

        try:

            rti.resignFederationExecution(RTI.ResignAction.NOACTION)

            print("Left federation.")

            return True

        except:

            return False

    except:

        return False

try:

    rti.createFederationExecution(federationName, "uav_patrol.fed")

except RTI.FederationExecutionAlreadyExists:

    pass

except RTI.CouldNotOpenFED:

    print("Error: place uav_patrol.fed in this directory")

    sys.exit(-1)

except:

    print("Cannot create federation")

    sys.exit(-1)

try:

    rti.joinFederationExecution(federateName, federationName, HwFederateAmbassador())

    joined = True

except:

    print("Failed to join federation")

    sys.exit(-1)

try:

    hUav = rti.getObjectClassHandle("uav")

    GlobalVariables.g_hLat = rti.getAttributeHandle("latitude", hUav)

    GlobalVariables.g_hLon = rti.getAttributeHandle("longitude", hUav)

    GlobalVariables.g_hAlt = rti.getAttributeHandle("altitude", hUav)

    GlobalVariables.g_hBattery = rti.getAttributeHandle("batteryPct", hUav)

    GlobalVariables.g_hStatus = rti.getAttributeHandle("flightStatus", hUav)

    attrs = [GlobalVariables.g_hLat, GlobalVariables.g_hLon, GlobalVariables.g_hAlt,

             GlobalVariables.g_hBattery, GlobalVariables.g_hStatus]

    rti.publishObjectClass(hUav, attrs)

    GlobalVariables.g_hUavInstance = rti.registerObjectInstance(hUav, federateName)

    lat = 28.228 + random.uniform(-0.02, 0.02)

    lon = 112.938 + random.uniform(-0.02, 0.02)

    alt = 120.0

    battery = 100.0

    step = 0

    print("\n=== " + federateName + " airborne ===")

    print("1 step per " + str(int(GlobalVariables.g_step_interval)) + " sec. Ctrl+C to land.\n")

    while True:

        step += 1

        GlobalVariables.g_currentTime = float(step)

        battery = max(0.0, battery - random.uniform(0.3, 0.8))

        if battery <= 0:

            battery = 0.0

            status = "LANDED"

            lat_hold, lon_hold, alt_hold = lat, lon, alt

        else:

            lat += random.uniform(-0.001, 0.001)

            lon += random.uniform(-0.001, 0.001)

            alt = max(80.0, min(200.0, alt + random.uniform(-2, 2)))

            status = "PATROL" if battery > 20 else "RTB"

            lat_hold, lon_hold, alt_hold = lat, lon, alt

        s1 = RTI.HandleValuePair()

        s1.aHandle = GlobalVariables.g_hLat

        s1.aValue = str(round(lat_hold, 6))

        s2 = RTI.HandleValuePair()

        s2.aHandle = GlobalVariables.g_hLon

        s2.aValue = str(round(lon_hold, 6))

        s3 = RTI.HandleValuePair()

        s3.aHandle = GlobalVariables.g_hAlt

        s3.aValue = str(round(alt_hold, 1))

        s4 = RTI.HandleValuePair()

        s4.aHandle = GlobalVariables.g_hBattery

        s4.aValue = str(round(battery, 1))

        s5 = RTI.HandleValuePair()

        s5.aHandle = GlobalVariables.g_hStatus

        s5.aValue = status

        rti.updateAttributeValues(GlobalVariables.g_hUavInstance, [s1, s2, s3, s4, s5], federateName)

        print("Step " + str(step) + " T=" + str(step) + ".0 " + status +

              " batt=" + str(round(battery, 1)) + "%")

        if battery <= 0:

            print("Battery depleted — landing and leaving federation.")

            time.sleep(0.3)

            break

        time.sleep(GlobalVariables.g_step_interval)

except KeyboardInterrupt:

    pass

except:

    print("Runtime error")

    sys.exit(-1)

finally:

    if joined:

        joined = False

        if land(rti, federateName):

            # 正常退出后立刻结束进程,避免 RTI Python 绑定析构时段错误

            os._exit(0)

        电量降至 0 时,无人机将状态设置为LANDED,停止移动并退出联邦。

11.6.3 订购方回调处理(HwFederateAmbassador.py)

#coding:utf-8

"""HLA联邦成员回调:处理 UAV 发现、遥测反射、移除及时间管理事件。"""

import GlobalVariables

class HwFederateAmbassador(object):

    """RTI回调实现类,供WebDashboard、CommandCenter、UAVAgent共用。"""

    def discoverObjectInstance(self, theObject, theObjectClass, theObjectName):

        """RTI发现新UAV对象时调用:记录呼号并初始化机队条目。"""

        GlobalVariables.g_object_names[theObject] = theObjectName

        if GlobalVariables.g_web_lock is not None:

            with GlobalVariables.g_web_lock:

                GlobalVariables.g_fleet.setdefault(theObject, {"status": "JOINING"})

        print("[DISCOVER] UAV " + theObjectName + " handle=" + str(theObject))

    def reflectAttributeValuesWithTime(self, theObject, theAttributes, theTime, theTag, theHandle):

        """带时间戳的属性反射:更新仿真时间后转交 reflectAttributeValues。"""

        GlobalVariables.g_currentTime = theTime

        self.reflectAttributeValues(theObject, theAttributes, theTag)

    def reflectAttributeValues(self, theObject, theAttributes, theTag):

        """订阅方收到 UAV 属性更新:解析遥测、更新机队、推送到 Web 大屏。"""

        state = GlobalVariables.g_fleet.get(theObject, {})

        for item in theAttributes:

            if item.aHandle == GlobalVariables.g_hLat:

                state["lat"] = float(item.aValue)

            elif item.aHandle == GlobalVariables.g_hLon:

                state["lon"] = float(item.aValue)

            elif item.aHandle == GlobalVariables.g_hAlt:

                state["alt"] = float(item.aValue)

            elif item.aHandle == GlobalVariables.g_hBattery:

                state["battery"] = float(item.aValue)

            elif item.aHandle == GlobalVariables.g_hStatus:

                state["status"] = item.aValue

        if GlobalVariables.g_web_lock is not None:

            import web_server

            with GlobalVariables.g_web_lock:

                GlobalVariables.g_fleet[theObject] = state

                GlobalVariables.g_web_sim_time += 1.0

            web_server.push_telemetry(theObject, state)

        else:

            GlobalVariables.g_fleet[theObject] = state

        if state:

            print("[TELEMETRY] UAV#" + str(theObject) +

                  " lat=" + str(state.get("lat", "?")) +

                  " lon=" + str(state.get("lon", "?")) +

                  " alt=" + str(state.get("alt", "?")) + "m" +

                  " batt=" + str(state.get("battery", "?")) + "%" +

                  " status=" + str(state.get("status", "?")))

    def removeObjectInstance(self, theObject, theTag):

        """UAV 离开联邦时调用:从机队与名称表中移除。"""

        if GlobalVariables.g_web_lock is not None:

            with GlobalVariables.g_web_lock:

                GlobalVariables.g_fleet.pop(theObject, None)

                GlobalVariables.g_object_names.pop(theObject, None)

        else:

            GlobalVariables.g_fleet.pop(theObject, None)

            GlobalVariables.g_object_names.pop(theObject, None)

        print("[REMOVE] UAV handle=" + str(theObject))

    def timeRegulationEnabled(self, theFederateTime):

        """时间管理:获得时间管控权,记录当前仿真时间。"""

        GlobalVariables.g_currentTime = theFederateTime

        GlobalVariables.g_bRegulation = True

        print("timeRegulationEnabled: " + str(theFederateTime))

    def timeConstrainedEnabled(self, theFederateTime):

        """时间管理:获得时间约束,记录当前仿真时间。"""

        GlobalVariables.g_currentTime = theFederateTime

        GlobalVariables.g_bConstrained = True

        print("timeConstrainedEnabled: " + str(theFederateTime))

    def timeAdvanceGrant(self, theTime):

        """时间管理:收到时间推进授权,更新仿真时间并同步到大屏。"""

        GlobalVariables.g_currentTime = theTime

        GlobalVariables.g_granted = True

        if GlobalVariables.g_web_lock is not None:

            with GlobalVariables.g_web_lock:

                GlobalVariables.g_web_sim_time = theTime

            print("[TIME] T=" + str(theTime))

        discoverObjectInstance和removeObjectInstance分别处理无人机加入和退出。

11.6.4 规则告警引擎(web_server.py)

def _check_alerts(handle, state):

    """根据电量与围栏规则检测异常,返回告警字典列表。

    Args:

        handle: UAV 对象实例句柄

        state:  遥测字典,含 lat/lon/battery 等字段

    Returns:

        告警列表,每项含 level、source、text、issue 字段

    """

    import GlobalVariables as G

    alerts = []

    batt = state.get("battery", 100)

    lat = state.get("lat", 0)

    lon = state.get("lon", 0)

    name = G.g_object_names.get(handle, "UAV#" + str(handle))

    if batt <= 0:

        alerts.append({

            "level": "danger",

            "source": "rule",

            "text": name + " 电量耗尽,已迫降",

            "issue": "电量为 0%",

        })

    elif batt < 25:

        alerts.append({

            "level": "warn",

            "source": "rule",

            "text": name + " 电量 " + str(round(batt, 1)) + "%,建议返航",

            "issue": "电量低于 25%",

        })

    elif batt < 50:

        alerts.append({

            "level": "warn",

            "source": "rule",

            "text": name + " 电量 " + str(round(batt, 1)) + "%,注意监控",

            "issue": "电量低于 50%",

        })

    if lat < G.g_fence_min_lat or lat > G.g_fence_max_lat:

        alerts.append({

            "level": "danger",

            "source": "rule",

            "text": name + " 纬度越界",

            "issue": "纬度超出巡逻围栏",

        })

    if lon < G.g_fence_min_lon or lon > G.g_fence_max_lon:

        alerts.append({

            "level": "danger",

            "source": "rule",

            "text": name + " 经度越界",

            "issue": "经度超出巡逻围栏",

        })

    return alerts

        围栏范围定义在 GlobalVariables.py 中(默认长沙市范围)。

11.6.5 大模型调用(llm_advisor.py)

        当规则告警触发时,构造提示词并请求 Ollama 部署的千问(Qwen2-0.5B)大模型,每架无人机的每种问题独立请求,互不阻塞。

def generate_advice(name, state, issue):

    """同步调用 Ollama /api/generate,等待模型返回处置建议。

    Args:

        name:  无人机呼号

        state: 遥测字典

        issue: 问题描述

    Returns:

        (advice, error): 成功时 advice 为文本、error 为 None;失败时相反

    """

    prompt = _build_prompt(name, state, issue)

    body = json.dumps({

        "model": OLLAMA_MODEL,

        "prompt": prompt,

        "stream": False,

        "options": {"temperature": 0.3, "num_predict": 120},

    }, ensure_ascii=False).encode("utf-8")

    req = urllib.request.Request(

        OLLAMA_URL + "/api/generate",

        data=body,

        headers={"Content-Type": "application/json"},

        method="POST",

    )

    try:

        with urllib.request.urlopen(req, timeout=REQUEST_TIMEOUT) as resp:

            data = json.loads(resp.read().decode("utf-8"))

        text = (data.get("response") or "").strip()

        return text or None, None

    except urllib.error.HTTPError as exc:

        try:

            detail = exc.read().decode("utf-8", errors="replace")

        except Exception:

            detail = str(exc)

        return None, _friendly_error(detail)

    except Exception as exc:

        return None, str(exc)

11.6.6 前端态势显示(index.html)

        前端页面使用Leaflet地图库,每500ms通过AJAX轮询 /api/state 接口获取最新态势数据,更新地图上的无人机标记以及右侧三个面板的内容。无需任何前端构建工具。该文件有400多行代码,这里忽略。

11.7 项目文件说明

文件名

角色

uav_patrol.fed

HLA对象模型定义文件

UAVAgent.py

无人机联邦成员(发布方)

WebDashboard.py

态势大屏联邦成员,启动 Web 服务

HwFederateAmbassador.py

RTI回调

web_server.py

HTTP服务、规则告警、仿真时间管理

llm_advisor.py

Ollama大模型调用封装

index.html

浏览器前端页面

GlobalVariables.py

全局变量(句柄、围栏、机队字典等)

CommandCenter.py

可选的命令行版指挥所

11.8 扩展与改进方向

        本示例可在多方面进行扩展,让其更接近真实系统。

        (1)扩展对象模型

        在FED文件中增加速度、航向、载荷状态等属性,同步修改发布方和订购方的解析代码。

        (2)增加联邦成员

        加入指挥所、气象站、禁飞区服务等席位。

        (3)增强AI能力

        将规则告警替换为机器学习异常检测(如sklearn);或将大模型建议扩展为多机协同调度策略。

        (4)态势前端

        替换地图底图为商用地图(如高德等),接入真实GIS服务。

11.9 总结

        KY-RTI打开了传统HLA仿真走向AI大模型时代的大门。它独特的Python支持能力,让仿真专家不再纠结于语言适配,可以全力聚焦于AI仿真应用本身。本文通过低空无人机巡逻态势仿真这一完整示例,直观展示了KY-RTI + Python + 国产大模型的技术路径。读者可在此基础上根据实际需求进行扩展。希望本文能帮助开发者利用KY-RTI打造出属于自己的下一代智能仿真系统。

 KY-RTI的Linux、Windows版本和源码请联系作者:walt_lbq@163.com

KY-RTI分布仿真技术:前 言

KY-RTI分布仿真技术:第一章 简介

KY-RTI分布仿真技术:第二章 系统安装

KY-RTI分布仿真技术:第三章 KY-OMT对象模型模板工具

KY-RTI分布仿真技术:第四章 C++程序设计

KY-RTI分布仿真技术:第五章 Qt程序设计

KY-RTI分布仿真技术:第六章 Java程序设计

KY-RTI分布仿真技术:第七章 Visual C++程序设计

KY-RTI分布仿真技术:第八章 Visual C#程序设计

KY-RTI分布仿真技术:第九章 综合演示

KY-RTI分布仿真技术:第十章 Python程序设计

KY-RTI分布仿真技术:第十一章 人工智能大模型仿真--低空无人机巡逻实战示例

KY-RTI分布仿真技术:第十二章 HLA联邦保存与恢复--物流联合仿真中的热恢复与冷恢复

KY-RTI分布仿真技术:附录1 分组聊天(HLA数据分发管理的应用

KY-RTI分布仿真技术:附录2 大联邦(构建1000个成员的HLA/RTI仿真系统)

KY-RTI分布仿真技术:附录3 国产化(操作系统+CPUs)

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐