onQueue('stat'); $this->data = $data; $this->server = $server; $this->protocol = $protocol; $this->recordType = $recordType; } public function handle(): void { $recordAt = $this->recordType === 'm' ? strtotime(date('Y-m-01')) : strtotime(date('Y-m-d')); foreach ($this->data as $uid => $v) { try { $this->processUserStat($uid, $v, $recordAt); } catch (\Exception $e) { Log::error('StatUserJob failed for user ' . $uid . ': ' . $e->getMessage()); throw $e; } } } protected function processUserStat(int $uid, array $v, int $recordAt): void { if (config('database.default') === 'sqlite') { $this->processUserStatForSqlite($uid, $v, $recordAt); } else { $this->processUserStatForOtherDatabases($uid, $v, $recordAt); } } protected function processUserStatForSqlite(int $uid, array $v, int $recordAt): void { DB::transaction(function () use ($uid, $v, $recordAt) { $existingRecord = StatUser::where([ 'user_id' => $uid, 'server_rate' => $this->server['rate'], 'record_at' => $recordAt, 'record_type' => $this->recordType, ])->first(); if ($existingRecord) { $existingRecord->update([ 'u' => $existingRecord->u + ($v[0] * $this->server['rate']), 'd' => $existingRecord->d + ($v[1] * $this->server['rate']), 'updated_at' => time(), ]); } else { StatUser::create([ 'user_id' => $uid, 'server_rate' => $this->server['rate'], 'record_at' => $recordAt, 'record_type' => $this->recordType, 'u' => ($v[0] * $this->server['rate']), 'd' => ($v[1] * $this->server['rate']), 'created_at' => time(), 'updated_at' => time(), ]); } }, 3); } protected function processUserStatForOtherDatabases(int $uid, array $v, int $recordAt): void { StatUser::upsert( [ 'user_id' => $uid, 'server_rate' => $this->server['rate'], 'record_at' => $recordAt, 'record_type' => $this->recordType, 'u' => ($v[0] * $this->server['rate']), 'd' => ($v[1] * $this->server['rate']), 'created_at' => time(), 'updated_at' => time(), ], ['user_id', 'server_rate', 'record_at', 'record_type'], [ 'u' => DB::raw("u + VALUES(u)"), 'd' => DB::raw("d + VALUES(d)"), 'updated_at' => time(), ] ); } }