返回顶部
热门问答 更多热门问答
技术文章 更多技术文章

基于Daft实现百万级文本Embedding

[复制链接]
链载Ai 显示全部楼层 发表于 4 小时前 |阅读模式 打印 上一主题 下一主题

    ingFang SC", system-ui, -apple-system, "system-ui", "Helvetica Neue", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: 17px;font-style: normal;font-variant-ligatures: normal;font-variant-caps: normal;font-weight: 400;letter-spacing: 0.544px;orphans: 2;text-align: justify;text-indent: 0px;text-transform: none;widows: 2;word-spacing: 0px;-webkit-text-stroke-width: 0px;white-space: normal;text-decoration-thickness: initial;text-decoration-style: initial;text-decoration-color: initial;list-style-type: disc;visibility: visible;" class="list-paddingleft-1">

在计算密集型任务中,GPU作为核心算力资源,其利用率的高低对任务效率与资源成本有着直接影响。因此,工程师们在优化性能时,其中一个重要目标便是让GPU资源尽可能 “跑满”,实现近乎100%的利用率。尤其在大规模数据处理场景下,如何避免GPU空转、充分发挥其并行计算能力,往往成为优化的关键所在。

近期,我们在运用Qwen3-Embedding-0.6B模型对百万级文本数据进行向量化操作时,达成了这一目标:整个任务运行过程中,GPU始终保持接近 100% 的使用率。这不仅显著缩短了数据处理耗时,也让我们对GPU资源的极致利用有了更具体的实践经验。接下来,我们将通过一个样例,分享一些心得体会。

另外,在撰写本文之际,我们还发现了另一种方法,能够将数据处理速度提升3倍,且该优化方法并非通过压榨剩余GPU资源实现。不过,这并非本文重点,我们将在后续分享这一内容。今天的目标主要还是借助一个Daft任务样例,分享实现GPU接近100%利用率的方法。


Daft任务描述


在Daft的日常开发过程中,我们留意到很多用户存在这样的需求:“期望借助一款高效、便捷且使用门槛低的工具,将海量文档数据存入向量数据库,以支撑下游业务的检索需求”。针对此业务场景,我们基于Daft数据处理框架设计了一个 作业样例,用以展示Daft如何快速高效地解决这类业务问题。整个样例涵盖以下步骤:
  • 从S3读取百万级的文档数据。

  • 运用spaCy将每个文档切分为多个语句。

  • 采用开源最优的embedding模型Qwen3-Embedding-0.6B为每一条语句生成对应的 Embedding向量。

  • 将结果写入TurboPuffer向量数据库,以备后续业务检索。



准备工作


在开展 Daft任务开发前,需创建一个虚拟环境,例如virtualenvuvconda等。以下以uv为例:

uvvenv--seed--python3.11source.venv/bin/activate

随后,需安装以下依赖包

pipinstall"daft[ray]"turbopuffertorchsentence-transformersspacyacceleratetransformerspython-mspacydownloaden_core_web_sm


导入依赖并配置常量


接着开始编写作业代码,首先引入必要的依赖,同时设置一些工作负载参数。用户可依据集群规模、数据大小、GPU 可用资源对上述参数进行调整,以提升处理效率。例如,使用en_core_web_sm这个spaCy 模型实现文档切分,使用intfloat/multilingual-e5-large-instruct模型生成Embedding向量,当然也可将这些模型替换为其他开源模型或企业级模型。
importtorchimportdaftfromdaftimportcolNUM_GPU_NODES=8#GPUnodesinyourclusterNLP_MODEL_NAME="en_core_web_sm"#spaCymodelforsentencedetectionCHUNKING_PARALLELISM=8#ParallelchunkingprocessespernodeEMBEDDING_MODEL_NAME="Qwen/Qwen3-Embedding-0.6B"#TextembeddingmodelENCODING_DIM=1024#EmbeddingdimensionsBATCH_SIZE=512#RecordsperembeddingbatchSENTENCE_TRANSFORMER_BATCH_SIZE=16#GPUbatchsizeforembeddings


步骤 1:文本切块


在为文本数据生成embedding向量之前,通常需要先对大文本进行切分,再作处理。在我们的业务场景中,文本数据主要源自文档,文档一般具有层级结构,如文档→章节→段落→句子→单词→字符,分别有不同的切分策略,常见的切分策略包括:
  • 语句级分块:适用于多数场景,尤其是文档结构不清晰时。

  • 段落级分块:适用于RAG(检索增强生成)应用。

  • 章节级分块:适用于结构清晰的长文档。

  • 固定长度分块:此方式实现最为简便,灵活性高,但可能破坏语义边界,适用于自定义逻辑切分场景。

