defsummarize_agent(state: AgentState)-> Dict[str, Any]: print("Summarization Agent: Running") start_time = time.time() try: text = state["text"] ifnottext.strip(): return{ "summary":"No text provided for summarization.", "summary_time":0.0 } time.sleep(2) sentences = re.split(r'(?<=[.!?]) +', text.strip()) scored_sentences = [(s, len(s.split()))forsinsentencesifs] top_sentences = [sfors, _insorted(scored_sentences, key=lambdax: x[1], reverse=True)[:2]] summary =" ".join(top_sentences)iftop_sentenceselse"Text too short to summarize." processing_time = time.time() - start_time print(f"Summarization Agent: Completed in{processing_time:.2f}seconds") return{ "summary": summary, "summary_time": processing_time } exceptExceptionase: return{ "summary":f"Error in summarization:{str(e)}", "summary_time":0.0 }
deftranslate_agent(state: AgentState)-> Dict[str, Any]: print("Translation Agent: Running") start_time = time.time() try: text = state["text"] ifnottext.strip(): return{ "translation":"No text provided for translation.", "translation_time":0.0 } time.sleep(3) translation = ( "El nuevo parque en la ciudad es una maravillosa adición. " "Las familias disfrutan de los espacios abiertos, y a los niños les encanta el parque infantil. " "Sin embargo, algunas personas piensan que el área de estacionamiento es demasiado pequeña." ) processing_time = time.time() - start_time print(f"Translation Agent: Completed in{processing_time:.2f}seconds") return{ "translation": translation, "translation_time": processing_time } exceptExceptionase: return{ "translation":f"Error in translation:{str(e)}", "translation_time":0.0 }
defmain(): text = ( "The new park in the city is a wonderful addition. Families are enjoying the open spaces, " "and children love the playground. However, some people think the parking area is too small." ) initial_state: AgentState = { "text": text, "summary":"", "translation":"", "sentiment":"", "summary_time":0.0, "translation_time":0.0, "sentiment_time":0.0 } print("\nBuilding new graph...") app = build_parallel_graph() print("\nStarting parallel processing...") start_time = time.time() config = RunnableConfig(parallel=True) result = app.invoke(initial_state, config=config) total_time = time.time() - start_time print("\n=== Parallel Task Results ===") print(f"Input Text:\n{text}\n") print(f"Summary:\n{result['summary']}\n") print(f"Translation (Spanish):\n{result['translation']}\n") print(f"Sentiment Analysis:\n{result['sentiment']}\n") print("\n=== Processing Times ===") processing_times = { "summary": result["summary_time"], "translation": result["translation_time"], "sentiment": result["sentiment_time"] } foragent, time_takeninprocessing_times.items(): print(f"{agent.capitalize()}:{time_taken:.2f}seconds") print(f"\nTotal Wall Clock Time:{total_time:.2f}seconds") print(f"Sum of Individual Processing Times:{sum(processing_times.values()):.2f}seconds") print(f"Time Saved by Parallel Processing:{sum(processing_times.values()) - total_time:.2f}seconds")
if__name__ =="__main__": main()
输出:
Building new graph... Starting parallel processing... Sentiment Agent: Running Summarization Agent: Running Translation Agent: Running Sentiment Agent: Completedin1.50 seconds Summarization Agent: Completedin2.00 seconds Translation Agent: Completedin3.00 seconds === Parallel Task Results === Input Text: The new parkinthe city is a wonderful addition. Families are enjoying the open spaces, and children love the playground. However, some people think the parking area is too small. Summary: Families are enjoying the open spaces, and children love the playground. The new parkinthe city is a wonderful addition. Translation (Spanish): El nuevo parque en la ciudad es una maravillosa adición. Las familias disfrutan de los espacios abiertos, y a los niños les encanta el parque infantil. Sin embargo, algunas personas piensan que el área de estacionamiento es demasiado pequeña. Sentiment Analysis: Positive (Polarity: 0.31, Subjectivity: 0.59) === Processing Times === Summary: 2.00 seconds Translation: 3.00 seconds Sentiment: 1.50 seconds Total Wall Clock Time: 3.01 seconds Sum of Individual Processing Times: 6.50 seconds Time Saved by Parallel Processing: 3.50 seconds
并行模式特点:
并行性:三个任务(总结、翻译、情感分析)同时运行,减少了总处理时间。
独立性:每个代理独立处理输入文本,执行过程中无需代理间通信。
协调性:队列确保结果被安全收集并按顺序显示。
实际用例:总结、翻译和情感分析是常见的自然语言处理(NLP)任务,并行处理对较大文本特别有益。
2.1.2 顺序
任务按顺序依次处理,前一个代理的输出成为下一个代理的输入。
示例:多步审批流程。
代码:
fromtypingimportDict fromlanggraph.graphimportStateGraph, MessagesState, END fromlangchain_core.runnablesimportRunnableConfig fromlangchain_core.messagesimportHumanMessage, AIMessage importjson
defteam_lead_agent(state: MessagesState, config: RunnableConfig)-> Dict: print("Agent (Team Lead): Starting review") messages = state["messages"] proposal = json.loads(messages[0].content) title = proposal.get("title","") amount = proposal.get("amount",0.0) ifnottitleoramount <=0: status ="Rejected" comment ="Team Lead: Proposal rejected due to missing title or invalid amount." goto = END else: status ="Approved by Team Lead" comment ="Team Lead: Proposal is complete and approved." goto ="dept_manager" print(f"Agent (Team Lead): Review complete -{status}") messages.append(AIMessage( content=json.dumps({"status": status,"comment": comment}), additional_kwargs={"agent":"team_lead","goto": goto} )) return{"messages": messages}
defdept_manager_agent(state: MessagesState, config: RunnableConfig)-> Dict: print("Agent (Department Manager): Starting review") messages = state["messages"] team_lead_msg = next((mforminmessagesifm.additional_kwargs.get("agent") =="team_lead"),None) proposal = json.loads(messages[0].content) amount = proposal.get("amount",0.0) ifjson.loads(team_lead_msg.content)["status"] !="Approved by Team Lead": status ="Rejected" comment ="Department Manager: Skipped due to Team Lead rejection." goto = END elifamount >100000: status ="Rejected" comment ="Department Manager: Budget exceeds limit." goto = END else: status ="Approved by Department Manager" comment ="Department Manager: Budget is within limits." goto ="finance_director" print(f"Agent (Department Manager): Review complete -{status}") messages.append(AIMessage( content=json.dumps({"status": status,"comment": comment}), additional_kwargs={"agent":"dept_manager","goto": goto} )) return{"messages": messages}
deffinance_director_agent(state: MessagesState, config: RunnableConfig)-> Dict: print("Agent (Finance Director): Starting review") messages = state["messages"] dept_msg = next((mforminmessagesifm.additional_kwargs.get("agent") =="dept_manager"),None) proposal = json.loads(messages[0].content) amount = proposal.get("amount",0.0) ifjson.loads(dept_msg.content)["status"] !="Approved by Department Manager": status ="Rejected" comment ="Finance Director: Skipped due to Dept Manager rejection." elifamount >50000: status ="Rejected" comment ="Finance Director: Insufficient budget." else: status ="Approved" comment ="Finance Director: Approved and feasible." print(f"Agent (Finance Director): Review complete -{status}") messages.append(AIMessage( content=json.dumps({"status": status,"comment": comment}), additional_kwargs={"agent":"finance_director","goto": END} )) return{"messages": messages}
defcode_writer_agent(state: EvaluationState, config: RunnableConfig)-> Dict[str, Any]: print(f"Iteration{state['iteration'] +1}- Code Writer: Generating code") print(f"Iteration{state['iteration'] +1}- Code Writer: Received feedback:{state['feedback']}") iteration = state["iteration"] +1 feedback = state["feedback"] ifiteration ==1: code = textwrap.dedent(""" def factorial(n): result = 1 for i in range(1, n + 1): result *= i return result """) writer_feedback ="Initial code generated." elif"factorial(0)"infeedback.lower(): code = textwrap.dedent(""" def factorial(n): if n == 0: return 1 result = 1 for i in range(1, n + 1): result *= i return result """) writer_feedback ="Fixed handling for n=0." elif"factorial(-1)"infeedback.lower()or"negative"infeedback.lower(): code = textwrap.dedent(""" def factorial(n): if n < 0: raise ValueError("Factorial not defined for negative numbers") if n == 0: return 1 result = 1 for i in range(1, n + 1): result *= i return result """) writer_feedback ="Added error handling for negative inputs." else: code = state["code"] writer_feedback ="No further improvements identified." print(f"Iteration{iteration}- Code Writer: Code generated") return{ "code": code, "feedback": writer_feedback, "iteration": iteration }
defmain(): test_tickets = [ "I have a billing issue with my last invoice. It seems I was overcharged.", "My app keeps crashing with a technical error. Please help!", "I have a general question about your services. Can you provide more info?", "I need assistance with something unrelated to billing or technical issues." ] forticket_textintest_tickets: initial_state: TicketState = { "ticket_text": ticket_text, "category":"", "resolution":"", "processing_time":0.0 } print(f"\n=== Processing Ticket: '{ticket_text}' ===") app = build_router_graph() start_time = time.time() result = app.invoke(initial_state, config=RunnableConfig()) total_time = time.time() - start_time print("\n=== Ticket Results ===") print(f"Category:{result['category']}") print(f"Resolution:{result['resolution']}") print(f"Total Processing Time:{result['processing_time']:.2f}seconds") print(f"Total Wall Clock Time:{total_time:.2f}seconds") print("-"*50)
if__name__ =="__main__": main()
输出:
=== Processing Ticket:'I have a billing issue with my last invoice. It seems I was overcharged.'=== Router Agent: Analyzing ticket... Router Agent: Categorized as'Billing'in0.00 seconds Routing: Ticket category is'Billing' Billing Team Agent: Processing ticket... Billing Team Agent: Completedin0.00 seconds === Ticket Results === Category: Billing Resolution: Billing Team: Reviewed ticket'I have a billing issue with my last invoice. It seems I was overcharged.'. Please check your invoice details or contact our billing departmentforfurther assistance. Total Processing Time: 0.00 seconds Total Wall Clock Time: 1.03 seconds -------------------------------------------------- === Processing Ticket:'My app keeps crashing with a technical error. Please help!'=== Router Agent: Analyzing ticket... Router Agent: Categorized as'Technical'in0.00 seconds Routing: Ticket category is'Technical' Technical Team Agent: Processing ticket... Technical Team Agent: Completedin0.00 seconds === Ticket Results === Category: Technical Resolution: Technical Team: Reviewed ticket'My app keeps crashing with a technical error. Please help!'. Please try restarting your device or submit a detailed errorlogforfurther investigation. Total Processing Time: 0.00 seconds Total Wall Clock Time: 1.50 seconds -------------------------------------------------- === Processing Ticket:'I have a general question about your services. Can you provide more info?'=== Router Agent: Analyzing ticket... Router Agent: Categorized as'General'in0.00 seconds Routing: Ticket category is'General' General Team Agent: Processing ticket... General Team Agent: Completedin0.00 seconds === Ticket Results === Category: General Resolution: General Team: Reviewed ticket'I have a general question about your services. Can you provide more info?'. For more information, please refer to our FAQ or contact us via email. Total Processing Time: 0.00 seconds Total Wall Clock Time: 0.80 seconds -------------------------------------------------- === Processing Ticket:'I need assistance with something unrelated to billing or technical issues.'=== Router Agent: Analyzing ticket... Router Agent: Categorized as'Billing'in0.00 seconds Routing: Ticket category is'Billing' Billing Team Agent: Processing ticket... Billing Team Agent: Completedin0.00 seconds === Ticket Results === Category: Billing Resolution: Billing Team: Reviewed ticket'I need assistance with something unrelated to billing or technical issues.'. Please check your invoice details or contact our billing departmentforfurther assistance. Total Processing Time: 0.00 seconds Total Wall Clock Time: 1.00 seconds --------------------------------------------------