import sys
import re
import csv
from io import StringIO
import pickle
from dataclasses import dataclass
from typing import Dict, List, Tuple, Optional
class DatabaseError(Exception):
"""Base exception class for database errors"""
pass
class TableExistsError(DatabaseError):
pass
class TableNotFoundError(DatabaseError):
pass
class ColumnNotFoundError(DatabaseError):
pass
class DataTypeError(DatabaseError):
pass
class Table:
def __init__(self, name: str, columns: List[Tuple[str, str]]):
self.name = name
self.columns = {col[0]: col[1] for col in columns}
self.records: List[Dict] = []
def insert(self, values: List[str]):
if len(values) != len(self.columns):
raise ValueError("Number of values doesn't match columns")
record = {}
for (col_name, col_type), value in zip(self.columns.items(), values):
try:
if col_type == 'int':
record[col_name] = int(value)
elif col_type == 'str':
record[col_name] = str(value)
else:
raise DataTypeError(f"Unsupported type: {col_type}")
except ValueError:
raise DataTypeError(f"Invalid {col_type} value: {value}")
self.records.append(record)
def select(
self,
columns: Optional[List[str]] = None,
order_by: Optional[str] = None,
order_dir: str = 'ASC'
) -> List[Dict]:
if columns is None:
columns = list(self.columns.keys())
else:
for col in columns:
if col not in self.columns:
raise ColumnNotFoundError(f"Column {col} not found")
if order_by:
if order_by not in self.columns:
raise ColumnNotFoundError(f"Order column {order_by} not found")
col_type = self.columns[order_by]
def sort_key(record):
val = record[order_by]
return (val if col_type == 'str' else str(val)) if col_type == 'str' else val
sorted_records = sorted(
self.records,
key=sort_key,
reverse=(order_dir.upper() == 'DESC')
)
else:
sorted_records = self.records.copy()
return [{col: record[col] for col in columns} for record in sorted_records]
def to_dict(self):
return {
'columns': list(self.columns.items()),
'records': self.records
}
class Database:
def __init__(self):
self.tables: Dict[str, Table] = {}
def create_table(self, name: str, columns: List[Tuple[str, str]]):
if name in self.tables:
raise TableExistsError(f"Table {name} already exists")
self.tables[name] = Table(name, columns)
def get_table(self, name: str) -> Table:
if name not in self.tables:
raise TableNotFoundError(f"Table {name} not found")
return self.tables[name]
def save(self, filename: str):
data = {
name: table.to_dict()
for name, table in self.tables.items()
}
with open(filename, 'wb') as f:
f.write(b'DB_VERSION_1')
pickle.dump(data, f)
@classmethod
def load(cls, filename: str) -> 'Database':
db = cls()
with open(filename, 'rb') as f:
header = f.read(12)
if header != b'DB_VERSION_1':
raise ValueError("Invalid database format")
data = pickle.load(f)
for name, table_data in data.items():
columns = table_data['columns']
records = table_data['records']
table = Table(name, columns)
table.records = records
db.tables[name] = table
return db
class OutputTable:
@staticmethod
def format(columns: List[str], records: List[Dict]) -> str:
if not records:
return "No results"
col_widths = {
col: max(len(str(record.get(col, ''))) for record in records)
for col in columns
}
for col in columns:
col_widths[col] = max(col_widths[col], len(col))
sep = '+' + '+'.join('-' * (w + 2) for w in col_widths.values()) + '+'
header = '|' + '|'.join(f' {col.ljust(col_widths[col])} ' for col in columns) + '|'
rows = []
for record in records:
row = '|' + '|'.join(
f' {str(record.get(col, "")).ljust(col_widths[col])} '
for col in columns
) + '|'
rows.append(row)
return '\n'.join([sep, header, sep] + rows + [sep])
def parse_sql(sql: str):
sql = re.sub(r'\s+', ' ', sql.strip(), flags=re.UNICODE)
upper_sql = sql.upper()
if upper_sql == 'HELP':
return ('HELP', None)
if upper_sql.startswith('CREATE TABLE'):
match = re.match(r'CREATE TABLE (\w+) \((.+)\)', sql, re.IGNORECASE)
if not match:
raise ValueError("Invalid CREATE TABLE syntax")
table_name = match.group(1)
columns = [
tuple(col.strip().rsplit(' ', 1))
for col in match.group(2).split(',')
]
return ('CREATE_TABLE', (table_name, columns))
if upper_sql.startswith('INSERT INTO'):
match = re.match(r'INSERT INTO (\w+) VALUES \((.+)\)', sql, re.IGNORECASE)
if not match:
raise ValueError("Invalid INSERT syntax")
table_name = match.group(1)
values_part = match.group(2)
reader = csv.reader(
StringIO(values_part.replace("'", '"')),
quotechar='"',
skipinitialspace=True
)
values = next(reader)
return ('INSERT', (table_name, values))
if upper_sql.startswith('SELECT'):
match = re.match(
r'SELECT (.+?) FROM (\w+)(?: ORDER BY (\w+)(?: (ASC|DESC))?)?$',
sql,
re.IGNORECASE
)
if not match:
raise ValueError("Invalid SELECT syntax")
columns_part = match.group(1).strip()
table_name = match.group(2)
order_by = match.group(3)
order_dir = (match.group(4) or 'ASC').upper()
columns = None if columns_part == '*' else [c.strip() for c in columns_part.split(',')]
return ('SELECT', (table_name, columns, order_by, order_dir))
if upper_sql.startswith('SAVE '):
return ('SAVE', sql.split(' ', 1)[1].strip())
if upper_sql.startswith('LOAD '):
return ('LOAD', sql.split(' ', 1)[1].strip())
if upper_sql == 'EXIT':
return ('EXIT', None)
raise ValueError(f"Unsupported command: {sql}")
def main():
db = Database()
print("SimpleDB - Type 'HELP' for help")
while True:
try:
command = input("SQL> ").strip()
if not command:
continue
cmd_type, args = parse_sql(command)
if cmd_type == 'HELP':
print("""
Supported commands:
CREATE TABLE <table> (col1 type1, col2 type2...)
INSERT INTO <table> VALUES (val1, val2...)
SELECT [*|col1,col2...] FROM <table> [ORDER BY col [ASC|DESC]]
SAVE <filename>
LOAD <filename>
EXIT
HELP
""".strip())
elif cmd_type == 'CREATE_TABLE':
table_name, columns = args
db.create_table(table_name, columns)
print(f"Table '{table_name}' created")
elif cmd_type == 'INSERT':
table_name, values = args
table = db.get_table(table_name)
table.insert(values)
print(f"Inserted 1 row into {table_name}")
elif cmd_type == 'SELECT':
table_name, columns, order_by, order_dir = args
table = db.get_table(table_name)
records = table.select(columns, order_by, order_dir)
output_columns = columns if columns else list(table.columns.keys())
print(OutputTable.format(output_columns, records))
elif cmd_type == 'SAVE':
db.save(args)
print(f"Database saved to {args}")
elif cmd_type == 'LOAD':
db = Database.load(args)
print(f"Database loaded from {args}")
elif cmd_type == 'EXIT':
break
except Exception as e:
print(f"Error: {str(e)}")
if __name__ == "__main__":
main()