Merge remote-tracking branch 'upstream/master'

# Conflicts:
#	public/assets/admin
This commit is contained in:
yinjianm
2026-04-18 00:35:04 +08:00
27 changed files with 1604 additions and 354 deletions
+96 -3
View File
@@ -13,25 +13,55 @@ class NodeRegistry
/** @var array<int, TcpConnection> nodeId → connection */
private static array $connections = [];
/** @var array<int, TcpConnection> machineId → connection */
private static array $machineConnections = [];
public static function add(int $nodeId, TcpConnection $conn): void
{
// Close existing connection for this node (if reconnecting)
if (isset(self::$connections[$nodeId])) {
if (isset(self::$connections[$nodeId]) && self::$connections[$nodeId] !== $conn) {
self::$connections[$nodeId]->close();
}
self::$connections[$nodeId] = $conn;
}
public static function remove(int $nodeId): void
public static function addMachine(int $machineId, TcpConnection $conn): void
{
if (isset(self::$machineConnections[$machineId]) && self::$machineConnections[$machineId] !== $conn) {
self::$machineConnections[$machineId]->close();
}
self::$machineConnections[$machineId] = $conn;
}
/**
* Remove a node mapping only if it still points to the given connection.
* Passing null removes unconditionally (backward compat for single-node mode).
*/
public static function remove(int $nodeId, ?TcpConnection $conn = null): void
{
if ($conn !== null && isset(self::$connections[$nodeId]) && self::$connections[$nodeId] !== $conn) {
return; // already replaced by a newer connection
}
unset(self::$connections[$nodeId]);
}
public static function removeMachine(int $machineId, ?TcpConnection $conn = null): void
{
if ($conn !== null && isset(self::$machineConnections[$machineId]) && self::$machineConnections[$machineId] !== $conn) {
return;
}
unset(self::$machineConnections[$machineId]);
}
public static function get(int $nodeId): ?TcpConnection
{
return self::$connections[$nodeId] ?? null;
}
public static function getMachine(int $machineId): ?TcpConnection
{
return self::$machineConnections[$machineId] ?? null;
}
/**
* Send a JSON message to a specific node.
*/
@@ -42,6 +72,55 @@ class NodeRegistry
return false;
}
// Machine-mode connections multiplex multiple node IDs through the same
// socket, so node-scoped events must carry node_id for the client mux.
if (!empty($conn->machineNodeIds) && $event !== 'sync.nodes' && !array_key_exists('node_id', $data)) {
$data['node_id'] = $nodeId;
}
$payload = json_encode([
'event' => $event,
'data' => $data,
'timestamp' => time(),
]);
$conn->send($payload);
return true;
}
/**
* Update in-memory registry when a machine's node set changes.
* Called from the WS process when a sync.nodes event is dispatched.
*/
public static function refreshMachineNodes(int $machineId, array $newNodeIds): void
{
$conn = self::getMachine($machineId);
if (!$conn) {
return;
}
$oldNodeIds = $conn->machineNodeIds ?? [];
// Remove nodes no longer on this machine
foreach (array_diff($oldNodeIds, $newNodeIds) as $removedId) {
self::remove($removedId, $conn);
}
// Add newly assigned nodes (via add() to close any stale standalone connection)
foreach ($newNodeIds as $nodeId) {
self::add($nodeId, $conn);
}
$conn->machineNodeIds = $newNodeIds;
}
public static function sendMachine(int $machineId, string $event, array $data): bool
{
$conn = self::getMachine($machineId);
if (!$conn) {
return false;
}
$payload = json_encode([
'event' => $event,
'data' => $data,
@@ -74,4 +153,18 @@ class NodeRegistry
{
return count(self::$connections);
}
/**
* @return int[]
*/
public static function getConnectedMachineIds(): array
{
return array_keys(self::$machineConnections);
}
public static function machineCount(): int
{
return count(self::$machineConnections);
}
}
+42 -1
View File
@@ -3,6 +3,7 @@
namespace App\Services;
use App\Models\Server;
use App\Models\ServerMachine;
use App\Models\User;
use Illuminate\Support\Facades\Cache;
use Illuminate\Support\Facades\Log;
@@ -30,7 +31,6 @@ class NodeSyncService
if (!$node)
return;
self::push($nodeId, 'sync.config', ['config' => ServerService::buildNodeConfig($node)]);
}
@@ -122,6 +122,28 @@ class NodeSyncService
self::push($nodeId, 'sync.users', ['users' => $users]);
}
/**
* Notify machine that its node set has changed.
* Always publishes via Redis so the WS process can update its in-memory registry.
*/
public static function notifyMachineNodesChanged(int $machineId): void
{
$machine = ServerMachine::find($machineId);
$nodeList = [];
if ($machine) {
$nodes = ServerService::getMachineNodes($machine);
$nodeList = $nodes->map(fn($n) => [
'id' => $n->id,
'type' => $n->type,
'name' => $n->name,
])->values()->toArray();
}
// Always publish via Redis so the WS process can update its in-memory registry
self::pushMachine($machineId, 'sync.nodes', ['nodes' => $nodeList]);
}
/**
* Publish a push command to Redis — picked up by the Workerman WS server
*/
@@ -140,4 +162,23 @@ class NodeSyncService
]);
}
}
/**
* Publish a machine-level push command to Redis — picked up by the Workerman WS server
*/
public static function pushMachine(int $machineId, string $event, array $data): void
{
try {
Redis::publish('node:push', json_encode([
'machine_id' => $machineId,
'event' => $event,
'data' => $data,
]));
} catch (\Throwable $e) {
Log::warning("[NodePush] Redis machine publish failed: {$e->getMessage()}", [
'machine_id' => $machineId,
'event' => $event,
]);
}
}
}
+18 -9
View File
@@ -8,7 +8,6 @@ use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\File;
use Illuminate\Support\Facades\View;
use Illuminate\Support\Facades\Route;
use Illuminate\Support\Facades\Artisan;
use Illuminate\Support\Facades\Log;
use Illuminate\Support\Str;
@@ -219,6 +218,20 @@ class PluginManager
return $defaultValues;
}
/**
* 获取 Migrator 实例并确保迁移仓库存在
*/
protected function getMigrator(): \Illuminate\Database\Migrations\Migrator
{
$migrator = app('migrator');
if (!$migrator->repositoryExists()) {
$migrator->getRepository()->createRepository();
}
return $migrator;
}
/**
* 运行插件数据库迁移
*/
@@ -227,10 +240,8 @@ class PluginManager
$migrationsPath = $this->getPluginPath($pluginCode) . '/database/migrations';
if (File::exists($migrationsPath)) {
Artisan::call('migrate', [
'--path' => "plugins/" . Str::studly($pluginCode) . "/database/migrations",
'--force' => true
]);
$migrator = $this->getMigrator();
$migrator->run([$migrationsPath]);
}
}
@@ -242,10 +253,8 @@ class PluginManager
$migrationsPath = $this->getPluginPath($pluginCode) . '/database/migrations';
if (File::exists($migrationsPath)) {
Artisan::call('migrate:rollback', [
'--path' => "plugins/" . Str::studly($pluginCode) . "/database/migrations",
'--force' => true
]);
$migrator = $this->getMigrator();
$migrator->rollback([$migrationsPath]);
}
}
+136 -14
View File
@@ -3,10 +3,13 @@
namespace App\Services;
use App\Models\Server;
use App\Models\ServerMachine;
use App\Models\ServerRoute;
use App\Models\User;
use App\Services\Plugin\HookManager;
use App\Utils\CacheKey;
use App\Utils\Helper;
use Illuminate\Support\Facades\Cache;
use Illuminate\Support\Collection;
class ServerService
@@ -33,6 +36,17 @@ class ServerService
]);
}
/**
* 获取机器下所有已启用节点
*/
public static function getMachineNodes(ServerMachine $machine): Collection
{
return Server::where('machine_id', $machine->id)
->where('enabled', true)
->orderBy('sort', 'ASC')
->get();
}
/**
* 获取指定用户可用的服务器列表
* @param User $user
@@ -75,8 +89,12 @@ class ServerService
*/
public static function getAvailableUsers(Server $node)
{
$groupIds = $node->group_ids ?? [];
if (empty($groupIds)) {
return collect();
}
$users = User::toBase()
->whereIn('group_id', $node->group_ids)
->whereIn('group_id', $groupIds)
->whereRaw('u + d < transfer_enable')
->where(function ($query) {
$query->where('expired_at', '>=', time())
@@ -100,6 +118,100 @@ class ServerService
return $routes;
}
/**
* 处理节点流量数据汇报
*/
public static function processTraffic(Server $node, array $traffic): void
{
$data = array_filter($traffic, fn($item) =>
is_array($item) && count($item) === 2
&& is_numeric($item[0]) && is_numeric($item[1])
);
if (empty($data)) {
return;
}
$nodeType = strtoupper($node->type);
$nodeId = $node->id;
Cache::put(CacheKey::get("SERVER_{$nodeType}_ONLINE_USER", $nodeId), count($data), 3600);
Cache::put(CacheKey::get("SERVER_{$nodeType}_LAST_PUSH_AT", $nodeId), time(), 3600);
(new UserService())->trafficFetch($node, $node->type, $data);
}
/**
* 处理节点在线设备汇报
*/
public static function processAlive(int $nodeId, array $alive): void
{
$service = app(DeviceStateService::class);
foreach ($alive as $uid => $ips) {
$service->setDevices((int) $uid, $nodeId, (array) $ips);
}
}
/**
* 处理节点连接数汇报
*/
public static function processOnline(Server $node, array $online): void
{
$cacheTime = max(300, (int) admin_setting('server_push_interval', 60) * 3);
$nodeType = $node->type;
$nodeId = $node->id;
foreach ($online as $uid => $conn) {
$cacheKey = CacheKey::get("USER_ONLINE_CONN_{$nodeType}_{$nodeId}", $uid);
Cache::put($cacheKey, (int) $conn, $cacheTime);
}
}
/**
* 处理节点负载状态汇报
*/
public static function processStatus(Server $node, array $status): void
{
$nodeType = strtoupper($node->type);
$nodeId = $node->id;
$statusData = [
'cpu' => (float) ($status['cpu'] ?? 0),
'mem' => [
'total' => (int) ($status['mem']['total'] ?? 0),
'used' => (int) ($status['mem']['used'] ?? 0),
],
'swap' => [
'total' => (int) ($status['swap']['total'] ?? 0),
'used' => (int) ($status['swap']['used'] ?? 0),
],
'disk' => [
'total' => (int) ($status['disk']['total'] ?? 0),
'used' => (int) ($status['disk']['used'] ?? 0),
],
'updated_at' => now()->timestamp,
'kernel_status' => $status['kernel_status'] ?? null,
];
$cacheTime = max(300, (int) admin_setting('server_push_interval', 60) * 3);
cache([
CacheKey::get("SERVER_{$nodeType}_LOAD_STATUS", $nodeId) => $statusData,
CacheKey::get("SERVER_{$nodeType}_LAST_LOAD_AT", $nodeId) => now()->timestamp,
], $cacheTime);
}
/**
* 标记节点心跳
*/
public static function touchNode(Server $node): void
{
Cache::put(
CacheKey::get('SERVER_' . strtoupper($node->type) . '_LAST_CHECK_AT', $node->id),
time(),
3600
);
}
/**
* Update node metrics and load status
*/
@@ -129,8 +241,8 @@ class ServerService
'kernel_status' => (bool) ($metrics['kernel_status'] ?? false),
];
\Illuminate\Support\Facades\Cache::put(
\App\Utils\CacheKey::get('SERVER_' . $nodeType . '_METRICS', $nodeId),
Cache::put(
CacheKey::get('SERVER_' . $nodeType . '_METRICS', $nodeId),
$metricsData,
$cacheTime
);
@@ -166,17 +278,21 @@ class ServerService
'vmess' => [
...$baseConfig,
'tls' => (int) $protocolSettings['tls'],
'tls_settings' => $protocolSettings['tls_settings'],
'multiplex' => data_get($protocolSettings, 'multiplex'),
],
'trojan' => [
...$baseConfig,
'host' => $host,
'server_name' => $protocolSettings['server_name'],
'server_name' => data_get($protocolSettings, 'tls_settings.server_name') ?? $protocolSettings['server_name'],
'multiplex' => data_get($protocolSettings, 'multiplex'),
'tls' => (int) $protocolSettings['tls'],
'tls_settings' => match ((int) $protocolSettings['tls']) {
2 => $protocolSettings['reality_settings'],
default => null,
default => array_merge($protocolSettings['tls_settings'] ?? [], [
'server_name' => data_get($protocolSettings, 'tls_settings.server_name') ?? $protocolSettings['server_name'],
'allow_insecure' => data_get($protocolSettings, 'tls_settings.allow_insecure', $protocolSettings['allow_insecure']),
]),
},
],
'vless' => [
@@ -199,6 +315,7 @@ class ServerService
'version' => (int) $protocolSettings['version'],
'host' => $host,
'server_name' => $protocolSettings['tls']['server_name'],
'tls_settings' => $protocolSettings['tls'],
'up_mbps' => (int) $protocolSettings['bandwidth']['up'],
'down_mbps' => (int) $protocolSettings['bandwidth']['down'],
...match ((int) $protocolSettings['version']) {
@@ -216,7 +333,7 @@ class ServerService
'server_port' => (int) $serverPort,
'server_name' => $protocolSettings['tls']['server_name'],
'congestion_control' => $protocolSettings['congestion_control'],
'tls_settings' => data_get($protocolSettings, 'tls_settings'),
'tls_settings' => $protocolSettings['tls'],
'auth_timeout' => '3s',
'zero_rtt_handshake' => false,
'heartbeat' => '3s',
@@ -225,11 +342,14 @@ class ServerService
...$baseConfig,
'server_port' => (int) $serverPort,
'server_name' => $protocolSettings['tls']['server_name'],
'tls_settings' => $protocolSettings['tls'],
'padding_scheme' => $protocolSettings['padding_scheme'],
],
'socks' => [
...$baseConfig,
'server_port' => (int) $serverPort,
'tls' => (int) data_get($protocolSettings, 'tls', 0),
'tls_settings' => data_get($protocolSettings, 'tls_settings'),
],
'naive' => [
...$baseConfig,
@@ -248,16 +368,10 @@ class ServerService
'server_port' => (int) $serverPort,
'transport' => data_get($protocolSettings, 'transport', 'TCP'),
'traffic_pattern' => $protocolSettings['traffic_pattern'],
// 'multiplex' => data_get($protocolSettings, 'multiplex'),
],
default => [],
};
// $response = array_filter(
// $response,
// static fn ($value) => $value !== null
// );
if (!empty($node['route_ids'])) {
$response['routes'] = self::getRoutes($node['route_ids']);
}
@@ -270,8 +384,16 @@ class ServerService
$response['custom_routes'] = $node['custom_routes'];
}
if (!empty($node['cert_config']) && data_get($node['cert_config'],'cert_mode') !== 'none' ) {
$response['cert_config'] = $node['cert_config'];
if (!empty($node['cert_config'])) {
$certConfig = $node['cert_config'];
// Normalize: accept both "mode" and "cert_mode" from the database
if (isset($certConfig['mode']) && !isset($certConfig['cert_mode'])) {
$certConfig['cert_mode'] = $certConfig['mode'];
unset($certConfig['mode']);
}
if (data_get($certConfig, 'cert_mode') !== 'none') {
$response['cert_config'] = $certConfig;
}
}
return $response;