fastapi开发的接口,客户端同时访问两个以上接口报错解决
最近开发了一个小项目,通过fastapi写的后台,遇到个很无语的情况。
浏览器异步同时调用两个接口的时候会将程序中断。主要是我手写的数据库连接程序,同时访问的话一个执行完会导致另一个连接丢失,有时候会直接程序中断,内存溢出,报错都没有。
找个各种办法,最后实在不行了手搓了个判断,等待一个执行完以后再执行下一个,这才解决了问题。
以下是我写的MySQL操作类,有用到的朋友可以试试。
import mysql.connector
from mysql.connector import Error
import pandas as pd
import time
is_locked = False
class MySQLDatabase:
"""MySQL 数据库操作类"""
def __init__(self, host, user, password, database=None):
"""初始化数据库连接参数"""
self.host = host
self.user = user
self.password = password
self.database = database
self.connection = None
self.cursor = None
def connect(self):
"""建立数据库连接"""
try:
self.connection = mysql.connector.connect(
host=self.host,
user=self.user,
password=self.password,
database=self.database
)
self.cursor = self.connection.cursor(dictionary=True)
print(f"成功连接到数据库: {self.database or '未指定'}")
return True
except Error as e:
print(f"数据库连接错误: {e}")
return False
def disconnect(self):
"""断开数据库连接"""
if self.connection and self.connection.is_connected():
self.cursor.close()
self.connection.close()
print("数据库连接已关闭")
def create_database(self, db_name):
"""创建数据库"""
try:
temp_conn = mysql.connector.connect(
host=self.host,
user=self.user,
password=self.password
)
temp_cursor = temp_conn.cursor()
temp_cursor.execute(f"CREATE DATABASE IF NOT EXISTS {db_name}")
print(f"数据库 {db_name} 创建成功")
temp_conn.close()
self.database = db_name
return True
except Error as e:
print(f"创建数据库错误: {e}")
return False
def execute_query(self, query, params=None):
"""执行增删改操作"""
if not self.connection or not self.connection.is_connected():
if not self.connect():
return False
try:
self.cursor.execute(query, params or ())
self.connection.commit()
return True
except Error as e:
print(f"执行查询错误: {e}")
self.connection.rollback()
return False
def execute_read_query(self, query, params=None):
"""执行查询操作"""
global is_locked
try:
# 检查锁是否已被占用
while is_locked:
time.sleep(0.1) # 等待锁释放
if not is_locked:
is_locked = True
if not self.connection or not self.connection.is_connected():
if not self.connect():
return []
self.cursor.execute(query, params or ())
result = self.cursor.fetchall()
finally:
# 释放锁
is_locked = False
return result
def create_table(self, table_name, columns_definition):
"""创建表"""
query = f"CREATE TABLE IF NOT EXISTS {table_name} ({columns_definition})"
return self.execute_query(query)
def insert(self, table_name, data):
if not self.connection or not self.connection.is_connected():
if not self.connect():
return False
"""插入单条记录"""
columns = ', '.join(data.keys())
placeholders = ', '.join(['%s'] * len(data))
values = tuple(data.values())
query = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
return self.execute_query(query, values)
def insert_many(self, table_name, columns, data_list):
if not self.connection or not self.connection.is_connected():
if not self.connect():
return False
"""插入多条记录"""
columns_str = ', '.join(columns)
placeholders = ', '.join(['%s'] * len(columns))
query = f"INSERT INTO {table_name} ({columns_str}) VALUES ({placeholders})"
try:
self.cursor.executemany(query, data_list)
self.connection.commit()
print(f"成功插入 {self.cursor.rowcount} 条记录")
return True
except Error as e:
print(f"批量插入错误: {e}")
self.connection.rollback()
return False
def select(self, table_name, columns="*", condition=None, order_by=None, limit=None):
"""查询记录"""
columns_str = ', '.join(columns) if isinstance(columns, list) else columns
query = f"SELECT {columns_str} FROM {table_name}"
if condition:
query += f" WHERE {condition}"
if order_by:
query += f" ORDER BY {order_by}"
if limit:
query += f" LIMIT {limit}"
return self.execute_read_query(query)
def update(self, table_name, data, condition):
"""更新记录"""
set_clause = ', '.join([f"{key} = %s" for key in data.keys()])
values = tuple(data.values())
query = f"UPDATE {table_name} SET {set_clause} WHERE {condition}"
return self.execute_query(query, values)
def delete(self, table_name, condition):
"""删除记录"""
query = f"DELETE FROM {table_name} WHERE {condition}"
return self.execute_query(query)
def df_data_query(self,query):
try:
connection = mysql.connector.connect(
host=self.host,
database=self.database,
user=self.user,
password=self.password
)
if connection.is_connected():
df = pd.read_sql(query, connection)
return df
except Error as e:
print(f"Error: {e}")
finally:
if connection.is_connected():
connection.close()
编程路漫漫,吾将遇到各种坑。