Skip to content

Commit

Permalink
Changes to how DeviceLinks are supervised
Browse files Browse the repository at this point in the history
- Partition supervisor for the DynamicSupervisor, with thousands of
  devices connected the single DynamicSupervisor could get overloaded,
  partition it to help reduce processing changing state quickly
- Start a DeviceLink with the ID only, to help reduce state inside the
  supervisor, only store the device's ID in the supervisor state (since
  it's part of the child_spec that's stored in the supervisor state)
- Remove the supervisor module to simplify, put the one function that
  matters in the DeviceLink module directly
- Set to transient for restart since we do want the DeviceLink to
  terminate and get cleared out of the child_spec state
  • Loading branch information
oestrich committed Jan 16, 2024
1 parent ebd21ae commit e298351
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 41 deletions.
3 changes: 2 additions & 1 deletion lib/nerves_hub/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ defmodule NervesHub.Application do
{Task.Supervisor, name: NervesHub.TaskSupervisor},
{Oban, Application.fetch_env!(:nerves_hub, Oban)},
NervesHub.Tracker,
NervesHub.Devices.Supervisor
{PartitionSupervisor,
child_spec: DynamicSupervisor, name: NervesHub.Devices.Supervisors}
] ++
deployments_supervisor(deploy_env()) ++
endpoints(deploy_env())
Expand Down
34 changes: 26 additions & 8 deletions lib/nerves_hub/devices/device_link.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ defmodule NervesHub.Devices.DeviceLink do
e.g. websockets, MQTT, etc
"""

use GenServer
# NOTE: revisit this restart strategy as we add in more
# transport layers for devices, such as MQTT
use GenServer, restart: :transient

alias NervesHub.Archives
alias NervesHub.AuditLogs
Expand Down Expand Up @@ -35,8 +37,8 @@ defmodule NervesHub.Devices.DeviceLink do
end

@spec start_link(Device.t()) :: GenServer.on_start()
def start_link(device) do
GenServer.start_link(__MODULE__, device, name: name(device))
def start_link(device_id) do
GenServer.start_link(__MODULE__, device_id, name: name(device_id))
end

@spec name(Device.t() | pos_integer()) ::
Expand Down Expand Up @@ -95,7 +97,7 @@ defmodule NervesHub.Devices.DeviceLink do
link =
case whereis(device) do
nil ->
{:ok, pid} = NervesHub.Devices.Supervisor.start_device(device)
{:ok, pid} = start_device(device)
pid

link ->
Expand All @@ -118,6 +120,19 @@ defmodule NervesHub.Devices.DeviceLink do
GenServer.call(link, {:connect, push_cb, params, monitor, :ctx})
end

defp start_device(device) do
case GenServer.whereis(name(device)) do
nil ->
DynamicSupervisor.start_child(
{:via, PartitionSupervisor, {NervesHub.Devices.Supervisors, self()}},
{__MODULE__, device.id}
)

pid when is_pid(pid) ->
{:ok, pid}
end
end

@doc """
Mark device as disconnected
Expand All @@ -137,12 +152,14 @@ defmodule NervesHub.Devices.DeviceLink do
end

@impl GenServer
def init(device) do
{:ok, %State{device: device}, {:continue, :boot}}
def init(device_id) do
{:ok, %State{}, {:continue, {:boot, device_id}}}
end

@impl GenServer
def handle_continue(:boot, %{device: device} = state) do
def handle_continue({:boot, device_id}, state) do
device = Devices.get_device(device_id)

ref_id = Base.encode32(:crypto.strong_rand_bytes(2), padding: false)

deployment_channel =
Expand All @@ -163,7 +180,8 @@ defmodule NervesHub.Devices.DeviceLink do
updating: false
})

{:noreply, %{state | deployment_channel: deployment_channel, reference_id: ref_id}}
{:noreply,
%{state | device: device, deployment_channel: deployment_channel, reference_id: ref_id}}
end

@impl GenServer
Expand Down
24 changes: 0 additions & 24 deletions lib/nerves_hub/devices/supervisor.ex

This file was deleted.

24 changes: 16 additions & 8 deletions test/nerves_hub/devices/device_link_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,24 @@ defmodule NervesHub.DeviceLinkTest do
alias NervesHub.Tracker
alias Phoenix.Socket.Broadcast

test "device without deployment subscribes deployment:none" do
state = %DeviceLink.State{device: %Device{id: 1, deployment_id: nil}}
assert {:noreply, updated} = DeviceLink.handle_continue(:boot, state)
test "device without deployment subscribes deployment:none", context do
%{device: %{id: id}} = create_device(context)
assert {:noreply, updated} = DeviceLink.handle_continue({:boot, id}, %DeviceLink.State{})
assert updated.deployment_channel == "deployment:none"
end

test "device with deployment subscribes deployment:\#{id}" do
state = %DeviceLink.State{device: %Device{id: 1, deployment_id: 1}}
assert {:noreply, updated} = DeviceLink.handle_continue(:boot, state)
assert updated.deployment_channel == "deployment:1"
test "device with deployment subscribes deployment:\#{id}", context do
%{device: device, deployment: deployment} = create_device(context)

device =
device
|> Ecto.Changeset.change(%{deployment_id: deployment.id})
|> NervesHub.Repo.update!()

assert {:noreply, updated} =
DeviceLink.handle_continue({:boot, device.id}, %DeviceLink.State{})

assert updated.deployment_channel == "deployment:#{deployment.id}"
end

describe "connect/4" do
Expand Down Expand Up @@ -531,7 +539,7 @@ defmodule NervesHub.DeviceLinkTest do
end

defp start_device_link(context) do
link = start_supervised!({DeviceLink, context.device}, restart: :temporary)
link = start_supervised!({DeviceLink, context.device.id}, restart: :temporary)
Mox.allow(NervesHub.UploadMock, self(), link)
Map.put(context, :link, link)
end
Expand Down

0 comments on commit e298351

Please sign in to comment.