随着通义千问大语言模型的发布以及API、SDK的发布,规划利用通义千问大模型开放赋能能力去搭建一个千问chat,说干就干。

1、后端功能接口整体设计:

1 系统登录、退出、修改密码 Login、LoginOut、

editpassword

2

创建新的回话

newsession

3

修改回话名称

editsession

4

删除回话

deletesession

5 清空会话

clearsession

6

会话列表

sessionlist

7

对话列表

messagelist

8

删除对话

deletemessage

9

提交问话

chat

2、系统工具类

from flask import Response
import json
import hashlib
import json
import dashscope
from dashscope import TextEmbedding
from dashvector import Client, Doc

#返回信息操作类
class ResultBody:

    def ok(msg='',data=''):
        return Response(json.dumps({'code':200,'msg':msg,'data':data},indent=6, ensure_ascii=False),content_type='application/json')

    def failed(msg='',data=''):
        return Response(json.dumps({'code':1001,'msg':msg,'data':data},indent=6, ensure_ascii=False),content_type='application/json')

    def error(msg='',data=''):
        return Response(json.dumps({'code':500,'msg':msg,'data':data},indent=6, ensure_ascii=False),content_type='application/json')


#加密操作类   
class HashDecEnc:
    def encrypt(data):  
        hashed_data = hashlib.sha256(data.encode()).hexdigest()  
        return hashed_data
    

#知识向量操作类
class Embeding:

    # 内容向量化
    def generate_embeddings(content):
        rsp = TextEmbedding.call(model=TextEmbedding.Models.text_embedding_v1,input=content)
        embeddings = [record['embedding'] for record in rsp.output['embeddings']]
        return embeddings if isinstance(content, list) else embeddings[0]
    
    # 写入阿里云向量
    def knowledge_upsert(knowledge):
        dashscope.api_key = ''   #apikey
        # 初始化 dashvector client
        client = Client(api_key='sk-',endpoint='vrs-cn-uqm3jv0rn0002h.dashvector.cn-hangzhou.aliyuncs.com')
        collection = client.get('keygrow')
        for Knowledgepoints in knowledge:
            Knowledgepoints_id=Knowledgepoints["id"]
            Knowledgepoints_title = Knowledgepoints["knowledge_title"]
            Knowledgepoints_content = Knowledgepoints["knowledge_content"]
            vectors = Embeding.generate_embeddings(Knowledgepoints_title)
            # 写入 dashvector 构建索引
            rsp = collection.upsert(
                [
                    Doc(id=str(Knowledgepoints_id), vector=vectors, fields={'id': Knowledgepoints_id, 'title': Knowledgepoints_title, 'content': Knowledgepoints_content}) 
                ]
            )


3、数据库连接类:

import pymysql
from dbutils.pooled_db import PooledDB

DB_CONFIG = {
'host':'',
'user':'',
'password':'',
'database':'',
'port':3306,
'charset':'utf8',
'cursorclass':pymysql.cursors.DictCursor
}

pool = PooledDB(
    creator=pymysql,
    maxconnections=30,  # 设置最大连接数  
    mincached=2,  # 设置最小空闲连接数  
    maxcached=5,  # 设置最大空闲连接数  
    maxshared=3,  # 设置最大共享连接数  
    blocking=True,  # 设置阻塞模式  
    maxusage=None,  # 设置最大使用时间  
    setsession=[],  # 设置会话参数  
    ping=0,  # 设置 ping 间隔 
    **DB_CONFIG 
)
import pymysql
from dbutils.pooled_db import PooledDB
import dbutils.pooled_db
import KDBPool

class KMySQL:
    
    def get_conn():
        return KeygrowDBPool.pool.connection()
    
    def get_cursor():
         return KeygrowDBPool.pool.connection().cursor()

    def execute(self,sql,params=None):
        conn = KeygrowDBPool.pool.connection()
        try:
            with conn.cursor() as cursor:
                if cursor:
                    if params:
                        cursor.execute(sql,params)
                    else:
                        cursor.execute(sql)
                    conn.commit()
        except Exception as e:
            print(f"Error: {e}") 
            conn.rollback()
        finally:
            conn.close()

    def fetchall(self,sql,params=None):
        conn = KeygrowDBPool.pool.connection()
        try:
             with conn.cursor() as cursor:
                if cursor:
                    if params:
                        cursor.execute(sql,params)
                    else:
                        cursor.execute(sql)
                    conn.commit()
                    result = cursor.fetchall()
                    if result:
                        return result
                else:
                    print(f"fetchall查询游标错误: {e}") 
                    return None
                cursor.close()
                conn.close()
        except Exception as e:
            print(f"fetchallError: {e}") 
            return None
        finally:
            conn.close()

    def fetchone(self,sql,params=None):
        # self.connect()
        try:
            conn = KeygrowDBPool.pool.connection()
            with conn.cursor() as cursor:
                if cursor:
                    if params:
                        cursor.execute(sql,params)
                    else:   
                        cursor.execute(sql)  
                    conn.commit() 
                    result = cursor.fetchone()
                    if result:
                        return result
                else:
                    print(f"fetchone查询游标错误:") 
                    return None
                # cursor.close()
                # conn.close()
        except Exception as e:
            print(f"fetchoneError: {e}") 
            return None
        finally:
            conn.close()

