Files
Xboard/app/WebSocket/NodeEventHandlers.php
yinjianm 1739f7a2f9 feat(api): 新增节点流量悬浮详情与即时自动上线同步
为 server/manage/getNodes 返回节点级今日、本月与累计流量统计,
并在节点管理页名称悬浮层展示上行、下行和合计流量。

同时为自动上线补齐单节点同步入口,在管理端保存、
批量更新以及 REST/WS 心跳后立即同步 show 状态,
避免复制节点后开启自动上线仍需等待定时任务。

另优化管理端前端 Docker 发布流程,默认仅构建 amd64,
并收敛 BuildKit 缓存导出以缩短发布时间
2026-04-28 16:51:35 +08:00

145 lines
4.1 KiB
PHP

<?php
namespace App\WebSocket;
use App\Models\Server;
use App\Services\DeviceStateService;
use App\Services\NodeRegistry;
use App\Services\ServerService;
use Illuminate\Support\Facades\Cache;
use Illuminate\Support\Facades\Log;
use Illuminate\Support\Facades\Redis;
use Workerman\Connection\TcpConnection;
class NodeEventHandlers
{
/**
* Handle pong heartbeat
*/
public static function handlePong(TcpConnection $conn, int $nodeId, array $data = []): void
{
Cache::put("node_ws_alive:{$nodeId}", true, 86400);
}
/**
* Handle node status update
*/
public static function handleNodeStatus(TcpConnection $conn, int $nodeId, array $data): void
{
$node = Server::find($nodeId);
if (!$node) return;
ServerService::touchNode($node);
ServerService::updateMetrics($node, $data);
Log::debug("[WS] Node#{$nodeId} status updated");
}
/**
* Handle device report from node
*
* 数据格式: {"event": "report.devices", "data": {userId: [ip1, ip2, ...], ...}}
*/
public static function handleDeviceReport(TcpConnection $conn, int $nodeId, array $data): void
{
$service = app(DeviceStateService::class);
if (isset($data['devices']) && is_array($data['devices'])) {
$data = $data['devices'];
}
// Get old data
$oldDevices = $service->getNodeDevices($nodeId);
// Calculate diff
$removedUsers = array_diff_key($oldDevices, $data);
$newDevices = [];
foreach ($data as $userId => $ips) {
if (is_numeric($userId) && is_array($ips)) {
$newDevices[(int) $userId] = $ips;
}
}
// Handle removed users
foreach ($removedUsers as $userId => $ips) {
$service->removeNodeDevices($nodeId, $userId);
$service->notifyUpdate($userId);
}
// Handle new/updated users
foreach ($newDevices as $userId => $ips) {
$service->setDevices($userId, $nodeId, $ips);
}
// Mark for push
Redis::sadd('device:push_pending_nodes', $nodeId);
Log::debug("[WS] Node#{$nodeId} synced " . count($newDevices) . " users, removed " . count($removedUsers));
}
/**
* Handle device state request from node
*/
public static function handleDeviceRequest(TcpConnection $conn, int $nodeId, array $data = []): void
{
$node = Server::find($nodeId);
if (!$node) return;
$users = ServerService::getAvailableUsers($node);
$userIds = $users->pluck('id')->toArray();
$service = app(DeviceStateService::class);
$devices = $service->getUsersDevices($userIds);
NodeRegistry::send($nodeId, 'sync.devices', [
'users' => $devices,
]);
Log::debug("[WS] Node#{$nodeId} requested devices, sent " . count($devices) . " users");
}
/**
* Push device state to node
*/
public static function pushDeviceStateToNode(int $nodeId, DeviceStateService $service): void
{
$node = Server::find($nodeId);
if (!$node) return;
$users = ServerService::getAvailableUsers($node);
$userIds = $users->pluck('id')->toArray();
$devices = $service->getUsersDevices($userIds);
NodeRegistry::send($nodeId, 'sync.devices', [
'users' => $devices
]);
Log::debug("[WS] Pushed device state to node#{$nodeId}: " . count($devices) . " users");
}
/**
* Push full config + users to newly connected node
*/
public static function pushFullSync(TcpConnection $conn, Server $node): void
{
$nodeId = (int) $node->id;
// Push config
$config = ServerService::buildNodeConfig($node);
NodeRegistry::send($nodeId, 'sync.config', [
'config' => $config,
]);
// Push users
$users = ServerService::getAvailableUsers($node)->toArray();
NodeRegistry::send($nodeId, 'sync.users', [
'users' => $users,
]);
Log::info("[WS] Full sync pushed to node#{$nodeId}", [
'users' => count($users),
]);
}
}