|

楼主 |
发表于 2022-6-22 11:03:33
|
显示全部楼层
sqlalchemy:
- """
- Routes and views for the flask application.
- """
- from flask import render_template, request
- from HelloWorld_Controller import app
- import os
- import json
- import pydub
- import threading
- import time
- import pydub.exceptions
- import copy
- import re
- from . import mysql_controller as mc
- import datetime
- import hashlib
- import random
- import base64
- from flask_sqlalchemy import SQLAlchemy
- import pymysql
- import filetype
- app.config['SQLALCHEMY_DATABASE_URI'] = "mysql+pymysql://RWUser:d774ab2befc390901bc738ed735ce256b7b07dd35175c2aace2bc3517b5264ec@localhost/HelloWorldServerDatabase"
- app.config['SQLALCHEMY_COMMIT_TEARDOWN'] = True
- app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
- app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True
- config = {}
- necessaryConfigs = ["baseDIR", "DB", "DB.cleanDB", "firstStartup"]
- try:
- with open("config.json") as f:
- text = f.read()
- text = text.replace("\n", "")
- config = json.loads(text)
- #print("dee")
- except Exception:
-
- config = json.loads(r"""
- {
- "baseDIR": "D:\\HelloWorld_Server\",
- "DB": {
- "cleanDB": false,
- "create_new": false
- }
- }""")
- for each in necessaryConfigs:
- if ("." in each):
- splited = each.split(".")
- now = config
- for i in range(len(splited)):
- try:
- now = now[splited[i]]
- except KeyError:
- raise AssertionError("Cannot find necessary config named %s." % each)
- else:
- try:
- config[each]
- except KeyError:
- raise AssertionError("Cannot find necessary config named %s." % each)
- db = SQLAlchemy(app)
- class Token(db.Model):
- id = db.Column(db.BigInteger, primary_key=True)
- username = db.Column(db.String(300), nullable=False)
- token = db.Column(db.String(100), unique=True, nullable=False)
- vaildTime = db.Column(db.DateTime, unique=True)
- class User(db.Model):
- id = db.Column(db.BigInteger, primary_key=True)
- username = db.Column(db.String(300), unique=True, nullable=False)
- password_hash = db.Column(db.String(128), nullable=False)
- salt = db.Column(db.String(10), nullable=False)
- class Evidence(db.Model):
- id = db.Column(db.BigInteger, primary_key=True)
- username = db.Column(db.String(300), nullable=False)
- evidence = db.Column(db.String(150), unique=True, nullable=False)
- vaildTime = db.Column(db.DateTime)
- #db.create_all()
- BASE_PATH = config["baseDIR"]
- temps = []
- try:
- config["DB"]["create_new"]
- except:
- config["DB"]["create_new"] = False
- if (config["DB"]["cleanDB"] or config["firstStartup"]):
- db.drop_all()
- db.create_all()
- if (config["firstStartup"]):
- config["firstStartup"] = False
- with open("config.json", "w") as f:
- json.dump(config, f, indent=4)
- elif (config["DB"]["create_new"]):
- db.create_all()
- def new_token(username : str):
- lst = [e for e in username]
- res = ""
- for i in range(len(lst)):
- index = random.randint(0, len(lst) - 1)
- res += lst[index]
- lst.pop(index)
- lst.reverse()
- res += str(time.time()) + "efj;"
- res += str(random.randint(0, 5023))
- res = res.encode(random.choice(['utf-32', 'utf-16', 'utf-8']))
- return str(int("0x" + hashlib.sha256(res).hexdigest(), 16))
- def compute_evidence(username):
- evidence = hashlib.sha512(str(time.time() + random.randint(-100, 100) + random.random() * random.choice([-1, 1])).encode()).hexdigest()
- info = Evidence(evidence=evidence, username=username, vaildTime=datetime.datetime.now() + datetime.timedelta(0, 0, 0, 0, 25.5, 0, 0))
- db.session.add(info)
- return evidence
- def eff(db):
- lock = threading.RLock()
- while True:
- lock.acquire()
- db.session.commit()
- lock.release()
- time.sleep(5)
- thread2 = threading.Thread(target=lambda:eff(db))
- thread2.start()
- @app.route("/python/account/loginState", methods=["GET", "POST"])
- def LoginStateCheck():
- data = ""
- if (request.method == "POST"):
- data = request.form.get("token")
- elif (request.method == "GET"):
- data = request.args.get("token")
- retsder = Token.query.filter_by(token=data).first()
- if (retsder == None):
- return "invaild"
- #retsder = retsder#.first()
- if (retsder.vaildTime <= datetime.datetime.now()):
- db.session.delete(retsder)
- return "invaild"
- else:
- _token = new_token(retsder.username)
- _username = retsder.username
- info = Token(username=_username, token=_token, vaildTime=retsder.vaildTime)
- db.session.delete(retsder)
- db.session.add(info)
- return json.dumps({"token": _token, "username": _username,
- "evidence": compute_evidence(_username)})
- class DictTool:
- @staticmethod
- def reverse(dic):
- nd = {}
- for k, v in dic.items():
- nd[v] = k
- return nd
- @staticmethod
- def keys(dic):
- return [k for k, v in dic.items()]
- @staticmethod
- def vals(dic):
- return [v for k, v in dic.items()]
- @app.route("/python/account/checkEvidence", methods=["GET", "POST"])
- def CheckEvi():
- global evi
- data = ""
- if (request.method == "POST"):
- data = request.form.get("evidence")
- elif (request.method == "GET"):
- data = request.args.get("evidence")
- test = Evidence.query.filter_by(evidence=data).first()
- if (test == None):
- return "false"
- else:
- return "true"
- @app.route("/python/account/login", methods=["GET", "POST"])
- def Login():
- global evi, db
- username = ""
- password = ""
- withToken = False
- if (request.method == "GET"):
- username = request.args.get("username")
- password = request.args.get("password")
- withToken = request.args.get("withtoken")
- else:
- username = request.form.get("username")
- password = request.form.get("password")
- withToken = request.form.get("withtoken")
- username = base64.decodebytes(username.encode()).decode()
- password = base64.decodebytes(password.encode()).decode()
- withToken = True if withToken == "true" else False
- query = User.query.filter_by(username=username).first()
- if (query == None):
- return "nouser"
- elif (str(int("0x" + hashlib.sha256((username + query.salt).encode()).hexdigest(), 16)) != query.password_hash):
- return "wrongpassword"
- evidence = compute_evidence(username)
- if (withToken):
- token = new_token(username)
- info = Token(username=username, token=token, vaildTime=datetime.datetime.now() + datetime.timedelta(5, 0, 0, 0, 0, 0, 0))
- db.session.add(info)
- return json.dumps({"username": username, "token": token, "evidence": evidence})
- return json.dumps({"username": username, "evidence": evidence})
- @app.route("/python/account/signup", methods=["POST", "GET"])
- def Signup():
- global db
- rf = request.args
- if (request.method == "POST"):
- rf = request.form
- username = base64.decodebytes(rf.get("username").encode()).decode()
- password = base64.decodebytes(rf.get("password").encode()).decode()
- if (User.query.filter_by(username=username).first() != None):
- return "userexisted"
- d = "abcdefghijklmnopqrstuvwxyz"
- d += d.upper()
- d += "1234567890!@#$%^&*()~-=_+{}[]\\|:";\'<>?,./ "
- d = [e for e in d]
- salt = "".join([random.choice(d) for i in range(random.randint(4, 10))])
- hashobj = hashlib.sha256((password + salt).encode())
- hexhash = hashobj.hexdigest()
- integerhash = int("0x" + hexhash, 16)
- decimalhash = str(integerhash)
- info = User(username=username, salt=salt, password_hash=integerhash)
- db.session.add(info)
- return "Completed.Username:%s, Password:%s" % (username, password)
- def getPhyhicsPath(uri):
- global BASE_PATH
- return BASE_PATH + uri[1:].replace("/", "\")
复制代码
然后是pymysql的代码:
- conn = pymysql.connect(host="127.0.0.1", user="recordCleaner", password="faf76827f1634a501544bb6f668efe4a6882101f9783f33ca790308173be0ec5", database="helloworldserverdatabase")
- cur = conn.cursor(pymysql.cursors.DictCursor)
- while True:
- cur.execute("delete from evidence where vaildTime <= CURTIME()")
- cur.fetchall()
- cur.execute("select id, evidence from evidence order by id")
- ret = cur.fetchall()
- lastid = 0
- for each in ret:
- if (each["id"] - lastid != 1):
- cur.execute("update evidence set id=%d where id=%d and evidence='%s'" % (lastid + 1, each["id"], each['evidence']))
- lastid = lastid + 1
- cur.execute("delete from token where vaildTime <= CURTIME()")
- cur.fetchall()
- cur.execute("select id, token from token order by id")
- ret = cur.fetchall()
- lastid = 0
- for each in ret:
- if (each["id"] - lastid != 1):
- cur.execute("update token set id=%d where id=%d and token='%s'" % (lastid + 1, each["id"], each['token']))
- lastid = lastid + 1
- time.sleep(2)
复制代码
用的是flask。 |
|