4、业务功能api

from http import HTTPStatus
from dashscope import Generation
import dashscope
from dashscope import TextEmbedding
from dashvector import Client, Doc
import mdtex2html
import gradio as gr
from gradio.components import Textbox
from flask import Flask, request,Response,stream_with_context
from flask_cors import CORS
import time 
from datetime import datetime
import json
import uuid
from  ksql import KeygrowMySQL
from  ktils import ResultBody
from  ktils import HashDecEnc
from  ktils import Embeding


app = Flask(__name__)
CORS(app)  # 启用 CORS

app.secret_key = ''
app.config['SESSION_PERMANENT'] = False  
app.config['SESSION_TYPE'] = 'filesystem'  
app.config['SESSION_TIMEOUT'] = 60 * 60 * 24
dashscope.api_key=""
dashvector_api_key=""
dashvector_endpoint='vrs-cn-uqm3jv0rn0002h.dashvector.cn-hangzhou.aliyuncs.com'

chat_model="qwen-14b-chat"
# chat_model='qwen-7b-v1'

#定义返回Header
headers = {'Content-Type':'text/plain','Transfer-Encoding':'chunked'}

mysql = KeygrowMySQL()


#用户登录
@app.route('/login',methods=['POST'])  
def login():  
   try:
      userphone = request.form.get('userphone')
      userpassword = request.form.get('userpassword')
      if userphone:
         if userpassword:
              userpassword = HashDecEnc.encrypt(userpassword)
              querySql = "select phone,id from chat_user where phone=%s and userpassword=%s and enable=1"
              result = mysql.fetchone(querySql,(userphone,userpassword))
              if result:
                  if result["phone"]==userphone:
                     userid=result["id"]
                     updateSql="update chat_user set token=%s,logintime=%s where phone=%s"
                     token=str(uuid.uuid5(uuid.NAMESPACE_DNS,userphone+str(datetime.now())))
                     logintime = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                     mysql.execute(updateSql,(token,logintime,userphone))
                     data = {"token": token, "userid": userid}
                     return ResultBody.ok("登录成功",data)
              else:
                  return ResultBody.failed("用户或密码错误",'')
         else:
            return ResultBody.failed("密码不能为空",'')
      else:
         return ResultBody.failed("用户手机号不能为空",'')
   except Exception as e:
      return ResultBody.error('服务器异常'+str(e))

#修改密码  
@app.route('/editpassword',methods=['POST'])
def editpassword():
   try:
       token = request.headers.get('token')
       flag,userid,username = validatetoken(token)
       if flag:
          oldpassword = request.form.get('oldpassword')
          newpassword = request.form.get('newpassword')
          if oldpassword:
             oldpassword = HashDecEnc.encrypt(oldpassword.strip())
             querySql = "select id from chat_user where id=%s and userpassword=%s and enable=1"
             result = mysql.fetchone(querySql,(userid,oldpassword))
             if result:
                  if int(result["id"])==userid: 
                     newpassword = HashDecEnc.encrypt(newpassword.strip())
                     updateSql = "update chat_user set userpassword=%s,token='' where id=%s"
                     mysql.execute(updateSql,(newpassword,userid))

                     return ResultBody.ok("修改密码成功",'')
                  else:
                     return ResultBody.failed("旧密码不正确",'')   
             else:
                return ResultBody.failed("旧密码不正确",'')
          else:
             return ResultBody.failed("旧密码不能为空",'')   
       else:  
          return ResultBody.failed("Token不正确",'')   
   except Exception as e:
       return ResultBody.error('服务器异常'+str(e))

#用户退出
@app.route('/loginout',methods=['POST'])  
def loginout():  
   try:
      token = request.headers.get('token')
      flag,userid,username = validatetoken(token)
      if flag:
         updateSql="update chat_user set token='' where token=%s"
         mysql.execute(updateSql,(token))
         return ResultBody.ok("退出成功")
      else:
         return ResultBody.failed("Token不正确")
   except Exception as e:
      return ResultBody.error('服务器异常'+str(e))

#验证Token,并返回用户ID、用户名称
def validatetoken(token):
   querySql = "select id,realname,logintime from chat_user where token=%s and enable=1"
   result = mysql.fetchone(querySql,(token))
   if result:
         lastlogintime = datetime.strptime(str(result["logintime"]),"%Y-%m-%d %H:%M:%S")
         currenttime = datetime.now()
         if abs(currenttime - lastlogintime).total_seconds() > 60 * 60 * 24:
            return False,0,''
         else:
            return True,result["id"],result["realname"]
   else:
      return False,0,''

