|
马上注册,结交更多好友,享用更多功能^_^
您需要 登录 才可以下载或查看,没有账号?立即注册
x
本帖最后由 zhangchenyvn 于 2025-2-9 09:40 编辑
问题(prompt):
- 设计一个命令行版的Python数据库,集成在单一Python脚本文件中,实现类似SQLite的基础功能,同时支持内存数据库操作和简单的排序功能。具体要求如下:
- 1. 创建一个名为 `Database` 的类,用于管理数据库中的表。每个表由一个名为 `Table` 的类实例表示,包含列名和数据类型,以及一个列表来存储记录。
- 2. `Table` 类应包含常用的数据库操作的函数实现。
- 3. 使用字典和列表组合的数据结构来存储表和记录,实现内存数据库模式,允许在内存中创建和操作表,无需持久化存储。
- 4. 创建一个名为 `OutputTable` 的类,用于格式化输出查询结果,确保输出美观易读。
- 5. 实现基本的排序功能,例如`ORDER BY`,以便可以根据指定列对查询结果进行升序或降序排列。排序逻辑需考虑不同数据类型(如数字和字符串)的比较规则。对于字符串类型,使用Python内置的字符串比较方法;对于数字类型,使用数值比较。
- 6. 提供一个简单的命令行界面,用户可以输入SQL命令进行数据库操作,程序能够解析这些命令并返回相应的结果。命令行界面应包括帮助文档,介绍如何使用各个命令。帮助文档应包含所有支持的SQL命令及其语法说明。
- 7. 确保所有代码集成在同一个Python文件中,便于管理和测试。文件应包含导入所需的库的语句,例如 `import sys` 和 `import datetime`。
- 8. 提供良好的数据持久化机制,将所有数据存储在类似于.db的二进制文件中,该文件应有一个文件头和文件主体,用于标识数据版本和存储格式。文件头可以是一个固定的字符串,如"DB_VERSION_1",表示当前数据格式版本。
- 9. 提供完整的实现,禁止使用 SQLite 等现成的数据库实现。注意实现过程中应处理常见的错误情况,如表不存在、字段错误等,以增强程序的健壮性。错误处理应包括异常抛出和友好提示信息。
- **特别注意**:贯彻OOP思想,一切皆对象,将可能的数据结构作为 class 来编写。
- 示例输出格式:
- ```
- +----+--------------+---------------------------+-------+
- | id | name | url | alexa |
- +----+--------------+---------------------------+-------+
- ...
- | 2 | 淘宝 | https://www.taobao.com/ | 13 |
- | 3 | 菜鸟教程 | http://www.runoob.com/ | 4689 |
- | 4 | 微博 | http://weibo.com/ | 20 |
- ...
- +----+--------------+---------------------------+-------+
- ```
复制代码
他的答复:
- import sys
- import re
- import csv
- from io import StringIO
- import pickle
- from dataclasses import dataclass
- from typing import Dict, List, Tuple, Optional
- class DatabaseError(Exception):
- """Base exception class for database errors"""
- pass
- class TableExistsError(DatabaseError):
- pass
- class TableNotFoundError(DatabaseError):
- pass
- class ColumnNotFoundError(DatabaseError):
- pass
- class DataTypeError(DatabaseError):
- pass
- class Table:
- def __init__(self, name: str, columns: List[Tuple[str, str]]):
- self.name = name
- self.columns = {col[0]: col[1] for col in columns}
- self.records: List[Dict] = []
- def insert(self, values: List[str]):
- if len(values) != len(self.columns):
- raise ValueError("Number of values doesn't match columns")
-
- record = {}
- for (col_name, col_type), value in zip(self.columns.items(), values):
- try:
- if col_type == 'int':
- record[col_name] = int(value)
- elif col_type == 'str':
- record[col_name] = str(value)
- else:
- raise DataTypeError(f"Unsupported type: {col_type}")
- except ValueError:
- raise DataTypeError(f"Invalid {col_type} value: {value}")
-
- self.records.append(record)
- def select(
- self,
- columns: Optional[List[str]] = None,
- order_by: Optional[str] = None,
- order_dir: str = 'ASC'
- ) -> List[Dict]:
- if columns is None:
- columns = list(self.columns.keys())
- else:
- for col in columns:
- if col not in self.columns:
- raise ColumnNotFoundError(f"Column {col} not found")
- if order_by:
- if order_by not in self.columns:
- raise ColumnNotFoundError(f"Order column {order_by} not found")
-
- col_type = self.columns[order_by]
- def sort_key(record):
- val = record[order_by]
- return (val if col_type == 'str' else str(val)) if col_type == 'str' else val
-
- sorted_records = sorted(
- self.records,
- key=sort_key,
- reverse=(order_dir.upper() == 'DESC')
- )
- else:
- sorted_records = self.records.copy()
- return [{col: record[col] for col in columns} for record in sorted_records]
- def to_dict(self):
- return {
- 'columns': list(self.columns.items()),
- 'records': self.records
- }
- class Database:
- def __init__(self):
- self.tables: Dict[str, Table] = {}
- def create_table(self, name: str, columns: List[Tuple[str, str]]):
- if name in self.tables:
- raise TableExistsError(f"Table {name} already exists")
- self.tables[name] = Table(name, columns)
- def get_table(self, name: str) -> Table:
- if name not in self.tables:
- raise TableNotFoundError(f"Table {name} not found")
- return self.tables[name]
- def save(self, filename: str):
- data = {
- name: table.to_dict()
- for name, table in self.tables.items()
- }
- with open(filename, 'wb') as f:
- f.write(b'DB_VERSION_1')
- pickle.dump(data, f)
- @classmethod
- def load(cls, filename: str) -> 'Database':
- db = cls()
- with open(filename, 'rb') as f:
- header = f.read(12)
- if header != b'DB_VERSION_1':
- raise ValueError("Invalid database format")
- data = pickle.load(f)
- for name, table_data in data.items():
- columns = table_data['columns']
- records = table_data['records']
- table = Table(name, columns)
- table.records = records
- db.tables[name] = table
- return db
- class OutputTable:
- @staticmethod
- def format(columns: List[str], records: List[Dict]) -> str:
- if not records:
- return "No results"
-
- col_widths = {
- col: max(len(str(record.get(col, ''))) for record in records)
- for col in columns
- }
- for col in columns:
- col_widths[col] = max(col_widths[col], len(col))
-
- sep = '+' + '+'.join('-' * (w + 2) for w in col_widths.values()) + '+'
- header = '|' + '|'.join(f' {col.ljust(col_widths[col])} ' for col in columns) + '|'
- rows = []
- for record in records:
- row = '|' + '|'.join(
- f' {str(record.get(col, "")).ljust(col_widths[col])} '
- for col in columns
- ) + '|'
- rows.append(row)
-
- return '\n'.join([sep, header, sep] + rows + [sep])
- def parse_sql(sql: str):
- sql = re.sub(r'\s+', ' ', sql.strip(), flags=re.UNICODE)
- upper_sql = sql.upper()
-
- if upper_sql == 'HELP':
- return ('HELP', None)
-
- if upper_sql.startswith('CREATE TABLE'):
- match = re.match(r'CREATE TABLE (\w+) \((.+)\)', sql, re.IGNORECASE)
- if not match:
- raise ValueError("Invalid CREATE TABLE syntax")
- table_name = match.group(1)
- columns = [
- tuple(col.strip().rsplit(' ', 1))
- for col in match.group(2).split(',')
- ]
- return ('CREATE_TABLE', (table_name, columns))
-
- if upper_sql.startswith('INSERT INTO'):
- match = re.match(r'INSERT INTO (\w+) VALUES \((.+)\)', sql, re.IGNORECASE)
- if not match:
- raise ValueError("Invalid INSERT syntax")
- table_name = match.group(1)
- values_part = match.group(2)
- reader = csv.reader(
- StringIO(values_part.replace("'", '"')),
- quotechar='"',
- skipinitialspace=True
- )
- values = next(reader)
- return ('INSERT', (table_name, values))
-
- if upper_sql.startswith('SELECT'):
- match = re.match(
- r'SELECT (.+?) FROM (\w+)(?: ORDER BY (\w+)(?: (ASC|DESC))?)?$',
- sql,
- re.IGNORECASE
- )
- if not match:
- raise ValueError("Invalid SELECT syntax")
- columns_part = match.group(1).strip()
- table_name = match.group(2)
- order_by = match.group(3)
- order_dir = (match.group(4) or 'ASC').upper()
- columns = None if columns_part == '*' else [c.strip() for c in columns_part.split(',')]
- return ('SELECT', (table_name, columns, order_by, order_dir))
-
- if upper_sql.startswith('SAVE '):
- return ('SAVE', sql.split(' ', 1)[1].strip())
-
- if upper_sql.startswith('LOAD '):
- return ('LOAD', sql.split(' ', 1)[1].strip())
-
- if upper_sql == 'EXIT':
- return ('EXIT', None)
-
- raise ValueError(f"Unsupported command: {sql}")
- def main():
- db = Database()
- print("SimpleDB - Type 'HELP' for help")
-
- while True:
- try:
- command = input("SQL> ").strip()
- if not command:
- continue
-
- cmd_type, args = parse_sql(command)
-
- if cmd_type == 'HELP':
- print("""
- Supported commands:
- CREATE TABLE <table> (col1 type1, col2 type2...)
- INSERT INTO <table> VALUES (val1, val2...)
- SELECT [*|col1,col2...] FROM <table> [ORDER BY col [ASC|DESC]]
- SAVE <filename>
- LOAD <filename>
- EXIT
- HELP
- """.strip())
-
- elif cmd_type == 'CREATE_TABLE':
- table_name, columns = args
- db.create_table(table_name, columns)
- print(f"Table '{table_name}' created")
-
- elif cmd_type == 'INSERT':
- table_name, values = args
- table = db.get_table(table_name)
- table.insert(values)
- print(f"Inserted 1 row into {table_name}")
-
- elif cmd_type == 'SELECT':
- table_name, columns, order_by, order_dir = args
- table = db.get_table(table_name)
- records = table.select(columns, order_by, order_dir)
- output_columns = columns if columns else list(table.columns.keys())
- print(OutputTable.format(output_columns, records))
-
- elif cmd_type == 'SAVE':
- db.save(args)
- print(f"Database saved to {args}")
-
- elif cmd_type == 'LOAD':
- db = Database.load(args)
- print(f"Database loaded from {args}")
-
- elif cmd_type == 'EXIT':
- break
-
- except Exception as e:
- print(f"Error: {str(e)}")
- if __name__ == "__main__":
- main()
复制代码
就一个词:厉害!
测试:程序可用。
LICENSED under BSD3 |
|