Sometimes our applications have asynchronous processes that require users to wait for results. Two examples in Groundwork are processing imagery so users can see map layers for their projects and exporting STAC catalogs of users’ imagery and labels. It’s nice for users to receive information about the status of these processes without having to refresh the page.
You can make this work with polling from a front-end application if you’re not worried that too many status checks will make your database sad. There are also plenty of third-party services specifically tailored to sending notifications or that look like notification services if you squint at them, like Intercom. In recent R&D though, I wanted to try out a different strategy. I wanted to figure out whether I could send notifications from the server using only PostgreSQL and Scala.
Here’s what the application does in GIF form:
The application backing the example above is available on GitHub.
This project used PostgreSQL as its database. The basic database schema was borrowed from the world countries database in the
doobie documentation. I chose it because it has only a few tables, it’s publicly available, and it will be familiar to anyone who’s read through the
I added a trigger on the
city table that runs a function to notify a channel matching the new city’s country code every time a row is added. The city table ends up looking like this:
pgsockets4s=# \d city Table "public.city" Column | Type | Collation | Nullable | Default -------------+-------------------+-----------+----------+--------- id | integer | | not null | name | character varying | | not null | countrycode | character(3) | | not null | district | character varying | | not null | population | integer | | not null | Indexes: "city_pkey" PRIMARY KEY, btree (id) Referenced by: TABLE "country" CONSTRAINT "country_capital_fkey" FOREIGN KEY (capital) REFERENCES city(id) Triggers: new_city_trigger AFTER INSERT OR UPDATE OF id ON city FOR EACH ROW EXECUTE PROCEDURE new_city_notify()
new_city_notify() function mentioned at the bottom is also pretty straightforward:
CREATE OR REPLACE FUNCTION new_city_notify() ?RETURNS trigger AS $$ BEGIN PERFORM pg_notify(lower(NEW.countrycode), row_to_json(NEW)::text); RETURN NEW; END; $$ LANGUAGE plpgsql;
This function creates a JSON representation of the new city’s row and sends it to a channel matching the new city’s country code.
NEW refers to the row that’s been inserted and has a schema matching the row’s table.
row_to_json converts the inserted row into a JSON record, and
::text lets PostgreSQL know that I just care about the string containing that JSON.
The notification webservice
We’re helping organizations extract insights with machine learning.
The crux of the example is that if I can produce a
Stream[F, WebSocketFrame] from my PostgreSQL database, I can hook up the async notifications in the database to my webservice. For that task, I turned to
skunk is a new database library for Scala that targets only PostgreSQL, instead of any JDBC-compliant database. Scala developers and other people at Azavea who are interested have been reading Practical Functional Programming in Scala by Gabriel Volpe, so we have some recent experience with
http4s that I thought would help me here.
skunk provides a way to listen to PostgreSQL notification channels. The return type is a
Stream[F, Notification], which looks a lot like the
Stream[F, WebSocketFrame] that I needed.
Streams can be mapped,
Notifications can be converted to
WebSocketFrames have a
String constructor, so the integration was amazingly easy:
val sess: Session[F] = ??? val identifier: Identifer = ??? session.channel(identifer).listen(1).map(notificationData => Text(s"Ping! $notificationData"))
In the application, it looks a little more complex, but not much. The extra work around the call to
listen is present to handle cases where a user provides a URL parameter that isn’t a valid PostgreSQL identifier.
Putting it all together
The rest was configuration.
skunk needs a way to obtain sessions from a database. Fortunately, that’s easy to set up with my world countries database in
docker-compose. Constructing a session pool in
skunk requires a simple call to
Session.pooled[F] for some effect type
F with your standard database connection parameters ー database name, user, password, and address ー that were easy to match up with what I had in my docker-compose.yml file.
What’s missing here?
Should you clone the repo, throw it into a container, and deploy it to your managed Kubernetes service right now? Probably not!
Since this was an experiment, I didn’t handle some important concerns that you’d probably have with a production-grade notification service.
First, there’s no concept of dead letters in PostgreSQL as far as I’m aware, so if PostgreSQL sends a notification that no one is listening for, nothing cares and no one finds out.
Secondly, I have no idea how well this setup performs under a more substantial load than the tests I ran on my computer. While it held up fine against inserting a country every quarter of a second and notifying 14 consumers, I don’t know that that means that it could handle twenty-five, or one hundred, or one thousand. I assume it’s not one thousand? But I don’t know.
Third, there are a number of caveats in the
WebSocketBuilder class that make it seem like a server running for long enough would slowly accumulate connections that it doesn’t know it should close.
But it worked! It is 100% possible to service asynchronous notifications over websockets using PostgreSQL and Scala. If you’d like to try it out locally, you can clone the repository and follow the instructions in the README.