total_records = len(cnn_dm_dataset["train"]) + len(cnn_dm_dataset["validation"]) + len(cnn_dm_dataset["test"])# Print the total and a sample record print(f"Total number of records in the dataset: {total_records}\n") print("Sample record from the training dataset:") print(cnn_dm_dataset["train"][0])#### OUTPUT #### Total number of records in the dataset: 311971Sample record from the training dataset: {'article': 'LONDON, England (Reuters) -- Harry Potter star Daniel ...'}
cnn_dm_dataset_train = cnn_dm_dataset['train']# Initialize an empty list to store filtered articles filtered_articles = []# Loop through the dataset and filter articles based on keywords for record in cnn_dm_dataset_train: # Check if any of the keywords appear in the article text found_keyword = False for keyword in ACQUISITION_KEYWORDS: if keyword.lower() in record['article'].lower(): found_keyword = True break # Stop once a keyword is found # If a keyword was found, append the article to the filtered list if found_keyword: filtered_articles.append(record)
现在我们已经过滤出了文章,接下来检查一下过滤后的文章总数以及一个样本。
# Print the total number of filtered articles
print(f"Total number of filtered articles: {len(filtered_articles)}")# Print a sample of one filtered article print("\nSample of a filtered article:") print(filtered_articles[0]['article'])### OUTPUT #### Total number of filtered articles: 65249Sample of a filtered article: SAN DIEGO, California (CNN) -- You must know whats really driving the immigration debate ...
# Download and load the English language model for spaCy
# only needs to be run once)
spacy.cli.download("en*core_web_sm") nlp = spacy.load("en_core_web_sm")# Initialize a counter to hold entity label counts (e.g., PERSON, ORG, DATE) entity_counts = Counter()# Loop through each article and apply spaCy's Named Entity Recognition for article in cleaned_articles: text = article['cleaned_text'] # Get the cleaned text doc = nlp(text) # Process text with spaCy # Count each entity label found in the text for ent in doc.ents: entity_counts[ent.label*] += 1
defcall_llm(system_prompt, user_prompt, model_name): """ Sends a request to a language model (LLM) to extract entities based on provided prompts. Args: system_prompt (str): Instructions or context for the LLM (e.g., how to behave). user_prompt (str): The user input containing text to extract entities from. model_name (str): The identifier of the LLM model to use (e.g., "gpt-4"). Returns: str: The JSON-formatted string response from the LLM, or None if the client is unavailable. """ # Construct and send the chat completion request to the LLM response = client.chat.completions.create( model=model_name, messages=[ {"role":"system","content": system_prompt}, # System-level instructions {"role":"user","content": user_prompt} # User-provided input ], ) # Extract and return the response content (JSON string) returnresponse.choices[0].message.content.strip()
relevant_entity_labels_for_llm = [label for label, count in entity_counts.most_common(TOP_N_ENTITY_TYPES)] entity_types_string_for_prompt = ", ".join(relevant_entity_labels_for_llm)# System prompt for the LLM
# We are instructing it to return a JSON object with a key "entities"
# whose value is a list of entity objects.
llm_ner_system_prompt = ( f"You are an expert Named Entity Recognition system. " f"From the provided news article text, identify and extract entities. " f"The entity types to focus on are: {entity_types_string_for_prompt}. " f"For each identified entity, provide its exact text span from the article and its type (use one of the provided types). " f"Output ONLY a valid JSON object with a single key 'entities'. The value of 'entities' MUST be a list of JSON objects, " f"where each object has 'text' and 'type' keys. " f"Example: {{\"entities\": [{{\"text\": \"United Nations\", \"type\": \"ORG\"}}, {{\"text\": \"Barack Obama\", \"type\": \"PERSON\"}}]}} " f"If no entities of the specified types are found, the 'entities' list should be empty: {{\"entities\": []}}." )
这个系统提示将以有效的 JSON 格式输出实体数据。
在创建主循环之前,我们需要一个 JSON 解析函数,将文本输出转换成有效的 JSON 格式。
defparse_llm_entity_json_output(llm_output_str): """ Parses the JSON string from the LLM and returns a list of entities. Assumes the format: {"entities": [{"text": "...", "type": "..."}]} Args: llm_output_str (str): JSON string from the LLM. Returns: list: Extracted entities or empty list if parsing fails. """ ifnotllm_output_str: return[] # Return empty list if no output # Remove markdown code block if present ifllm_output_str.startswith("```json"): llm_output_str = llm_output_str[7:].rstrip("```").strip() try: data = json.loads(llm_output_str) returndata.get("entities", []) # Return entities list, or empty if not found exceptjson.JSONDecodeError: return[] # Return empty list on JSON error
接着,创建一个循环,对数据集中的每篇文章都使用这个系统提示。
# Defining our entities extraction LLM
TEXT_GEN_MODEL_NAME = "microsoft/phi-4"# Loop through a limited number of cleaned articles to
# extract entities using the LLM
for i, article_data in enumerate(cleaned_articles): article_id = article_data['id'] article_text = article_data['cleaned_text'] # Call the LLM to extract entities llm_response_content = call_llm( llm_ner_system_prompt, article_text, TEXT_GEN_MODEL_NAME ) # Parse the LLM's response into a list of entities extracted_llm_entities = [] if llm_response_content: extracted_llm_entities = parse_llm_entity_json_output(llm_response_content) # Store the results with the article articles_with_llm_entities.append({ "id": article_id, "cleaned_text": article_text, "summary": article_data['summary'], "llm_extracted_entities": extracted_llm_entities })
# We're asking for a JSON object with a "relationships" key.
llm_re_system_prompt = ( "You are an expert system for extracting relationships between entities from text, " "specifically focusing on**technology company acquisitions**. " "Given an article text and a list of pre-extracted named entities (each with 'text' and 'type'), " "your task is to identify and extract relationships. " "The 'subject_text' and 'object_text' in your output MUST be exact text spans of entities found in the provided 'Extracted Entities' list. " "The 'subject_type' and 'object_type' MUST correspond to the types of those entities from the provided list. " "Output ONLY a valid JSON object with a single key 'relationships'. The value of 'relationships' MUST be a list of JSON objects. " "Each relationship object must have these keys: 'subject_text', 'subject_type', 'predicate' (one of the types listed above), 'object_text', 'object_type'. " "Example: {\"relationships\": [{\"subject_text\": \"Innovatech Ltd.\", \"subject_type\": \"ORG\", \"predicate\": \"ACQUIRED\", \"object_text\": \"Global Solutions Inc.\", \"object_type\": \"ORG\"}]} " "If no relevant relationships of the specified types are found between the provided entities, the 'relationships' list should be empty: {\"relationships\": []}." )
defparse_llm_relationship_json_output(llm_output_str_rels): """ Parses the JSON string from the LLM to extract relationships. Expected format: {"relationships": [{"subject_text": ..., "predicate": ..., "object_text": ...}]} Args: llm_output_str_rels (str): JSON string from the LLM. Returns: list: Extracted relationships or empty list if parsing fails. """ ifnotllm_output_str_rels: return[] # Return empty list if no output # Remove markdown code block if present ifllm_output_str_rels.startswith("```json"): llm_output_str_rels = llm_output_str_rels[7:].rstrip("```").strip() try: data = json.loads(llm_output_str_rels) returndata.get("relationships", []) # Return relationships list, or empty if not found exceptjson.JSONDecodeError: return[] # Return empty list on JSON error
for i, article_entity_data in enumerate(articles_with_llm_entities): # Extract article id, cleaned text, and extracted entities from the article data article_id_rels = article_entity_data['id'] article_text_rels = article_entity_data['cleaned_text'] current_entities = article_entity_data['llm_extracted_entities'] # Serialize the list of entities into a JSON string for inclusion in the prompt entities_json_for_prompt = json.dumps(current_entities) # Construct the user prompt to request relationship extraction from the LLM user_prompt_for_re = ( f"Article Text:\n`\n{article_text_for_llm_re}\n`\n\n" f"Extracted Entities (use these exact texts for subjects/objects of relationships):\n`json\n{entities_json_for_prompt}\n`\n\n" "Identify and extract relationships between these entities based on the system instructions." ) # Call the LLM to get the relationship extraction based on the prompt llm_response_rels_content = call_llm_for_relationships(llm_re_system_prompt, user_prompt_for_re, TEXT_GEN_MODEL_NAME) # Initialize an empty list to store the extracted relationships extracted_llm_rels = [] # If LLM response is not empty, parse the extracted relationships from the JSON response if llm_response_rels_content: extracted_llm_rels = parse_llm_relationship_json_output(llm_response_rels_content) # Append the original article data along with the extracted relationships to the results list articles_with_llm_relations.append({ \*\*article_entity_data, # Keep the original article data (id, text, entities, etc.) "llm_extracted_relationships": extracted_llm_rels # Add the extracted relationships })
def normalize_entity_text_for_uri(entity_text, entity_type): """ Normalizes entity text, primarily by stripping common suffixes for organizations. """ normalized_text = entity_text.strip() if entity_type == 'ORG': # List of common suffixes to remove from organization names # This list can be expanded based on your data suffixes_to_remove = [ 'Inc.', 'Incorporated', 'Ltd.', 'Limited', 'LLC', 'L.L.C.', 'Corp.', 'Corporation', 'PLC', 'Co.', 'Company', 'Group', 'Holdings', 'Solutions', 'Technologies', 'Systems' ] # Sort by length to remove longer matches first (e.g., "Corp." before "Co.") suffixes_to_remove.sort(key=len, reverse=True) for suffix in suffixes_to_remove: # Case-insensitive check if the text ends with the suffix if normalized_text.lower().endswith(" " + suffix.lower()) or normalized_text.lower() == suffix.lower(): # Find the start of the suffix in the original cased string suffix_start_index = normalized_text.lower().rfind(suffix.lower()) # Slice the string to remove the suffix normalized_text = normalized_text[:suffix_start_index].strip() # Once a suffix is removed, we break to avoid over-stripping if not careful # e.g. "The The Co." -> "The The" not "The" break # Remove any trailing commas or periods that might be left normalized_text = re.sub(r'[,.]\* , '', normalized_text).strip() # Remove possessives like 's or s' which are sometimes caught by NER if normalized_text.endswith("'s") or normalized_text.endswith("s'"): normalized_text = normalized_text[:-2].strip() # If normalization results in an empty string, revert to original (should be rare) return normalized_text if normalized_text else entity_text
# Final output list to store articles with processed entity info
articles*with_normalized_entities_and_uris = []# Dictionary to track unique entities and assign stable URIs unique_entities_map = {}# Define a base namespace for Knowledge Graph URIs EX = Namespace("http://example.org/kg/") print("KG Namespace EX defined.")# Process each article for article in tqdm(articles_with_llm_relations, desc="Normalizing & URI Gen"): processed = [] # Extract and process each entity if available for ent in article.get('llm_extracted_entities', []): text = ent['text'] type_raw = ent['type'] # Normalize the type (e.g., "ORG (Organization)" → "ORG") type_simple = type_raw.split()[0].upper() # Normalize the entity text for URI generation norm_text = normalize_entity_text_for_uri(text, type_simple) key = (norm_text, type_simple) # Assign a unique URI if it's a new entity if key not in unique_entities_map: # Clean and truncate the text to make it URI-safe safe_text = re.sub(r'[^a-zA-Z0-9*]', '_', norm_text.replace(' ', '_'))[:50] # If cleaning makes the name empty, fallback to a hash if not safe*text: safe_text = f"entity*{hashlib.md5(norm*text.encode()).hexdigest()[:8]}" # Generate full URI unique_entities_map[key] = EX[f"{safe_text}*{type_simple}"] # Add normalized fields and URI to the entity processed.append({ **ent, 'normalized_text': norm_text, 'simple_type': type_simple, 'uri': unique_entities_map[key] }) # Add the processed entity list to the article articles_with_normalized_entities_and_uris.append({ **article, "processed_entities": processed })
这个循环需要一些时间来运行,因为它会规范化实体并为它们生成 URI。现在,让我们打印一个样本结果。
# Display the first 3 processed entities from the first article for ent in articles_with_normalized_entities_and_uris[2222]['processed_entities'][:3]: print(f" Original: '{ent['text']}' ({ent['type']})") # Original entity text and raw type print(f" Normalized: '{ent['normalized_text']}' (Simple Type: {ent['simple_type']})") # Cleaned text and type print(f" URI: <{ent['uri']}>") # The generated URI for the entity### OUTPUT ### Example of processed entities from the first article (sample): Original: 'Inabix Corp.' (ORG) Normalized: 'Inabix' (Simple Type: ORG) URI: <http://example.org/kg/Inabix_ORG> Original: 'Nuance Communications Inc.' (ORG) Normalized: 'Nuance Communications' (Simple Type: ORG) URI: <http://example.org/kg/Nuance_Communications_ORG> Original: '$73.1 billion' (MONEY) Normalized: '$3.1 billion' (Simple Type: MONEY) URI: <http://example.org/kg/73_1_billion_MONEY>
defget_rdf_class_for_entity_type(simple_entity_type_str): """ Maps a simple entity type string (e.g., 'ORG') to an RDF Class URI. Uses Schema.org where possible, otherwise defaults to our custom EX namespace. """ type_to_rdf_class_map = { 'ORG':SCHEMA.Organization, 'PERSON':SCHEMA.Person, 'MONEY':SCHEMA.PriceSpecification, #Schema.orgusesthisformonetary amounts 'DATE':SCHEMA.Date, #Representsa date. 'PRODUCT':SCHEMA.Product, 'GPE':SCHEMA.Place, #GeopoliticalEntity(maps well toPlace) 'LOC':SCHEMA.Place, #GeneralLocation 'EVENT':SCHEMA.Event, 'NORP':SCHEMA.Nationality, #Nationalities, religious, or political groups 'CARDINAL':XSD.integer, #Cardinalnumbers are often just literal integers # or could be mapped toschemauantitativeValueifmore context. #Fortyping a node, it's less common unless it's a complex value. #Often, cardinal numbers become literal valuesofproperties. #Addmore mappingsifyourLLMidentified other relevant'simple_type's } #Use.get() to provide a fallbackifthe type isn't in our map # If not in map, create a class in our EX namespace rdf_class = type_to_rdf_class_map.get(simple_entity_type_str.upper(), EX[simple_entity_type_str.upper()]) return rdf_class
让我们用几个例子来测试这个函数,看看我们的实体类型将映射到哪些 RDF 类。
print("Example RDF Class mappings for our entity types:")sample_type1 ='ORG' rdf_class1 =get_rdf_class_for_entity_type(sample_type1) print(f" Entity Type '{sample_type1}' maps to RDF Class: <{rdf_class1}>")sample_type2 ='MONEY' rdf_class2 =get_rdf_class_for_entity_type(sample_type2) print(f" Entity Type '{sample_type2}' maps to RDF Class: <{rdf_class2}>")sample_type3 ='INVESTMENT_ROUND'# A hypothetical custom type rdf_class3 =get_rdf_class_for_entity_type(sample_type3) print(f" Entity Type '{sample_type3}' (custom) maps to RDF Class: <{rdf_class3}>")###OUTPUT### CustomKGNamespaceEXre-definedforclarity. RDFNamespace:http://www.w3.org/1999/02/22-rdf-syntax-ns# RDFSNamespace:http://www.w3.org/2000/01/rdf-schema# SCHEMANamespace(Schema.org):http://schema.org/ EXNamespace(Custom):http://example.org/kg/Example RDF Class mappings for our entity types: EntityType'ORG'maps toRDFClass:<http://schema.org/Organization> EntityType'MONEY'maps toRDFClass:<http://schema.org/PriceSpecification> EntityType'INVESTMENT_ROUND'(custom) maps toRDFClass:<http://example.org/kg/INVESTMENT_ROUND>
•宾语 (Object):与主语相关的值或另一个事物(另一个实体 URI 或一个字面值,如名称、日期或数字)。
它就像一个简单的句子:
# Original Form <Subject> <Predicate> <object> # Example <ex:Microsoft_ORG> <rdf:type> <schema:Organization> (Microsoft is an Organization)</schema:Organization ></rdf:type ></ex:Microsoft_ORG > </object></Predicate ></Subject >
def get*rdf_predicate_uri(predicate_string_from_llm): """ Converts a predicate string (e.g., 'ACQUIRED', 'HAS_PRICE') into a proper RDF Property URI in our EX namespace. """ # Sanitize: uppercase, replace spaces with underscores sanitized_predicate = predicate_string_from_llm.strip().replace(" ", "*").upper() return EX[sanitized_predicate]
现在,创建并填充我们的图谱!
# Initialize RDF graph and namespaces
kg = Graph() SKOS = Namespace("http://www.w3.org/2004/02/skos/core#") kg.bind("ex", EX) kg.bind("schema", SCHEMA) kg.bind("rdfs", RDFS) kg.bind("skos", SKOS)total*triples_added = 0for article in tqdm(articles_with_normalized_entities_and_uris): # Create URI for the article article_uri = EX[f"article*{article['id'].replace('-', '\_')}"] kg.add((article_uri, RDF.type, SCHEMA.Article)) # Add summary or fallback label label = article.get('summary') or f"Article {article['id']}" pred = SCHEMA.headline if article.get('summary') else RDFS.label kg.add((article_uri, pred, Literal(label, lang='en'))) total_triples_added += 2 entity_map = {} # Process entities for e in article.get('processed_entities', []): uri = e['uri'] kg.add((uri, RDF.type, get_rdf_class_for_entity_type(e['simple_type']))) kg.add((uri, RDFS.label, Literal(e['normalized_text'], lang='en'))) if e['text'] != e['normalized_text']: kg.add((uri, SKOS.altLabel, Literal(e['text'], lang='en'))) kg.add((article_uri, SCHEMA.mentions, uri)) total_triples_added += 4 entity_map[e['text']] = uri # Process relationships for r in article.get('llm_extracted_relationships', []): s_uri = entity_map.get(r.get('subject_text')) o_uri = entity_map.get(r.get('object_text')) p_uri = get_rdf_predicate_uri(r.get('predicate')) if s_uri and o_uri: kg.add((s_uri, p_uri, o_uri)) total_triples_added += 1
这将开始处理实体及其关系,并开始创建三元组。让我们打印一个结果样本,看看它是什么样的。
Sampleoffirst5triplesfromtheKnowledgeGraph(N3format): ex:article_02002614879655690596592a07ba827b1651f065rdf:typeschema:Article. ex:article_02002614879655690596592a07ba827b1651f065schema:headline"SAN DIEGO, California (CNN) -- You must know whats really driving the immigration debate because its not what you hear on TV Dont be fooled This has nothing to do with national security It has nothing to do with Mexican immigrants It has nothing to do with illegal immigrants The real driver of this debate is the insecurity of American working-class white men All the other stuff is just window dressing...". ex:Microsoft_ORGrdf:typeschema:Organization. ex:Microsoft_ORGrdfs:labelMicrosoft@en. ex:Microsoft_ORGex:ACQUIREDex:Nuance_Communications_ORG .
EMBEDDING_MODEL_NAME ="BAAI/bge-multilingual-gemma2"defget_text_embeddings(list_of_texts_to_embed, embedding_model): """ Generates embeddings for a list of text strings using the specified model. """ # Remove invalid or empty texts valid_texts = [str(text).strip()fortextinlist_of_texts_to_embedifstr(text).strip()] # Return empty embeddings if no valid texts ifnotvalid_texts: return{text: []fortextinlist_of_texts_to_embed} # Generate embeddings using the specified model response = client.embeddings.create(model=embedding_model,input=valid_texts) # Map the embeddings to their corresponding texts embeddings_map = {valid_texts[i]: data_item.embeddingfori, data_iteminenumerate(response.data)} # Ensure all original texts are included in the result, even if they were empty fortextinlist_of_texts_to_embed: iftext.strip()notinembeddings_map: embeddings_map[text.strip()] = [] returnembeddings_map
# Dictionary to store entity URI -> embedding vector
entity_uri_to_embedding_vector = {}# Extract unique normalized texts from the entity map unique_normalized_texts = list(set([key[0] for key in unique_entities_map.keys() if key[0].strip()]))# Get embeddings for these texts text_to_embedding_result_map = get_text_embeddings(unique_normalized_texts, EMBEDDING_MODEL_NAME)# Map entity URIs to their corresponding embeddings for (norm_text, simple_type), entity_uri in unique_entities_map.items(): if norm_text in text_to_embedding_result_map and text_to_embedding_result_map[norm_text]: entity_uri_to_embedding_vector[entity_uri] = text_to_embedding_result_map[norm_text]
defcalculate_cosine_similarity(embedding_vector_1, embedding_vector_2): """ Calculates the cosine similarity between two embedding vectors. """ # Convert to numpy arrays and reshape to 2D arrays as expected by sklearn vec1 = np.array(embedding_vector_1).reshape(1, -1) vec2 = np.array(embedding_vector_2).reshape(1, -1) similarity_score = cosine_similarity(vec1, vec2) returnsimilarity_score[0][0]# The result is a 2D array, get the single value
uris = [uri for uri, emb in entity_uri_to_embedding_vector.items() if isinstance(emb, (list, np.ndarray)) and len(emb) > 0]# Find at least two ORG entities, fallback to any two entities org_entities = [uri for uri in uris if (uri, RDF.type, SCHEMA.Organization) in kg] entity1_uri, entity2_uri = (org_entities[:2] if len(org_entities) >= 2 else uris[:2]) if len(uris) >= 2 else (None, None)if entity1_uri and entity2_uri: emb1, emb2 = entity_uri_to_embedding_vector[entity1_uri], entity_uri_to_embedding_vector[entity2_uri] label1 = kg.value(subject=entity1_uri, predicate=RDFS.label, default=str(entity1_uri)) label2 = kg.value(subject=entity2_uri, predicate=RDFS.label, default=str(entity2_uri)) # Calculate similarity and print interpretation similarity = calculate_cosine_similarity(emb1, emb2) print(f"\nSimilarity between '{label1}' and '{label2}': {similarity:.4f}") if similarity > 0.75: print("Highly similar.") elif similarity > 0.5: print("Moderately similar.") else: print("Not very similar.")
看看它的输出结果如何。
Semantic Similarity between 'Microsoft' and 'Google': 0.8234 Interpretation: These entities are quite similar based on their name embeddings. This could suggest they operate in similar domains or have related functions.
它能够生成交互式 HTML 网络可视化,用户既可以在浏览器中查看,也可以直接在 Jupyter Notebook 内进行探索。
定义一个函数,它接收我们的kg图谱,并为其一部分三元组生成可视化。
def visualize_kg(graph, filename="kg_viz.html", num_triples=50): # Create a pyvis network for interactive visualization net = Network(height="600px", width="100%", directed=True) # Collect up to `num_triples` where both subject and object are URIs triples = [(s, p, o) for s, p, o in graph if isinstance(s, URIRef) and isinstance(o, URIRef)][:num_triples] nodes = set() # To avoid adding duplicate nodes # Add nodes and edges to the visualization for s, p, o in tqdm(triples, desc="Visualizing"): for node in (s, o): if node not in nodes: # Get label and type for each node label = graph.label(node) or node.n3(graph.namespace_manager) ntype = graph.value(node, RDF.type) group = ntype.n3(graph.namespace_manager).split(":")[-1] if ntype else "Unknown" # Add node to graph net.add_node(str(node), label=str(label), group=group) nodes.add(node) # Add edge with predicate label label = p.n3(graph.namespace_manager).split(":")[-1] net.add_edge(str(s), str(o), label=label, title=label) # Save the visualization to an HTML file net.save_graph(filename)
现在,为我们的 kg 生成可视化。该函数将在当前工作目录下生成一个名为tech_acquisitions_kg_sample.html的文件。