#验证Session,并返回SessionID、用户ID
def validatesession(sessionid,userid):
   querySql = "select sessionid,userid from  chat_session  where sessionid=%s and userid=%s and enable=1"
   result = mysql.fetchone(querySql,(sessionid,userid))
   if result:
      return True
   else:
      return False
    
#验证Session是否可以创建
def validatesessioncreate(userid): 
   querySql = "select count(*) as sessionnum from  chat_session  where userid=%s and enable=1"
   result = mysql.fetchone(querySql,(userid))
   if int(result["sessionnum"])<50:
      return True
   else:
      return False

#验证会话是否可以创建
def validatemessagecreate(sessionid):
   querySql = "select count(*) as messagenum from  chat_message  where sessionid=%s and enable=1"
   result = mysql.fetchone(querySql,(sessionid))
   if int(result["messagenum"])<100:
      return True
   else:
      return False
   

#验证Session,并返回SessionID、用户ID
def validateknowledgesession(sessionid,userid):
   querySql = "select sessionid,userid from  chat_knowledge_session  where sessionid=%s and userid=%s and enable=1"
   result = mysql.fetchone(querySql,(sessionid,userid))
   if result:
      return True
   else:
      return False
    
#验证Session是否可以创建
def validateknowledgesessioncreate(userid): 
   querySql = "select count(*) as sessionnum from  chat_knowledge_session  where userid=%s and enable=1"
   result = mysql.fetchone(querySql,(userid))
   if int(result["sessionnum"])<50:
      return True
   else:
      return False

#验证会话是否可以创建
def validateknowledgemessagecreate(sessionid):
   querySql = "select count(*) as messagenum from  chat_knowledge_message  where sessionid=%s and enable=1"
   result = mysql.fetchone(querySql,(sessionid))
   if int(result["messagenum"])<100:
      return True
   else:
      return False
   
                   
#创建新的回话
@app.route('/newsession',methods=['POST'])  
def newsession(): 
   try:
      token = request.headers.get('token')
      flag,userid,username = validatetoken(token)
      if flag:
         flag=validatesessioncreate(userid)
         if flag:
            sessionid=str(uuid.uuid5(uuid.NAMESPACE_DNS,token+str(datetime.now())))
            createtime=datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            if sessionid:
               sessiontitle = request.form.get('sessiontitle')
               if sessiontitle:
                  insertSql="insert into chat_session (sessionid,userid,createtime,sessiontitle) values (%s,%s,%s,%s)"
                  mysql.execute(insertSql,(sessionid,userid,createtime,sessiontitle))
                  data = {"sessionid": sessionid, "createtime": createtime}
                  return ResultBody.ok('新建会话成功',data)
               else:
                  return ResultBody.failed('新建会话失败','会话标题不正确')
            else:
               return ResultBody.failed('新建会话失败','会话ID不正确')
         else:
               return ResultBody.failed('会话最大支持50个','会话最大支持50个')
      else:
         return ResultBody.failed("Token不正确")
   except Exception as e:
      return ResultBody.error('服务器异常'+str(e))
   
#创建新的知识回话
@app.route('/knowledge/newsession',methods=['POST'])  
def knowledgenewsession(): 
   try:
      token = request.headers.get('token')
      flag,userid,username = validatetoken(token)
      if flag:
         flag=validateknowledgesessioncreate(userid)
         if flag:
            sessionid=str(uuid.uuid5(uuid.NAMESPACE_DNS,token+str(datetime.now())))
            createtime=datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            if sessionid:
               sessiontitle = request.form.get('sessiontitle')
               if sessiontitle:
                  insertSql="insert into chat_knowledge_session (sessionid,userid,createtime,sessiontitle) values (%s,%s,%s,%s)"
                  mysql.execute(insertSql,(sessionid,userid,createtime,sessiontitle))
                  data = {"sessionid": sessionid, "createtime": createtime}
                  return ResultBody.ok('新建会话成功',data)
               else:
                  return ResultBody.failed('新建知识会话失败','会话标题不正确')
            else:
               return ResultBody.failed('新建只是会话失败','会话ID不正确')
         else:
               return ResultBody.failed('知识会话最大支持50个','会话最大支持50个')
      else:
         return ResultBody.failed("Token不正确")
   except Exception as e:
      return ResultBody.error('服务器异常'+str(e))

#修改回话名称
@app.route('/editsession',methods=['POST'])  
def editsession():  
   try:
      token = request.headers.get('token')
      flag,userid,username = validatetoken(token)
      if flag:
         sessionid = request.form.get('sessionid')
         sessiontitle = request.form.get('sessiontitle')
         flag = validatesession(sessionid,userid)
         if flag:
            if sessiontitle:
               editSql="update chat_session set sessiontitle=%s  where userid=%s and sessionid=%s"
               mysql.execute(editSql,(sessiontitle,userid,sessionid))
               return ResultBody.ok('会话修改成功')
            else:
               return ResultBody.failed('会话标题不能为空')
         else:
            return ResultBody.failed('SessionID不正确')
      else:
            return ResultBody.failed("Token不正确")
   except Exception as e:
      return ResultBody.error('服务器异常'+str(e))
   

