# 加载必要的库和环境变量import openaiimport osfrom math import *from icecream import icimport jsonfrom pdfminer.high_level import extract_pagesfrom pdfminer.layout import LTTextContainerfrom elasticsearch7 import Elasticsearch, helpersfrom nltk.stem import PorterStemmerfrom nltk.tokenize import word_tokenizefrom nltk.corpus import stopwordsimport nltkimport reimport warnings
# 加载 .env 文件from dotenv import load_dotenv, find_dotenv_ = load_dotenv(find_dotenv())
# 屏蔽一些警告warnings.simplefilter("ignore")
# 初始化 OpenAI 和 Elasticsearch 配置openai.api_key = os.getenv('OPENAI_API_KEY')openai.api_base = os.getenv('OPENAI_API_URL')model = os.getenv('MODEL')ES_HOSTS = os.getenv('ES_HOSTS')ES_USERNAME = os.getenv('ES_USERNAME')ES_PASSWORD = os.getenv('ES_PASSWORD')
def extract_text_from_pdf(filename,page_numbers=None,min_line_length=1):'''从 PDF 文件中提取文本。
:param filename: PDF 文件名:param page_numbers: 需要提取文本的页码列表,可选:param min_line_length: 最小行长度,用于过滤短行,可选:return: 提取的文本段落列表''''''从 PDF 文件中(按指定页码)提取文字'''paragraphs = []buffer = ''full_text = ''# 提取全部文本for i, page_layout in enumerate(extract_pages(filename)):# 如果指定了页码范围,跳过范围外的页if page_numbers is not None and i not in page_numbers:continuefor element in page_layout:if isinstance(element, LTTextContainer):full_text += element.get_text() + '\n'# 按空行分隔,将文本重新组织成段落lines = full_text.split('\n')for text in lines:if len(text) >= min_line_length:buffer += (' '+text) if not text.endswith('-') else text.strip('-')elif buffer: paragraphs.append(buffer)buffer = ''if buffer:paragraphs.append(buffer)return paragraphs
def fenchi_init():'''初始化自然语言处理工具,下载必要的资源。'''nltk.download('punkt') # 英文切词、词根、切句等方法nltk.download('stopwords') # 英文停用词库
def to_keywords(input_string):'''将输入字符串转换为关键词列表。
:param input_string: 输入的字符串:return: 关键词字符串''''''(英文)文本只保留关键字'''# 使用正则表达式替换所有非字母数字的字符为空格no_symbols = re.sub(r'[^a-zA-Z0-9\s]', ' ', input_string)word_tokens = word_tokenize(no_symbols)stop_words = set(stopwords.words('english'))ps = PorterStemmer()# 去停用词,取词根filtered_sentence = [ps.stem(w) for w in word_tokens if not w.lower() in stop_words]return ' '.join(filtered_sentence)
def init_index(paragraphs):'''初始化 Elasticsearch 索引,将文本段落索引到 Elasticsearch 中。
:param paragraphs: 文本段落列表'''# 1. 创建Elasticsearch连接es = Elasticsearch(hosts=[ ES_HOSTS ],# 服务地址与端口http_auth=(ES_USERNAME, ES_PASSWORD), # 用户名,密码)
# 2. 定义索引名称index_name = "string_index"
# 3. 如果索引已存在,删除它(仅供演示,实际应用时不需要这步)if es.indices.exists(index=index_name):es.indices.delete(index=index_name)
# 4. 创建索引es.indices.create(index=index_name)
# 5. 灌库指令actions = [{"_index": index_name,"_source": {"keywords": to_keywords(para),"text": para}}for para in paragraphs]
# 6. 文本灌库helpers.bulk(es, actions)
def search(query_string, top_n=3):'''在 Elasticsearch 中搜索匹配的文本段落。
:param query_string: 搜索查询字符串:param top_n: 返回的匹配段落数量:return: 匹配的文本段落列表'''# 1. 创建Elasticsearch连接es = Elasticsearch(hosts=[ ES_HOSTS ],# 服务地址与端口http_auth=(ES_USERNAME, ES_PASSWORD), # 用户名,密码)# ES 的查询语言search_query = {"match": {"keywords": to_keywords(query_string)}}index_name = "string_index"res = es.search(index=index_name, query=search_query, size=top_n)return [hit["_source"]["text"] for hit in res["hits"]["hits"]]
def get_completion(prompt, model=model):'''使用 OpenAI 完成聊天对话。
:param prompt: 聊天的提示信息:param model: 使用的 OpenAI 模型:return: 聊天的回复'''messages = [{"role": "user", "content": prompt}]
response = openai.ChatCompletion.create(model=model,messages=messages,temperature=0,# 模型输出的随机性,0 表示随机性最小)
return response.choices[0].message["content"]
def build_prompt(prompt_template, **kwargs):'''根据模板和参数构建提示信息。
:param prompt_template: 提示信息的模板字符串:param kwargs: 用于替换模板中的变量的键值对:return: 构建后的提示信息''''''将 Prompt 模板赋值'''prompt = prompt_templatefor k, v in kwargs.items():if isinstance(v,str):val = velif isinstance(v, list) and all(isinstance(elem, str) for elem in v):val = '\n'.join(v)else:val = str(v)prompt = prompt.replace(f"__{k.upper()}__",val)return prompt
def test_promopt():'''测试构建和使用提示信息的函数。'''prompt_template = """你是一个问答机器人。你的任务是根据下述给定的已知信息回答用户问题。确保你的回复完全依据下述已知信息。不要编造答案。如果下述已知信息不足以回答用户的问题,请直接回复"我无法回答您的问题"。
已知信息:__INFO__
用户问:__QUERY__
请用中文回答用户问题。"""user_query="how many parameters does llama 2 have?"
# 1. 检索search_results = search(user_query,2)
# 2. 构建 Promptprompt = build_prompt(prompt_template, info=search_results, query=user_query)print("===Prompt===")print(prompt)
# 3. 调用 LLMresponse = get_completion(prompt)#response = get_completion_ernie(prompt)print("===回复===")print(response)
if __name__ == '__main__':
test_promopt()