Flask+Sqlite3相关问题求助
Codeimport sqlite3
from queue import Queue
from typing import Any
from flask import Flask, request, send_file
from threading import Thread
class SQLiteExecutor:
"""SQLite3 仅支持在同一线程内操作,但 flask 的机制使其并不在同一线程内,故封装此类以解决此问题"""
def __init__(self, path: str):
"""
:param path: SQLite3 文件路径
"""
self.path = path
# 每次需要执行的时候都往 Queue 里扔东西就行
self.queue = Queue()
# 执行完之后把结果放到这里
self.results = Queue()
# 把所有操作都封装到 _thread 方法里,这样就不会报错了
Thread(target=self._thread, daemon=True).start()
def _thread(self) -> None:
"""线程函数。
`_thread()` 一直监听 `queue`,当 `queue` 发生改变时 `_thread()` 就会自动执行里面的函数。
执行完函数后,`_thread` 会立即把结果(如有)放到 `results` 里,所以理论上不存在等待时间。
"""
while True:
conn = sqlite3.connect(self.path)
cur = conn.cursor()
if (result := self.queue.get()(conn, cur)) is not None:
self.results.put(result)
def do(self, command: str, commit: bool = True) -> None:
"""对 `cur` 执行一条命令。
`do()` 的原理是将 `_execute()` 的返回值放到 `queue` 里,`_thread` 在监听到有元素进入时就会自动执行它。
当 `_thread` 执行完毕后,它会把结果放到 `results` 里,只要监听 `results` 即可。
:param command: SQLite3 命令
:param commit: 命令执行完后是否要 commit
:return: 无
"""
def inner(conn: sqlite3.Connection, cur: sqlite3.Cursor):
cur.execute(command)
if commit:
conn.commit()
self.queue.put(inner)
def cur(self, name: str) -> Any:
"""让 `cur` 执行 `name`。
:param name: 例如 `name == 'fetchall'`,执行 `cur.fetchall()`。
:return: 执行的结果
"""
def inner(_: sqlite3.Connection, cur: sqlite3.Cursor):
return getattr(cur, name)()
self.queue.put(inner)
return self.results.get()
ext = SQLiteExecutor('data.db')
ext.do('''CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
pwd VARCHAR(64) NOT NULL
)''')
def add_route(route_path: str, file_path: str = '') -> None:
"""添加一条 route。
调用这个函数,可以不用单独为每个 route 写一个函数,程序使用变量 `_rid` 自动为每个 route 分配名称。
:param route_path: 用户想要看到对应文件应该访问的***网址***
:param file_path: 用户访问网住可以看到的文件的***路径***
"""
global _rid
file_path = (file_path or route_path).lstrip('/')
exec(f'''@app.route({repr(route_path)})
def route{_rid}():
return send_file({repr(file_path)})
''')
_rid += 1
def read(path: str) -> str:
"""读取一个文本文件的内容。
调用此函数可以避免大量重复 with-as 语句,不仅让代码变多,还降低了可读性,敲起来也十分乏味。
:param path: 文本文件的路径
:return: 文本文件的内容
"""
with open(path, encoding='utf-8') as f:
return f.read()
_rid = 0
app = Flask(__name__)
add_route('/favicon.ico')
add_route('/base.css')
add_route('/daohanglan.js')
@app.route('/')
def main():
return read('index.html'), 200
@app.route('/submit', methods=['POST'])
def handler():
json = request.get_json()
try:
# todo 代表用户要做的请求
todo = json['todo']
if todo == 'login':
# 用户想要登录
# 密码 pwd 是 SHA256 加密后的样子
name, pwd = json['name'], json['pwd']
ext.do(f"SELECT * FROM users", False)
print(ext.cur('fetchall'))
elif todo == 'register':
# 注册
# 密码 pwd 是 SHA256 加密后的样子
name, pwd = json['name'], json['pwd']
ext.do(f"INSERT INTO users (name, pwd) VALUES ('{name}', '{pwd}');")
except KeyError as ex:
return f'400 Bad Request (Key {repr(ex.args)} Not Found)', 400
return str(), 200
if __name__ == '__main__':
app.run(port=22222, debug=True)
输出的是空列表 [],数据库里已经有数据了 @FishC 求救 player-none 发表于 2025-05-19 18:31
@FishC 求救
Python反转字符串的两种常用方法
这里提供两种简单易懂的实现方式,根据需求选择即可:
[*]切片法(推荐)
直接通过切片操作[::-1]实现反转:
s = "FishC"
reversed_str = s[::-1]
print(reversed_str)# 输出:ChsiF
[*]reversed函数法
使用内置函数生成反转后的迭代器,再拼接字符串:
s = "hello"
reversed_str = ''.join(reversed(s))
print(reversed_str)# 输出:olleh
注意:
1. 两种方法都不会修改原字符串(字符串不可变)
2. 切片法性能更优,适合简单场景
3. reversed函数在处理复杂对象时更灵活
以上回复来自 FishC AI 小助理,如未能正确解答您的问题,请继续追问。 FishC 发表于 2025-5-19 18:33
Python反转字符串的两种常用方法
这里提供两种简单易懂的实现方式,根据需求选择即可:
sb 读问题
页:
[1]