#删除回话
@app.route('/knowledge/deletesession',methods=['POST'])  
def knowledgedeletesession():  
   try:
      token = request.headers.get('token')
      flag,userid,username = validatetoken(token)
      if flag:
         sessionid = request.form.get('sessionid')
         flag = validateknowledgesession(sessionid,userid)
         if flag:
            deleteSql="update chat_knowledge_session set enable=0  where userid=%s and sessionid=%s"
            mysql.execute(deleteSql,(userid,sessionid))
            return ResultBody.ok("会话删除成功")
         else:
            return ResultBody.failed("SessionID不正确")
      else:
         return ResultBody.failed("Token不正确")
   except Exception as e:
      return ResultBody.error('服务器异常'+str(e))
   
#删除知识回话
@app.route('/deletesession',methods=['POST'])  
def deletesession():  
   try:
      token = request.headers.get('token')
      flag,userid,username = validatetoken(token)
      if flag:
         sessionid = request.form.get('sessionid')
         flag = validatesession(sessionid,userid)
         if flag:
            deleteSql="update chat_session set enable=0  where userid=%s and sessionid=%s"
            mysql.execute(deleteSql,(userid,sessionid))
            return ResultBody.ok("知识会话删除成功")
         else:
            return ResultBody.failed("SessionID不正确")
      else:
         return ResultBody.failed("Token不正确")
   except Exception as e:
      return ResultBody.error('服务器异常'+str(e))
   
#清空当前回话
@app.route('/clearsession',methods=['POST'])  
def clearsession():  
   try:
      token = request.headers.get('token')
      flag,userid,username = validatetoken(token)
      if flag:
         sessionid = request.form.get('sessionid')
         flag = validatesession(sessionid,userid)
         if flag:
            deleteSql="update chat_message set enable=0  where  sessionid=%s"
            mysql.execute(deleteSql,(sessionid))
            return ResultBody.ok("会对话清空成功")
         else:
            return ResultBody.failed("SessionID不正确")
      else:
         return ResultBody.failed("Token不正确")
   except Exception as e:
      return ResultBody.error('服务器异常'+str(e))
   
#清空当前知识回话
@app.route('/knowledge/clearsession',methods=['POST'])  
def knowledgeclearsession():  
   try:
      token = request.headers.get('token')
      flag,userid,username = validatetoken(token)
      if flag:
         sessionid = request.form.get('sessionid')
         flag = validateknowledgesession(sessionid,userid)
         if flag:
            deleteSql="update chat_knowledge_message set enable=0  where  sessionid=%s"
            mysql.execute(deleteSql,(sessionid))
            return ResultBody.ok("知识会对话清空成功")
         else:
            return ResultBody.failed("SessionID不正确")
      else:
         return ResultBody.failed("Token不正确")
   except Exception as e:
      return ResultBody.error('服务器异常'+str(e))


#知识会话列表
@app.route('/knowledge/sessionlist',methods=['POST'])  
def knowledgesessionlist():  
   try:
      token = request.headers.get('token')
      flag,userid,username = validatetoken(token)
      if flag:
         selectSql="select sessionid,sessiontitle,userid,createtime from  chat_knowledge_session  where userid=%s and enable=1 order by id desc limit 50"
         data_list=mysql.fetchall(selectSql,(userid))
         return ResultBody.ok("知识会话列表查询成功",data_list)
      else:
         return ResultBody.failed("Token不正确")    
   except Exception as e:
      return ResultBody.error('服务器异常'+str(e))

#会话列表
@app.route('/sessionlist',methods=['POST'])  
def sessionlist():  
   try:
      token = request.headers.get('token')
      flag,userid,username = validatetoken(token)
      if flag:
         selectSql="select sessionid,sessiontitle,userid,createtime from  chat_session  where userid=%s and enable=1 order by id desc limit 50"
         data_list=mysql.fetchall(selectSql,(userid))
         return ResultBody.ok("会话列表查询成功",data_list)
      else:
         return ResultBody.failed("Token不正确")    
   except Exception as e:
      return ResultBody.error('服务器异常'+str(e))

#知识对话列表
@app.route('/knowledge/messagelist',methods=['POST'])  
def knowledgemessagelist():  
   try:
      token = request.headers.get('token')
      flag,userid,username = validatetoken(token)
      if flag:
         sessionid = request.form.get('sessionid')
         flag = validateknowledgesession(sessionid,userid)
         if flag:
            selectSql="select sessionid,messageid,content,sendertype,createtime from  chat_knowledge_message  where sessionid=%s and enable=1 order by id asc limit 100"
            data_list = mysql.fetchall(selectSql,(sessionid))
            return ResultBody.ok("知识对话列表查询成功",data_list)
         else:
            return ResultBody.failed("SessionID不正确")
      else:
         return ResultBody.failed("Token不正确")
   except Exception as e:
      return ResultBody.error('服务器异常'+str(e))

