Skip to content

refactor(UpcomingDeparturesLive): subscribe to data updates from a GenServer#3228

Open
thecristen wants to merge 15 commits into
mainfrom
cbj/upcoming-departures-server
Open

refactor(UpcomingDeparturesLive): subscribe to data updates from a GenServer#3228
thecristen wants to merge 15 commits into
mainfrom
cbj/upcoming-departures-server

Conversation

@thecristen
Copy link
Copy Markdown
Collaborator

@thecristen thecristen commented Jun 3, 2026

Scope

Asana Ticket: 📅🔎🎭 Extract upcoming departures calculation from LiveView into a separate module (update: and 📅🔎🎭 De-duplicate Upcoming Departures GenServers with commit f46c55a)

The linked ticket describes the background and prior art nicely!

Implementation

Warning

There are a few other changes bundled into here, so I'll describe it commit-by-commit.

These changes felt needed after I ran into many surprises testing this PR. Because this PR makes UpcomingDeparturesLive receive data from another process, and tests run in their own process, and LiveView renders in a separate process, and test mocks (via Mox's expect/4 or stub/3 calls) individually run in other processes.... I quickly found myself in a brittle mess of race conditions. The first two commits helped me get through that mess.

4722e98 refactor(UpcomingDeparturesLive): use behaviour

Tip

This makes upcoming_departures/1 able to be mocked in tests.

This section of the Mox documentation describes a technique to prevent the test from finishing executing before it has a chance to call the mock and meet the expectations. It basically entails sending a message from the expect/4 function to the test, and asserting on receiving said message in the test.

The upcoming_departures/1 function internally makes several different calls to the MBTA V3 API, which we can dutifully mock in the tests. But it proved hard to use this test technique with so many different mocks!

I went back and forth on whether to make upcoming_departures/1 itself mockable or not. For tests which call code which calls upcoming_departures/1:

  • If we don't mock it, those tests can use the same predictions/schedules/now() mocks to produce the same output even if we change the upcoming_departures/1 internals
  • If we do mock it, our tests can help enforce changes to the shape of the upcoming_departures/1 output (because of the addition of the Behaviour), and we are free to change the internals without adjusting other mocks. It doesn't capture how particular predictions/schedules/now() states create certain outputs (though that's covered directly in the function's own tests)

I'm getting rambly. I decided to mock it, so that I'd only have one thing to mock for the associated LiveView's tests, and one place to fix Mox's treatment.

81826c6 refactor(UpcomingDepartures.Processor) inline date

Tip

This means we don't have to mock Dotcom.Utils.DateTime.now() every single time we deal with calling upcoming_departures/1 in tests.

We were always calling now/0 right before calling upcoming_departures/1 anyways, so I figure it doesn't make any particular performance impact to make that call from inside of upcoming_departures/1 🤷🏼‍♀️

27c46de & 6da3ee7 feat(UpcomingDepartures.Server): add worker process + fix(UpcomingDeparturesLive): unbreak all the tests

Tip

This is the main part! Instead of calling upcoming_departures/1 in a LiveView, move that to a separate process.

About Dotcom.UpcomingDepartures.Server

It's a GenServer that's configured to be a long-lived process handling sending upcoming departures to other processes!

  • The actual subscribing/unsubscribing/broadcasting-to-subscribers pieces are actually handled ultimately by Phoenix.PubSub (via the DotcomWeb.Endpoint subscribe/1, unsubscribe/1, and broadcast/3 functions) .
  • Each route/direction/stop is associated with a topic. e.g. the Orange line inbound from Back Bay is associated with the topic name "departures:Orange:1:place-bbsta". (Phoenix.PubSub requires the topic to be a string)
  • A single Dotcom.UpcomingDepartures.Server process is started for each unique topic.
    • On process startup, it stores the function we'll call to get upcoming departures, the topic name, and a list of unique subscribers (initialized to empty).
    • It handles periodically calling the stored function and broadcasting its output to the topic's subscribers
    • When someone new subscribes, it replies immediately with the current upcoming departures
    • While it does internally keep an updated list of current subscribers to a topic, it's only used for checking when we have no subscribers left. When that happens, we stop the process.

My theory was that because subscribing/unsubscribing to a topic is decoupled from the Dotcom.UpcomingDepartures.Server process, if Dotcom.UpcomingDepartures.Server happens to crash/ and get restarted, it could quickly resume broadcasting to subscribers since Phoenix.Pubsub will still know who's subscribed to which topic.

However that'd mess up our subscriber count tracking!

So I also added some logic to the Dotcom.UpcomingDepartures.Server terminate/2 callback, to send another broadcast to the topic indicating it's been terminated. In UpcomingDeparturesLive we can detect that event and clear the results from the screen before the data gets stale (here it should switch to "There was a problem loading upcoming departures").

Screenshots

There should be no visual changes from this! Compare side-to-side with a prod or staging environment of your choice.

How to test

I added some logging so we could see what's going on. We could remove this before going live if we'd like, as it'd get quite noisy :)
image

I hadn't quite gotten so far in my own testing, but I'd love to verify that terminating the server produces the expected output, I might probably come back and add that tomorrow.

@thecristen thecristen marked this pull request as ready for review June 3, 2026 21:00
@thecristen thecristen requested a review from a team as a code owner June 3, 2026 21:00
@thecristen thecristen requested a review from jlucytan June 3, 2026 21:00
Copy link
Copy Markdown
Contributor

@joshlarson joshlarson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love this change! And poking around with it running, I tried to get it to break, and couldn't, so this feels very promisigin! I left a few code-style suggestions and questions, but none of them are blocking.

The thing that is blocking approval for me is the fact that something about this change caused half of the tests ScheduleFinderLiveTest to become super flaky - its pass rate is under 50%! I would be okay with just tagging the flaky tests as @flaky for now, or digging in a bit deeper to see what's actually going on with them.

Weirdly, the new tests are totally fine.

Comment thread lib/dotcom/application.ex Outdated
Predictions.Supervisor,
Alerts.BusStopChangeSupervisor,
Alerts.CacheSupervisor,
{DynamicSupervisor, name: UpcomingDeparturesSupervisor},
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion (non-blocking): I know it doesn't really matter (hence non-blocking), but it was confusing me a bit that the supervisor wasn't namespaced where I was expecting Dotcom.UpcomingDepartures stuff to be.

Suggested change
{DynamicSupervisor, name: UpcomingDeparturesSupervisor},
{DynamicSupervisor, name: Dotcom.UpcomingDepartures.Supervisor},

Also it kind of blows my mind that you can basically invent a module like this on the fly 🤯.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in 197c9b7


send(self(), :refresh)

{:ok, %{departures_fn: departures_fn, topic: topic, subscribers: MapSet.new([])}}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oooh I like departures_fn! It feels a lot more elegant than having route, stop_id, etc each tracked as part of state.

Comment thread lib/dotcom/upcoming_departures.ex Outdated
end

defp topic_name(%{route_id: route_id, direction_id: direction_id, stop_id: stop_id}) do
{:ok, "departures:#{route_id}:#{direction_id}:#{stop_id}"}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: Since this always returns {:ok, String.t()}, and all of its consumers know that and only handle that case, can we get rid of the :ok part, and just return the string?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough! Changed in a245ad8


@impl GenServer
def init(topic) do
[_departures, route_id, direction_id, stop_id] = String.split(topic, ":")
Copy link
Copy Markdown
Contributor

@joshlarson joshlarson Jun 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion (non-blocking): Instead of constructing the topic string in UpcomingDepartures.subscribe/1 and then having to parse that string here, what if we passed the params in here, and constructed the topic by calling UpcomingDepartures.topic/1 from here.

That way, we wouldn't have to parse a string that we just constructed, and could instead just rely on the data that's already structured.


Suggestion 2/ Question: Given that the Server already knows who all of its subscribers are, do we actually need a PubSub, or could the Server just send messages to each of its subscribers directly?

If we got rid of the PubSub, then we wouldn't need to worry about topic strings, since the Server would just send messages directly to the relevant PID's, and my suggestion above wouldn't be relevant anymore.

(Another benefit of removing PubSub here is that then the subscriber only needs to handle one type of message, whereas now it needs to handle both PubSub-style messages and the plain-old :subscribed ones.)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I think that works fine! updated in fa4da52

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. It does work without the PubSub (one of my many iterations did just that)! Some aspects to mull over:
  • (This one's the main one I consider important here) If we stored subscribers in the Server state, we'd lose them in the event of a Server restart (unless we did our own effort to save them somewhere else and restore them or something)
  • As far as I understand PubSub's pretty dang optimized and performant for what we're using it for here, as opposed to having to loop through however many subscribers in a given GenServer process (I know the non-PubSub version seems to perform okay enough, though)
  • PubSub handles clearing dead processes from its subscribers without any work on our part
  • If we ever dared create a setup where our nodes could talk to each other, PubSub would just work across them

assert ^pid = GenServer.whereis({:global, topic})
Process.exit(pid, :kill)
# need some time for it to restart
Process.sleep(100)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question (non-blocking): Is there a way to do some kind of exponential backoff to check this at, say, 1ms, 10ms, and 100ms? I only ask because the Process.sleep introduces noticeable lag when running this test file.

It's not a huge deal for now - if it's not trivial, then I'd advocate for a follow-up investigation task, rather than blocking this PR on it.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a test helper function for this! 88dc7a0

route: route,
stop_id: stop_id
}) do
now = @date_time.now()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question/Suggestion (non-blocking): I don't oppose this on principle, but I thought we were migrating UpcomingDepartures.Processor towards being purely functional (e.g. a future task is passing predicted_schedules in as an argument, rather than calculating it in upcoming_departures/1). So I'm wondering if moving now into this function is something that we're going to wind up undoing in the next PR?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not going to undo it in the next PR, but maybe we might in the PR after that?

def handle_cast({:subscribe, caller_pid}, state) do
Logger.notice("subscribing #{inspect(caller_pid)} to #{state.topic}")
new_state = add_subscriber(caller_pid, state)
send(caller_pid, {:subscribed, state.departures_fn.()})
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: The LiveView that receives this update doesn't have to do anything special due to this being a :subscribed message, but they do need to know that it's upcoming departures.

(Imagine if a LiveView subscribed to more than one thing, but just got messages called :subscribe from each of them 😅)

Suggested change
send(caller_pid, {:subscribed, state.departures_fn.()})
send(caller_pid, {:upcoming_departures, state.departures_fn.()})

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed in 0a76709

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants