ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;display: table;padding: 0px 0.2em;color: rgb(255, 255, 255);background: rgb(15, 76, 129);">本章介绍ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: 14px;color: rgb(63, 63, 63);" class="list-paddingleft-1"> ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: 14px;text-indent: -1em;display: block;margin: 0.2em 8px;color: rgb(63, 63, 63);"> ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: 14px;text-indent: -1em;display: block;margin: 0.2em 8px;color: rgb(63, 63, 63);">• 借助白泽上一期开源的 Eino 编写的基于 Redis 文档向量检索系统,梳理 Eino 框架的各个组件模块,以及交互、编排方式。 ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;display: table;padding: 0px 0.2em;color: rgb(255, 255, 255);background: rgb(15, 76, 129);">Eino 框架生态 ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: 14px;color: rgb(63, 63, 63);"> ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: 14px;margin: 0.1em auto 0.5em;border-radius: 4px;" title="null"/>ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: 14px;color: rgb(63, 63, 63);" class="list-paddingleft-1"> ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: 14px;text-indent: -1em;display: block;margin: 0.2em 8px;color: rgb(63, 63, 63);">•Eino(主代码仓库):包含类型定义、流处理机制、组件抽象、编排功能、切面机制等。 ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: 14px;text-indent: -1em;display: block;margin: 0.2em 8px;color: rgb(63, 63, 63);">•EinoExt:组件实现、回调处理程序实现、组件使用示例,以及各种工具,如评估器、提示优化器等。 •Eino Devops:可视化开发、可视化调试等。 •EinoExamples:是包含示例应用程序和最佳实践的代码仓库。 •Eino 用户手册:快速理解 Eino 中的概念,掌握基于 Eino 开发设计 AI 应用的技能。(Eino 开源不满一年,文档仍在完善) Redis 文档向量检索系统(RAG) 接下来将通过这个案例,介绍一下 Eino 框架的各个组件,以及如何使用组件进行编排构建 Agent,同时带你熟悉一下 Eino 本身的代码结构。
https://github.com/BaiZe1998/go-learning/tree/main/eino_assistant
项目架构图:
系统架构回答生成阶段查询检索阶段索引构建阶段Markdown文件文件加载器文档分割器嵌入模型文档向量Redis向量数据库用户问题嵌入模型查询向量KNN向量搜索TopK相关文档提示构建增强提示大语言模型生成回答检索器\nRetrieverRAG系统生成器\nGenerator参数配置\ntopK等 整个项目包含三个阶段,索引构建、检查索引、回答生成、接下来以索引构建阶段为例 ,介绍一下用上了 Eino 哪些组件,以及组件之间的关系,完整的项目讲解可以看往期的文章。
? 整个过程中我们的项目中会同时引入 Eino库 和 Eino-Ext 库的内容,希望你能体会 Eino 生态将稳定的类型定义、组件抽象、编排逻辑放置在 Eino 主库中,而将可扩展的组件、工具实现拆分到 Eino-Ext 库中的好处。
一、组件初始化 Eino 组件大全 • chatmodel:对接各家大模型的调用接口。 • callbacks:一些工具的 hook 能力的实现。 • chattemplate:提示词工程相关,处理和格式化提示模板的组件。 • indexer:Indexer 为把文本进行索引存储,一般使用Embedding做语义化索引,也可做分词索引等,以便于Retriever中召回使用。 • retriver:Retriever 用于把Indexer构建索引之后的内容进行召回,在 AI 应用中,一般使用Embedding进行语义相似性召回。 索引构建本质上也是一个局部完整的工作流,可以借助编辑器插件 Eino Dev 完成可视化的编辑工作流,在可视化的编辑窗口,编排工作流。
点击 generate 直接生成如下5个文件,然后手动替换内部的业务逻辑。
Eino Dev 插件的使用将在组件讲解篇完成后,单出一期讲解。
接下来我们看一下五个文件的内容,特别是关注 import 的库的来源。
packageknowledgeindexing import( "context" "github.com/cloudwego/eino-ext/components/document/loader/file" "github.com/cloudwego/eino/components/document" ) // newLoader component initialization function of node 'FileLoader' in graph 'KnowledgeIndexing' funcnewLoader(ctx context.Context)(ldr document.Loader, errerror) { // TODO Modify component configuration here. config := &file.FileLoaderConfig{} ldr, err = file.NewFileLoader(ctx, config) iferr !=nil{ returnnil, err } returnldr,nil }document.Loader:
返回值类型是一个接口,定义在 Eino 主库的 components/document 目录下。
typeLoaderinterface{ Load(ctx context.Context, src Source, opts ...LoaderOption) ([]*schema.Document,error) }file.NewFileLoader:
返回一个具体的文件加载的实现,定义在 Eino-Ext 库的 components/document 目录下,是对应关系。
unc NewFileLoader(ctx context.Context, config *FileLoaderConfig) (*FileLoader,error) { ifconfig ==nil{ config = &FileLoaderConfig{} } ifconfig.Parser ==nil{ parser, err := parser.NewExtParser(ctx, &parser.ExtParserConfig{ FallbackParser: parser.TextParser{}, }, ) iferr !=nil{ returnnil, fmt.Errorf("new file parser fail: %w", err) } config.Parser = parser } return&FileLoader{FileLoaderConfig: *config},nil }• transformer.go 创建 markdown 文件分割组件 import( "context" "github.com/cloudwego/eino-ext/components/document/transformer/splitter/markdown" "github.com/cloudwego/eino/components/document" ) // newDocumentTransformer component initialization function of node 'MarkdownSplitter' in graph 'KnowledgeIndexing' funcnewDocumentTransformer(ctx context.Context)(tfr document.Transformer, errerror) { // TODO Modify component configuration here. config := &markdown.HeaderConfig{ Headers:map[string]string{ "#":"title", }, TrimHeaders:false} tfr, err = markdown.NewHeaderSplitter(ctx, config) iferr !=nil{ returnnil, err } returntfr,nil }document.Transformer:
返回值类型是一个接口,定义在 Eino 主库的 components/document 目录下,定义文档的过滤和分割。
// Transformer is to convert documents, such as split or filter. typeTransformerinterface{ Transform(ctx context.Context, src []*schema.Document, opts ...TransformerOption) ([]*schema.Document,error) }markdown.NewHeaderSplitter:
创建一个基于 # 标签进行分割的 markdown 组件,定义在 Eino-Ext 扩展库的 components/document/transformer/splitter/markdown 目录下。
funcNewHeaderSplitter(ctx context.Context, config *HeaderConfig)(document.Transformer,error) { iflen(config.Headers) ==0{ returnnil, fmt.Errorf("no headers specified") } fork :=rangeconfig.Headers { for_, c :=rangek { ifc !='#'{ returnnil, fmt.Errorf("header can only consist of '#': %s", k) } } } return&headerSplitter{ headers: config.Headers, trimHeaders: config.TrimHeaders, },nil }到这一步你应该有了大致的感受,Eino 和 Eino-Ext 是相辅相成的。
看一下 Eino 库的组件目录结构。
看一下 Eino-Ext 的组件目录结构。
文档向量化,需要在初始化的时候,指定一个向量化的模型,用于将文档数据向量化之后,存入 Redis 向量索引中(也可以使用其他向量数据库),这里使用了字节的 doubao-embedding-large-text-240915 模型。
packageknowledgeindexing import( "context" "os" "github.com/cloudwego/eino-ext/components/embedding/ark" "github.com/cloudwego/eino/components/embedding" ) funcnewEmbedding(ctx context.Context)(eb embedding.Embedder, errerror) { // TODO Modify component configuration here. config := &ark.EmbeddingConfig{ BaseURL:"https://ark.cn-beijing.volces.com/api/v3", APIKey: os.Getenv("ARK_API_KEY"), Model: os.Getenv("ARK_EMBEDDING_MODEL"), } eb, err = ark.NewEmbedder(ctx, config) iferr !=nil{ returnnil, err } returneb,nil }• indexer.go(这一步需要你本地通过启动一个 redis) Redis向量索引(通过RediSearch模块实现)是一种高性能的向量数据库功能,它允许:
2. 语义搜索: 基于向量相似度进行搜索(而非简单的关键词匹配) 3. KNN查询: 使用K-Nearest Neighbors算法找到最接近的向量 Redis向量索引的核心概念:
1. 哈希结构: 使用Redis Hash存储文档内容、元数据和向量 2. 向量字段: 特殊字段类型,支持高效的向量操作 3. 相似度计算: 支持多种距离度量方式(如余弦相似度、欧氏距离) import( "context" "encoding/json" "fmt" "log" "os" "github.com/cloudwego/eino-ext/components/indexer/redis" "github.com/cloudwego/eino/components/indexer" "github.com/cloudwego/eino/schema" "github.com/google/uuid" redisCli"github.com/redis/go-redis/v9" redispkg"eino_assistant/pkg/redis" ) funcinit(){ // 初始化索引 err := redispkg.Init() iferr !=nil{ log.Fatalf("failed to init redis index: %v", err) } } // newIndexer component initialization function of node 'RedisIndexer' in graph 'KnowledgeIndexing' funcnewIndexer(ctx context.Context)(idr indexer.Indexer, errerror) { // TODO Modify component configuration here. redisAddr := os.Getenv("REDIS_ADDR") redisClient := redisCli.NewClient(&redisCli.Options{ Addr: redisAddr, Protocol:2, }) // 文档向量转换配置 config := &redis.IndexerConfig{ Client: redisClient, KeyPrefix: redispkg.RedisPrefix, BatchSize:1, // 文档到 hash 的逻辑转换 DocumentToHashes:func(ctx context.Context, doc *schema.Document)(*redis.Hashes,error) { ifdoc.ID ==""{ doc.ID = uuid.New().String() } key := doc.ID metadataBytes, err := json.Marshal(doc.MetaData) iferr !=nil{ returnnil, fmt.Errorf("failed to marshal metadata: %w", err) } return&redis.Hashes{ Key: key, Field2Value:map[string]redis.FieldValue{ redispkg.ContentField: {Value: doc.Content, EmbedKey: redispkg.VectorField}, redispkg.MetadataField: {Value: metadataBytes}, }, },nil }, } // 配置 doubao 嵌入模型(文档向量化) embeddingIns11, err := newEmbedding(ctx) iferr !=nil{ returnnil, err } config.Embedding = embeddingIns11 idr, err = redis.NewIndexer(ctx, config) iferr !=nil{ returnnil, err } returnidr,nil }二、组件编排 orchestration.go
文档索引构建阶段,上文的代码文件连同 orchestration.go 都是通过插件生成的,编排完 ui 工作流,就会为你生成组件之间的流式代码。
import( "context" "github.com/cloudwego/eino/components/document" "github.com/cloudwego/eino/compose" ) funcBuildKnowledgeIndexing(ctx context.Context)(r compose.Runnable[document.Source, []string], errerror) { const( FileLoader ="FileLoader" MarkdownSplitter ="MarkdownSplitter" RedisIndexer ="RedisIndexer" ) g := compose.NewGraph[document.Source, []string]() fileLoaderKeyOfLoader, err := newLoader(ctx) iferr !=nil{ returnnil, err } _ = g.AddLoaderNode(FileLoader, fileLoaderKeyOfLoader) markdownSplitterKeyOfDocumentTransformer, err := newDocumentTransformer(ctx) iferr !=nil{ returnnil, err } _ = g.AddDocumentTransformerNode(MarkdownSplitter, markdownSplitterKeyOfDocumentTransformer) redisIndexerKeyOfIndexer, err := newIndexer(ctx) iferr !=nil{ returnnil, err } // 编排的核心:通过点和边的概念,顺序处理数据 _ = g.AddIndexerNode(RedisIndexer, redisIndexerKeyOfIndexer) _ = g.AddEdge(compose.START, FileLoader) _ = g.AddEdge(RedisIndexer, compose.END) _ = g.AddEdge(FileLoader, MarkdownSplitter) _ = g.AddEdge(MarkdownSplitter, RedisIndexer) r, err = g.Compile(ctx, compose.WithGraphName("KnowledgeIndexing"), compose.WithNodeTriggerMode(compose.AllPredecessor)) iferr !=nil{ returnnil, err } returnr, err }? 通过 import 的库可以看到,编排的流程抽象和数据传输类型,都是定义在 Eino 主库当中的,这里使用了范型来动态定义输入和输出类型,此外 Eino 允许上下游之间通过流式或者非流失的形式交换数据,这都是框架的能力。
// Runnable is the interface for an executable object. Graph, Chain can be compiled into Runnable. // runnable is the core conception of eino, we do downgrade compatibility for four data flow patterns, // and can automatically connect components that only implement one or more methods. // eg, if a component only implements Stream() method, you can still call Invoke() to convert stream output to invoke output. typeRunnable[I, O any]interface{ Invoke(ctx context.Context, input I, opts ...Option) (output O, errerror) Stream(ctx context.Context, input I, opts ...Option) (output *schema.StreamReader[O], errerror) Collect(ctx context.Context, input *schema.StreamReader[I], opts ...Option) (output O, errerror) Transform(ctx context.Context, input *schema.StreamReader[I], opts ...Option) (output *schema.StreamReader[O], errerror) }Eino 提供了两组用于编排的 API:
我们来创建一个简单的 chain: 一个模版(ChatTemplate)接一个大模型(ChatModel)。
chain, _ := NewChain[map[string]any, *Message](). AppendChatTemplate(prompt). AppendChatModel(model). Compile(ctx) chain.Invoke(ctx,map[string]any{"query":"what's your name?"})现在,我们来创建一个 Graph,先用一个 ChatModel 生成回复或者 Tool 调用指令,如生成了 Tool 调用指令,就用一个 ToolsNode 执行这些 Tool。
graph := NewGraph[map[string]any, *schema.Message]() _ = graph.AddChatTemplateNode("node_template", chatTpl) _ = graph.AddChatModelNode("node_model", chatModel) _ = graph.AddToolsNode("node_tools", toolsNode) _ = graph.AddLambdaNode("node_converter", takeOne) _ = graph.AddEdge(START,"node_template") _ = graph.AddEdge("node_template","node_model") _ = graph.AddBranch("node_model", branch) _ = graph.AddEdge("node_tools","node_converter") _ = graph.AddEdge("node_converter", END) compiledGraph, err := graph.Compile(ctx) iferr !=nil{ returnerr } out, err := r.Invoke(ctx,map[string]any{"query":"Beijing's weather this weekend"}