#对话列表
@app.route('/messagelist',methods=['POST'])  
def messagelist():  
   try:
      token = request.headers.get('token')
      flag,userid,username = validatetoken(token)
      if flag:
         sessionid = request.form.get('sessionid')
         flag = validatesession(sessionid,userid)
         if flag:
            selectSql="select sessionid,messageid,content,sendertype,createtime from  chat_message  where sessionid=%s and enable=1 order by id asc limit 100"
            data_list = mysql.fetchall(selectSql,(sessionid))
            return ResultBody.ok("对话列表查询成功",data_list)
         else:
            return ResultBody.failed("SessionID不正确")
      else:
         return ResultBody.failed("Token不正确")
   except Exception as e:
      return ResultBody.error('服务器异常'+str(e))  

#删除对话
@app.route('/deletemessage',methods=['POST'])  
def deletemessage():  
   try:
      token = request.headers.get('token')
      flag,userid,username = validatetoken(token)
      if flag:
         messageid = request.form.get('messageid')
         sessionid = request.form.get('sessionid')
         flag = validatesession(sessionid,userid)
         if flag:
            querySql = "select a.messageid,b.sessionid,c.id,c.token from chat_message a,chat_session b,chat_user c where  a.sessionid=b.sessionid  and b.userid=c.id and b.sessionid=%s and c.token=%s and a.messageid=%s"
            result=mysql.fetchone(querySql,(sessionid,token,messageid))
            if result:
               if messageid==result["messageid"]:
                  updateSql="update chat_message set enable=0  where messageid=%s"
                  mysql.execute(updateSql,(messageid))
                  return ResultBody.ok("对话删除删除成功")
               else:
                  return ResultBody.failed("MessageID不正确")
            else:
               return ResultBody.failed("对话不存在")
         else:
               return ResultBody.failed("对话不存在")
      else:
         return ResultBody.failed("Token不正确")
   except Exception as e:
      return ResultBody.error('服务器异常'+str(e))


#删除知识对话
@app.route('/knowledge/deletemessage',methods=['POST'])  
def knowledgedeletemessage():  
   try:
      token = request.headers.get('token')
      flag,userid,username = validatetoken(token)
      if flag:
         messageid = request.form.get('messageid')
         sessionid = request.form.get('sessionid')
         flag = validateknowledgesession(sessionid,userid)
         if flag:
            querySql = "select a.messageid,b.sessionid,c.id,c.token from chat_knowledge_message a,chat_knowledge_session b,chat_user c where  a.sessionid=b.sessionid  and b.userid=c.id and b.sessionid=%s and c.token=%s and a.messageid=%s"
            result=mysql.fetchone(querySql,(sessionid,token,messageid))
            if result:
               if messageid==result["messageid"]:
                  updateSql="update chat_knowledge_message set enable=0  where messageid=%s"
                  mysql.execute(updateSql,(messageid))
                  return ResultBody.ok("知识对话删除删除成功")
               else:
                  return ResultBody.failed("MessageID不正确")
            else:
               return ResultBody.failed("知识对话不存在")
         else:
               return ResultBody.failed("知识对话不存在")
      else:
         return ResultBody.failed("Token不正确")
   except Exception as e:
      return ResultBody.error('服务器异常'+str(e))

#知识类别列表
@app.route('/knowledge/classlist',methods=['POST'])  
def knowledgeclasslist():  
   try:
      token = request.headers.get('token')
      flag,userid,username = validatetoken(token)
      if flag:
         selectSql="select id,fatherid,classname from chat_knowledge_classinfo where enable=1 order by id asc"
         data_list=mysql.fetchall(selectSql)
         return ResultBody.ok("知识类别列表查询成功",data_list)
      else:
         return ResultBody.failed("Token不正确")    
   except Exception as e:
      return ResultBody.error('服务器异常'+str(e))


#知识类别管理--添加
@app.route('/manager/classadd',methods=['POST'])
def knowledgeclassadd():
   try:
      token = request.headers.get('token')
      flag,userid,username = validatetoken(token)
      if flag:
         if int(userid)==1: #当前用户为管理员时
            classname=request.form.get('classname')
            fatherid=request.form.get('fatherid')
            if classname and fatherid:
               insertSql="insert into chat_knowledge_classinfo (classname,fatherid,enable) values(%s,%s,1)"
               mysql.execute(insertSql,(classname,fatherid))   
               return ResultBody.ok("知识类别添加成功")   
            else:
               return ResultBody.failed("参数不完整")
         else:
            return ResultBody.failed("无权限")
      else: 
         return ResultBody.failed("Token不正确")   
   except Exception as e:
      return ResultBody.error('服务器异常'+str(e))
   

