|

楼主 |
发表于 2022-3-17 13:00:44
|
显示全部楼层
project-main code:
server.py- import socket
- import csv
- from User import User
- import pickle
- HOST = '127.0.0.1'
- PORT = 18737
- def checkFunction(conn, data):
- #decode byte data and split the string into parts
- data = data.decode('utf-8')
- command = data.split()
- if command[0] == "newuser":
- #call funct to check if user already exists
- result = checkUserExists(command[1])
- if result == True:
- #if user does not exist call funct to create the new user
- newUser(conn,command[1], command[2])
- elif result == False:
- error = "Denied. User account already exists."
- conn.send(bytes(error,'utf-8'))
- elif command[0] == "login":
- #call login funct
- result = login(conn, command[1], command[2])
- if result == True:
- updateUserState(command[1], True)
- #get User obj and put in pickle to send to client
- logged = getUser(command[1])
- logged = pickle.dumps(logged)
- #header string to indicate login success to client
- header = bytes("login",'utf-8')
- #send header and userObj pickle to client
- conn.send(header + logged)
- elif result == False:
- conn.send(bytes("Denied. Username or password incorrect",'utf-8'))
- elif command[0] == "send":
- #split send command into 3 parts (2 spaces) (send,username,message content)
- command = data.split(" ", 2)
- #msg create and print to server
- msg = command[1] + ": " + command[2]
- print(msg)
- #send msg to client
- conn.send(bytes(msg,'utf-8'))
- elif command[0] == "logout":
- #make sure user can be logged out
- updateUserState(command[1], True)
- result = getUserState(command[1])
- if result == True:
- #make user logged in state false
- updateUserState(command[1], False)
- #msg and print to server
- msg = command[1] + " logout"
- print(msg)
- #send logout msg to client
- conn.send(bytes(command[1] + " left",'utf-8'))
- elif result == False:
- error = "Denied. Please login first"
- elif command[0] == "retry":
- #response for if no user is logged in
- conn.send(bytes("retrying",'utf-8'))
- def checkUserExists(user):
- #iterate over users in array to check if the user exists
- for name in users:
- username = name.username
- if username == user:
- #user already exists
- return False
-
- return True
-
- #func to put a new user into the users.txt file
- def newUser(conn,username,passw):
- #open the file and format the new user entry
- File = open("users.txt", "a")
- entry = "\n(" + username + ", " + passw + ")"
- #add the new entry to the User list
- users.append(User(username, False))
- #add the entry on a newline to the file
- File.write(entry)
- File.close()
- #report success
- success = "New user account created. Please login."
- print("New user account created.")
- conn.send(bytes(success,'utf-8'))
- #func to login a user
- def login(conn,user,passw):
- #open the users file
- File = open("users.txt", "r")
- #iterate over users in the file, use csv to access parts of the entries
- reader = csv.reader(File, delimiter=',')
- for row in reader:
- #get a username entry
- username = row[0]
- #remove parenthesis
- username = username[1:]
- #compare user entered username with username in file
- if username == user:
- #if matched, get the password for the user
- recordPass = row[1]
- #remove parenthesis
- recordPass = recordPass[:-1]
- #remove leading space
- recordPass = recordPass[1:]
- #compare passwords
- if recordPass == passw:
- print(username, "login.")
- return True
- File.close()
- return False
- #func to change user logged in status
- def updateUserState(user, state):
- for name in users:
- username = name.username
- if username == user:
- name.logStatus = state
- #func to get logged in state of user
- def getUserState(user):
- for name in users:
- username = name.username
- if username == user:
- return name.logStatus
- #get a specific User obj
- def getUser(user):
- for name in users:
- username = name.username
- if username == user:
- return name
- #create the User obj list
- def usersInit():
- users = []
- #open users file
- File = open("users.txt", "r")
- reader = csv.reader(File, delimiter=',')
- #read in usernames
- for row in reader:
- username = row[0]
- username = username[1:]
- #create new Users with the usernames in the file
- users.append(User(username,False))
- File.close()
- return users
- print("My chat room server. Version one.")
- while True:
- with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
- s.bind((HOST, PORT))
- s.listen()
- conn, addr = s.accept()
- with conn:
- data = conn.recv(1024)
- #initiate a list of all users
- users = usersInit()
- #send data from client to function to figure out what to do
- checkFunction(conn,data)
- conn.sendall(data)
复制代码
client.py:
- import socket
- import pickle
- from User import User
- HOST = '127.0.0.1'
- PORT = 18737 # The port used by the server
- print("My chat room client. Version One.\n")
- userLogged = User("init",False)
- while True:
- with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
- s.connect((HOST, PORT))
- #get user input
- command = input()
- #split user input into parts on spaces
- commandParts = command.split()
- #if statement to check for login, logout, and send commands
- if commandParts[0] == "login":
- #check for proper command usage
- if len(commandParts) == 3:
- #send login command to server
- s.send(bytes(command,'utf-8'))
- #receive response from server
-
- response = s.recv(1024)
- #decode header bytes
- result = response[:5]
- result = result.decode('utf-8')
- if result == "login":
- #if header bytes match, get the user pickle
- userObj = response[5:]
- #unpickle User
- userLogged = pickle.loads(userObj)
- print("login confirmed")
- else:
- #header bytes dont match, print error response
- print(response.decode('utf-8'))
-
- else:
- print("Command usage: login <user> <pass>")
- s.send(bytes("retry",'utf-8'))
- response = s.recv(1024)
- response = response.decode('utf-8')
- elif commandParts[0] == "logout":
- #check for proper command usage
- if len(commandParts) == 1:
- #make sure a user is logged in
- if userLogged and userLogged.logStatus == True:
- #use User object to tell the server which user to logout
- msg = commandParts[0] + " " + userLogged.username
- s.send(bytes(msg,'utf-8'))
- response = s.recv(1024)
- #get logout response and close socket connection
- response = response.decode('utf-8')
- print(response)
- s.close
- break
- else:
- #handle if no user is logged in
- print("Denied. Please login first.")
- command = "retry"
- s.send(bytes(command,'utf-8'))
- response = s.recv(1024)
- response = response.decode('utf-8')
- else:
- print("Command usage: logout")
- command = "retry"
- s.send(bytes(command,'utf-8'))
- response = s.recv(1024)
- response = response.decode('utf-8')
- elif commandParts[0] == "send":
- #make sure a user is logged in
- if userLogged and userLogged.logStatus == True:
- #split the send command to keep the message content together
- sendMsg = command.split(" ", 1)
- #check for proper send usage
- if len(sendMsg) == 2 and sendMsg[0] == "send":
- #insert the username into the command to send to the server
- sendMsg.insert(1,userLogged.username)
- sendStr = ""
- #put the send command together and send to server
- for ele in sendMsg:
- sendStr += ele + " "
- s.send(bytes(sendStr,'utf-8'))
- response = s.recv(1024)
- response = response.decode('utf-8')
- print(response)
- else:
- print("Command usage: send <message>")
- command = "retry"
- s.send(bytes(command,'utf-8'))
- response = s.recv(1024)
- response = response.decode('utf-8')
- else:
- #handle if no user is logged in
- print("Denied. Please login first.")
- command = "retry"
- s.send(bytes(command,'utf-8'))
- response = s.recv(1024)
- response = response.decode('utf-8')
- elif commandParts[0] == "newuser":
- #check for proper newuser usage
- if len(commandParts) == 3:
- s.send(bytes(command,'utf-8'))
- response = s.recv(1024)
- response = response.decode('utf-8')
- print(response)
- else:
- print("Command usage: newuser <username> <password>")
- command = "retry"
- s.send(bytes(command,'utf-8'))
- response = s.recv(1024)
- response = response.decode('utf-8')
- else:
- #catch invalid commands
- print("Invalid Command. Potential commands are:")
- if userLogged.logStatus == True:
- print("send <message>\nlogout")
- elif userLogged.logStatus == False:
- print("newuser <username> <password>\nlogin <username> <password>")
- command = "retry"
- s.send(bytes(command,'utf-8'))
- response = s.recv(1024)
- response = response.decode('utf-8')
-
-
复制代码
User.py:
- class User:
- def __init__(self,username,logStatus):
- self.username = username
- self.logStatus = logStatus
复制代码 |
|