我们可依据具体的业务场景,按照不同粒度或策略进行切分,例如:

  • 当文档结构未知,或文档内容类型多样时,优先采用语句级分块

  • 当计划构建RAG系统,需要跨语句上下文信息时,优先采用段落级分块

  • 当文本数据主要来自推文、短信、代码等非标准结构内容时,可考虑自定义分割

在此样例中,我们采用语句级分块,借助自然语言处理库spaCy,相较于其他简单的基于标点分割的方法,它提供了更为健壮的语句边界检测能力,能够更好地处理边缘问题。

# Define the return type for chunked text# Here we'll keep both the chunked text and the chunk ID which# we'll later use for creating IDs for the sentenceschunked_type = daft.DataType.list(  daft.DataType.struct({   "text": daft.DataType.string(),   "chunk_id": daft.DataType.int32()  }))@daft.udf(  return_dtype=chunked_type,  concurrency=NUM_GPU_NODES * CHUNKING_PARALLELISM,  batch_size=BATCH_SIZE // CHUNKING_PARALLELISM //2)classChunkingUDF: def__init__(self):   importspacy    self.nlp = spacy.load(NLP_MODEL_NAME)
def__call__(self, text_col): results = [] fortextintext_col: doc = self.nlp(text) sentence_texts = [ {"text": sentence.text,"chunk_id": i} fori, sentenceinenumerate(doc.sents) ] results.append(sentence_texts) returnresults

这里我们使用Daft提供的Class UDFs实现一个自定义ChunkingUDF,来实现文本切分的逻辑,具体内容如下:

  • ChunkingUDF实例初始化时,加载spaCy模型,每UDF实例仅加载一次模型。

  • 通过批量处理的方式,处理文档数据(text_col),减少调用开销。

  • 返回结果包含切分后的文本列表,列表中每个元素包含文本内容和对应的chunk ID。

  • 整个ChunkingUDF支持分布式运行,该示例中会启动64个(NUM_GPU_NODES * CHUNKING_PARALLELISM)实例并行的处理数据。


步骤 2:GPU加速Embedding向量生成


Embedding向量的质量与模型选择紧密相关。在选择具体模型时,可考虑以下因素:
  • MTEBLeaderboard:可参考 MTEB排行榜,查看不同模型在不同任务上的排名。

  • 特定任务类型性能:不同模型擅长的任务类型各异,如语义搜索、聚类等。可根据具体业务场景确定任务类型,进而选择该类型中的最优模型。

  • 多语言支持:需考量业务场景是否需处理多语言文本。若仅需单语言,应更关注该语言对应的专项排行榜,而非多语言排行榜。

以下为一些主流模型及其特点:

  • Qwen3-Embedding-0.6B:整体性价比高,在该级别大小的开源模型中性能最优,也是本示例所使用的模型。

  • all-MiniLM-L6-v2:Sentence Transformer的文档默认使用的模型,通常用于教程。

  • gemini-embedding-001:目前MTEB上的顶级多语言模型,需要Gemini API访问。

  • Seed1.6-Embedding:目前中国MTEB排行榜的顶级模型,需要Volcengine API访问。

确认使用的模型后,可通过修改EMBEDDING_MODEL_NAME变量的值轻松替换模型。

接下来,实现Embedding的逻辑:

#Definethereturntypeforembeddingsembedding_type=daft.DataType.embedding(daft.DataType.float32(),ENCODING_DIM)@daft.udf(return_dtype=embedding_type,concurrency=NUM_GPU_NODES,num_gpus=1,batch_size=BATCH_SIZE)classEncodingUDF:def__init__(self):fromsentence_transformersimportSentenceTransformerdevice='cuda'iftorch.cuda.is_available()else'cpu'self.model=SentenceTransformer(EMBEDDING_MODEL_NAME,device=device)self.model.compile()def__call__(self,text_col):embeddings=self.model.encode(text_col.to_pylist(),batch_size=SENTENCE_TRANSFORMER_BATCH_SIZE,convert_to_tensor=True,torch_dtype=torch.bfloat16,)returnembeddings.cpu().numpy()

ChunkingUDF类似,我们实现一个自定义UDF(EncodingUDF)来执行Embedding操作:

  • EncodingUDF实例初始化时,判断是否存在GPU资源,若GPU资源可用,则将模型加载到 GPU 的内存中。

  • 在生成文本Embedding向量时,使用bfloat16精度降低内存占用,同时通过批量处理文本的方式(SENTENCE_TRANSFORMER_BATCH_SIZE=128)优化GPU利用率。

  • 最终将生成的Embedding向量以Numpy数组的形式返回。


步骤 3:配置分布式处理


完成步骤 2 后,整个Embedding的业务处理逻辑就完成了。此时,可先在本地环境基于小批量数据进行测试。当处理大规模数据时,通常需将任务提交到分布式集群上执行,详细内容可参考 扩展指南。