#知识类别管理--添加
@app.route('/manager/classedit',methods=['POST'])
def knowledgeclassedit():
   try:
      token = request.headers.get('token')
      flag,userid,username = validatetoken(token)
      if flag:
         if int(userid)==1: #当前用户为管理员时
            classname=request.form.get('classname')
            classid=request.form.get('classid')
            if classname and classid:
               updateSql="update chat_knowledge_classinfo set classname = %s where id = %s"
               mysql.execute(updateSql,(classname,classid))   
               return ResultBody.ok("知识类别修改成功")   
            else:
               return ResultBody.failed("参数不完整")
         else:
            return ResultBody.failed("无权限")
      else: 
         return ResultBody.failed("Token不正确")   
   except Exception as e:
      return ResultBody.error('服务器异常'+str(e))


#知识添加
@app.route('/manager/infoadd',methods=['POST'])
def knowledgeinfoadd():
   try:
      token = request.headers.get('token')
      flag,userid,username = validatetoken(token)
      if flag:
         if int(userid)==1: #当前用户为管理员时
            knowledgetitle=request.form.get('knowledgetitle')
            knowledgedes=request.form.get('knowledgedes')
            knowledgecontent=request.form.get('knowledgecontent')
            classid=request.form.get('classid')
            fatherclassid=request.form.get('fatherclassid')
            createtime = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            if knowledgetitle and knowledgedes and classid and fatherclassid:
               knowledgeid=str(uuid.uuid5(uuid.NAMESPACE_DNS,classid+str(datetime.now())))
               insertSql="insert into chat_knowledge_info (knowledge_id,knowledge_title,knowledge_des,knowledge_content,classid,fatherclassid,createtime,enable) values (%s,%s,%s,%s,%s,%s,%s,1)"
               mysql.execute(insertSql,(knowledgeid,knowledgetitle,knowledgedes,knowledgecontent,classid,fatherclassid,createtime))
               return ResultBody.ok("知识添加成功")   
            else:
               return ResultBody.failed("参数不完整")
         else:
            return ResultBody.failed("无权限")   
      else: 
         return ResultBody.failed("token过期")
   except Exception as e:  
      return ResultBody.error('Token不正确'+str(e))


#知识修改
@app.route('/manager/infoedit',methods=['POST'])
def knowledgeinfoedit():
   try:
      token = request.headers.get('token')
      flag,userid,username = validatetoken(token)
      if flag:
         if int(userid)==1: #当前用户为管理员时
            knowledgetitle=request.form.get('knowledgetitle')
            knowledgedes=request.form.get('knowledgedes')
            knowledgecontent=request.form.get('knowledgecontent')
            classid=request.form.get('classid')
            fatherclassid=request.form.get('fatherclassid')
            knowledgeid=request.form.get('knowledgeid')
            if knowledgetitle and knowledgedes and classid and fatherclassid and knowledgeid and knowledgecontent:
               updateSql="update chat_knowledge_info set knowledge_title=%s,knowledge_des=%s,knowledge_content=%s,classid=%s,fatherclassid=%s where id=%s"
               mysql.execute(updateSql,(knowledgetitle,knowledgedes,knowledgecontent,classid,fatherclassid,knowledgeid))
               return ResultBody.ok("知识修改成功")   
            else:
               return ResultBody.failed("参数不完整")
         else:
            return ResultBody.failed("无权限")   
      else: 
         return ResultBody.failed("token过期")
   except Exception as e:  
      return ResultBody.error('Token不正确'+str(e))


#知识集合向量化入库
@app.route('/manager/infoembeding',methods=['POST'])  
def knowledgeinfoembeding():  
   try:
      token = request.headers.get('token')
      flag,userid,username = validatetoken(token)
      if flag:
         if int(userid)==1: #当前用户为管理员时
            selectSql="select id,knowledge_title,knowledge_des,knowledge_content,classid,fatherclassid,usenum from chat_knowledge_info where enable=1  order by id desc"
            data_list=mysql.fetchall(selectSql)
            Embeding.knowledge_upsert(data_list)
            return ResultBody.ok("知识向量入库成功")   
         else:
             return ResultBody.failed("无权限")
      else:
         return ResultBody.failed("Token不正确")    
   except Exception as e:
      return ResultBody.error('服务器异常'+str(e))


