Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 22 additions & 35 deletions app/Jobs/IngestActivity.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use Carbon\CarbonImmutable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
use Illuminate\Support\Collection;

final class IngestActivity implements ShouldQueue
{
Expand All @@ -31,9 +32,10 @@ public function handle(): void
{
$events = $this->activity->events;

// Group events by URL and bucket to process them together
collect($events)
->each(function (Event $event): void {
$path = $this->urlToPath($event->payload['url']);
->groupBy(fn (Event $event) => $this->urlToPath($event->payload['url']))
->each(function (Collection $urlEvents, string $path): void {
$bucket = $this->bucket->setTime($this->bucket->hour, 0, 0);

/** @var Page $page */
Expand All @@ -45,48 +47,33 @@ public function handle(): void
'average_time' => 0,
]);

match ($event->type) {
EventType::View => $this->handleView($page),
EventType::ViewDuration => $this->handleViewDuration($page, $event),
};
// Count views and sum durations
$viewCount = $urlEvents->filter(fn (Event $event) => $event->type === EventType::View)->count();
$durationCollection = $urlEvents
->filter(fn (Event $event) => $event->type === EventType::ViewDuration)
->map(fn (Event $event) => (int) $event->payload['seconds']);

// Update page with aggregated stats
$this->updatePageStats($page, $viewCount, $durationCollection);
});

$this->activity->delete();
}

/**
* Handle the view event.
*/
private function handleView(Page $page): void
{
$averageTime = $page->average_time;
$views = $page->views + 1;

$page->update([
'views' => $views,
'average_time' => $averageTime === 0
? 0
: $averageTime / $views,
]);
}

/**
* Handle the view duration event.
*/
private function handleViewDuration(Page $page, Event $event): void
private function updatePageStats(Page $page, int $newViews, Collection $durationCollection): void
{
$views = $page->views;
$oldViews = $page->views;
$oldAverage = $page->average_time;
$totalViews = $oldViews + $newViews;

if ($views === 0) {
return;
}
// Calculate new average time
$newAverage = $totalViews > 0
? (($oldAverage * $oldViews) + $durationCollection->sum()) / $totalViews
: 0;

$averageTime = $page->average_time;
$seconds = (int) $event->payload['seconds'];
$page->update([
'average_time' => $averageTime === 0
? $seconds
: ($averageTime * $views + $seconds) / $views,
'views' => $totalViews,
'average_time' => $newAverage,
]);
}

Expand Down