Discovery 模块¶
核心文件¶
src/by_framework/core/discovery.py- ServiceRegistry, DiscoveryClientsrc/by_framework/util/discovery_http_client.py- DiscoveryHttpClient
ServiceRegistry¶
基于 Redis 的服务注册中心。
主要方法¶
class ServiceRegistry:
def __init__(self, redis_client: Redis) -> None:
self._redis = redis_client
async def register(
self,
service_name: str,
endpoint: str,
metadata: Optional[dict] = None,
) -> None:
"""注册服务"""
async def heartbeat(
self,
service_name: str,
endpoint: str,
) -> None:
"""发送心跳"""
async def unregister(
self,
service_name: str,
endpoint: str,
) -> None:
"""注销服务"""
async def get_service_endpoints(
self,
service_name: str,
) -> List[Dict[str, Any]]:
"""获取服务 endpoints"""
DiscoveryClient¶
带缓存的服务发现客户端。
class DiscoveryClient:
def __init__(
self,
redis_client: Redis,
cache_ttl: int = 30,
) -> None:
self._redis = redis_client
self._cache_ttl = cache_ttl
async def discover(
self,
service_name: str,
use_cache: bool = True,
) -> List[str]:
"""发现服务 endpoints"""
DiscoveryHttpClient¶
结合服务发现的 HTTP 客户端,支持自动重试与故障切换。
class DiscoveryHttpClient:
def __init__(
self,
redis_client: Redis,
service_name: str,
max_retries: int = 3,
timeout: float = 30.0,
) -> None:
self._redis = redis_client
self._service_name = service_name
self._max_retries = max_retries
self._timeout = timeout
async def get(
self,
path: str,
**kwargs,
) -> httpx.Response:
"""GET 请求"""
async def post(
self,
path: str,
**kwargs,
) -> httpx.Response:
"""POST 请求"""
async def download(
self,
remote_path: str,
local_path: str,
**kwargs,
) -> None:
"""下载文件"""
async def upload(
self,
local_path: str,
remote_path: str,
**kwargs,
) -> None:
"""上传文件"""