#知识列表
@app.route('/knowledge/infolist',methods=['POST'])  
def knowledgeinfolist():  
   try:
      token = request.headers.get('token')
      flag,userid,username = validatetoken(token)
      classid =request.json.get('classid')
      fatherclassid =request.json.get('fatherclassid')
      if flag:
         if classid and fatherclassid:
            selectSql="select id,knowledge_id,knowledge_title,knowledge_des,classid,knowledge_content,fatherclassid,usenum from chat_knowledge_info where enable=1  and  classid=%s and fatherclassid=%s order by usenum desc"
            data_list=mysql.fetchall(selectSql,(classid,fatherclassid))
            return ResultBody.ok("知识列表查询成功",data_list)
         elif fatherclassid:
             selectSql="select id,knowledge_id,knowledge_title,knowledge_des,knowledge_content,classid,fatherclassid,usenum from chat_knowledge_info where enable=1  and  fatherclassid=%s order by usenum desc"
             data_list=mysql.fetchall(selectSql,(fatherclassid))
             return ResultBody.ok("知识列表查询成功",data_list)
         else:
             return ResultBody.failed("知识分类传值错误")
      else:
         return ResultBody.failed("Token不正确")    
   except Exception as e:
      return ResultBody.error('服务器异常'+str(e))


#保存Message
def messagesave(sessionid,content,sendertype,messageid): 
      createtime=datetime.now().strftime('%Y-%m-%d %H:%M:%S') 
      querySql = "insert into chat_message(sessionid,messageid,content,sendertype,createtime) values(%s,%s,%s,%s,%s)"
      mysql.execute(querySql,(sessionid,messageid,content,sendertype,createtime))

 #保存知识Message
def knowledgemessagesave(sessionid,content,sendertype,messageid): 
      createtime=datetime.now().strftime('%Y-%m-%d %H:%M:%S') 
      querySql = "insert into chat_knowledge_message(sessionid,messageid,content,sendertype,createtime) values(%s,%s,%s,%s,%s)"
      mysql.execute(querySql,(sessionid,messageid,content,sendertype,createtime))     

#提交问话
@app.route('/chat', methods=['POST'])
def chat():
    try:   
      token = request.headers.get('token')
      flag,userid,username = validatetoken(token)
      if flag:
            sessionid = request.form.get('sessionid')
            messageid = request.form.get('messageid')
            flag=validatesession(sessionid,userid)
            if flag:
                  flag = validatemessagecreate(sessionid)
                  if flag:
                     question = request.form.get('question')
                     reload=request.form.get('reload')
                     openhistory=request.form.get('openhistory')
                     if question:  
                        return Response(stream_with_context(predict(question,sessionid,messageid,reload,openhistory)), mimetype="text/event-stream")  
                     else:
                        return  ResultBody.failed("提问不能为空")
                  return  ResultBody.failed("对话最多支持50对")
            return  ResultBody.failed("SessionID不正确")
      return ResultBody.failed("Token不正确")
    except Exception as e:
      return ResultBody.error('服务器异常'+str(e))
    
#提交知识问话
@app.route('/knowledge/chat', methods=['POST'])
def knowledgechat():
    try:   
      token = request.headers.get('token')
      flag,userid,username = validatetoken(token)
      if flag:
            sessionid = request.form.get('sessionid')
            messageid = request.form.get('messageid')
            flag=validateknowledgesession(sessionid,userid)
            if flag:
                  flag = validateknowledgemessagecreate(sessionid)
                  if flag:
                     question = request.form.get('question')
                     reload=request.form.get('reload')
                     openhistory=request.form.get('openhistory')
                     if question:  
                        return Response(stream_with_context(knowledgepredict(question,sessionid,messageid,reload,openhistory)), mimetype="text/event-stream")  
                     else:
                        return  ResultBody.failed("提问不能为空")
                  return  ResultBody.failed("对话最多支持50对")
            return  ResultBody.failed("SessionID不正确")
      return ResultBody.failed("Token不正确")
    except Exception as e:
      return ResultBody.error('服务器异常'+str(e))

#大模型回答
def predict(input,sessionid,messageid,reload,openhistory):
    top_p=float(0.8)
    messages=createmessage(sessionid)
    if openhistory=='0':
       messages=[]
    if input:
      #   context,flag = search_relevant_news(input)
      #   if flag:
      #      responses = answer_question(input, context,top_p,messages)
      #   else:
        responses=Generation.call(model=chat_model,prompt=input,messages=messages,stream=True,top_p=top_p)
        for response in responses:  
            if response.status_code==HTTPStatus.OK:
                last_response = response
                chat=json.dumps(response.output, indent=6, ensure_ascii=False)
                yield chat
            else:
                if response.code=="DataInspectionFailed":
                   chat={"text":"输入或者输出包含疑似敏感内容被绿网拦截", "finish_reason": "stop", "choices":"null"}
                   chat=json.dumps(chat,indent=6, ensure_ascii=False)
                   yield chat
                else:
                   chat='Code: %d, status: %s, message: %s' % (response.status_code, response.code, response.message)
                   yield chat
        if last_response:
            if last_response.code!="DataInspectionFailed":
               content = last_response.output['text']
               if reload=='0':
                  # messageid=str(uuid.uuid5(uuid.NAMESPACE_DNS,sessionid+str(datetime.now())))
                  messagesave(sessionid,input,1,messageid)
                  messagesave(sessionid,content,2,messageid)

