Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 64 additions & 71 deletions akagi_backend/akagi_ng/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,16 @@
from types import FrameType

from akagi_ng import AKAGI_VERSION
from akagi_ng.core.context import (
AppContext,
get_app_context,
set_app_context,
)
from akagi_ng.core.logging import (
configure_logging,
logger,
)
from akagi_ng.autoplay import AutoPlayManager, AutoPlayRuntime
from akagi_ng.core.context import AppContext, get_app_context, set_app_context
from akagi_ng.core.logging import configure_logging, logger
from akagi_ng.dataserver import DataServer
from akagi_ng.electron_client import create_electron_client
from akagi_ng.mitm_client import MitmClient
from akagi_ng.mjai_bot import Controller, StateTracker
from akagi_ng.mjai_bot.status import BotStatusContext
from akagi_ng.schema.constants import ServerConstants
from akagi_ng.schema.protocols import (
ControllerProtocol,
StateTrackerProtocol,
)
from akagi_ng.schema.constants import Platform, ServerConstants
from akagi_ng.schema.protocols import ControllerProtocol, StateTrackerProtocol
from akagi_ng.schema.types import (
AkagiEvent,
MJAIEventBase,
Expand All @@ -43,6 +34,7 @@
self._stop_event = threading.Event()
self.ds: DataServer | None = None
self.status: BotStatusContext | None = None
self.autoplay: AutoPlayManager | None = None
self.frontend_url = ""
self.message_queue: queue.Queue[AkagiEvent] = queue.Queue(maxsize=ServerConstants.MESSAGE_QUEUE_MAXSIZE)

Expand Down Expand Up @@ -78,29 +70,20 @@
electron_client=create_electron_client(settings.platform, shared_queue=self.message_queue),
shared_queue=self.message_queue,
)

set_app_context(app_context)
self.autoplay = AutoPlayManager(runtime_provider=self._build_autoplay_runtime)

def start(self):
self.ds.start()
logger.info(f"DataServer started at {self.frontend_url}")

app = get_app_context()

for source in filter(
None,
(
app.mitm_client if app.settings.mitm.enabled else None,
app.electron_client,
),
):
for source in filter(None, (app.mitm_client if app.settings.mitm.enabled else None, app.electron_client)):
source.start()

self._setup_signals()

def _setup_signals(self):
"""设置信号处理器以关闭程序"""

def signal_handler(signum: int, _frame: FrameType | None):
sig_name = signal.Signals(signum).name
logger.info(f"Received signal {sig_name} ({signum}), initiating shutdown...")
Expand All @@ -112,24 +95,63 @@
def stop(self):
self._stop_event.set()

def _get_active_bridge(self):
app = get_app_context()

Check warning on line 99 in akagi_backend/akagi_ng/application.py

View check run for this annotation

Codecov / codecov/patch

akagi_backend/akagi_ng/application.py#L99

Added line #L99 was not covered by tests
if app.settings.mitm.enabled and app.mitm_client and app.mitm_client.addon:
addon = app.mitm_client.addon

Check warning on line 101 in akagi_backend/akagi_ng/application.py

View check run for this annotation

Codecov / codecov/patch

akagi_backend/akagi_ng/application.py#L101

Added line #L101 was not covered by tests
if addon.activated_flows:
flow_id = addon.activated_flows[-1]
return addon.bridges.get(flow_id)

Check warning on line 104 in akagi_backend/akagi_ng/application.py

View check run for this annotation

Codecov / codecov/patch

akagi_backend/akagi_ng/application.py#L103-L104

Added lines #L103 - L104 were not covered by tests
if addon.bridges:
return list(addon.bridges.values())[-1]

Check warning on line 106 in akagi_backend/akagi_ng/application.py

View check run for this annotation

Codecov / codecov/patch

akagi_backend/akagi_ng/application.py#L106

Added line #L106 was not covered by tests
if app.electron_client:
return getattr(app.electron_client, "bridge", None)
return None

Check warning on line 109 in akagi_backend/akagi_ng/application.py

View check run for this annotation

Codecov / codecov/patch

akagi_backend/akagi_ng/application.py#L108-L109

Added lines #L108 - L109 were not covered by tests

