《Higress AI 网关挑战赛》正在火热进行中,Higress 社区邀请了目前位于排行榜 top5 的选手杨贝宁同学分享他的心得。下面是他整理的参赛攻略:
背景
我们要在 Higress 网关中编写 WebAssembly(wasm)插件,使得在 http 请求的各个阶段(requestHeader,requestBody,responseHeader,responseBody)能够将相应的请求或返回捕获进行业务逻辑的处理。具体到本比赛,主要需要实现的是缓存对大模型的请求(openai 接口的形式)在本地(或云数据库),并设计语义级别的缓存命中逻辑来实现降低响应请求且减少 token 费用的目的。
AI Cache 示例
以上图为例,本比赛主要的问题可以归纳为:(1)如何根据 Query 字符串生成合适的 Query 向量 ⇒ 向量生成器选型。(2)如何根据 Query 向量进行语义级别的查找,快速找到合适的缓存向量 ⇒ 缓存命中逻辑设计。(3)如何管理大量的缓存⇒向量数据库选型及重复初始化逻辑。
实际上 Redis 也具备 Vector Store 能力,这里的 Cache Store 和 Vector Store 是可以合并的。不过本 Demo 将二者分开了,Cache Store 使用 Redis,Vector Store 使用阿里云 DashVector 服务。
一、网关环境搭建
网关需要 AI Proxy 插件作为处理 AI 请求的支撑,我们可以采用插件市场中已有的 ai-proxy 插件。从源码编译的命令和上次如下所示。最后,配置环节需要提供大模型服务商的 api 和 token key 等,注意比赛需要使用通义千问的 qwen_long 模型。
gitclonehttps://github.com/alibaba/higress.gitcdhigress/plugins/wasm-goPLUGIN_NAME=ai-cacheEXTRA_TAGS=proxy_wasm_version_0_2_100makebuild
version: '3.9'networks:higress-net:external: falseservices:higress:image: registry.cn-hangzhou.aliyuncs.com/ztygw/aio-redis:1.4.1-rc.1environment:- GATEWAY_COMPONENT_LOG_LEVEL=misc:error,wasm:debug # 重要,开启日志- CONFIG_TEMPLATE=ai-proxy- DEFAULT_AI_SERVICE=qwen- DASHSCOPE_API_KEY= [YOUR_KEY]networks:- higress-netports:- "9080:8080/tcp"- "9001:8001/tcp"volumes:- 本地data目录:/data- 本地log目录:/var/log/higress/ # 重要,方便在容器restrat之后查看日志restart: alwayslobechat:image: lobehub/lobe-chatenvironment:- CODE=123456ed- OPENAI_API_KEY=unused- OPENAI_PROXY_URL=http://higress:8080/v1networks:- higress-netports:- "3210:3210/tcp"restart:always
主要更改了 Higress 的 image,environment 以及 volumes 的配置,启动和重启就是 docker compose up -d docker compose restart。
cd${workspaceFolder}/higress/plugins/wasm-goPLUGIN_NAME=ai-cacheEXTRA_TAGS=proxy_wasm_version_0_2_100makebuild//修改版本号(version.txt)exportcur_version=$(cat${workspaceFolder}/version.txt)&&dockerbuild-t[YOURIMAGE_BASE_URL]
cur_version-fDockerfile.&&dockerpush[YOUR_IMAGE_BASE_URL]
cur_version//修改本地测试环境配置中的镜像版本sudobash-c\"sed-i's|oci://registry.cn-hangzhou.aliyuncs.com/XXX:[0-9]*\\\\.[0-9]*\\\\.[0-9]*|oci://registry.cn-hangzhou.aliyuncs.com/XXX
(catversion.txt)|g'data/wasmplugins/ai-cache-1.0.0.yaml\当查询到达时,与Redis中存储的键进行匹配(`redisSearchHandler`)。如果完全一致,则直接返回结果(`handleCacheHit`)。如果不匹配,则请求`text_embedding`接口将查询转换为`query_embedding`(`fetchAndProcessEmbeddings`)。使用`query_embedding`与向量数据库中的向量进行ANN搜索,返回最接近的键,并通过阈值进行过滤(`performQueryAndRespond`)。如果返回结果为空或距离大于阈值,则丢弃结果,本轮缓存未命中,最后将`query_embedding`存入向量数据库(`uploadQueryEmbedding`)。如果距离小于阈值,则再次调用Redis对最相似的键进行匹配(`redisSearchHandler`)。在响应阶段,请求Redis新增键值对,键为查询的问题,值为LLM返回结果。
可以看到,除了 Redis 服务外,我们还需要请求文本向量化服务和向量数据库服务,这里我们分别选取向量生成器:阿里灵积通用文本向量接口ingFang SC", Cambria, Cochin, Georgia, Times, "Times New Roman", serif;">[ingFang SC", Cambria, Cochin, Georgia, Times, "Times New Roman", serif;">2]和向量数据库:阿里向量检索服务 DashVectoringFang SC", Cambria, Cochin, Georgia, Times, "Times New Roman", serif;">[ingFang SC", Cambria, Cochin, Georgia, Times, "Times New Roman", serif;">3]作为服务商。
DashVectorClientwrapper.HttpClient`yaml:"-"json:"-"`DashScopeClientwrapper.HttpClient`yaml:"-"json:"-"`redisClientwrapper.RedisClient`yaml:"-"json:"-"`
并且在 ParseConfig 函数中注册外部服务:
c.DashVectorInfo.DashVectorClient=wrapper.NewClusterClient(wrapper.DnsCluster{ServiceName:c.DashVectorInfo.DashVectorServiceName,Port:443,Domain:c.DashVectorInfo.DashVectorAuthApiEnd,})c.DashVectorInfo.DashScopeClient=wrapper.NewClusterClient(wrapper.DnsCluster{ServiceName:c.DashVectorInfo.DashScopeServiceName,Port:443,Domain:"dashscope.aliyuncs.com",})Dash:dashScopeKey:"YOUR_DASHSCOPE_KEY"#这个是文本向量的keydashScopeServiceName:"qwen"#重要,需要和scope对应的服务名匹配dashVectorCollection:"YOUR_CLUSTER_NAME"dashVectorEnd:"YOUR_VECTOR_END"dashVectorKey:"YOUR_DASHVECTOR_KEY"#这个是DASHVECTOR的keydashVectorServiceName:"DashVector.dns"#重要,需要新建一个vector对应的DNS服务sessionID:"XXX"#可用可不用,主要用于重复初始化逻辑redis:#重要serviceName:"redis.static"timeout:2000
// ===================== 以下是主要逻辑 =====================// 主handler函数,根据key从redis中获取value ,如果不命中,则首先调用文本向量化接口向量化query,然后调用向量搜索接口搜索最相似的出现过的key,最后再次调用redis获取结果// 可以把所有handler单独提取为文件,这里为了方便读者复制就和主逻辑放在一个文件中了//// 1. query 进来和 redis 中存的 key 匹配 (redisSearchHandler) ,若完全一致则直接返回 (handleCacheHit)// 2. 否则请求 text_embdding 接口将 query 转换为 query_embedding (fetchAndProcessEmbeddings)// 3. 用 query_embedding 和向量数据库中的向量做 ANN search,返回最接近的 key ,并用阈值过滤 (performQueryAndRespond)// 4. 若返回结果为空或大于阈值,舍去,本轮 cache 未命中, 最后将 query_embedding 存入向量数据库 (uploadQueryEmbedding)// 5. 若小于阈值,则再次调用 redis对 most similar key 做匹配。(redisSearchHandler)// 7. 在 response 阶段请求 redis 新增key/LLM返回结果func redisSearchHandler(key string, ctx wrapper.HttpContext, config PluginConfig, log wrapper.Log, stream bool, ifUseEmbedding bool) error {err := config.redisClient.Get(config.CacheKeyPrefix+key, func(response resp.Value) {if err := response.Error(); err == nil && !response.IsNull() {log.Warnf("cache hit, key:%s", key)handleCacheHit(key, response, stream, ctx, config, log)} else {log.Warnf("cache miss, key:%s", key)if ifUseEmbedding {handleCacheMiss(key, err, response, ctx, config, log, key, stream)} else {proxywasm.ResumeHttpRequest()return}}})return err}// 简单处理缓存命中的情况, 从redis中获取到value后,直接返回func handleCacheHit(key string, response resp.Value, stream bool, ctx wrapper.HttpContext, config PluginConfig, log wrapper.Log) {log.Warnf("cache hit, key:%s", key)ctx.SetContext(CacheKeyContextKey, nil)if !stream {proxywasm.SendHttpResponse(200, [][2]string{{"content-type", "application/json; charset=utf-8"}}, []byte(fmt.Sprintf(config.ReturnResponseTemplate, response.String())), -1)} else {proxywasm.SendHttpResponse(200, [][2]string{{"content-type", "text/event-stream; charset=utf-8"}}, []byte(fmt.Sprintf(config.ReturnStreamResponseTemplate, response.String())), -1)}}// 处理缓存未命中的情况,调用fetchAndProcessEmbeddings函数向量化queryfunc handleCacheMiss(key string, err error, response resp.Value, ctx wrapper.HttpContext, config PluginConfig, log wrapper.Log, queryString string, stream bool) {if err != nil {log.Warnf("redis get key:%s failed, err:%v", key, err)}if response.IsNull() {log.Warnf("cache miss, key:%s", key)}fetchAndProcessEmbeddings(key, ctx, config, log, queryString, stream)}// 调用文本向量化接口向量化query, 向量化成功后调用processFetchedEmbeddings函数处理向量化结果func fetchAndProcessEmbeddings(key string, ctx wrapper.HttpContext, config PluginConfig, log wrapper.Log, queryString string, stream bool) {Emb_url, Emb_requestBody, Emb_headers := ConstructTextEmbeddingParameters(&config, log, []string{queryString})config.DashVectorInfo.DashScopeClient.Post(Emb_url,Emb_headers,Emb_requestBody,func(statusCode int, responseHeaders http.Header, responseBody []byte) {// log.Infof("statusCode:%d, responseBody:%s", statusCode, string(responseBody))log.Infof("Successfully fetched embeddings for key: %s", key)if statusCode != 200 {log.Errorf("Failed to fetch embeddings, statusCode: %d, responseBody: %s", statusCode, string(responseBody))ctx.SetContext(QueryEmbeddingKey, nil)proxywasm.ResumeHttpRequest()} else {processFetchedEmbeddings(key, responseBody, ctx, config, log, stream)}},10000)}// 先将向量化的结果存入上下文ctx变量,其次发起向量搜索请求func processFetchedEmbeddings(key string, responseBody []byte, ctx wrapper.HttpContext, config PluginConfig, log wrapper.Log, stream bool) {text_embedding_raw, _ := ParseTextEmbedding(responseBody)text_embedding := text_embedding_raw.Output.Embeddings[0].Embedding// ctx.SetContext(CacheKeyContextKey, text_embedding)ctx.SetContext(QueryEmbeddingKey, text_embedding)ctx.SetContext(CacheKeyContextKey, key)performQueryAndRespond(key, text_embedding, ctx, config, log, stream)}// 调用向量搜索接口搜索最相似的key,搜索成功后调用redisSearchHandler函数获取最相似的key的结果func performQueryAndRespond(key string, text_embedding []float64, ctx wrapper.HttpContext, config PluginConfig, log wrapper.Log, stream bool) {vector_url, vector_request, vector_headers, err := ConstructEmbeddingQueryParameters(config, text_embedding)if err != nil {log.Errorf("Failed to perform query, err: %v", err)proxywasm.ResumeHttpRequest()return}config.DashVectorInfo.DashVectorClient.Post(vector_url,vector_headers,vector_request,func(statusCode int, responseHeaders http.Header, responseBody []byte) {log.Infof("statusCode:%d, responseBody:%s", statusCode, string(responseBody))query_resp, err_query := ParseQueryResponse(responseBody)if err_query != nil {log.Errorf("Failed to parse response: %v", err)proxywasm.ResumeHttpRequest()return}if len(query_resp.Output) < 1 {log.Warnf("query response is empty")uploadQueryEmbedding(ctx, config, log, key, text_embedding)return}most_similar_key := query_resp.Output[0].Fields["query"].(string)log.Infof("most similar key:%s", most_similar_key)most_similar_score := query_resp.Output[0].Scoreif most_similar_score < 0.1 {ctx.SetContext(CacheKeyContextKey, nil)redisSearchHandler(most_similar_key, ctx, config, log, stream, false)} else {log.Infof("the most similar key's score is too high, key:%s, score:%f", most_similar_key, most_similar_score)uploadQueryEmbedding(ctx, config, log, key, text_embedding)proxywasm.ResumeHttpRequest()return}},100000)}// 未命中cache,则将新的query embedding和对应的key存入向量数据库func uploadQueryEmbedding(ctx wrapper.HttpContext, config PluginConfig, log wrapper.Log, key string, text_embedding []float64) error {vector_url, vector_body, err := ConsturctEmbeddingInsertParameters(&config, log, text_embedding, key)if err != nil {log.Errorf("Failed to construct embedding insert parameters: %v", err)proxywasm.ResumeHttpRequest()return nil}err = config.DashVectorInfo.DashVectorClient.Post(vector_url,[][2]string{{"Content-Type", "application/json"},{"dashvector-auth-token", config.DashVectorInfo.DashVectorKey},},vector_body,func(statusCode int, responseHeaders http.Header, responseBody []byte) {if statusCode != 200 {log.Errorf("Failed to upload query embedding: %s", responseBody)} else {log.Infof("Successfully uploaded query embedding for key: %s", key)}proxywasm.ResumeHttpRequest()},10000,)if err != nil {log.Errorf("Failed to upload query embedding: %v", err)proxywasm.ResumeHttpRequest()return nil}return nil}//=====================以上是主要逻辑=====================
此外,该逻辑只能在返回值为 types.Action 的函数中使用,例如 onHttpResponseBody 这样的流式处理函数无法以类似方式处理。尽管可以确保请求被发送出去,但由于没有阻塞操作,无法调用回调函数。如果有需要,可以参考 wasm-go/pkg/wrapper/http_wrapper.go,添加信号变量进行修改。
| 欢迎光临 链载Ai (https://www.lianzai.com/) | Powered by Discuz! X3.5 |