#知识大模型回答
def knowledgepredict(input,sessionid,messageid,reload,openhistory):
    top_p=float(0.8)
    messages=knowledgecreatemessage(sessionid)
    if openhistory=='0':
       messages=[]
    if input:
        context,flag = search_relevant_knowledge(input)
      #   print(context[0].filed["raw"])
        if flag:
           responses = answer_question(input, context,top_p,messages)
        else:
           responses=Generation.call(model=chat_model,prompt=input,messages=messages,stream=True,top_p=top_p)
        for response in responses:  
            # print(response)
            if response.status_code==HTTPStatus.OK:
                last_response = response
                chat=json.dumps(response.output, indent=6, ensure_ascii=False)
                yield chat
            else:
                 if response.code=="DataInspectionFailed":
                   chat={"text":"输入或者输出包含疑似敏感内容被绿网拦截", "finish_reason": "stop", "choices":"null"}
                   chat=json.dumps(chat,indent=6, ensure_ascii=False)
                   yield chat
                 else:
                  chat='Code: %d, status: %s, message: %s' % (response.status_code, response.code, response.message)
                  yield chat

        if last_response:
            if last_response.code!="DataInspectionFailed":
               content = last_response.output['text']
               if reload=='0':
                  # messageid=str(uuid.uuid5(uuid.NAMESPACE_DNS,sessionid+str(datetime.now())))
                  knowledgemessagesave(sessionid,input,1,messageid)
                  knowledgemessagesave(sessionid,content,2,messageid)
         

#组装Messages
def createmessage(sessionid):
     messages=[]
     selectSql="select sessionid,messageid,content,sendertype from  chat_message  where sessionid=%s and enable=1 order by id asc"
     data_list = mysql.fetchall(selectSql,(sessionid))
     if data_list:
      for data in data_list:
            if data["sendertype"]==1:
               messages.append({'role':'user','content':data["content"]})
            else:
               messages.append({'role':'assistant','content':data["content"]})
     return messages

#组装Messages
def knowledgecreatemessage(sessionid):
     messages=[]
     selectSql="select sessionid,messageid,content,sendertype from  chat_knowledge_message  where sessionid=%s and enable=1 order by id asc"
     data_list = mysql.fetchall(selectSql,(sessionid))
     if data_list:
      for data in data_list:
            if data["sendertype"]==1:
               messages.append({'role':'user','content':data["content"]})
            else:
               messages.append({'role':'assistant','content':data["content"]})
     return messages

#向量编码
def generate_embeddings(question):
    rsp = TextEmbedding.call(model=TextEmbedding.Models.text_embedding_v1,input=question)
    embeddings = [record['embedding'] for record in rsp.output['embeddings']]
    return embeddings if isinstance(question, list) else embeddings[0]

#定义搜索向量知识库
def search_relevant_news(question):
    # 初始化 dashvector client
    client = Client(api_key=dashvector_api_key)
    # 获取刚刚存入的集合
    collection = client.get('news_embedings')
    # 向量检索:指定 topk = 1 
    # print(generate_embeddings(question))
    rsp = collection.query(generate_embeddings(question), output_fields=['raw'],topk=1)
    # print(rsp.output[0])
    if rsp.output[0].score<0.5:
        return rsp.output[0].fields['raw'],True
    else:
        return "",False
    

#定义搜索向量知识库
def search_relevant_knowledge(question):
    # 初始化 dashvector client
    client = Client(api_key=dashvector_api_key,endpoint=dashvector_endpoint)
    # 获取刚刚存入的集合
    collection = client.get('growknowledge_embedings')
    # 向量检索:指定 topk = 1 
    # print(generate_embeddings(question))
    rsp = collection.query(generate_embeddings(question), output_fields=['raw'],topk=1)
    outputs = rsp.output
   #  print(outputs)
    outputstr = ""
   #  return "",False
    if outputs:
      # for output in outputs:
      #   outputstr.append(output.fields['raw'])
      #   outputstr+=output.fields['raw']
      #   print(str(outputstr))
      return outputs,True
    else:
        return "",False


#定义回答prompt
def answer_question(question, context,top_p, messages):
    prompt = f'''请基于```内的内容回答问题。"
	```
	{context}
	```
	我的问题是:{question}。
    '''

    rsp = Generation.call(model=chat_model,prompt=prompt,messages=messages,stream=True,top_p=top_p)
    return rsp

# 设置一个全局的异常处理函数 
def handle_exception(e):  
   def decorator(func):  
       def wrapper(*args, **kwargs):  
           try:  
               return func(*args, **kwargs)  
           except:  
               return None
       return wrapper
   return decorator

# if __name__ == '__main__':  
#    # app.run(host='192.168.2.31', port=9999,debug=True)  
#    app.run(host='127.0.0.1', port=9999,debug=True)  

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