fastapi开发的接口,客户端同时访问两个以上接口报错解决

2天前学习15

最近开发了一个小项目,通过fastapi写的后台,遇到个很无语的情况。

浏览器异步同时调用两个接口的时候会将程序中断。主要是我手写的数据库连接程序,同时访问的话一个执行完会导致另一个连接丢失,有时候会直接程序中断,内存溢出,报错都没有。

找个各种办法,最后实在不行了手搓了个判断,等待一个执行完以后再执行下一个,这才解决了问题。

以下是我写的MySQL操作类,有用到的朋友可以试试。

import mysql.connector
from mysql.connector import Error
import pandas as pd
import time
is_locked = False

class MySQLDatabase:
    """MySQL 数据库操作类"""

    def __init__(self, host, user, password, database=None):
        """初始化数据库连接参数"""
        self.host = host
        self.user = user
        self.password = password
        self.database = database
        self.connection = None
        self.cursor = None

    def connect(self):
        """建立数据库连接"""
        try:
            self.connection = mysql.connector.connect(
                host=self.host,
                user=self.user,
                password=self.password,
                database=self.database
            )
            self.cursor = self.connection.cursor(dictionary=True)
            print(f"成功连接到数据库: {self.database or '未指定'}")
            return True
        except Error as e:
            print(f"数据库连接错误: {e}")
            return False

    def disconnect(self):
        """断开数据库连接"""
        if self.connection and self.connection.is_connected():
            self.cursor.close()
            self.connection.close()
            print("数据库连接已关闭")

    def create_database(self, db_name):
        """创建数据库"""
        try:
            temp_conn = mysql.connector.connect(
                host=self.host,
                user=self.user,
                password=self.password
            )
            temp_cursor = temp_conn.cursor()
            temp_cursor.execute(f"CREATE DATABASE IF NOT EXISTS {db_name}")
            print(f"数据库 {db_name} 创建成功")
            temp_conn.close()
            self.database = db_name
            return True
        except Error as e:
            print(f"创建数据库错误: {e}")
            return False

    def execute_query(self, query, params=None):
        """执行增删改操作"""
        if not self.connection or not self.connection.is_connected():
            if not self.connect():
                return False

        try:
            self.cursor.execute(query, params or ())
            self.connection.commit()
            return True
        except Error as e:
            print(f"执行查询错误: {e}")
            self.connection.rollback()
            return False

    def execute_read_query(self, query, params=None):
        """执行查询操作"""
        global is_locked
        try:
            # 检查锁是否已被占用
            while is_locked:
                time.sleep(0.1)  # 等待锁释放

            if not is_locked:
                is_locked = True

            if not self.connection or not self.connection.is_connected():
                if not self.connect():
                    return []
            self.cursor.execute(query, params or ())
            result = self.cursor.fetchall()
        finally:
            # 释放锁
            is_locked = False

        return result

    def create_table(self, table_name, columns_definition):
        """创建表"""
        query = f"CREATE TABLE IF NOT EXISTS {table_name} ({columns_definition})"
        return self.execute_query(query)

    def insert(self, table_name, data):
        if not self.connection or not self.connection.is_connected():
            if not self.connect():
                return False

        """插入单条记录"""
        columns = ', '.join(data.keys())
        placeholders = ', '.join(['%s'] * len(data))
        values = tuple(data.values())
        query = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
        return self.execute_query(query, values)

    def insert_many(self, table_name, columns, data_list):
        if not self.connection or not self.connection.is_connected():
            if not self.connect():
                return False

        """插入多条记录"""
        columns_str = ', '.join(columns)
        placeholders = ', '.join(['%s'] * len(columns))
        query = f"INSERT INTO {table_name} ({columns_str}) VALUES ({placeholders})"
        try:
            self.cursor.executemany(query, data_list)
            self.connection.commit()
            print(f"成功插入 {self.cursor.rowcount} 条记录")
            return True
        except Error as e:
            print(f"批量插入错误: {e}")
            self.connection.rollback()
            return False

    def select(self, table_name, columns="*", condition=None, order_by=None, limit=None):
        """查询记录"""
        columns_str = ', '.join(columns) if isinstance(columns, list) else columns
        query = f"SELECT {columns_str} FROM {table_name}"

        if condition:
            query += f" WHERE {condition}"
        if order_by:
            query += f" ORDER BY {order_by}"
        if limit:
            query += f" LIMIT {limit}"

        return self.execute_read_query(query)

    def update(self, table_name, data, condition):
        """更新记录"""
        set_clause = ', '.join([f"{key} = %s" for key in data.keys()])
        values = tuple(data.values())
        query = f"UPDATE {table_name} SET {set_clause} WHERE {condition}"
        return self.execute_query(query, values)

    def delete(self, table_name, condition):
        """删除记录"""
        query = f"DELETE FROM {table_name} WHERE {condition}"
        return self.execute_query(query)

    def df_data_query(self,query):
        try:
            connection = mysql.connector.connect(
                host=self.host,
                database=self.database,
                user=self.user,
                password=self.password
            )
            if connection.is_connected():
                df = pd.read_sql(query, connection)
                return df
        except Error as e:
            print(f"Error: {e}")
        finally:
            if connection.is_connected():
                connection.close()

编程路漫漫,吾将遇到各种坑。

扫描二维码推送至手机访问。

版权声明:本文由星光下的赶路人发布,如需转载请注明出处。

本文链接:https://forstyle.cc/zblog/post/77.html

分享给朋友: