|
没有AI ready数据,AI就是个笑话。 那到底什么是AI ready数据?两层含义。 第一类是用于大模型训练的高质量语料数据。 第二类是喂给大模型推理的Context上下文数据。 目前AI在企业落地卡点主要是在第二类数据。市面上的RAG和KAG效果都不太理想。 第二类数据更重要,是复杂分散海量动态变化的,是企业私域的,是带语义语境的,直接决定了大模型推理结果是否靠谱。 Palantir直接用pipeline builder+本体OAG+Logic解决了第二类数据问题。 因此Pipeline builder是创建AI ready数据的核心关键之一。 Palantir Pipeline Builder提供了一个算子库,是一个涵盖了从基础算术到复杂地理空间分析的庞大武库。数百种算子,任由发挥,处理数据,生产AI ready数据,灌给本体。 特别是有了Use LLM大模型算子之后,能力更加爆表。
对于业务分析师,Pipeline Builder提供了LeftJoin,Pivot,if/else等直观的工具,使其能完成以前必须依赖IT部门才能做的数据清洗。 对于数据工程师,Pipeline Builder提供了WindowFunctions,Explode,Regex,Struct等高级操作,使其能在不写一行Scala代码的情况下构建高性能的Spark任务。 有了Pipeline Builder可以通过无代码可视化的方式提供等同于编写Spark代码的强大数据处理能力。 Pipeline Builder的后端基于Apache Spark,因此它几乎支持SparkSQL的所有函数,加上Palantir自定义的处理逻辑。 Pipeline Builder的算子主要分为表级变换(Table Transforms)和列表达式(Column Expressions)两大类。总数超过百种。 Pipeline Builder打破了传统数据工程中业务分析师与数据开发工程师之间的壁垒。用户不再需要精通Python或Scala,只需通过拖拉拽和配置表达式,即可构建出生产级的、基于Spark的大数据处理管道。 Pipeline Builder之所以强大,核心在于其内置的庞大算子库(Library of Operators)。这些算子不仅覆盖了标准SQL的所有功能,还集成了复杂的数组处理、地理空间计算甚至机器学习预处理功能。
一、架构基础与算子概览
Pipeline Builder中算子存在的两种形态。
一是变换节点(Transforms):这是宏观层面的算子。它们改变数据的形状(Shape)或结构。例如:将两张表合并、过滤行、透视表等。
二是表达式(Expressions):这是微观层面的算子。它们存在于变换节点内部,用于计算新列的值。例如:字符串截取、日期加减、正则匹配等。
Pipeline Builder的后端引擎是Apache Spark。因此每一个算子,最终都会被编译成Spark Catalyst优化器可执行的逻辑计划。这意味着,在界面上点的每一个算子,其性能理论上等同于手写的高质量Spark代码。
二、表级变换算子(Table Transforms)
表级算子决定了数据流的走向和结构。目前Pipeline Builder提供了约50+种核心表级变换,涵盖了数据工程的方方面面。
在实际构建 Pipeline 时,90% 的时间只会用到以下 7 个 Transform: - Add Column (写表达式处理逻辑) - Filter (清洗数据) - Join (拼表) - Group By (统计指标) - Window (计算复杂逻辑如 YoY, MoM) - Union (追加数据) - Pivot/Unpivot (调整报表结构)
下面标绿色背景的是我们会常用的一些算子。
数据合并与关联 1.Join(关联):支持Left,Right,Inner,FullOuter,CrossJoin。Pipeline Builder提供了一个可视化的连线视图,允许用户在不同列名之间建立键值关系。将不同来源的数据(如CRM数据和ERP数据)根据ID拼接到一起。 2. Cross Join: 笛卡尔积(每一行与另一表的每一行匹配)。通常用于生成日期序列或测试数据。 3.Union(并集):支持UnionAll(保留重复)和UnionDistinct(去重)。将结构相同的多张表(例如“1月销售”和“2月销售”)上下堆叠。 4. Intersect: 用于集合运算的交集。保留两个数据集中共有的行。 5. Lookup: 类似于 Excel VLOOKUP 的简化版 Join。通常用于将“代码”替换为“名称”(例如将 country_code 替换为 country_name)。
形状与结构变换 (Shape & Structure) 改变数据的物理结构(行数或列结构)。 6.Group By: 分组聚合。这是使用频率最高的算子之一。通常配合 Sum, Count, Max, Avg, Collect List 等聚合逻辑。 7.Pivot: 透视(行转列)。示例:将“月份”列的值(1月、2月...)变为独立的列头。将“日期、产品、销量”三列数据,转换为以“日期”为行,“产品”为列的宽表。 8.Unpivot: 逆透视(列转行)。是处理Excel类宽表的利器,将非结构化的宽表压回标准的数据库长表格式。示例:将“1月销量”、“2月销量”多列压缩为“月份”和“销量”两列。 9.Explode(数组展开):针对数组(Array)类型的算子。功能是将一行中的数组列拆分成多行。例如,一行订单包含多个商品ID,Explode后会变成多行,每行一个商品ID。对JSON数据处理特别有用,比如在PDF文档结构化变换时总用到这个算子。 10.Window: 窗口函数。不改变行数的情况下计算排名、移动平均、累计求和等。包含Rank, Dense Rank, Row Number, Lead, Lag, Running Total.这是高级数据开发的分水岭。Pipeline Builder允许用户定义Partition By和Order By,然后应用Rank,RowNumber,Lead,Lag等算子。主要场景包括计算每个用户按时间排序的“下一次购买时间”,或计算“过去7天的移动平均值”。
行级操作 (Row Operations) 针对行的筛选和排序。 11.Filter: 过滤。使用条件表达式保留或剔除行。支持复杂的布尔逻辑(AND/OR嵌套)。 12.Drop Duplicates去重:可以针对全行去重,也可以指定特定列(Subset)去重。可基于“所有列”或“特定列子集”进行去重。支持保留“第一行”或“最后一行”(通常配合 Sort 使用)。 13.Sort / Order By: 排序。 14. Limit / Sample: 限制行数或随机采样。Top N: 获取前 N 行。Random Sample: 按百分比随机抽取。 15. Difference (Except): 用于集合运算的差集。保留在表 A 中但不在表 B 中的行。
列级操作 (Column Operations) 对字段进行增删改查。 16.Add Column: 添加列。这是最通用的算子,内部可以写任何 Spark SQL 表达式(Regex, Math, String logic 等)。 17.Select Columns: 选择列。仅保留选中的列,丢弃其他。 18.Drop Columns: 删除列。 19.Rename Columns: 重命名。支持批量重命名(例如:给所有列加前缀)。 20.Clean Column Names: 清洗列名。自动将 Column A 转换为 column_a(小写、下划线),去除非法字符。 21.Cast: 类型转换。将列强制转换为 String, Integer, Date, Double 等。
清洗与智能处理 (Cleaning & Smart Transforms) Palantir 特有的或封装的高级清洗功能。 22. Smart Cast: 智能类型转换。自动推断列的最佳类型。例如,它能识别 "2023-01-01" 和 "01/01/2023" 混合的列并将其统一转为 Date 类型。 23. Impute / Fill Nulls: 填充空值。支持用固定值、平均值、中位数或前/后值填充 Null。 24. Trim / Whitespace: 批量去除字符串列的首尾空格。 25. Normalize: 文本标准化(例如统一转小写,去除重音符号)。
地理空间 (Geospatial) Foundry 强项,处理地图数据。 26. Geohash Encode/Decode: 将经纬度转换为 Geohash 字符串。 27. H3 Index: 将经纬度转换为 Uber H3 六边形网格索引(常用于空间聚合)。 28. Distance: 计算两点间的地理距离(Haversine formula)。 29. Point in Polygon: 判断点是否在多边形内(通常通过 Join 或 UDF 实现,但在某些版本有封装算子)。
特殊功能算子 除了上述基础的SQL映射算子,Pipeline Builder还包含一些AI算子。 30. SmartCast(智能类型转换) 不仅仅是简单的cast(colasint)。Pipeline Builder提供智能推断算子,能够扫描列中的数据模式,自动建议并将String转换为Date,Integer或Boolean,并处理异常格式(如"2023/01/01"和"2023-01-01"混杂的情况)。 31. Lookups(查找/字典映射) 类似于Excel的VLOOKUP,但更高效。用户可以上传一个CSV字典,使用Lookup算子将代码(如"CN")替换为全称(如"China"),而无需编写繁琐的Join逻辑。 32. User Defined Functions(UDFs)-扩展算子 尽管内置了数百种算子,Pipeline Builder还允许逃生舱模式。如果真的遇到了内置算子无法解决的逻辑(例如极复杂的专有数学模型),用户可以在Pipeline Builder中嵌入PythonUDF(需权限允许),自定义一个算子节点。
Use LLM大模型算子 这是个万能算子,负责利用大模型处理生成、分析、提取、翻译等所有逻辑任务。 这是 Pipeline Builder 中最核心、最强大的 GenAI 算子。它本质上是一个通过 API 批量调用 LLM(如 GPT-4, Claude 3等)的通用接口。它接收管道中的一列或多列数据作为输入(Context),结合用户定义的提示词(Prompt),将 LLM 的生成结果输出到新的一列。为了简化操作,这个算子内部预置了多种“任务模式”,用户无需从零编写 Prompt,只需填空即可: 33. Sentiment Analysis (情感分析):自动将文本评价(如客户反馈)标记为 Positive, Negative, Neutral,还可以输出具体的情感分数。 34. Classification (分类):根据预定义的类别列表(如“产品缺陷”、“物流问题”、“客服态度”),自动对文本行进行归类。支持单选或多选。 35. Translation (翻译):利用 LLM 强大的多语言能力,将任意语言的文本列翻译为目标语言。 36. Summarization (摘要):针对长文本(如一篇新闻、一段会议记录),生成简短的总结。 37. Entity Extraction (实体提取):从非结构化文本中提取特定信息(如人名、合同金额、日期、法律条款),并将其结构化为 JSON 或单独的列。 38.自定义模式 (Custom Prompt):如果上述模版不满足需求,用户可以使用“Free-form”模式,编写完全自定义的 Prompt,甚至结合 Few-shot Learning(给模型几个示例)来处理复杂逻辑。
向量化算子:Text to Embeddings (文本转向量) 这是构建 RAG (Retrieval-Augmented Generation) 和 语义搜索 (Semantic Search) 应用的基础算子。 39.文本转换为向量 (Vector/Embeddings):转换后的向量通常是一个浮点数数组(如 [0.12, -0.98, ...])。这些向量会被存入 Ontology(本体)中,用于支持后续的语义搜索(例如:用户搜“哪里有好吃的面条”,系统能匹配到“推荐这家拉面馆”的记录,即使没有关键词匹配)。 模型通常支持 OpenAI 的 text-embedding-ada-002 或 Palantir 内部托管的其他 Embedding 模型。
辅助开发功能包含AIP Assist / Generate (AI 辅助生成) 这不是一个严格的数据处理算子,而是嵌入在 Pipeline Builder 界面中的 AI 助手,但它极大地改变了算子的使用方式。 40. Regex Generation (正则生成):不再需要自己写复杂的正则表达式。只需对 AI 说:“提取这段话里的所有邮箱地址”,它会自动生成一个标准的 Regex Extract 算子配置。 41. SQL/Expression Generation (表达式生成):可以用自然语言描述逻辑(例如:“把日期格式化为 YYYY-MM,如果为空则填入 'Unknown'”),AI 会自动编写出对应的 Spark SQL 表达式。
非结构化数据算子 (Vision & Document Intelligence) 42.Extract Text from PDF / Image (OCR):使用OCR识别或视觉模型从 PDF 文档和图片中提取文字。这是将非结构化文件喂给 Use LLM 算子的前置步骤。 43. Extract Layout-aware Content (版面分析):更高级的提取,能理解文档的标题、段落和表格结构,通常用于处理复杂的金融报表或合同。
输出与变更捕获 (Output & CDC) 44. Change Data Capture (CDC): 变更数据捕获。虽然通常是通过 Join 逻辑构建的,但在某些模版中,会有专门的逻辑来计算 Added, Removed, Modified 的行。 45. Output / Writeback: 输出节点。定义数据最终保存到 Foundry 的位置、数据集名称、分区策略(Partitioning)以及是否增量更新。
三、列表达式算子(Expressions)
在Pipeline Builder的“AddColumn”或“Filter”面板中,隐藏着一个包含200-300+种函数的表达式库。这些算子支持名为Foundry Expression Language的语法(高度兼容SQL和Spark语法)。
包括字符串处理算子(String Manipulation),处理脏数据最常用的类别如下 基础处理: 46. lower(col)/upper(col):大小写转换。 47. trim(col)/ltrim/rtrim:去除空格。 48. length(col):计算长度。 49. reverse(col):字符串反转。 50. initcap(col): 首字母大写 (如 "john doe" -> "John Doe")。 高级提取与替换: 51. substring(col,start,length):截取子串。 52. replace(col,search,replace):文本替换。 53. split(col,delimiter):将字符串按分隔符切割成数组。 54. concat(col1,col2,...)或col1||col2:字符串拼接。 正则表达式(Regex): 55. regex_extract(col,pattern,index):这是最强大的文本清洗算子。允许用户编写Regex提取特定模式(如提取邮件地址、电话号码)。 56. regex_replace(col,pattern,replacement):按模式替换。 57. rlike(col,pattern):正则匹配判断(返回Boolean)。 日期与时间算子,Foundry有极其强调时区处理,因此这部分算子非常丰富。 当前时间: 58. current_date():当前日期。 59. current_timestamp():当前时间戳。 格式转换: 60. to_date(col,format):字符串转日期。 61. date_format(col,format):日期转字符串(如"yyyy-MM-dd")。 62. unix_timestamp(col):转为Unix秒数。 日期计算: 63. date_add(col,days)/date_sub(col,days):加减天数。 64. datediff(end,start):计算两个日期的差值。 65. months_between(date1,date2):计算月数差。 66. add_months(date,num):加减月份。 67. datediff(end_date, start_date): 计算相差天数。 68. months_between(date1, date2): 计算相差月数。 69. next_day(date, dayOfWeek): 下一个周几的日期。 70. last_day(date): 当月最后一天。 部分提取: 71. year(col),month(col),dayofmonth(col) 72. hour(col),minute(col),second(col)。 73. dayofweek(col):返回星期几(通常周日为1)。 74. trunc(date,format):日期截断(例如截断到月初)。 75. weekofyear(col): 一年中的第几周。 数学与统计算子类 基础运算:+,-,*,/,%(取模)。 舍入与截断: 76. round(col,scale):四舍五入。 77. floor(col):向下取整。 78. ceil(col):向上取整。 高级数学: 79. abs(col):绝对值。 80. pow(base,exponent):幂运算。 81. sqrt(col):开方。 82. log(col)/exp(col):对数与指数。 83. exp(col): e 的指数。 空值处理数学: 84. nanvl(col,default):处理NaN值。 85. 极值: 86. greatest(col1, col2, ...): 返回一行中多列的最大值。 87. least(col1, col2, ...): 返回一行中多列的最小值。 逻辑与条件控制类 这是编写业务逻辑的核心。 条件判断: 88. CASE WHEN condition THEN result ELSE other END:这是SQL逻辑的灵魂,Pipeline Builder完美支持。 89. if(condition,true_val,false_val):简化的IF逻辑。 空值处理: 90. coalesce(col1,col2,...):返回第一个非空值。非常重要,用于填充默认值。 91. isnull(col)/isnotnull(col):判断是否为空。 92. nvl(col,default):类似于coalesce的简化版。 复杂类型算子 Pipeline Builder区别于Excel的地方在于它对非结构化数据的支持。 数组操作: 93. array_contains(col,value):判断数组是否包含某值 (True/False)。 94. size(col):计算数组长度。 95. array_join(col,delimiter):将数组合并为字符串。 96. element_at(col,index):获取数组特定位置的元素。(Spark 中索引通常从 1 开始)。 97. array_distinct(col):数组内去重。 98. array_intersect(col1,col2):计算数组交集。 99. array_sort(array_col): 数组排序。 100. array_union(arr1, arr2): 数组并集。 101. slice(array_col, start, length): 数组切片。 结构体(Struct)操作: 102. struct(col1,col2):将多列打包成一个对象列。创建结构体。 103. col.field:点号语法访问结构体内部字段。 JSON处理: 104. get_json_object(col,path):从JSON字符串中提取值。 105. from_json(col,schema):解析JSON。 聚合函数类 通常用于Group By节点,但也可用在Window函数中。 基础: 106. count_distinct。 107. count(*) / count(col) 108. sum(col) 109. avg(col) 110. min(col) / max(col) 111. first(col) / last(col) 统计: 112. stddev(标准差) 113. variance(方差) 114. corr(相关系数)。 集合: 115. collect_list(聚合为列表,不去重) 116. collect_set(聚合为列表,去重)。 哈希与加密 (Hashing & Cryptography) 117. md5(col) 118. sha1(col) 119. sha2(col, 256) 120. hash(col): 生成一个整数哈希值。 地理空间算子 这是Palantir的特色强项,用于处理地图数据。 121. geo_distance(lat1,long1,lat2,long2):计算两点距离。 122. h3_token_to_lat_long/lat_long_to_h3:UberH3索引转换(用于六边形网格聚合)。 123. wkt_parse:解析Well-Known Text地理格式。
总结来看,Palantir Pipeline Builder 的强大精髓在于:将 Apache Spark 的硬核算力封装于极致的低代码可视化界面中,把复杂、异构的数据加工逻辑抽象成可视化、可追溯、可协同的数据生产线,让业务和工程团队像搭积木一样快速构建、调试并治理企业级数据流程。
|