前言
有一段时间没有更新文章了,最近决定梳理一下这几个月一直在忙的聊天记录问答项目,分享一下作者的心得。
实际上,整个项目不止是基于聊天记录的问答,这里以聊天记录问答为例,是为了各位读者便于理解。
其它信息在这里不便多说,希望各位读者能够理解。
话不多说,咱们直接进入正题。
整个项目打算分为两块来讲:
一、数据样例
在这篇文章中,我们只需要关注右侧的聊天记录部分。
二、分析
从上图的数据样例可以看出,我们有几个可选的方案:
- 根据聊天记录,做实体、关系、三元组抽取,构建知识图谱,基于知识图谱做问答
具体的方案,读者可以自行思考优劣,咱们在这里不展开叙述(下一篇讲)。
很显然,我们首先可以排除 “将每条消息embedding,直接做RAG” 的方案,因为这样做,我们检索出来的消息,几乎都长得一样,并且非常碎片化,基于这些数据生成的回答,往往不够全面。
那么,剩下的方案,看起来都是有效的,事实也确实如此,我们最终的方案便是三者结合。
NaiveRAG和GraphRAG有一个共同点:由于LLM最大上下文限制,当两人的聊天记录非常多时,需要对数据分块。
实际上,我们的业务不仅需要做问答,还需要对聊天记录摘要,因此,合理的数据分块,是非常重要的。
这便引出了我们这篇博客主题。
三、如何做
我们的核心原则:不能把同一个话题划分到多个窗口,要尽量保持话题的完整性。
结合我们平时的聊天习惯很容易想到的是:当两条消息的发送时间间隔不长时,那他们大概属于同一个话题。
很显然,这是对的。
我们的做法:
- 当两条消息的发送时间间隔小于某个阈值时,被认为属于同一个窗口
- 这个窗口中,可能包括了多个话题,但这些话题,没有被拆分到多个窗口,因此一个窗口中有多个话题也是是没关系的
- 通过大量数据分析发现,这个阈值在 120 分钟时,效果是比较好的
通过时间间隔划分好窗口后我们发现,有些话题,还是被划分到多个窗口去了,比如:有些消息没有得到及时回复,可能间隔了三五个小时,但他们确实属于同一个话题,应该被划分到同一个窗口。
- 当前一个窗口的末尾部分的内容与当前窗口的开始部分的内容相似度大于某个阈值时,被认为属于同一个窗口
- 目前LLM的上下文主流的都在32768左右,但我们实验发现,当上下文超过8k时,LLM的效果下降得厉害
- 因此,我们对于超长的窗口,将其划分为长度在8k token左右的多个窗口,并且每个窗口保持一定长度的首尾消息冗余
很多同学可能会有疑问,前面不是把长窗口划分成短窗口了吗,为什么又要合并起来
这里的短窗口,是指只有三五条消息的窗口,比如:两人虽然聊了很多个话题,但每个话题的消息数量都很少,也就是加起来的token很少,并且信息含量较低,如果把这种信息含量较低的内容都丢给LLM处理一次的话,很容易出现很多没什么用的总结,并且增加了LLM处理时间,因此,我们将这些短的窗口合并起来,只要不超过某一长度阈值,就都合并到同一个窗口
至此,我们便得到了划分比较合理的多个聊天窗口了。
四、结合代码理解
代码不一定完全实现了以上逻辑,为了帮助大家理解,可以参考下
classSplitChatWindow:
def__init__(self):
self.simalarity_endpoint = os.getenv(
"SIMALARITY_ENDPOINT")
defget_simalarity(self, a, b):
payload = json.dumps(
{"inputs": {"sentences": [b],"source_sentence": a}})
headers = {
'Content-Type':'application/json',
'Authorization':'Bearer '+ os.getenv("OPENAI_API_KEY","empty")
}
response = requests.request(
"
OST", self.simalarity_endpoint, headers=headers, data=payload
)
simalarity = response.json()["data"][0]
returnsimalarity
defsplit_chat_by_time(self, chat_records, time_threshold_minutes):
chat_records.sort(
key=lambdax: parse(x["Time"].strip())
)
topic_windows = []
current_window = [chat_records[0]]
foriinrange(1, len(chat_records)):
current_time = parse(
chat_records[i]["Time"].strip()
)
prev_time = parse(
chat_records[i -1]["Time"].strip()
)
time_diff = (current_time - prev_time).total_seconds() / \
60# in minutes
iftime_diff <= time_threshold_minutes:
current_window.append(chat_records[i])
else:
topic_windows.append(current_window)
current_window = [chat_records[i]]
topic_windows.append(current_window)
returntopic_windows
defmerge_similar_window_batch_with_sbert(self, windows, similarity_threshold):
merged_windows = [windows[0]]
window_content_list = [
"\n".join([msg["Content"].strip()formsginmerged_windows[-1]])
]
forwindowinwindows[1:]:
new_window_content ="\n".join(
[msg["Content"].strip()formsginwindow])
window_content_list.append(new_window_content)
foriinrange(1, len(windows)):
ifwindow_content_list[i -1].strip() !=""andwindow_content_list[i].strip() !="":
similarity = self.get_simalarity(
window_content_list[i -
1][-256:], window_content_list[i][:256]
)
ifsimilarity > similarity_threshold:
merged_windows[-1].extend(windows[i])
else:
merged_windows.append(windows[i])
else:
merged_windows.append(windows[i])
returnmerged_windows
defmerge_short_window(self, windows, max_length=8192):
merged_windows = [windows[0]]
foridx, windowinenumerate(windows[1:]):
current_window_content ="\n".join(
[
msg["SendNickName"]
+" -> "
+ msg["ReceiveNickName"]
+": "
+ msg["Content"].strip()
formsginmerged_windows[-1]
]
)
new_window_content ="\n".join(
[
msg["SendNickName"]
+" -> "
+ msg["ReceiveNickName"]
+": "
+ msg["Content"].strip()
formsginwindow
]
)
total_length = len(current_window_content) + \
len(new_window_content)
iftotal_length < max_length:
merged_windows[-1] += window
else:
merged_windows.append(window)
returnmerged_windows
defsplit_long_window(self, windows, max_length=32000):
new_windows = []
forwindowinwindows:
current_window_content ="\n".join(
[
msg["SendNickName"]
+" -> "
+ msg["ReceiveNickName"]
+": "
+ msg["Content"].strip()
formsginwindow
]
)
window_content_length = len(current_window_content)
ifwindow_content_length > max_length:
# 计算需要拆分成多少个子窗口
num_splits = (window_content_length // max_length) +1
messages_per_window = len(window) // num_splits
# 按照消息数量平均拆分
foriinrange(0, len(window), messages_per_window):
new_windows.append(window[i:i + messages_per_window])
logger.info(
f"window 长度为:{window_content_length}, 拆分成:{num_splits}个window"
)
else:
new_windows.append(window)
returnnew_windows
defsplit_window_by_length(self, message_list, max_length=512):
merged_windows = [[message_list[0]]]
formsginmessage_list[1:]:
current_window_content ="\n".join(
[
msg["SendNickName"]
+" -> "
+ msg["ReceiveNickName"]
+": "
+ msg["Content"].strip()
formsginmerged_windows[-1]
]
)
new_window_content = (
msg["SendNickName"]
+" -> "
+ msg["ReceiveNickName"]
+": "
+ msg["Content"].strip()
)
length = len(current_window_content) + len(new_window_content)
iflength < max_length:
merged_windows[-1].append(msg)
else:
merged_windows.append([msg])
returnmerged_windows