Scaling out Postgres, Part 1: Partitioning
At connect.me we started building our web application with one relational database implemented in Postgres. The idea was to quickly iterate on the feature set to solidify our data model. In the later stage we planned to work on a scale-out approach that would allow us to add server machines without major service disruption. The objective was to support exponential growth, which we could face with viral elements in our product. It turned out to be an interesting and challenging problem and I want to share our thought process and the resulting design in this blog post.
The essense of connect.me data model is to represent people profiles and vouches. A vouch is a directed edge in the reputation graph—there is one giving and one receiving profile. Also the vouch always has a specified tag, such as “guitarist” or “cyclist”. There are other aspects that make it more complicated, but for the purpose of this discussion this simplified model is sufficient. With a single relational database this model has a trivial implementation. The profiles table has a unique 64-bit integer id—we called it a puid—along with other profile attributes. A new puid is generated with a sequence starting at 1. The tags table also has a sequence generated 64-bit integer id along with a tag name. The vouches table has source puid, target puid and tag id.
When we started working on scaling out we asked ourselves the following question. What is our main scaling dimension? With the product potentially going viral we expected both profiles and vouches to grow very quickly. Tags would grow as well, but we wanted tags to be unambiguous by providing autocomplete in the user interface and various normalization rules, so that their growth would be slower over time. With these observations we decided to split the database into multiple partitions projecting to the profiles dimension with vouches collocated with related profiles. We called such partition a cell.
When a profile gets created in our system we always have an external key that identifies the user in the initial and subsequent authentications. This can be twitter a handle, facebook id, email address or something else. One user can have multiple profiles via different authentication systems. Our product allows profiles to be grouped under the “umbrella” profile by a claiming mechanism, which I won’t cover here. We call such external key a provider id or proid. The provider-proid pair is globally unique and is known when we need to quickly locate an existing profile or to create a new one.
This makes possible devising a hash function that will produce a cell pointer for a provider-proid pair or, as we call it, do the routing to a cell. Within a cell we would select from the profiles table to find the an existing puid or insert into profiles to allocate a new one. Remember that puid is generated by a per-cell sequence starting with 1. This would not produce unique puids nor would it be possible to route to a cell by puid alone. The original single database puid is a 64-bit integer—bigint type in Postgres—and it has a humongous range. If we assumed each cell to have the capacity of 2^32 profiles, we could have 2^32 cells. So we assigned each cell a puid base and made the per-cell sequence to allocate puids from this base instead of 1. This way puids became unique and we had a nice property of being able to route a puid to its cell by matching puid ranges.
Now the profiles are partitioned across cells with puids and provider-proid pairs able to route to their cells directly by checking their values against the configuration of cells. The tags table is comparatively slow growing and can be replicated in its entirety on each cell. Inserting a new tag needs to generate a tag id coherently for all cells—as the sequences can easily get out of sync. To accomplish this we replaced the sequence with a 64-bit hash of the tag name. The vouches table is more interesting. Each vouch record relates two puids: source and target. We have queries that gather all vouches given to a puid and queries that gather all vouches received by a puid. It works great in a single database model. To make it work in the multiple database model we had to replicate each vouch record to both source and target puids’ cells. With a large number of cells this nearly doubles the total number of records. What is important, this replication factor is independent from the number of cells and number or vouches! And this is how we model other inter-cell relationships in our application. Now if we imagine for a moment having additional scaling dimensions, we would replicate inter-dimensional relationships with the same constant replication factor that would be independent from the number of objects in dimensions.
Lets briefly reflect on where we are in the design with multiple databases for connect.me. At the authentication or signup the external provider-proid pair routes to the cell where the existing profile resides or the new profile is created. Within the system puids are used to identify profiles and route directly to their cells. The profile’s given/received vouches and tags queries are still local to the cell thanks to the full replication of the tags table and the partial replication of vouches table. Queries that need to gather from many profiles are partitioned by mapping the input list of puids into per-cell lists—we can quickly tell which cell the puid belongs to. Then the query is executed in each mapped cell concurrently and the results are concatenated.
Profile writes are local to a profile’s cell. But vouch and tag writes are not. This brings us to a major tradeoff point in our scale-out design. When writes are local to a particular cell they are also inheriting all the awesomeness of an ACID database. As soon as they span multiple cells we need to consider two major issues. First is the consistency of the inter-cell database when multi-cell writes are in flight. For example for a brief period of time a vouch may already be visible in a source puid’s cell but still not visible in target puid’s cell. The second issue is error recovery. When a multi-cell write crashes it may leave the inter-cell database in inconsistent state permanently. One approach to solving these issues is to run all multi-cell writes as distributed transactions. Distributed transactions is extend ACID properties beyond a single database by means of a shared coordinator service and consensus protocol such as two-phase commit. In my personal experience this is suitable solution for coordinating between only a handful of ACID databases on fast local interconnect. It quickly becomes a bottleneck with horizontal scaling where the complexity of the protocol and reliability of the interconnect are the limiting factors.
The alternative approach is to run multi-cell writes in the form of many independent ACID transactions. We made clear that we can tolerate short-term inter-cell inconsistencies, so we will just live with the first issue. To address the recovery issue we added a message queue (implemented in RabbitMQ) and required all multi-cell writes to be handled in within the scope of a message. The message queue provides some basic guarantees. Message is received, processed and acknowledged by a worker. If the worker crashes after message is received, but before it was acknowledged the message remains in the queue and is later received for processing by the next available worker. There are many complex details in making message queues scalable and reliable, but it is nice to be able to push this complexity down to well defined common service. The additional effect of making database writes run through the queue is that these writes from the application perspective became asynchronous. Once the control is returned from the write operation there is no guarantee that the effects will be immediately seen by the following read operations.
Scalability and reliability with message queues can be improved by relaxing some of the guarantees that would be expected in the local queue with one producer and one consumer. If messages are allowed to be received out-of-order as well as received multiple times there are additional opportunities for such improvements. With multiple workers receiving messages concurrently we are giving up the ordering. With the possibility of worker crashing or losing the network link just before acknowledging the message we are giving up strictly-once receiving. What this means for the individual cell writes is that they have to be idempotent—meaning the write operation can be applied multiple times with the same effect. Also, writes must depend only on immutable data—this guarantees that message effects are not changing between replays. In the situation when there is a dependency on mutable data, the snapshot of the data must be copied into the message prior to submission. Let see how this property looks in the example with vouching. Consider a source vouch record that is inserted and then the worker crashes before inserting the target vouch record. Another worker receives the same message. It has to ignore the source vouch and then insert the target vouch. This is why the vouch insert operation in the cell becomes an upsert or merge operation.
There is still an issue of consistency when there are two or more messages in flight that are changing the same data in different ways. They may overlap and produce lasting inter-cell database inconsistency. To avoid the inconsistency we use a variation of optimistic concurrency control. In the vouching example the conflict may happen between two overlapping messages. One is setting the vouch and another is removing the same vouch. We also assume that the giving and the receiving profiles are not collocated on the same cell. We start by reading the current vouching state on both involved cells before updating them. Then atomic compare-writes are performed that succeed only if state to be updated is equal to the one previously read. This will detect concurrently running writes and drop messages so that they may be replayed later to restore the consistent state. It is important to perform per-cell data writes in the same order to guarantee overall progress.
With multiple cells storing partitions of our data we still needed reliability of a single cell. We made each cell database to be fully replicated on another server machine. This replication is using the same mechanism that is used for writing record replicas to multiple cells. Now in the example with replicated vouch record we are writing into four databases: two source puid’s cell replicas and two target puid’s cell replicas. The reads can now be load balanced between cell replicas.
The combination of effects in the described design approach has interesting implications for application-level behaviour. With synchronous writes into a single ACID database or distributed transaction system the visibility of effects and the order of writes are very predictable and are easy to start relying upon. With asynchronous writes over multiple ACID databases and optimistic concurrency control we have a less predictable system—it aims at reaching consistency eventually and must not be expected to provide perfectly consistent view at all time.
With the partitioned database working we decided on the initial number of cells to start with in production. Initially we chose to collocate all cell databases on the two replicated Postgres machines and as they keep growing in size and workload we would split them until we have one database per machine. Now what? How can we add more cells? This brings us to the second part of this blog post where I will describe cell generations and the term routing would start making more sense.