def _detect_autoplay_platform(self) -> Platform:
app = get_app_context()

Check warning on line 112 in akagi_backend/akagi_ng/application.py

View check run for this annotation

Codecov / codecov/patch

akagi_backend/akagi_ng/application.py#L112

Added line #L112 was not covered by tests
if app.settings.platform != Platform.AUTO:
return app.settings.platform

Check warning on line 114 in akagi_backend/akagi_ng/application.py

View check run for this annotation

Codecov / codecov/patch

akagi_backend/akagi_ng/application.py#L114

Added line #L114 was not covered by tests

bridge = self._get_active_bridge()
bridge_name = bridge.__class__.__name__.lower() if bridge else ""

Check warning on line 117 in akagi_backend/akagi_ng/application.py

View check run for this annotation

Codecov / codecov/patch

akagi_backend/akagi_ng/application.py#L116-L117

Added lines #L116 - L117 were not covered by tests
if "tenhou" in bridge_name:
return Platform.TENHOU

Check warning on line 119 in akagi_backend/akagi_ng/application.py

View check run for this annotation

Codecov / codecov/patch

akagi_backend/akagi_ng/application.py#L119

Added line #L119 was not covered by tests
if "riichi" in bridge_name:
return Platform.RIICHI_CITY

Check warning on line 121 in akagi_backend/akagi_ng/application.py

View check run for this annotation

Codecov / codecov/patch

akagi_backend/akagi_ng/application.py#L121

Added line #L121 was not covered by tests
if "amatsuki" in bridge_name:
return Platform.AMATSUKI
return Platform.MAJSOUL

Check warning on line 124 in akagi_backend/akagi_ng/application.py

View check run for this annotation

Codecov / codecov/patch

akagi_backend/akagi_ng/application.py#L123-L124

Added lines #L123 - L124 were not covered by tests

def _get_latest_operation_list(self) -> list[dict]:
bridge = self._get_active_bridge()
operation_list = getattr(bridge, "latest_self_operation_list", [])
return list(operation_list) if isinstance(operation_list, list) else []

Check warning on line 129 in akagi_backend/akagi_ng/application.py

View check run for this annotation

Codecov / codecov/patch

akagi_backend/akagi_ng/application.py#L127-L129

Added lines #L127 - L129 were not covered by tests

def _get_latest_operation_step(self) -> int | None:
bridge = self._get_active_bridge()
step = getattr(bridge, "latest_operation_step", None)
return int(step) if step is not None else None

Check warning on line 134 in akagi_backend/akagi_ng/application.py

View check run for this annotation

Codecov / codecov/patch

akagi_backend/akagi_ng/application.py#L132-L134

Added lines #L132 - L134 were not covered by tests

def _build_autoplay_runtime(self) -> AutoPlayRuntime:
app = get_app_context()
return AutoPlayRuntime(

Check warning on line 138 in akagi_backend/akagi_ng/application.py

View check run for this annotation

Codecov / codecov/patch

akagi_backend/akagi_ng/application.py#L137-L138

Added lines #L137 - L138 were not covered by tests
platform=self._detect_autoplay_platform(),
window_keyword=app.settings.autoplay.window_keyword,
get_operation_list=self._get_latest_operation_list,
get_operation_step=self._get_latest_operation_step,
)

def _handle_message(
self, msg: AkagiEvent, tracker: StateTrackerProtocol | None, controller: ControllerProtocol | None
) -> tuple[str | None, bool, bool]:
"""统一处理消息分发的 match-case 逻辑。

Returns:
(notification, handled, is_sync)
"""
match msg:
# 1. 纯系统级别的管理事件 (不流向 Game Logic)
case SystemShutdownEvent():
logger.info("Received shutdown signal.")
self.stop()
return None, True, False
case SystemEvent(code=code):
return code, True, False

# 2. 属于 Game Logic / MJAI 范畴的协议事件
case MJAIEventBase(sync=is_sync):
pass
case _:
Expand All @@ -139,115 +161,86 @@
controller.react(msg)
if tracker:
tracker.react(msg)
if self.autoplay and isinstance(msg, MJAIEventBase):
self.autoplay.observe_event(msg)

Check warning on line 165 in akagi_backend/akagi_ng/application.py

View check run for this annotation

