NZND 发表于 2022-6-21 14:54:03

sqlalchemy会长时间占用数据库锁吗?

如题。我的项目一开始采用的是sqlalchemy,后来在部分地方改为了pymysql。
结果发现,pymysql在执行删除语句的时候,一直在等待一个锁或者类似锁的东西释放,就卡在那了。用mysql直接删也没用,只有把程序停下来才正常。
是sqlalchemy在运行期间会把MySQL的删除功能上锁吗?应该怎么解决?要把sqlalchemy的代码全部换掉吗?

kogawananari 发表于 2022-6-21 20:41:45

得看你sqlalchemy代码

NZND 发表于 2022-6-22 11:03:33

kogawananari 发表于 2022-6-21 20:41
得看你sqlalchemy代码

sqlalchemy:
"""
Routes and views for the flask application.
"""

from flask import render_template, request
from HelloWorld_Controller import app
import os
import json
import pydub
import threading
import time
import pydub.exceptions
import copy
import re
from . import mysql_controller as mc
import datetime
import hashlib
import random
import base64
from flask_sqlalchemy import SQLAlchemy
import pymysql
import filetype

app.config['SQLALCHEMY_DATABASE_URI'] = "mysql+pymysql://RWUser:d774ab2befc390901bc738ed735ce256b7b07dd35175c2aace2bc3517b5264ec@localhost/HelloWorldServerDatabase"

app.config['SQLALCHEMY_COMMIT_TEARDOWN'] = True
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True

config = {}
necessaryConfigs = ["baseDIR", "DB", "DB.cleanDB", "firstStartup"]
try:
    with open("config.json") as f:
      text = f.read()
      text = text.replace("\n", "")
      config = json.loads(text)
      #print("dee")
except Exception:
   
    config = json.loads(r"""
{
    "baseDIR": "D:\\HelloWorld_Server\\",
    "DB": {
      "cleanDB": false,
      "create_new": false
    }
}""")

for each in necessaryConfigs:
    if ("." in each):
      splited = each.split(".")
      now = config
      for i in range(len(splited)):
            try:
                now = now]
            except KeyError:
                raise AssertionError("Cannot find necessary config named %s." % each)
    else:
      try:
            config
      except KeyError:
            raise AssertionError("Cannot find necessary config named %s." % each)
db = SQLAlchemy(app)

class Token(db.Model):
    id = db.Column(db.BigInteger, primary_key=True)
    username = db.Column(db.String(300), nullable=False)
    token = db.Column(db.String(100), unique=True, nullable=False)
    vaildTime = db.Column(db.DateTime, unique=True)

class User(db.Model):
    id = db.Column(db.BigInteger, primary_key=True)
    username = db.Column(db.String(300), unique=True, nullable=False)
    password_hash = db.Column(db.String(128), nullable=False)
    salt = db.Column(db.String(10), nullable=False)

class Evidence(db.Model):
    id = db.Column(db.BigInteger, primary_key=True)
    username = db.Column(db.String(300), nullable=False)
    evidence = db.Column(db.String(150), unique=True, nullable=False)
    vaildTime = db.Column(db.DateTime)
#db.create_all()

BASE_PATH = config["baseDIR"]
temps = []
try:
    config["DB"]["create_new"]
except:
    config["DB"]["create_new"] = False

if (config["DB"]["cleanDB"] or config["firstStartup"]):
    db.drop_all()
    db.create_all()
    if (config["firstStartup"]):
      config["firstStartup"] = False
      with open("config.json", "w") as f:
            json.dump(config, f, indent=4)
elif (config["DB"]["create_new"]):
    db.create_all()

def new_token(username : str):
    lst =
    res = ""
    for i in range(len(lst)):
      index = random.randint(0, len(lst) - 1)
      res += lst
      lst.pop(index)
      lst.reverse()
    res += str(time.time()) + "efj;"
    res += str(random.randint(0, 5023))
    res = res.encode(random.choice(['utf-32', 'utf-16', 'utf-8']))
    return str(int("0x" + hashlib.sha256(res).hexdigest(), 16))

def compute_evidence(username):
    evidence = hashlib.sha512(str(time.time() + random.randint(-100, 100) + random.random() * random.choice([-1, 1])).encode()).hexdigest()
    info = Evidence(evidence=evidence, username=username, vaildTime=datetime.datetime.now() + datetime.timedelta(0, 0, 0, 0, 25.5, 0, 0))
    db.session.add(info)
    return evidence

def eff(db):
    lock = threading.RLock()
    while True:
      lock.acquire()
      db.session.commit()
      lock.release()
      time.sleep(5)

thread2 = threading.Thread(target=lambda:eff(db))
thread2.start()

@app.route("/python/account/loginState", methods=["GET", "POST"])
def LoginStateCheck():
    data = ""
    if (request.method == "POST"):
      data = request.form.get("token")
    elif (request.method == "GET"):
      data = request.args.get("token")
    retsder = Token.query.filter_by(token=data).first()
    if (retsder == None):
      return "invaild"
    #retsder = retsder#.first()
    if (retsder.vaildTime <= datetime.datetime.now()):
            db.session.delete(retsder)
            return "invaild"
    else:
      _token = new_token(retsder.username)
      _username = retsder.username
      info = Token(username=_username, token=_token, vaildTime=retsder.vaildTime)
      db.session.delete(retsder)
      db.session.add(info)
      return json.dumps({"token": _token, "username": _username,
                           "evidence": compute_evidence(_username)})

class DictTool:
    @staticmethod
    def reverse(dic):
      nd = {}
      for k, v in dic.items():
            nd = k
      return nd
    @staticmethod
    def keys(dic):
      return
    @staticmethod
    def vals(dic):
      return

@app.route("/python/account/checkEvidence", methods=["GET", "POST"])
def CheckEvi():
    global evi
    data = ""
    if (request.method == "POST"):
      data = request.form.get("evidence")
    elif (request.method == "GET"):
      data = request.args.get("evidence")
    test = Evidence.query.filter_by(evidence=data).first()
    if (test == None):
      return "false"
    else:
      return "true"

@app.route("/python/account/login", methods=["GET", "POST"])
def Login():
    global evi, db
    username = ""
    password = ""
    withToken = False
    if (request.method == "GET"):
      username = request.args.get("username")
      password = request.args.get("password")
      withToken = request.args.get("withtoken")
    else:
      username = request.form.get("username")
      password = request.form.get("password")
      withToken = request.form.get("withtoken")
    username = base64.decodebytes(username.encode()).decode()
    password = base64.decodebytes(password.encode()).decode()
    withToken = True if withToken == "true" else False

    query = User.query.filter_by(username=username).first()
    if (query == None):
      return "nouser"
    elif (str(int("0x" + hashlib.sha256((username + query.salt).encode()).hexdigest(), 16)) != query.password_hash):
      return "wrongpassword"
    evidence = compute_evidence(username)
    if (withToken):
      token = new_token(username)
      info = Token(username=username, token=token, vaildTime=datetime.datetime.now() + datetime.timedelta(5, 0, 0, 0, 0, 0, 0))
      db.session.add(info)
      return json.dumps({"username": username, "token": token, "evidence": evidence})
    return json.dumps({"username": username, "evidence": evidence})

@app.route("/python/account/signup", methods=["POST", "GET"])
def Signup():
    global db
    rf = request.args
    if (request.method == "POST"):
      rf = request.form
    username = base64.decodebytes(rf.get("username").encode()).decode()
    password = base64.decodebytes(rf.get("password").encode()).decode()

    if (User.query.filter_by(username=username).first() != None):
      return "userexisted"
    d = "abcdefghijklmnopqrstuvwxyz"
    d += d.upper()
    d += "1234567890!@#$%^&*()~-=_+{}[]\\|:\";\'<>?,./ "
    d =
    salt = "".join()
    hashobj = hashlib.sha256((password + salt).encode())
    hexhash = hashobj.hexdigest()
    integerhash = int("0x" + hexhash, 16)
    decimalhash = str(integerhash)
    info = User(username=username, salt=salt, password_hash=integerhash)
    db.session.add(info)
    return "Completed.Username:%s, Password:%s" % (username, password)


def getPhyhicsPath(uri):
    global BASE_PATH
    return BASE_PATH + uri.replace("/", "\\")

然后是pymysql的代码:
conn = pymysql.connect(host="127.0.0.1", user="recordCleaner", password="faf76827f1634a501544bb6f668efe4a6882101f9783f33ca790308173be0ec5", database="helloworldserverdatabase")
cur = conn.cursor(pymysql.cursors.DictCursor)
while True:
    cur.execute("delete from evidence where vaildTime <= CURTIME()")
    cur.fetchall()
    cur.execute("select id, evidence from evidence order by id")
    ret = cur.fetchall()
    lastid = 0
    for each in ret:
      if (each["id"] - lastid != 1):
            cur.execute("update evidence set id=%d where id=%d and evidence='%s'" % (lastid + 1, each["id"], each['evidence']))
      lastid = lastid + 1
    cur.execute("delete from token where vaildTime <= CURTIME()")
    cur.fetchall()
    cur.execute("select id, token from token order by id")
    ret = cur.fetchall()
    lastid = 0
    for each in ret:
      if (each["id"] - lastid != 1):
            cur.execute("update token set id=%d where id=%d and token='%s'" % (lastid + 1, each["id"], each['token']))
      lastid = lastid + 1
    time.sleep(2)

用的是flask。

kogawananari 发表于 2022-6-22 20:03:34

没用过flask_sqlalchemy   只用过sqlalchemy 没遇到过占用的情况 看了一眼你的代码 似乎是你一种保持着会话并且默认开启了事务?
建议使用会话元类来生成会话 sessionmaker(bind=engine)每次执行完关闭会话 。

当然 还有一种解法 那就是不开启事务 。你需要开启事务的时候 手动with session.begin()就行了
页: [1]
查看完整版本: sqlalchemy会长时间占用数据库锁吗?