码力全开 / Dify二次开发-使用MCP协议查询数据库数据

Created Fri, 13 Jun 2025 20:27:30 +0800 Modified Fri, 13 Jun 2025 22:57:11 +0800
1472 Words 2 min

这应该是Dify二次开发最后一篇文章了,主要是觉得没什么可以写的了。而且Dify调试起来很不方便,不想花费太多心思在这上面。而且要通过该技能找到工作还是蛮难的,现在的企业都想找熟手的,问题哪里可以快速培养出这么多的人来。

说是月薪15K,但很多企业连10K都给不到,实际上Dify开发只要1-3年Python开发经验即可。一边是大模型无所不能的需求,一边是薪资对不起就业的环境。最近才遇到一个企业的领导,觉得我Dify不是很深入。这平台出来3年都不好,你要资深不得一出来就开始玩。既要你懂还要经验丰富,薪资就15-30K。后面那个数字基本不用看了,结果还是一时兴起想找个人做这个项目,但又不确定能让你干多久,还要求会Java。敢问会Java能进行Dify二次开发?更有意思的是说他也是搞技术的,哈哈。

这里Dify版本必须大于1.0.0才能使用其插件功能,这里使用的版本是1.0.1。首先是对应插件的安装:

image

接着是MCP服务器代码的编写,这里使用的是FastMCP库,可以直接使用pip进行安装。但是需要确保Python的版本大于3.10。其对应的代码如下:

import psycopg2
from fastmcp import FastMCP

mcp = FastMCP("postgresql-mcp",port=9000)

conn_params = {
    "dbname": "xxx",
    "user": "xxx",
    "password": "xxx",
    "host": "localhost",      # 默认本地
    "port": "5432"           # PostgreSQL默认端口
}

@mcp.tool()
def execute_sql(query:str) -> list:
    """
    执行SQL查询语句

    参数:
        query (str): 要执行的SQL语句,支持多条语句以分号分隔

    返回:
        list: 包含查询结果的TextContent列表
        - 对于SELECT查询:返回CSV格式的结果,包含列名和数据
        - 对于SHOW TABLES:返回数据库中的所有表名
        - 对于其他查询:返回执行状态和影响行数
        - 多条语句的结果以"---"分隔

    """
    try:
        with psycopg2.connect(**conn_params) as conn:
            with conn.cursor() as cursor:
                statements = [stmt.strip() for stmt in query.split(";") if stmt.strip()]
                results = []
                for statement in statements:
                    try:
                        print("SQL:{}".format(statement))
                        cursor.execute(statement)
                        # 检查语句是否返回了结果集 (SELECT, SHOW, EXPLAIN, etc.)
                        if cursor.description:
                            columns = [desc[0] for desc in cursor.description]
                            rows = cursor.fetchall()
                            # 将每一行的数据转换为字符串,特殊处理None值
                            formatted_rows = []
                            for row in rows:
                                formatted_row = [
                                    "NULL" if value is None else str(value)
                                    for value in row
                                ]
                                formatted_rows.append(",".join(formatted_row))

                            # 将列名和数据合并为CSV格式
                            results.append(
                                "\n".join([",".join(columns)] + formatted_rows)
                            )

                        # 如果语句没有返回结果集 (INSERT, UPDATE, DELETE, etc.)
                        else:
                            conn.commit()  # 只有在非查询语句时才提交
                            results.append(f"查询执行成功。影响行数: {cursor.rowcount}")
                    except Error as stmt_error:
                        # 单条语句执行出错时,记录错误并继续执行
                        results.append(
                            f"执行语句 '{statement}' 出错: {str(stmt_error)}"
                        )
                        # 可以在这里选择是否继续执行后续语句,目前是继续
                return ["\n---\n".join(results)]

    except Exception as e:
        print(f"Error for '{query}': {e}")
        return [f"Error for: {str(e)}"]

if __name__ == '__main__':
    mcp.run(transport="sse")

之后是对MCP SSE插件进行授权,其内容类似如下:

{  "postgresql-mcp": {    "url": "http://127.0.0.1:9000/sse",    "headers": {}, "timeout": 500,    "sse_read_timeout": 50  }}

可以根据实际的IP进行修改。需要注意的是其2个插件版本要求必须是0.06才能正常调用。 最后就可以创建应用了,选择Chatflow即可。按照要求填写相关的内容。其中MCP服务器地址为:

{  "postgresql-mcp": {    "url": "http://127.0.0.1:9000/sse",    "headers": {}, "timeout": 150,    "sse_read_timeout": 50  }}

这里使用MCP对PostgreSQL中的数据进行查询,只涉及单表数据的查询,因此效果会很好。 而指令中内容如下:

使用中文回复。
当用户提问中涉及漏洞实体时,需要使用postgresql-mcp进行数据查询和操作,表结构说明如下:

##   漏洞信息表(vulnerability_information)

|     字段名    |          类型          |  描述   |
|--------------|------------------------|---------|
| id           | integer                | ID |
| bug_name     | character varying(200) | 漏洞名称 |
| cnnvd_id     | character varying(20)  | CNNVD号 |
| publish_date | date                   | 发布时间 |
| bug_desc     | text                   | 漏洞描述 |
| cve_id       | character varying(20)  | CVE号   |
| severity     | character varying(2)   | 风险级别 |

其最终效果如下:

可以看到输入内容后,对应的结果就开始不断输出了。而在MCP服务器这边可以看到如下的输出:

SQL:SELECT * FROM vulnerability_information WHERE publish_date >= '2025-01-01' AND publish_date < '2026-01-01' LIMIT 10

可以清楚看到大模型很好的理解其需求,并输出了正确的SQL语句并执行。而对于多表复杂的条件,其效果则会大打折扣。因此实际商用还面临很多挑战急需解决。

参考文章:

https://www.cnblogs.com/xiao987334176/p/18827261

https://www.cnblogs.com/xiao987334176/p/18822444

如果喜欢这篇文章或对您有帮助,可以:[☕] 请我喝杯咖啡 | [💓] 小额赞助