在本示例中,我们在8个g5.2xlarge(每节点含 A10G GPU)的Ray集群上运行,配置如下:

#ConfigureDafttouseRaytoscheduleworkondifferentworkernodesdaft.context.set_runner_ray()#ConfigureS3accessforreadingdatadaft.set_planning_config(default_io_config=daft.io.IOConfig(s3=daft.io.S3Config.from_env()))


步骤 4:执行完整流程

以下展示整个端到端的流程:

(daft.read_parquet("s3://desmond-demo/text-embedding-dataset.parquet").with_column("sentences",ChunkingUDF(col("text"))).explode("sentences").with_column("text",col("sentences")["text"]).with_column("chunk_id",col("sentences")["chunk_id"]).exclude("sentences").with_column("embedding",EncodingUDF(col("text"))).with_column("id",col("url").str.right(50)+"-"+col("chunk_id").cast(daft.DataType.string())).select("id","url","language","source","text","embedding").write_turbopuffer(namespace="desmond-scale-experiment6",region="aws-us-west-2",id_column="id",vector_column="embedding",distance_metric="cosine_distance"))

流程解析:

  • 读取数据:首先从S3高效地加载Parquet文件,读取文档内容。

  • 文本分块:使用ChunkingUDFtext列的文本数据进行语句切分,新增sentences列存储切分后的数据。

  • 展开列表sentences列每个元素的类型是list,通过explode将list展开后,sentences列中每个元素类型变为Struct,其中包含具体的文本内容和chunk ID。

  • 提取字段:从sentences列中提取文本和chunk ID,分别用textchunk_id存储对应的内容,并从dataframe中移除sentences列。

  • 生成嵌入:使用EncodingUDF为每个切分后的语句生成Embedding向量,将数据存储在embedding列中。

  • 创建唯一ID:通过组合 url和chunk ID为每一个Embedding向量生成唯一标识符。

  • 筛选列:筛选出必要字段。

  • 写入 TurboPuffer:将生成的Embedding数据及关联数据写入向量数据库中。

当在集群上运行该脚本后,应能看到整个集群的网络 I/O、CPU/GPU资源被任务充分利用,特别是GPU负载持续保持在高水位,同时我们可以调整批处理大小,并发参数不断优化GPU资源使用率。


自定义处理逻辑


我们可依据具体业务场景对本示例内容进行调整,包括:
  • 调整批大小:可以增大SENTENCE_TRANSFORMER_BATCH_SIZE提升吞吐量,同时提升GPU内存使用率。

  • 扩展节点:根据集群规模调整NUM_GPU_NODESCHUNKING_PARALLELISM

  • 更换模型:可以修改EMBEDDING_MODEL_NAME的值切换Sentence Transformers模型。

  • 自定义分块:可以修改ChunkingUDF实现其他分块策略。

  • 替换向量库:除了将生成的Embedding向量写到TurboPuffer之外,Daft还原生支持将Embedding向量写入其他向量数据库,比如LanceDB。


性能优化实践


除自定义处理逻辑外,还可对相关参数进行精细化调优,以实现高效处理百万级文本的目标。以下为几点实践建议:
  • GPU内存:在任务运行过程中,最好结合监控系统,对GPU内存使用情况进行监控。若分配失败或超过模型最大序列长度,应减小SENTENCE_TRANSFORMER_BATCH_SIZE

  • 模型加载:UDF在初始化时会加载一次模型,首次加载时,往往需从远程下载模型。在带宽受限的场景下,模型加载操作可能耗时较长。此时,可考虑提前将模型缓存到本地。在容器化场景下,可提前下载模型并挂载到容器中。

  • 模型量化:针对精度要求不高的场景,可使用bfloat16/float16精度类型进行量化,降低内存占用,提升吞吐量。


下一代计划

通过当前示例,我们已将GPU利用率提升至接近100%的水平,这是众多工程师所追求的 “圣杯”。但Daft的探索脚步不会停歇。我们正在尝试新方法,通过自定义GPU流水线,以及用vLLM替代Sentence Transformers,使整体处理速度提升3倍。我们正在逐步完善这一方案,并将在后续博客继续分享如何突破 “峰值利用率” 的限制,大幅提升吞吐量。

回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

链载AI是专业的生成式人工智能教程平台。提供Stable Diffusion、Midjourney AI绘画教程,Suno AI音乐生成指南,以及Runway、Pika等AI视频制作与动画生成实战案例。从提示词编写到参数调整,手把手助您从入门到精通。
  • 官方手机版

  • 微信公众号

  • 商务合作

  • Powered by Discuz! X3.5 | Copyright © 2025-2025. | 链载Ai
  • 桂ICP备2024021734号 | 营业执照 | |广西笔趣文化传媒有限公司|| QQ