sqlalchemy会长时间占用数据库锁吗?
如题。我的项目一开始采用的是sqlalchemy,后来在部分地方改为了pymysql。结果发现,pymysql在执行删除语句的时候,一直在等待一个锁或者类似锁的东西释放,就卡在那了。用mysql直接删也没用,只有把程序停下来才正常。
是sqlalchemy在运行期间会把MySQL的删除功能上锁吗?应该怎么解决?要把sqlalchemy的代码全部换掉吗? 得看你sqlalchemy代码 kogawananari 发表于 2022-6-21 20:41
得看你sqlalchemy代码
sqlalchemy:
"""
Routes and views for the flask application.
"""
from flask import render_template, request
from HelloWorld_Controller import app
import os
import json
import pydub
import threading
import time
import pydub.exceptions
import copy
import re
from . import mysql_controller as mc
import datetime
import hashlib
import random
import base64
from flask_sqlalchemy import SQLAlchemy
import pymysql
import filetype
app.config['SQLALCHEMY_DATABASE_URI'] = "mysql+pymysql://RWUser:d774ab2befc390901bc738ed735ce256b7b07dd35175c2aace2bc3517b5264ec@localhost/HelloWorldServerDatabase"
app.config['SQLALCHEMY_COMMIT_TEARDOWN'] = True
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True
config = {}
necessaryConfigs = ["baseDIR", "DB", "DB.cleanDB", "firstStartup"]
try:
with open("config.json") as f:
text = f.read()
text = text.replace("\n", "")
config = json.loads(text)
#print("dee")
except Exception:
config = json.loads(r"""
{
"baseDIR": "D:\\HelloWorld_Server\\",
"DB": {
"cleanDB": false,
"create_new": false
}
}""")
for each in necessaryConfigs:
if ("." in each):
splited = each.split(".")
now = config
for i in range(len(splited)):
try:
now = now]
except KeyError:
raise AssertionError("Cannot find necessary config named %s." % each)
else:
try:
config
except KeyError:
raise AssertionError("Cannot find necessary config named %s." % each)
db = SQLAlchemy(app)
class Token(db.Model):
id = db.Column(db.BigInteger, primary_key=True)
username = db.Column(db.String(300), nullable=False)
token = db.Column(db.String(100), unique=True, nullable=False)
vaildTime = db.Column(db.DateTime, unique=True)
class User(db.Model):
id = db.Column(db.BigInteger, primary_key=True)
username = db.Column(db.String(300), unique=True, nullable=False)
password_hash = db.Column(db.String(128), nullable=False)
salt = db.Column(db.String(10), nullable=False)
class Evidence(db.Model):
id = db.Column(db.BigInteger, primary_key=True)
username = db.Column(db.String(300), nullable=False)
evidence = db.Column(db.String(150), unique=True, nullable=False)
vaildTime = db.Column(db.DateTime)
#db.create_all()
BASE_PATH = config["baseDIR"]
temps = []
try:
config["DB"]["create_new"]
except:
config["DB"]["create_new"] = False
if (config["DB"]["cleanDB"] or config["firstStartup"]):
db.drop_all()
db.create_all()
if (config["firstStartup"]):
config["firstStartup"] = False
with open("config.json", "w") as f:
json.dump(config, f, indent=4)
elif (config["DB"]["create_new"]):
db.create_all()
def new_token(username : str):
lst =
res = ""
for i in range(len(lst)):
index = random.randint(0, len(lst) - 1)
res += lst
lst.pop(index)
lst.reverse()
res += str(time.time()) + "efj;"
res += str(random.randint(0, 5023))
res = res.encode(random.choice(['utf-32', 'utf-16', 'utf-8']))
return str(int("0x" + hashlib.sha256(res).hexdigest(), 16))
def compute_evidence(username):
evidence = hashlib.sha512(str(time.time() + random.randint(-100, 100) + random.random() * random.choice([-1, 1])).encode()).hexdigest()
info = Evidence(evidence=evidence, username=username, vaildTime=datetime.datetime.now() + datetime.timedelta(0, 0, 0, 0, 25.5, 0, 0))
db.session.add(info)
return evidence
def eff(db):
lock = threading.RLock()
while True:
lock.acquire()
db.session.commit()
lock.release()
time.sleep(5)
thread2 = threading.Thread(target=lambda:eff(db))
thread2.start()
@app.route("/python/account/loginState", methods=["GET", "POST"])
def LoginStateCheck():
data = ""
if (request.method == "POST"):
data = request.form.get("token")
elif (request.method == "GET"):
data = request.args.get("token")
retsder = Token.query.filter_by(token=data).first()
if (retsder == None):
return "invaild"
#retsder = retsder#.first()
if (retsder.vaildTime <= datetime.datetime.now()):
db.session.delete(retsder)
return "invaild"
else:
_token = new_token(retsder.username)
_username = retsder.username
info = Token(username=_username, token=_token, vaildTime=retsder.vaildTime)
db.session.delete(retsder)
db.session.add(info)
return json.dumps({"token": _token, "username": _username,
"evidence": compute_evidence(_username)})
class DictTool:
@staticmethod
def reverse(dic):
nd = {}
for k, v in dic.items():
nd = k
return nd
@staticmethod
def keys(dic):
return
@staticmethod
def vals(dic):
return
@app.route("/python/account/checkEvidence", methods=["GET", "POST"])
def CheckEvi():
global evi
data = ""
if (request.method == "POST"):
data = request.form.get("evidence")
elif (request.method == "GET"):
data = request.args.get("evidence")
test = Evidence.query.filter_by(evidence=data).first()
if (test == None):
return "false"
else:
return "true"
@app.route("/python/account/login", methods=["GET", "POST"])
def Login():
global evi, db
username = ""
password = ""
withToken = False
if (request.method == "GET"):
username = request.args.get("username")
password = request.args.get("password")
withToken = request.args.get("withtoken")
else:
username = request.form.get("username")
password = request.form.get("password")
withToken = request.form.get("withtoken")
username = base64.decodebytes(username.encode()).decode()
password = base64.decodebytes(password.encode()).decode()
withToken = True if withToken == "true" else False
query = User.query.filter_by(username=username).first()
if (query == None):
return "nouser"
elif (str(int("0x" + hashlib.sha256((username + query.salt).encode()).hexdigest(), 16)) != query.password_hash):
return "wrongpassword"
evidence = compute_evidence(username)
if (withToken):
token = new_token(username)
info = Token(username=username, token=token, vaildTime=datetime.datetime.now() + datetime.timedelta(5, 0, 0, 0, 0, 0, 0))
db.session.add(info)
return json.dumps({"username": username, "token": token, "evidence": evidence})
return json.dumps({"username": username, "evidence": evidence})
@app.route("/python/account/signup", methods=["POST", "GET"])
def Signup():
global db
rf = request.args
if (request.method == "POST"):
rf = request.form
username = base64.decodebytes(rf.get("username").encode()).decode()
password = base64.decodebytes(rf.get("password").encode()).decode()
if (User.query.filter_by(username=username).first() != None):
return "userexisted"
d = "abcdefghijklmnopqrstuvwxyz"
d += d.upper()
d += "1234567890!@#$%^&*()~-=_+{}[]\\|:\";\'<>?,./ "
d =
salt = "".join()
hashobj = hashlib.sha256((password + salt).encode())
hexhash = hashobj.hexdigest()
integerhash = int("0x" + hexhash, 16)
decimalhash = str(integerhash)
info = User(username=username, salt=salt, password_hash=integerhash)
db.session.add(info)
return "Completed.Username:%s, Password:%s" % (username, password)
def getPhyhicsPath(uri):
global BASE_PATH
return BASE_PATH + uri.replace("/", "\\")
然后是pymysql的代码:
conn = pymysql.connect(host="127.0.0.1", user="recordCleaner", password="faf76827f1634a501544bb6f668efe4a6882101f9783f33ca790308173be0ec5", database="helloworldserverdatabase")
cur = conn.cursor(pymysql.cursors.DictCursor)
while True:
cur.execute("delete from evidence where vaildTime <= CURTIME()")
cur.fetchall()
cur.execute("select id, evidence from evidence order by id")
ret = cur.fetchall()
lastid = 0
for each in ret:
if (each["id"] - lastid != 1):
cur.execute("update evidence set id=%d where id=%d and evidence='%s'" % (lastid + 1, each["id"], each['evidence']))
lastid = lastid + 1
cur.execute("delete from token where vaildTime <= CURTIME()")
cur.fetchall()
cur.execute("select id, token from token order by id")
ret = cur.fetchall()
lastid = 0
for each in ret:
if (each["id"] - lastid != 1):
cur.execute("update token set id=%d where id=%d and token='%s'" % (lastid + 1, each["id"], each['token']))
lastid = lastid + 1
time.sleep(2)
用的是flask。 没用过flask_sqlalchemy 只用过sqlalchemy 没遇到过占用的情况 看了一眼你的代码 似乎是你一种保持着会话并且默认开启了事务?
建议使用会话元类来生成会话 sessionmaker(bind=engine)每次执行完关闭会话 。
当然 还有一种解法 那就是不开启事务 。你需要开启事务的时候 手动with session.begin()就行了
页:
[1]