非常好用,快去试试吧 from openai import OpenAI
MASSAGES = []
CLIENT = OpenAI(api_key="", base_url="https://api.siliconflow.cn/v1") # 将 api_key 替换成你自己的
def stream_deepseek_answer():
"""一轮对话"""
while True:
try:
contents = input("输入你的问题:\n")
MASSAGES.append({"role": "user", "content": contents})
stream = CLIENT.chat.completions.create(
model="Pro/deepseek-ai/DeepSeek-R1",
messages=MASSAGES,
temperature=1,
stream=True
)
reasoning_buffer = ""
content_buffer = ""
has_printed_thinking = False # 控制思考过程分隔线
has_printed_answer = False # 控制正式回答分隔线
print("\n=== DeepSeek的答案 ===")
for chunk in stream:
# 处理思考过程
if hasattr(chunk.choices[0].delta, 'reasoning_content'):
reasoning = chunk.choices[0].delta.reasoning_content or ""
reasoning_buffer += reasoning
# 首次收到思考内容时打印分隔线
if not has_printed_thinking and reasoning.strip():
print("-------- Think -------->")
has_printed_thinking = True
# 实时打印思考内容(替换换行为空格保持流畅)
print(reasoning, end="", flush=True)
# 处理正式回答
if hasattr(chunk.choices[0].delta, 'content'):
contents = chunk.choices[0].delta.content or ""
content_buffer += contents
# 首次收到正式内容时打印分隔线
if not has_printed_answer and contents.strip():
print("<------- Think ---------")
has_printed_answer = True
# 实时打印正式内容
print(contents, end="", flush=True)
print("\n========================")
except Exception as e:
print(f"发生错误: {e}\n服务器可能过载,请稍后再试。")
if __name__ == "__main__":
stream_deepseek_answer()
|