在计算密集型任务中,GPU作为核心算力资源,其利用率的高低对任务效率与资源成本有着直接影响。因此,工程师们在优化性能时,其中一个重要目标便是让GPU资源尽可能 “跑满”,实现近乎100%的利用率。尤其在大规模数据处理场景下,如何避免GPU空转、充分发挥其并行计算能力,往往成为优化的关键所在。
近期,我们在运用Qwen3-Embedding-0.6B模型对百万级文本数据进行向量化操作时,达成了这一目标:整个任务运行过程中,GPU始终保持接近 100% 的使用率。这不仅显著缩短了数据处理耗时,也让我们对GPU资源的极致利用有了更具体的实践经验。接下来,我们将通过一个样例,分享一些心得体会。
另外,在撰写本文之际,我们还发现了另一种方法,能够将数据处理速度提升3倍,且该优化方法并非通过压榨剩余GPU资源实现。不过,这并非本文重点,我们将在后续分享这一内容。今天的目标主要还是借助一个Daft任务样例,分享实现GPU接近100%利用率的方法。
从S3读取百万级的文档数据。
运用spaCy将每个文档切分为多个语句。
采用开源最优的embedding模型Qwen3-Embedding-0.6B为每一条语句生成对应的 Embedding向量。
将结果写入TurboPuffer向量数据库,以备后续业务检索。
在开展 Daft任务开发前,需创建一个虚拟环境,例如virtualenv、uv、conda等。以下以uv为例:
uvvenv--seed--python3.11source.venv/bin/activate
随后,需安装以下依赖包
pipinstall"daft[ray]"turbopuffertorchsentence-transformersspacyacceleratetransformerspython-mspacydownloaden_core_web_sm
en_core_web_sm这个spaCy 模型实现文档切分,使用intfloat/multilingual-e5-large-instruct模型生成Embedding向量,当然也可将这些模型替换为其他开源模型或企业级模型。importtorchimportdaftfromdaftimportcolNUM_GPU_NODES=8#GPUnodesinyourclusterNLP_MODEL_NAME="en_core_web_sm"#spaCymodelforsentencedetectionCHUNKING_PARALLELISM=8#ParallelchunkingprocessespernodeEMBEDDING_MODEL_NAME="Qwen/Qwen3-Embedding-0.6B"#TextembeddingmodelENCODING_DIM=1024#EmbeddingdimensionsBATCH_SIZE=512#RecordsperembeddingbatchSENTENCE_TRANSFORMER_BATCH_SIZE=16#GPUbatchsizeforembeddings
语句级分块:适用于多数场景,尤其是文档结构不清晰时。
段落级分块:适用于RAG(检索增强生成)应用。
章节级分块:适用于结构清晰的长文档。
固定长度分块:此方式实现最为简便,灵活性高,但可能破坏语义边界,适用于自定义逻辑切分场景。
我们可依据具体的业务场景,按照不同粒度或策略进行切分,例如:
当文档结构未知,或文档内容类型多样时,优先采用语句级分块。
当计划构建RAG系统,需要跨语句上下文信息时,优先采用段落级分块。
当文本数据主要来自推文、短信、代码等非标准结构内容时,可考虑自定义分割。
在此样例中,我们采用语句级分块,借助自然语言处理库spaCy,相较于其他简单的基于标点分割的方法,它提供了更为健壮的语句边界检测能力,能够更好地处理边缘问题。
# Define the return type for chunked text# Here we'll keep both the chunked text and the chunk ID which# we'll later use for creating IDs for the sentenceschunked_type = daft.DataType.list(daft.DataType.struct({"text": daft.DataType.string(),"chunk_id": daft.DataType.int32()}))@daft.udf(return_dtype=chunked_type,concurrency=NUM_GPU_NODES * CHUNKING_PARALLELISM,batch_size=BATCH_SIZE // CHUNKING_PARALLELISM //2)classChunkingUDF:def__init__(self):importspacyself.nlp = spacy.load(NLP_MODEL_NAME)def__call__(self, text_col):results = []fortextintext_col:doc = self.nlp(text)sentence_texts = [{"text": sentence.text,"chunk_id": i}fori, sentenceinenumerate(doc.sents)]results.append(sentence_texts)returnresults
这里我们使用Daft提供的Class UDFs实现一个自定义ChunkingUDF,来实现文本切分的逻辑,具体内容如下:
ChunkingUDF实例初始化时,加载spaCy模型,每UDF实例仅加载一次模型。
通过批量处理的方式,处理文档数据(text_col),减少调用开销。
返回结果包含切分后的文本列表,列表中每个元素包含文本内容和对应的chunk ID。
整个ChunkingUDF支持分布式运行,该示例中会启动64个(NUM_GPU_NODES * CHUNKING_PARALLELISM)实例并行的处理数据。
MTEBLeaderboard:可参考 MTEB排行榜,查看不同模型在不同任务上的排名。
特定任务类型性能:不同模型擅长的任务类型各异,如语义搜索、聚类等。可根据具体业务场景确定任务类型,进而选择该类型中的最优模型。
多语言支持:需考量业务场景是否需处理多语言文本。若仅需单语言,应更关注该语言对应的专项排行榜,而非多语言排行榜。
以下为一些主流模型及其特点:
Qwen3-Embedding-0.6B:整体性价比高,在该级别大小的开源模型中性能最优,也是本示例所使用的模型。
all-MiniLM-L6-v2:Sentence Transformer的文档默认使用的模型,通常用于教程。
gemini-embedding-001:目前MTEB上的顶级多语言模型,需要Gemini API访问。
Seed1.6-Embedding:目前中国MTEB排行榜的顶级模型,需要Volcengine API访问。
确认使用的模型后,可通过修改EMBEDDING_MODEL_NAME变量的值轻松替换模型。
接下来,实现Embedding的逻辑:
#Definethereturntypeforembeddingsembedding_type=daft.DataType.embedding(daft.DataType.float32(),ENCODING_DIM)@daft.udf(return_dtype=embedding_type,concurrency=NUM_GPU_NODES,num_gpus=1,batch_size=BATCH_SIZE)classEncodingUDF:def__init__(self):fromsentence_transformersimportSentenceTransformerdevice='cuda'iftorch.cuda.is_available()else'cpu'self.model=SentenceTransformer(EMBEDDING_MODEL_NAME,device=device)self.model.compile()def__call__(self,text_col):embeddings=self.model.encode(text_col.to_pylist(),batch_size=SENTENCE_TRANSFORMER_BATCH_SIZE,convert_to_tensor=True,torch_dtype=torch.bfloat16,)returnembeddings.cpu().numpy()
与ChunkingUDF类似,我们实现一个自定义UDF(EncodingUDF)来执行Embedding操作:
EncodingUDF实例初始化时,判断是否存在GPU资源,若GPU资源可用,则将模型加载到 GPU 的内存中。
在生成文本Embedding向量时,使用bfloat16精度降低内存占用,同时通过批量处理文本的方式(SENTENCE_TRANSFORMER_BATCH_SIZE=128)优化GPU利用率。
最终将生成的Embedding向量以Numpy数组的形式返回。
在本示例中,我们在8个g5.2xlarge(每节点含 A10G GPU)的Ray集群上运行,配置如下:
#ConfigureDafttouseRaytoscheduleworkondifferentworkernodesdaft.context.set_runner_ray()#ConfigureS3accessforreadingdatadaft.set_planning_config(default_io_config=daft.io.IOConfig(s3=daft.io.S3Config.from_env()))
步骤 4:执行完整流程
以下展示整个端到端的流程:
(daft.read_parquet("s3://desmond-demo/text-embedding-dataset.parquet").with_column("sentences",ChunkingUDF(col("text"))).explode("sentences").with_column("text",col("sentences")["text"]).with_column("chunk_id",col("sentences")["chunk_id"]).exclude("sentences").with_column("embedding",EncodingUDF(col("text"))).with_column("id",col("url").str.right(50)+"-"+col("chunk_id").cast(daft.DataType.string())).select("id","url","language","source","text","embedding").write_turbopuffer(namespace="desmond-scale-experiment6",region="aws-us-west-2",id_column="id",vector_column="embedding",distance_metric="cosine_distance"))流程解析:
读取数据:首先从S3高效地加载Parquet文件,读取文档内容。
文本分块:使用ChunkingUDF对text列的文本数据进行语句切分,新增sentences列存储切分后的数据。
展开列表:sentences列每个元素的类型是list,通过explode将list展开后,sentences列中每个元素类型变为Struct,其中包含具体的文本内容和chunk ID。
提取字段:从sentences列中提取文本和chunk ID,分别用text和chunk_id存储对应的内容,并从dataframe中移除sentences列。
生成嵌入:使用EncodingUDF为每个切分后的语句生成Embedding向量,将数据存储在embedding列中。
创建唯一ID:通过组合 url和chunk ID为每一个Embedding向量生成唯一标识符。
筛选列:筛选出必要字段。
写入 TurboPuffer:将生成的Embedding数据及关联数据写入向量数据库中。
当在集群上运行该脚本后,应能看到整个集群的网络 I/O、CPU/GPU资源被任务充分利用,特别是GPU负载持续保持在高水位,同时我们可以调整批处理大小,并发参数不断优化GPU资源使用率。
调整批大小:可以增大SENTENCE_TRANSFORMER_BATCH_SIZE提升吞吐量,同时提升GPU内存使用率。
扩展节点:根据集群规模调整NUM_GPU_NODES和CHUNKING_PARALLELISM。
更换模型:可以修改EMBEDDING_MODEL_NAME的值切换Sentence Transformers模型。
自定义分块:可以修改ChunkingUDF实现其他分块策略。
替换向量库:除了将生成的Embedding向量写到TurboPuffer之外,Daft还原生支持将Embedding向量写入其他向量数据库,比如LanceDB。
GPU内存:在任务运行过程中,最好结合监控系统,对GPU内存使用情况进行监控。若分配失败或超过模型最大序列长度,应减小SENTENCE_TRANSFORMER_BATCH_SIZE。
模型加载:UDF在初始化时会加载一次模型,首次加载时,往往需从远程下载模型。在带宽受限的场景下,模型加载操作可能耗时较长。此时,可考虑提前将模型缓存到本地。在容器化场景下,可提前下载模型并挂载到容器中。
模型量化:针对精度要求不高的场景,可使用bfloat16/float16精度类型进行量化,降低内存占用,提升吞吐量。
下一代计划
通过当前示例,我们已将GPU利用率提升至接近100%的水平,这是众多工程师所追求的 “圣杯”。但Daft的探索脚步不会停歇。我们正在尝试新方法,通过自定义GPU流水线,以及用vLLM替代Sentence Transformers,使整体处理速度提升3倍。我们正在逐步完善这一方案,并将在后续博客继续分享如何突破 “峰值利用率” 的限制,大幅提升吞吐量。
| 欢迎光临 链载Ai (https://www.lianzai.com/) | Powered by Discuz! X3.5 |