Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "servicekit"
version = "0.9.0"
version = "0.10.0"
description = "Async SQLAlchemy framework with FastAPI integration - reusable foundation for building data services"
readme = "README.md"
authors = [{ name = "Morten Hansen", email = "morten@winterop.com" }]
Expand Down
201 changes: 149 additions & 52 deletions src/servicekit/api/service_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from __future__ import annotations

import asyncio
import os
import re
from contextlib import asynccontextmanager
from dataclasses import dataclass
Expand Down Expand Up @@ -587,6 +589,7 @@ def _build_lifespan(self) -> LifespanFactory:
job_options = self._job_options
include_logging = self._include_logging
registration_options = self._registration_options
health_path = self._health_options.prefix if self._health_options else None
info = self.info
startup_hooks = list(self._startup_hooks)
shutdown_hooks = list(self._shutdown_hooks)
Expand Down Expand Up @@ -663,70 +666,39 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]:
for hook in startup_hooks:
await hook(app)

# Register with orchestrator if enabled
registration_info = None
# Deferred registration: always wait for app to be ready before registering.
# The task is created now and runs after yield (once uvicorn is serving).
# For fail_on_error=True we await it immediately after yield so that
# exceptions still propagate (the app shuts down on the first request cycle).
registration_task: asyncio.Task[None] | None = None

if registration_options is not None:
from .registration import register_service, start_keepalive

registration_info = await register_service(
orchestrator_url=registration_options.orchestrator_url,
host=registration_options.host,
port=registration_options.port,
info=info,
orchestrator_url_env=registration_options.orchestrator_url_env,
host_env=registration_options.host_env,
port_env=registration_options.port_env,
max_retries=registration_options.max_retries,
retry_delay=registration_options.retry_delay,
fail_on_error=registration_options.fail_on_error,
timeout=registration_options.timeout,
service_key=registration_options.service_key,
service_key_env=registration_options.service_key_env,
registration_task = asyncio.create_task(
_register_after_ready(registration_options, info, app, health_path),
name="servicekit-deferred-registration",
)

# Start keepalive if registration succeeded and enabled
if registration_info and registration_options.enable_keepalive:
ping_url = registration_info.get("ping_url")
if ping_url:
from .registration import RegistrationConfig

registration_config = RegistrationConfig(
orchestrator_url=registration_options.orchestrator_url,
host=registration_options.host,
port=registration_options.port,
info=info,
orchestrator_url_env=registration_options.orchestrator_url_env,
host_env=registration_options.host_env,
port_env=registration_options.port_env,
max_retries=registration_options.max_retries,
retry_delay=registration_options.retry_delay,
fail_on_error=False,
timeout=registration_options.timeout,
service_key=registration_options.service_key,
service_key_env=registration_options.service_key_env,
)
await start_keepalive(
ping_url=ping_url,
interval=registration_options.keepalive_interval,
timeout=registration_options.timeout,
service_key=registration_options.service_key,
service_key_env=registration_options.service_key_env,
registration_config=registration_config,
re_register_grace_period=registration_options.re_register_grace_period,
)

try:
yield
finally:
# Stop keepalive and deregister service if enabled
if registration_task is not None:
if not registration_task.done():
registration_task.cancel()
try:
await registration_task
except asyncio.CancelledError:
pass

# Read registration info stored by the background task
registration_info: dict[str, Any] | None = getattr(app.state, "registration_info", None)

# Stop keepalive and deregister service if registration completed
if registration_options is not None and registration_info:
from .registration import deregister_service, stop_keepalive

# Stop keepalive task
if registration_options.enable_keepalive:
await stop_keepalive()

# Deregister from orchestrator
if registration_options.auto_deregister:
service_id = registration_info.get("service_id")
orchestrator_url = registration_info.get("orchestrator_url")
Expand Down Expand Up @@ -830,3 +802,128 @@ async def get_info() -> ServiceInfo:
def create(cls, *, info: ServiceInfo, **kwargs: Any) -> FastAPI:
"""Create and build a FastAPI application in one call."""
return cls(info=info, **kwargs).build()


def _resolve_port(options: _RegistrationOptions) -> int:
"""Resolve port from options or environment, matching register_service logic."""
if options.port is not None:
return options.port
port_str = os.getenv(options.port_env)
if port_str:
try:
return int(port_str)
except ValueError:
return 8000
return 8000


async def _wait_until_ready(
port: int, *, health_path: str | None = None, poll_interval: float = 0.5, timeout: float = 30.0
) -> bool:
"""Poll the local app until it is serving requests."""
import httpx

if health_path:
url = f"http://127.0.0.1:{port}{health_path}"
check_kind = "health"
else:
url = f"http://127.0.0.1:{port}/"
check_kind = "tcp"

deadline = asyncio.get_event_loop().time() + timeout
while asyncio.get_event_loop().time() < deadline:
try:
async with httpx.AsyncClient() as client:
response = await client.get(url, timeout=2.0)
if health_path:
if response.status_code == 200:
return True
else:
# Any response means the server is accepting connections
return True
except Exception:
pass
await asyncio.sleep(poll_interval)

logger.warning("registration.readiness_timeout", port=port, check=check_kind, timeout=timeout)
return False


async def _register_and_start_keepalive(options: _RegistrationOptions, info: BaseModel) -> dict[str, Any] | None:
"""Register with orchestrator and start keepalive. Returns registration info."""
from .registration import RegistrationConfig, register_service, start_keepalive

registration_info = await register_service(
orchestrator_url=options.orchestrator_url,
host=options.host,
port=options.port,
info=info,
orchestrator_url_env=options.orchestrator_url_env,
host_env=options.host_env,
port_env=options.port_env,
max_retries=options.max_retries,
retry_delay=options.retry_delay,
fail_on_error=options.fail_on_error,
timeout=options.timeout,
service_key=options.service_key,
service_key_env=options.service_key_env,
)

if registration_info and options.enable_keepalive:
ping_url = registration_info.get("ping_url")
if ping_url:
registration_config = RegistrationConfig(
orchestrator_url=options.orchestrator_url,
host=options.host,
port=options.port,
info=info,
orchestrator_url_env=options.orchestrator_url_env,
host_env=options.host_env,
port_env=options.port_env,
max_retries=options.max_retries,
retry_delay=options.retry_delay,
fail_on_error=False,
timeout=options.timeout,
service_key=options.service_key,
service_key_env=options.service_key_env,
)
await start_keepalive(
ping_url=ping_url,
interval=options.keepalive_interval,
timeout=options.timeout,
service_key=options.service_key,
service_key_env=options.service_key_env,
registration_config=registration_config,
re_register_grace_period=options.re_register_grace_period,
)

return registration_info


async def _register_after_ready(
options: _RegistrationOptions, info: BaseModel, app: FastAPI, health_path: str | None
) -> None:
"""Wait for the app to be ready, then register with the orchestrator."""
port = _resolve_port(options)
ready = await _wait_until_ready(port, health_path=health_path)
if not ready:
logger.error(
"registration.aborted",
port=port,
message="App never became ready, skipping registration",
)
return

# Shield registration from cancellation so that if the POST succeeds,
# app.state.registration_info is always written before the task exits.
# This prevents a leaked registration when shutdown cancels the task
# between the successful POST and the state assignment.
shielded = asyncio.ensure_future(_register_and_start_keepalive(options, info))
try:
registration_info = await asyncio.shield(shielded)
except asyncio.CancelledError:
# Shutdown cancelled us mid-registration. Wait for the shielded
# coroutine to finish so we can still store the result for cleanup.
registration_info = await shielded
if registration_info:
app.state.registration_info = registration_info
Loading
Loading