想象一下有很多的算法服务实例,负载不均衡 会造成:
需要将请求分发到不同的节点进行处理,让每个节点的负载在合适的水平,这就是负载均衡。
1. 简介 nginx是一款开源的、高性能的 Web 服务器,同时也广泛用作 反向代理服务器、负载均衡器 和 HTTP 缓存。 它的设计目标是解决传统服务器(如 Apache)在高并发场景下的性能瓶颈,现已成为全球最流行的 Web 服务器之一。
特点:
高性能 :基于事件驱动的异步架构,单机支持数万并发连接。
算法灵活 :轮询(Round Robin)、加权轮询(Weighted)、IP Hash、最少连接(Least Connections)等。
2. Nginx安装 以 ubuntu 为例
sudo apt install nginx编辑配置文件/etc/nginx/nginx.conf,向 http 添加以下内容
http { upstream backend { least_conn; # 均衡算法 server 127.0.0.1:8001; # 后端服务1 server 127.0.0.1:8002; # 后端服务2 } server { listen 80; location / { proxy_pass http://backend; } } log_format upstream_log'$remote_addr - $remote_user [$time_local] "$request" ' '$status $body_bytes_sent "$http_referer" ' '"$http_user_agent" "$http_x_forwarded_for" ' 'to: $upstream_addr'; access_log /var/log/nginx/upstream.log upstream_log; }完整配置如下:
# cat /etc/nginx/nginx.conf user www-data; worker_processes auto; pid /run/nginx.pid; include /etc/nginx/modules-enabled/*.conf; events { worker_connections 768; # multi_accept on; } http { ## # Basic Settings ## sendfile on; tcp_nopush on; types_hash_max_size 2048; # server_tokens off; # server_names_hash_bucket_size 64; # server_name_in_redirect off; include /etc/nginx/mime.types; default_type application/octet-stream; ## # Logging Settings ## log_format upstream_log'$remote_addr remote_port $request_uri - $remote_user [$time_local] "$request" ' '$status $body_bytes_sent "$http_referer" ' '"$http_user_agent" "$http_x_forwarded_for" ' 'to: $upstream_addr'; access_log /var/log/nginx/access.log upstream_log; error_log /var/log/nginx/error.log; ## # SSL Settings ## ssl_protocols TLSv1 TLSv1.1 TLSv1.2 TLSv1.3;# Dropping SSLv3, ref: POODLE ssl_prefer_server_ciphers on; ## # Gzip Settings ## gzip on; # gzip_vary on; # gzip_proxied any; # gzip_comp_level 6; # gzip_buffers 16 8k; # gzip_http_version 1.1; # gzip_types text/plain text/css application/json application/javascript text/xml application/xml application/xml+rss text/javascript; ## # Upstream Servers ## upstream backend { least_conn; server 127.0.0.1:8001; # 后端服务1 server 127.0.0.1:8002; # 后端服务2 } ## # Server Blocks ## server { listen 80; server_name localhost; location / { proxy_pass http://backend; proxy_set_header Host$host; proxy_set_header X-Real-IP$remote_addr; proxy_set_header X-Forwarded-For$proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto$scheme; #proxy_http_version 1.1; # 确保使用 HTTP/1.1 <button class="citation-flag" data-index="7"> #proxy_set_header Connection ''; } } add_header X-Upstream$upstream_addralways; # 在响应头中暴露后端地址 ## # Virtual Host Configs ## include /etc/nginx/conf.d/*.conf; include /etc/nginx/sites-enabled/*; } #mail { # # See sample authentication script at: # # http://wiki.nginx.org/ImapAuthenticateWithApachePhpScript # # # auth_http localhost/auth.php; # # pop3_capabilities "TOP" "USER"; # # imap_capabilities "IMAP4rev1" "UIDPLUS"; # # server { # listen localhost:110; # protocol pop3; # proxy on; # } # # server { # listen localhost:143; # protocol imap; # proxy on; # } #}编写完配置,systemctl reload nginx重启,nginx -t确认配置ok
3. 均衡算法 以上配置中least_conn是均衡算法,可以有多种算法可选
轮询 (Round Robin) 默认算法,无需显式声明 工作原理:按顺序逐一分发请求到后端服务器 加权版本:通过 weight 参数调整分配比例upstream backend { server 127.0.0.1:8001 weight=3; # 60%的请求 server 127.0.0.1:8002 weight=2; # 40%的请求 }最少连接 (Least Connections) 语法:least_conn 工作原理:优先将请求发给当前连接数最少的后端 适用场景:后端服务器处理数量差异较大时upstream backend { least_conn; server 127.0.0.1:8001; server 127.0.0.1:8002; }IP 哈希 (IP Hash) 语法:ip_hash 工作原理:根据客户端IP的哈希值固定分配后端服务器 适用场景:需要会话保持(Session Persistence)upstream backend { ip_hash; server 127.0.0.1:8001; server 127.0.0.1:8002; }通用哈希 (Generic Hash) 语法:hash key [consistent] 工作原理:根据自定义的键(如URI、请求参数)计算哈希值 consistent:启用一致性哈希,减少后端增减时的影响❝一致性哈希通过哈希环和虚拟节点机制,有效解决了传统哈希算法在动态环境下的数据迁移和负载不均问题。 其核心在于减少节点变动的影响范围,并利用虚拟节点实现数据分布的平衡性,是分布式系统中实现高可用和可扩展性的关键技术
upstream backend { hash$request_uriconsistent; # 按请求URI分配 server 127.0.0.1:8001; server 127.0.0.1:8002; }所有算法均可配合权重使用:
server 127.0.0.1:8001 weight=5 max_fails=3 fail_timeout=30s;4. 测试负载均衡 开启两个http服务(端口 8001,8002),使用python脚本server.py启动
# server.py fromhttp.serverimportBaseHTTPRequestHandler, HTTPServer importsocket classDebugRequestHandler(BaseHTTPRequestHandler): defdo_GET(self): # 打印客户端信息 client_ip, client_port = self.client_address print(f"\n--- 新请求 @{self.server.server_port}---") print(f"[客户端]{client_ip}:{client_port}") print(f"[方法]{self.command}") print(f"[路径]{self.path}") print(f"[请求头]{self.headers}") # 返回响应 self.send_response(200) self.send_header("Content-type","text/plain") self.end_headers() self.wfile.write(f"响应来自:{self.server.server_port}".encode()) defdo_POST(self): # 处理 POST 请求 content_length = int(self.headers["Content-Length"]) post_data = self.rfile.read(content_length).decode("utf-8") print(f"\n--- POST 请求 @{self.server.server_port}---") print(f"[数据]{post_data}") self.do_GET() # 复用 GET 响应逻辑 defrun(server_class=HTTPServer, handler_class=DebugRequestHandler, port=8001): server_address = ("0.0.0.0", port) # 绑定到所有接口 httpd = server_class(server_address, handler_class) print(f"服务启动于 0.0.0.0:{port}...") httpd.serve_forever() if__name__ =="__main__": # 启动两个服务实例(分别在 8001 和 8002 端口) importthreading threading.Thread(target=run, kwargs={"port":8001}).start() threading.Thread(target=run, kwargs={"port":8002}).start()python3 server.py压力测试
apt install apache2-utils ab -n 1000 -c 10 http://localhost/ab是 Apache Bench 的缩写,它是 Apache HTTP 服务器项目中的一个性能测试工具,用于对 Web 服务器发起压力测试。
命令分解
**ab**:工具名称(Apache Bench)。 **-n 1000**:总请求数为 1000(-n表示number of requests)。 **-c 10**:并发用户数为 10(-c表示concurrency,即同时发送的请求数)。 **http://localhost/**:目标测试地址(可以是本地服务或远程 URL)。 模拟10 个并发用户 同时访问http://localhost/,每个用户连续发送请求,直到总请求数达到1000 。通过该命令可以测试:
示例输出解读
运行命令后,输出结果示例如下:
This is ApacheBench, Version 2.3 <$Revision: 1879490 $> Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/ Licensed to The Apache Software Foundation, http://www.apache.org/ Benchmarking localhost (be patient) Completed 100 requests Completed 200 requests Completed 300 requests Completed 400 requests Completed 500 requests Completed 600 requests Completed 700 requests Completed 800 requests Completed 900 requests Completed 1000 requests Finished 1000 requests Server Software: nginx/1.18.0 Server Hostname: localhost Server Port: 80 Document Path: / Document Length: 18 bytes Concurrency Level: 10 Time taken for tests: 1.201 seconds Complete requests: 1000 Failed requests: 0 Total transferred: 178000 bytes HTML transferred: 18000 bytes Requests per second: 832.30 [#/sec] (mean) Time per request: 12.015 [ms] (mean) Time per request: 1.201 [ms] (mean, across all concurrent requests) Transfer rate: 144.68 [Kbytes/sec] received Connection Times (ms) min mean[+/-sd] median max Connect: 0 0 0.0 0 1 Processing: 2 12 1.9 12 26 Waiting: 1 12 1.9 12 25 Total: 2 12 1.9 12 26 Percentage of the requests served within a certain time (ms) 50% 12 66% 12 75% 13 80% 13 90% 14 95% 15 98% 16 99% 18 100% 26 (longest request)关键指标
Requests per second :813.01 [#/sec] 表示服务器每秒能处理832 个请求 (吞吐量越高,性能越好)。
12.015 [ms] (mean):每个请求的平均耗时(从用户角度看)。1.201 [ms] (mean, across all concurrent requests):服务器平均处理每个请求的时间。tail -f /var/log/nginx/access.log 127.0.0.1:48878 / - - [02/Mar/2025:21:10:11 +0800]"GET / HTTP/1.0"200 18"-""ApacheBench/2.3""-"to: 127.0.0.1:8001 127.0.0.1:48882 / - - [02/Mar/2025:21:10:11 +0800]"GET / HTTP/1.0"200 18"-""ApacheBench/2.3""-"to: 127.0.0.1:8002 127.0.0.1:48892 / - - [02/Mar/2025:21:10:11 +0800]"GET / HTTP/1.0"200 18"-""ApacheBench/2.3""-"to: 127.0.0.1:8001 127.0.0.1:48904 / - - [02/Mar/2025:21:10:11 +0800]"GET / HTTP/1.0"200 18"-""ApacheBench/2.3""-"to: 127.0.0.1:8002root@MichaelMing:~# cat /var/log/nginx/access.log | tail -n 1000 | grep ":8001" | wc -l 506 root@MichaelMing:~# cat /var/log/nginx/access.log | tail -n 1000 | grep ":8002" | wc -l 494或者
root@MichaelMing:~# awk '{print $NF}' /var/log/nginx/access.log | sort | uniq -c 300"ApacheBench/2.3" 3"curl/7.81.0" 93576 - 984 127.0.0.1:8001 1018 127.0.0.1:8002我们去查看两个端口号,发现两个端口的服务被请求的次数是基本均衡的(506/494),Nginx起到了负载均衡的作用
5. chat服务例子 假设MichaelAI有两个 api 接口chat、completions
from http.server import BaseHTTPRequestHandler, HTTPServer import json from urllib.parse import urlparse, parse_qs from datetime import datetime class MultiAPIHandler(BaseHTTPRequestHandler): def send_json_response(self, data, status=200): self.send_response(status) self.send_header("Content-type","application/json") self.end_headers() self.wfile.write(json.dumps(data).encode("utf-8")) def do_POST(self): # 统一入口路由分发 parsed_path = urlparse(self.path) path = parsed_path.path try: content_length = int(self.headers["Content-Length"]) post_data = self.rfile.read(content_length).decode("utf-8") print(f"\n--- {path} 请求 @{self.server.server_port} ---") print(f"[客户端] {self.client_address[0]}:{self.client_address[1]}") print(f"[数据] {post_data}") ifpath =="/chat": self.handle_chat(post_data) elifpath =="/completions": self.handle_completions(post_data) else: self.send_error(404,"API not found") except Exception as e: self.send_json_response({ "error": str(e), "server": self.server.server_port, "timestamp": datetime.now().isoformat() }, 400) def handle_chat(self, data): # 模拟对话接口处理 try: input_data = json.loads(data) message = input_data.get("message","") response = { "original": message, "response": f" rocessed by chat API: {message.upper()}", "server": self.server.server_port, "timestamp": datetime.now().isoformat() } self.send_json_response(response) except json.JSONDecodeError: raise ValueError("Invalid JSON format") def handle_completions(self, data): # 文本补全接口 response = { "completions": [ {"text": f"Completion 1: {data[:5]}...","index": 0}, {"text": f"Completion 2: {data[-5:]}...","index": 1} ], "server": self.server.server_port, "timestamp": datetime.now().isoformat() } self.send_json_response(response) def run(server_class=HTTPServer, handler_class=MultiAPIHandler, port=8001): server_address = ("0.0.0.0", port) httpd = server_class(server_address, handler_class) print(f"多API服务启动于 0.0.0.0:{port}...") httpd.serve_forever() if__name__ =="__main__": import threading # 启动两个服务实例 threading.Thread(target=run, kwargs={"port": 8001}).start() threading.Thread(target=run, kwargs={"port": 8002}).start()❝chat_data.json 示例: {"message": "Hello, this is a test message"}
ab -n 100 -c 10 -T"application/json"-p json_data \ http://localhost:8001/chat服务端日志
--- /chat 请求 @8001 --- [客户端] 127.0.0.1:54724 [数据] {"message":"Hello, this is a test message"} 127.0.0.1 - - [02/Mar/2025 22:41:22]" OST /chat HTTP/1.0"200 -text.txt 示例: 你好,我是Michael阿明开发的智能助手!ab -n 10000 -c 100 -T"text/plain"-p text.txt \ http://localhost:8002/completions服务端日志
--- /completions 请求 @8002 --- [客户端] 127.0.0.1:47246 [数据] 你好,我是Michael阿明开发的智能助手! 127.0.0.1 - - [02/Mar/2025 22:49:26]" OST /completions HTTP/1.0"200# 同时压测两个接口 echo"chat接口压测结果:"&& \ ab -n 500 -c 50 -T"application/json"-p json_data http://localhost:80/chat && \ echo"completions接口压测结果:"&& \ ab -n 500 -c 50 -T"text/plain"-p text.txt http://localhost:80/completionscompletions 接口调用分布:两个机器基本接近
root@MichaelMing:~# cat /var/log/nginx/access.log | tail -n 20000 | grep "completions" | grep 8001 | wc -l 747 root@MichaelMing:~# cat /var/log/nginx/access.log | tail -n 20000 | grep "completions" | grep 8002 | wc -l 755import requests import asyncio import aiohttp # 同步客户端(基于requests) class SyncAPIClient: def __init__(self, base_url): self.base_url = base_url def call_chat(self, message): url = f"{self.base_url}/chat" data = {"message": message} response = requests.post(url, json=data) returnresponse.json() def call_completions(self, text): url = f"{self.base_url}/completions" response = requests.post(url, data=text) returnresponse.json() # 异步客户端(基于aiohttp) class AsyncAPIClient: def __init__(self, base_url): self.base_url = base_url async def call_chat(self, message): url = f"{self.base_url}/chat" data = {"message": message} async with aiohttp.ClientSession() as session: async with session.post(url, json=data) as response: returnawait response.json() async def call_completions(self, text): url = f"{self.base_url}/completions" async with aiohttp.ClientSession() as session: async with session.post(url, data=text) as response: returnawait response.json() # 同步调用示例 sync_client = SyncAPIClient("http://localhost") chat_response = sync_client.call_chat("Hello, sync chat!") print("Chat响应:", chat_response) completion_response = sync_client.call_completions("test text") print("Completions响应:", completion_response) # 异步调用示例 async def main(): async_client = AsyncAPIClient("http://localhost") # 并发调用两个接口 chat_task = asyncio.create_task(async_client.call_chat("Hello, async chat!")) completion_task = asyncio.create_task(async_client.call_completions("async test text")) results = await asyncio.gather(chat_task, completion_task) print("异步Chat响应:", results[0]) print("异步Completions响应:", results[1]) asyncio.run(main())输出:
root@MichaelMing:~# python3 /mnt/d/opt/client.py Chat响应: {'original':'Hello, sync chat!','response':' rocessed by chat API: HELLO, SYNC CHAT!','server': 8001,'timestamp':'2025-03-02T23:11:54.426626'} Completions响应: {'completions': [{'text':'Completion 1: test ...','index': 0}, {'text':'Completion 2: text...','index': 1}],'server': 8002,'timestamp':'2025-03-02T23:11:54.430562'} 异步Chat响应: {'original':'Hello, async chat!','response':' rocessed by chat API: HELLO, ASYNC CHAT!','server': 8001,'timestamp':'2025-03-02T23:11:54.444508'} 异步Completions响应: {'completions': [{'text':'Completion 1: async...','index': 0}, {'text':'Completion 2: text...','index': 1}],'server': 8002,'timestamp':'2025-03-02T23:11:54.444973'}用代码进行压力测试
# 并发压测示例(使用异步客户端) asyncdefstress_test(): client = AsyncAPIClient("http://localhost") tasks = [] for_inrange(100): tasks.append(client.call_chat("test message")) tasks.append(client.call_completions("test text")) responses =awaitasyncio.gather(*tasks) print(f"成功处理{len(responses)}个请求") asyncio.run(stress_test())