refactor(UpcomingDeparturesLive): subscribe to data updates from a GenServer#3228
refactor(UpcomingDeparturesLive): subscribe to data updates from a GenServer#3228thecristen wants to merge 15 commits into
Conversation
joshlarson
left a comment
There was a problem hiding this comment.
Love this change! And poking around with it running, I tried to get it to break, and couldn't, so this feels very promisigin! I left a few code-style suggestions and questions, but none of them are blocking.
The thing that is blocking approval for me is the fact that something about this change caused half of the tests ScheduleFinderLiveTest to become super flaky - its pass rate is under 50%! I would be okay with just tagging the flaky tests as @flaky for now, or digging in a bit deeper to see what's actually going on with them.
Weirdly, the new tests are totally fine.
| Predictions.Supervisor, | ||
| Alerts.BusStopChangeSupervisor, | ||
| Alerts.CacheSupervisor, | ||
| {DynamicSupervisor, name: UpcomingDeparturesSupervisor}, |
There was a problem hiding this comment.
Suggestion (non-blocking): I know it doesn't really matter (hence non-blocking), but it was confusing me a bit that the supervisor wasn't namespaced where I was expecting Dotcom.UpcomingDepartures stuff to be.
| {DynamicSupervisor, name: UpcomingDeparturesSupervisor}, | |
| {DynamicSupervisor, name: Dotcom.UpcomingDepartures.Supervisor}, |
Also it kind of blows my mind that you can basically invent a module like this on the fly 🤯.
|
|
||
| send(self(), :refresh) | ||
|
|
||
| {:ok, %{departures_fn: departures_fn, topic: topic, subscribers: MapSet.new([])}} |
There was a problem hiding this comment.
Oooh I like departures_fn! It feels a lot more elegant than having route, stop_id, etc each tracked as part of state.
| end | ||
|
|
||
| defp topic_name(%{route_id: route_id, direction_id: direction_id, stop_id: stop_id}) do | ||
| {:ok, "departures:#{route_id}:#{direction_id}:#{stop_id}"} |
There was a problem hiding this comment.
Suggestion: Since this always returns {:ok, String.t()}, and all of its consumers know that and only handle that case, can we get rid of the :ok part, and just return the string?
|
|
||
| @impl GenServer | ||
| def init(topic) do | ||
| [_departures, route_id, direction_id, stop_id] = String.split(topic, ":") |
There was a problem hiding this comment.
Suggestion (non-blocking): Instead of constructing the topic string in UpcomingDepartures.subscribe/1 and then having to parse that string here, what if we passed the params in here, and constructed the topic by calling UpcomingDepartures.topic/1 from here.
That way, we wouldn't have to parse a string that we just constructed, and could instead just rely on the data that's already structured.
Suggestion 2/ Question: Given that the Server already knows who all of its subscribers are, do we actually need a PubSub, or could the Server just send messages to each of its subscribers directly?
If we got rid of the PubSub, then we wouldn't need to worry about topic strings, since the Server would just send messages directly to the relevant PID's, and my suggestion above wouldn't be relevant anymore.
(Another benefit of removing PubSub here is that then the subscriber only needs to handle one type of message, whereas now it needs to handle both PubSub-style messages and the plain-old :subscribed ones.)
There was a problem hiding this comment.
- I think that works fine! updated in fa4da52
There was a problem hiding this comment.
- It does work without the PubSub (one of my many iterations did just that)! Some aspects to mull over:
- (This one's the main one I consider important here) If we stored subscribers in the
Serverstate, we'd lose them in the event of aServerrestart (unless we did our own effort to save them somewhere else and restore them or something) - As far as I understand PubSub's pretty dang optimized and performant for what we're using it for here, as opposed to having to loop through however many subscribers in a given GenServer process (I know the non-PubSub version seems to perform okay enough, though)
- PubSub handles clearing dead processes from its subscribers without any work on our part
- If we ever dared create a setup where our nodes could talk to each other, PubSub would just work across them
| assert ^pid = GenServer.whereis({:global, topic}) | ||
| Process.exit(pid, :kill) | ||
| # need some time for it to restart | ||
| Process.sleep(100) |
There was a problem hiding this comment.
Question (non-blocking): Is there a way to do some kind of exponential backoff to check this at, say, 1ms, 10ms, and 100ms? I only ask because the Process.sleep introduces noticeable lag when running this test file.
It's not a huge deal for now - if it's not trivial, then I'd advocate for a follow-up investigation task, rather than blocking this PR on it.
There was a problem hiding this comment.
Adding a test helper function for this! 88dc7a0
| route: route, | ||
| stop_id: stop_id | ||
| }) do | ||
| now = @date_time.now() |
There was a problem hiding this comment.
Question/Suggestion (non-blocking): I don't oppose this on principle, but I thought we were migrating UpcomingDepartures.Processor towards being purely functional (e.g. a future task is passing predicted_schedules in as an argument, rather than calculating it in upcoming_departures/1). So I'm wondering if moving now into this function is something that we're going to wind up undoing in the next PR?
There was a problem hiding this comment.
I'm not going to undo it in the next PR, but maybe we might in the PR after that?
| def handle_cast({:subscribe, caller_pid}, state) do | ||
| Logger.notice("subscribing #{inspect(caller_pid)} to #{state.topic}") | ||
| new_state = add_subscriber(caller_pid, state) | ||
| send(caller_pid, {:subscribed, state.departures_fn.()}) |
There was a problem hiding this comment.
Suggestion: The LiveView that receives this update doesn't have to do anything special due to this being a :subscribed message, but they do need to know that it's upcoming departures.
(Imagine if a LiveView subscribed to more than one thing, but just got messages called :subscribe from each of them 😅)
| send(caller_pid, {:subscribed, state.departures_fn.()}) | |
| send(caller_pid, {:upcoming_departures, state.departures_fn.()}) |
Scope
Asana Ticket: 📅🔎🎭 Extract upcoming departures calculation from LiveView into a separate module (update: and 📅🔎🎭 De-duplicate Upcoming Departures GenServers with commit f46c55a)
The linked ticket describes the background and prior art nicely!
Implementation
Warning
There are a few other changes bundled into here, so I'll describe it commit-by-commit.
These changes felt needed after I ran into many surprises testing this PR. Because this PR makes
UpcomingDeparturesLivereceive data from another process, and tests run in their own process, and LiveView renders in a separate process, and test mocks (via Mox'sexpect/4orstub/3calls) individually run in other processes.... I quickly found myself in a brittle mess of race conditions. The first two commits helped me get through that mess.4722e98 refactor(UpcomingDeparturesLive): use behaviour
Tip
This makes
upcoming_departures/1able to be mocked in tests.This section of the Mox documentation describes a technique to prevent the test from finishing executing before it has a chance to call the mock and meet the expectations. It basically entails sending a message from the
expect/4function to the test, and asserting on receiving said message in the test.The
upcoming_departures/1function internally makes several different calls to the MBTA V3 API, which we can dutifully mock in the tests. But it proved hard to use this test technique with so many different mocks!I went back and forth on whether to make
upcoming_departures/1itself mockable or not. For tests which call code which callsupcoming_departures/1:now()mocks to produce the same output even if we change theupcoming_departures/1internalsupcoming_departures/1output (because of the addition of the Behaviour), and we are free to change the internals without adjusting other mocks. It doesn't capture how particular predictions/schedules/now()states create certain outputs (though that's covered directly in the function's own tests)I'm getting rambly. I decided to mock it, so that I'd only have one thing to mock for the associated LiveView's tests, and one place to fix Mox's treatment.
81826c6 refactor(UpcomingDepartures.Processor) inline date
Tip
This means we don't have to mock
Dotcom.Utils.DateTime.now()every single time we deal with callingupcoming_departures/1in tests.We were always calling
now/0right before callingupcoming_departures/1anyways, so I figure it doesn't make any particular performance impact to make that call from inside ofupcoming_departures/1🤷🏼♀️27c46de & 6da3ee7 feat(UpcomingDepartures.Server): add worker process + fix(UpcomingDeparturesLive): unbreak all the tests
Tip
This is the main part! Instead of calling
upcoming_departures/1in a LiveView, move that to a separate process.About
Dotcom.UpcomingDepartures.ServerIt's a
GenServerthat's configured to be a long-lived process handling sending upcoming departures to other processes!Phoenix.PubSub(via theDotcomWeb.Endpointsubscribe/1,unsubscribe/1, andbroadcast/3functions) ."departures:Orange:1:place-bbsta". (Phoenix.PubSubrequires the topic to be a string)Dotcom.UpcomingDepartures.Serverprocess is started for each unique topic.My theory was that because subscribing/unsubscribing to a topic is decoupled from the
Dotcom.UpcomingDepartures.Serverprocess, ifDotcom.UpcomingDepartures.Serverhappens to crash/ and get restarted, it could quickly resume broadcasting to subscribers sincePhoenix.Pubsubwill still know who's subscribed to which topic.However that'd mess up our subscriber count tracking!
So I also added some logic to the
Dotcom.UpcomingDepartures.Serverterminate/2callback, to send another broadcast to the topic indicating it's been terminated. InUpcomingDeparturesLivewe can detect that event and clear the results from the screen before the data gets stale (here it should switch to "There was a problem loading upcoming departures").Screenshots
There should be no visual changes from this! Compare side-to-side with a prod or staging environment of your choice.
How to test
I added some logging so we could see what's going on. We could remove this before going live if we'd like, as it'd get quite noisy :)

I hadn't quite gotten so far in my own testing, but I'd love to verify that terminating the server produces the expected output, I might probably come back and add that tomorrow.