Skip to content

Commit f1f18f2

Browse files
committed
Run queue message processing in short-lived task processes
Broadway processors are long-lived, so their heap memory accumulates across messages. By running each message's work in a task under Task.Supervisor, the task's entire heap is freed immediately on exit and TmpDir cleanup triggers automatically via the :DOWN monitor.
1 parent aebbea4 commit f1f18f2

File tree

1 file changed

+29
-21
lines changed

1 file changed

+29
-21
lines changed

lib/preview/queue.ex

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ defmodule Preview.Queue do
1717
queue_url: url,
1818
max_number_of_messages: concurrency,
1919
wait_time_seconds: 10,
20-
visibility_timeout: 120
20+
visibility_timeout: 300
2121
},
2222
concurrency: 1
2323
],
@@ -44,16 +44,19 @@ defmodule Preview.Queue do
4444
end
4545

4646
def handle_message(%{data: %{"Records" => records}} = message) do
47-
Enum.each(records, &handle_record/1)
47+
Enum.each(records, fn record ->
48+
run_in_task(fn -> handle_record(record) end)
49+
end)
50+
4851
message
4952
end
5053

5154
def handle_message(%{data: %{"preview:sitemap" => key}} = message) do
5255
Logger.info("#{key}: start")
5356

54-
case key_components(key) do
55-
{:ok, package, version} ->
56-
try do
57+
run_in_task(fn ->
58+
case key_components(key) do
59+
{:ok, package, version} ->
5760
case extract_package(package, version) do
5861
{:ok, _dir, file_paths} ->
5962
update_package_sitemap(package, file_paths)
@@ -62,13 +65,11 @@ defmodule Preview.Queue do
6265
:error ->
6366
:ok
6467
end
65-
after
66-
Preview.TmpDir.cleanup()
67-
end
6868

69-
:error ->
70-
Logger.info("#{key}: skip")
71-
end
69+
:error ->
70+
Logger.info("#{key}: skip")
71+
end
72+
end)
7273

7374
message
7475
end
@@ -190,17 +191,13 @@ defmodule Preview.Queue do
190191
end
191192

192193
def create_package(package, version) do
193-
try do
194-
case extract_package(package, version) do
195-
{:ok, dir, file_paths} ->
196-
Preview.Bucket.put_files(package, version, dir, file_paths)
197-
{:ok, file_paths}
194+
case extract_package(package, version) do
195+
{:ok, dir, file_paths} ->
196+
Preview.Bucket.put_files(package, version, dir, file_paths)
197+
{:ok, file_paths}
198198

199-
:error ->
200-
:error
201-
end
202-
after
203-
Preview.TmpDir.cleanup()
199+
:error ->
200+
:error
204201
end
205202
end
206203

@@ -254,6 +251,17 @@ defmodule Preview.Queue do
254251
end)
255252
end
256253

254+
defp run_in_task(fun) do
255+
task = Task.Supervisor.async(Preview.Tasks, fun)
256+
257+
case Task.yield(task, :timer.seconds(270)) || Task.shutdown(task) do
258+
{:ok, result} -> result
259+
{:exit, {exception, stacktrace}} -> reraise(exception, stacktrace)
260+
{:exit, reason} -> exit(reason)
261+
nil -> raise "task timeout"
262+
end
263+
end
264+
257265
defp purge_key(package, version) do
258266
Preview.CDN.purge_key(:fastly_repo, [
259267
"preview/sitemap",

0 commit comments

Comments
 (0)