Codecov / codecov/patch

akagi_backend/akagi_ng/application.py#L165

Added line #L165 was not covered by tests
return None, False, is_sync

def _process_event(
self, msg: AkagiEvent, tracker: StateTrackerProtocol | None, controller: ControllerProtocol | None
) -> ProcessResult:
"""
处理单条 MJAI 消息
这是 Reactor 模式的 PROCESS 阶段
"""
response: MJAIResponse | None = None
notifications: list[Notification] = []
is_sync = False

try:
msg_code, handled, is_sync = self._handle_message(msg, tracker, controller)

# 收集结果:决策响应(从 Controller 拉取)
if controller and not handled:
response = controller.last_response

if msg_code:
notifications.append(Notification(code=msg_code))

# 每一条消息处理后,统一从 Context 中采集当前累积的标志
if not handled and self.status and self.status.flags:
notifications.extend(Notification(code=code) for code in self.status.flags)
self.status.clear_flags()

except Exception:
logger.exception(f"Unexpected error processing MJAI message: {msg}")

return ProcessResult(
response=response,
notifications=notifications,
is_sync=is_sync,
)
return ProcessResult(response=response, notifications=notifications, is_sync=is_sync)

def _emit_outputs(self, result: ProcessResult, tracker: StateTrackerProtocol | None):
"""
将处理结果发送到 DataServer
这是 Reactor 模式的 OUTPUT 阶段
"""
if notifications := result.notifications:
self.ds.send_notifications(notifications)

# 同步期间屏蔽推荐输出,仅保留通知发送。
if result.is_sync:
if result.is_sync or tracker is None:
return

response = result.response or MJAIResponse(type="none")
payload = tracker.build_recommendations(response)
if self.autoplay:
self.autoplay.execute(result.response, tracker)

Check warning on line 201 in akagi_backend/akagi_ng/application.py

View check run for this annotation

Codecov / codecov/patch

akagi_backend/akagi_ng/application.py#L201

Added line #L201 was not covered by tests

if payload:
self.ds.send_recommendations(payload)

def run(self) -> int:
"""
使用 Reactor 模式的主应用循环。

循环分三个阶段:
1. message_queue.get() - 从事件队列收集消息
2. _process_event() - 处理消息并生成响应
3. _emit_outputs() - 发送结果到 DataServer
"""
# 启动主循环
logger.info("Starting main loop...")
# 捕获引用以减少全局上下文访问
app = get_app_context()
tracker = app.state_tracker
controller = app.controller

try:
while not self._stop_event.is_set():
# 阶段 1:INPUT - 从事件队列获取消息 (阻塞、100ms超时)
try:
msg = self.message_queue.get(block=True, timeout=ServerConstants.MAIN_LOOP_POLL_TIMEOUT_SECONDS)
except queue.Empty:
continue

try:
# 阶段 2:PROCESS - 处理事件
result = self._process_event(msg, tracker, controller)

# 阶段 3:OUTPUT - 分发结果
self._emit_outputs(result, tracker)

except Exception as e:
logger.exception(f"Critical error in main loop dispatch: {e}")
self._stop_event.wait(1.0)

finally:
self.cleanup()

return 0

def cleanup(self):
"""清理资源并记录详细的关闭日志"""
logger.info("Stopping Akagi-NG...")
app = get_app_context()

# 停止消息源
if self.autoplay:
self.autoplay.stop()

Check warning on line 235 in akagi_backend/akagi_ng/application.py

View check run for this annotation

Codecov / codecov/patch

akagi_backend/akagi_ng/application.py#L235

Added line #L235 was not covered by tests

for source in filter(None, (app.mitm_client, app.electron_client)):
try:
logger.info(f"Stopping {source.__class__.__name__}...")
source.stop()
except Exception as e:
logger.error(f"Error stopping {source.__class__.__name__}: {e}")

# 停止 DataServer
if self.ds:
try:
logger.info("Stopping DataServer...")
Expand Down
3 changes: 3 additions & 0 deletions akagi_backend/akagi_ng/autoplay/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from akagi_ng.autoplay.manager import AutoPlayManager, AutoPlayRuntime

__all__ = ["AutoPlayManager", "AutoPlayRuntime"]
Loading
Loading