|
马上注册,结交更多好友,享用更多功能^_^
您需要 登录 才可以下载或查看,没有账号?立即注册
x
Code
- import sqlite3
- from queue import Queue
- from typing import Any
- from flask import Flask, request, send_file
- from threading import Thread
- class SQLiteExecutor:
- """SQLite3 仅支持在同一线程内操作,但 flask 的机制使其并不在同一线程内,故封装此类以解决此问题"""
- def __init__(self, path: str):
- """
- :param path: SQLite3 文件路径
- """
- self.path = path
- # 每次需要执行的时候都往 Queue 里扔东西就行
- self.queue = Queue()
- # 执行完之后把结果放到这里
- self.results = Queue()
- # 把所有操作都封装到 _thread 方法里,这样就不会报错了
- Thread(target=self._thread, daemon=True).start()
-
- def _thread(self) -> None:
- """线程函数。
-
- `_thread()` 一直监听 `queue`,当 `queue` 发生改变时 `_thread()` 就会自动执行里面的函数。
-
- 执行完函数后,`_thread` 会立即把结果(如有)放到 `results` 里,所以理论上不存在等待时间。
- """
- while True:
- conn = sqlite3.connect(self.path)
- cur = conn.cursor()
- if (result := self.queue.get()(conn, cur)) is not None:
- self.results.put(result)
-
- def do(self, command: str, commit: bool = True) -> None:
- """对 `cur` 执行一条命令。
-
- `do()` 的原理是将 `_execute()` 的返回值放到 `queue` 里,`_thread` 在监听到有元素进入时就会自动执行它。
- 当 `_thread` 执行完毕后,它会把结果放到 `results` 里,只要监听 `results` 即可。
-
- :param command: SQLite3 命令
- :param commit: 命令执行完后是否要 commit
- :return: 无
- """
- def inner(conn: sqlite3.Connection, cur: sqlite3.Cursor):
- cur.execute(command)
- if commit:
- conn.commit()
- self.queue.put(inner)
-
- def cur(self, name: str) -> Any:
- """让 `cur` 执行 `name`。
-
- :param name: 例如 `name == 'fetchall'`,执行 `cur.fetchall()`。
- :return: 执行的结果
- """
- def inner(_: sqlite3.Connection, cur: sqlite3.Cursor):
- return getattr(cur, name)()
- self.queue.put(inner)
- return self.results.get()
- ext = SQLiteExecutor('data.db')
- ext.do('''CREATE TABLE IF NOT EXISTS users (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- name TEXT NOT NULL,
- pwd VARCHAR(64) NOT NULL
- )''')
- def add_route(route_path: str, file_path: str = '') -> None:
- """添加一条 route。
- 调用这个函数,可以不用单独为每个 route 写一个函数,程序使用变量 `_rid` 自动为每个 route 分配名称。
-
- :param route_path: 用户想要看到对应文件应该访问的***网址***
- :param file_path: 用户访问网住可以看到的文件的***路径***
- """
- global _rid
- file_path = (file_path or route_path).lstrip('/')
- exec(f'''@app.route({repr(route_path)})
- def route{_rid}():
- return send_file({repr(file_path)})
- ''')
- _rid += 1
- def read(path: str) -> str:
- """读取一个文本文件的内容。
-
- 调用此函数可以避免大量重复 with-as 语句,不仅让代码变多,还降低了可读性,敲起来也十分乏味。
-
- :param path: 文本文件的路径
- :return: 文本文件的内容
- """
- with open(path, encoding='utf-8') as f:
- return f.read()
- _rid = 0
- app = Flask(__name__)
- add_route('/favicon.ico')
- add_route('/base.css')
- add_route('/daohanglan.js')
- @app.route('/')
- def main():
- return read('index.html'), 200
- @app.route('/submit', methods=['POST'])
- def handler():
- json = request.get_json()
- try:
- # todo 代表用户要做的请求
- todo = json['todo']
- if todo == 'login':
- # 用户想要登录
- # 密码 pwd 是 SHA256 加密后的样子
- name, pwd = json['name'], json['pwd']
- ext.do(f"SELECT * FROM users", False)
- print(ext.cur('fetchall'))
- elif todo == 'register':
- # 注册
- # 密码 pwd 是 SHA256 加密后的样子
- name, pwd = json['name'], json['pwd']
- ext.do(f"INSERT INTO users (name, pwd) VALUES ('{name}', '{pwd}');")
- except KeyError as ex:
- return f'400 Bad Request (Key {repr(ex.args[0])} Not Found)', 400
- return str(), 200
- if __name__ == '__main__':
- app.run(port=22222, debug=True)
复制代码
输出的是空列表 [],数据库里已经有数据了 |
|