Merge remote-tracking branch 'upstream/master'
# Conflicts: # app/Services/UserOnlineService.php # public/assets/admin
This commit is contained in:
@@ -3,12 +3,14 @@
|
||||
|
||||
namespace App\Jobs;
|
||||
|
||||
use App\Models\Server;
|
||||
use App\Models\StatServer;
|
||||
use Illuminate\Bus\Queueable;
|
||||
use Illuminate\Contracts\Queue\ShouldQueue;
|
||||
use Illuminate\Foundation\Bus\Dispatchable;
|
||||
use Illuminate\Queue\InteractsWithQueue;
|
||||
use Illuminate\Queue\SerializesModels;
|
||||
use Illuminate\Support\Carbon;
|
||||
use Illuminate\Support\Facades\DB;
|
||||
use Illuminate\Support\Facades\Log;
|
||||
|
||||
@@ -59,12 +61,23 @@ class StatServerJob implements ShouldQueue
|
||||
|
||||
try {
|
||||
$this->processServerStat($u, $d, $recordAt);
|
||||
$this->updateServerTraffic($u, $d);
|
||||
} catch (\Exception $e) {
|
||||
Log::error('StatServerJob failed for server ' . $this->server['id'] . ': ' . $e->getMessage());
|
||||
throw $e;
|
||||
}
|
||||
}
|
||||
|
||||
protected function updateServerTraffic(int $u, int $d): void
|
||||
{
|
||||
DB::table('v2_server')
|
||||
->where('id', $this->server['id'])
|
||||
->incrementEach(
|
||||
['u' => $u, 'd' => $d],
|
||||
['updated_at' => Carbon::now()]
|
||||
);
|
||||
}
|
||||
|
||||
protected function processServerStat(int $u, int $d, int $recordAt): void
|
||||
{
|
||||
$driver = config('database.default');
|
||||
|
||||
@@ -90,8 +90,8 @@ class StatUserJob implements ShouldQueue
|
||||
|
||||
if ($existingRecord) {
|
||||
$existingRecord->update([
|
||||
'u' => $existingRecord->u + ($v[0] * $this->server['rate']),
|
||||
'd' => $existingRecord->d + ($v[1] * $this->server['rate']),
|
||||
'u' => $existingRecord->u + intval($v[0] * $this->server['rate']),
|
||||
'd' => $existingRecord->d + intval($v[1] * $this->server['rate']),
|
||||
'updated_at' => time(),
|
||||
]);
|
||||
} else {
|
||||
@@ -102,8 +102,8 @@ class StatUserJob implements ShouldQueue
|
||||
'server_type' => $serverType,
|
||||
'record_at' => $recordAt,
|
||||
'record_type' => $this->recordType,
|
||||
'u' => ($v[0] * $this->server['rate']),
|
||||
'd' => ($v[1] * $this->server['rate']),
|
||||
'u' => intval($v[0] * $this->server['rate']),
|
||||
'd' => intval($v[1] * $this->server['rate']),
|
||||
'created_at' => time(),
|
||||
'updated_at' => time(),
|
||||
]);
|
||||
@@ -124,8 +124,8 @@ class StatUserJob implements ShouldQueue
|
||||
'server_type' => $serverType,
|
||||
'record_at' => $recordAt,
|
||||
'record_type' => $this->recordType,
|
||||
'u' => ($v[0] * $this->server['rate']),
|
||||
'd' => ($v[1] * $this->server['rate']),
|
||||
'u' => intval($v[0] * $this->server['rate']),
|
||||
'd' => intval($v[1] * $this->server['rate']),
|
||||
'created_at' => time(),
|
||||
'updated_at' => time(),
|
||||
],
|
||||
|
||||
@@ -8,6 +8,7 @@ use Illuminate\Contracts\Queue\ShouldQueue;
|
||||
use Illuminate\Foundation\Bus\Dispatchable;
|
||||
use Illuminate\Queue\InteractsWithQueue;
|
||||
use Illuminate\Queue\SerializesModels;
|
||||
use Illuminate\Support\Facades\Redis;
|
||||
|
||||
class TrafficFetchJob implements ShouldQueue
|
||||
{
|
||||
@@ -19,11 +20,6 @@ class TrafficFetchJob implements ShouldQueue
|
||||
public $tries = 1;
|
||||
public $timeout = 20;
|
||||
|
||||
/**
|
||||
* Create a new job instance.
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function __construct(array $server, array $data, $protocol, int $timestamp)
|
||||
{
|
||||
$this->onQueue('traffic_fetch');
|
||||
@@ -35,6 +31,8 @@ class TrafficFetchJob implements ShouldQueue
|
||||
|
||||
public function handle(): void
|
||||
{
|
||||
$userIds = array_keys($this->data);
|
||||
|
||||
foreach ($this->data as $uid => $v) {
|
||||
User::where('id', $uid)
|
||||
->incrementEach(
|
||||
@@ -45,5 +43,9 @@ class TrafficFetchJob implements ShouldQueue
|
||||
['t' => time()]
|
||||
);
|
||||
}
|
||||
|
||||
if (!empty($userIds)) {
|
||||
Redis::sadd('traffic:pending_check', ...$userIds);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,108 +0,0 @@
|
||||
<?php
|
||||
|
||||
namespace App\Jobs;
|
||||
|
||||
use App\Models\User;
|
||||
use Illuminate\Bus\Queueable;
|
||||
use Illuminate\Contracts\Queue\ShouldQueue;
|
||||
use Illuminate\Foundation\Bus\Dispatchable;
|
||||
use Illuminate\Queue\InteractsWithQueue;
|
||||
use Illuminate\Queue\SerializesModels;
|
||||
use Illuminate\Support\Facades\Cache;
|
||||
use App\Services\UserOnlineService;
|
||||
use Illuminate\Support\Facades\Log;
|
||||
|
||||
class UserAliveSyncJob implements ShouldQueue
|
||||
{
|
||||
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
|
||||
|
||||
private const CACHE_PREFIX = 'ALIVE_IP_USER_';
|
||||
private const CACHE_TTL = 120;
|
||||
private const NODE_DATA_EXPIRY = 100;
|
||||
|
||||
public function __construct(
|
||||
private readonly array $data,
|
||||
private readonly string $nodeType,
|
||||
private readonly int $nodeId
|
||||
) {
|
||||
$this->onQueue('user_alive_sync');
|
||||
}
|
||||
|
||||
public function handle(): void
|
||||
{
|
||||
try {
|
||||
$updateAt = time();
|
||||
$nowTs = time();
|
||||
$now = now();
|
||||
$nodeKey = $this->nodeType . $this->nodeId;
|
||||
$userUpdates = [];
|
||||
|
||||
foreach ($this->data as $uid => $ips) {
|
||||
$cacheKey = self::CACHE_PREFIX . $uid;
|
||||
$ipsArray = Cache::get($cacheKey, []);
|
||||
$ipsArray = [
|
||||
...collect($ipsArray)
|
||||
->filter(fn(mixed $value): bool => is_array($value) && ($updateAt - ($value['lastupdateAt'] ?? 0) <= self::NODE_DATA_EXPIRY)),
|
||||
$nodeKey => [
|
||||
'aliveips' => $ips,
|
||||
'lastupdateAt' => $updateAt,
|
||||
],
|
||||
];
|
||||
|
||||
$count = UserOnlineService::calculateDeviceCount($ipsArray);
|
||||
$ipsArray['alive_ip'] = $count;
|
||||
Cache::put($cacheKey, $ipsArray, now()->addSeconds(self::CACHE_TTL));
|
||||
|
||||
$userUpdates[] = [
|
||||
'id' => (int) $uid,
|
||||
'count' => (int) $count,
|
||||
];
|
||||
}
|
||||
|
||||
if (!empty($userUpdates)) {
|
||||
$allIds = collect($userUpdates)
|
||||
->pluck('id')
|
||||
->filter()
|
||||
->map(fn($v) => (int) $v)
|
||||
->unique()
|
||||
->values()
|
||||
->all();
|
||||
|
||||
if (!empty($allIds)) {
|
||||
$existingIds = User::query()
|
||||
->whereIn('id', $allIds)
|
||||
->pluck('id')
|
||||
->map(fn($v) => (int) $v)
|
||||
->all();
|
||||
|
||||
if (!empty($existingIds)) {
|
||||
collect($userUpdates)
|
||||
->filter(fn($row) => in_array((int) ($row['id'] ?? 0), $existingIds, true))
|
||||
->chunk(1000)
|
||||
->each(function ($chunk) use ($now) {
|
||||
collect($chunk)->each(function ($update) use ($now) {
|
||||
$id = (int) ($update['id'] ?? 0);
|
||||
$count = (int) ($update['count'] ?? 0);
|
||||
if ($id > 0) {
|
||||
User::query()
|
||||
->whereKey($id)
|
||||
->update([
|
||||
'online_count' => $count,
|
||||
'last_online_at' => $now,
|
||||
]);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (\Throwable $e) {
|
||||
Log::error('UserAliveSyncJob failed', [
|
||||
'error' => $e->getMessage(),
|
||||
]);
|
||||
$this->fail($e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user