Poor man’s distributed counter

etki
7 min readMay 15, 2016

--

Imagine you have to maintain some kind of counter — clicks, views, likes, whatever you want that represents a single integer value waving up so fast you can tolerate incorrect values. How do you store a thing like that?

Well, in case of plain old single-server world it’s an easy task:

INSERT INTO counter_table (counter_key, counter_value) VALUES (:key, :delta) 
ON DUPLICATE KEY UPDATE counter_value = counter_value + :delta;

However, the sample above is quite synchronous. You physically can’t issue new update if the last one hasn’t finished; you have a block over record because you can’t compute next value while you haven’t computed previous one. This is easily solved with delayed batched requests — client have to aggregate delta for some time before issuing a request as long as you can tolerate loss of that aggregated value, so database itself isn’t overloaded. So far so good, but single-server database is a SPOF which you may want to eliminate, and there comes the hard part.

The first thing to be aware of is race condition. If your distributed database doesn’t have counters by design and doesn’t have constructs like `SET x = x + :delta`, then you’re entering first circle of hell, because whenever you issue an update, you need to perform read-update-write process on client side, and while your bytes are traversing wast space of network, somebody else may have updated your counter, so your update part of process is already processing the stale data.

This perdition may be solved by at least two approaches (those are just the ones i know). The first one is bluntly straightforward: store every delta with unique identifier (say, UUID), store counter value in separate table, and whenever you think that counter value is stale (say, if it was generated more than a second ago), simply go over the delta table and recount value:

CREATE TABLE counter_delta (counter_key varchar, id uuid, value integer);
CREATE TABLE counter (counter_key varchar PARTITION KEY, value integer, updated_at timestamp)
-- time to count! depending on storage used, this may be run directly in database or in your client application (which is bad due to amount of network transmissions)
UPDATE counter SET value = (SELECT SUM(value) AS value FROM counter_delta WHERE counter_key = :counter_key), timestamp = NOW() WHERE counter_key = :counter_key
-------------------------------map<string, future> update_processescounter = get_counter(key)
if counter.get_timestamp() > threshold:
return counter
process = update_processes.get(key)
if process: // this code is NOT thread safe, i just skip checks for clarity
return counter
process = launch_update_process(key)
update_processes.put(key, process)
process.then(() -> update_processes.delete(key)
return counter // or you may want to wish until update process is complete

I’m not including any partition key data in first table intentionally — it’s either PK (counter_key, id) that will load whole cluster (may be undesirable) or PK (counter_key) to load only one node per counter (which may be even more undesirable). I simply can’t tell you the best option, because both sucks.

This reminds of a stone age, but it works: no writes are lost, the undercounts are eventually resolved, one-second delay is usually tolerable. However, walking over a large table may be not the best idea since it may be the thing your database is not ready to handle; the minimum requirement is to set LIMIT to absurdly big value.

The second approach is as elegant as inefficient: this is standard optimistic locking. Store version with your counter so whenever you issue an update, database may happily reject it if you’re working with stale data:

CREATE TABLE counter (counter_key varchar PARTITION KEY, value integer, version integer)UPDATE counter SET value = :new_value, version = :expected_version + 1 WHERE counter_key = :counter_key ONLY IF version = :expected_versionwhile true:
counter = get_counter(key)
if update_counter(key, counter.value + delta, counter.version):
break

The first thing i want to say is that optimistic locking is always beautiful. But it doesn’t fit well in this particular case: we can’t use it without batching because if we issue a request for every update, we’ll end up with N concurrent request — while only one will complete successfully, leaving N-1 requests to be reissued. If application is thinked thoroughly, this may be as bad as (number of nodes) concurrent requests (whenever request fails, issue new one including updated delta, don’t simply retry the old one), but usually the hell will descend on you and you’ll fuck up your own brain, motivation and effort. This approach is hard, may easily force a node to wait tens of seconds in case of wide fleet (we tolerate seconds of data loss, not tens of seconds, right?), and generally should be avoided in highly concurrent operations.

However, databases usually tend to do have counter data types, and at this very moment you might think you saved and time i spent for previous paragraphs is uselessly lost. I will opponent you with a classic not so fast phrase.

You may already now that distributed systems can not be consistent, available and tolerant to network partitions at the same time. Since you want your database to work after partition has gone, you can’t sacrifice network partition and you go either with availability (the official terminology is hard, let’s say your database just keeps responding and acknowledging writes — and doing that on both ends of network partition) or consistency (let’s say database will reject writes and reads at least on the smaller side of partition). The latter one is not allowed for our case (we need to count as much as we do, and failing database is a no-no in out case), so we have to use available solution for our counters storage.

And here come the lost writes problems.

Imagine that your cluster has split in two partitions. Your application first updates counter X on side A (x.value = x.value + 2), then on side B (x.value = x.value + 3), that should be an increment of 5 in total. Then the partition goes away, and database tries to merge data that came during network partition. And in that very moment it has two options:

  1. Take any of values it sees. Quite common approach is Last Write Wins, so the latter of them is used. If it chooses between (x + 2) and (x + 3), it will never result in (x + 5) — hence, this is lost update and undercount. This is very simplified, but this approach is allowed to lose all of your increments — in case (x + 1) is chosen over (x + 10000000).
  2. Store both values and force client to merge them on next query. If database provides client with two integer values, client can’t repair anything as well. I haven’t dug in this approach yet, but if you have time, search for vector clocks — i’m not sure if this approach may return previous values to compute delta or not.

The network partition also renders optimistic locking approach useless, since in non-consistent database entity version can’t be consistent as well (you can, however, use event sourcing with custom-built rebase functionality, but this is certainly a topic for another talk and i’m even afraid to speak about it). The former, store-every-delta will easily bloat your database, but prevent data losses and will restore counter value as soon as it is recomputed.

Well, it may seem that whole task of distributed counter is doomed in either lots of disk space or data losses. This is the place for not so fast again: there are lots of clever people on our planet, and the way they outsmart problems is simply astonishing.

We were talking about updating single value in distributed system. And it is already so wrong in the very definition, all the problems we saw arise from updating counter concurrently on several nodes of distributed system. Instead of storing counter as a single value, we can store it in distributed way as well: every node may maintain it’s own counter and replicate other nodes counters, and the counter requested from client is simply sum of all of those counters. Imagine following table:

CREATE TABLE virtual_counter (counter_key varchar PARTITION KEY, node_id varchar CLUSTERING KEY, value integer)---counter_key   | node_id     | value
ad:1:views | 192.168.0.1 | 1
ad:1:views | 192.168.0.2 | 2

Now, whenever node 192.168.0.1 receives a request, it knows that nobody will ever update corresponding record except for that very node, so node may guarantee there will be no concurrent updates on a value on it’s own and safely proceed in case of network partition; in any case of conflict LWW (or event ‘take the max one’ assuming counter is not allowed to decrease) strategy will be perfectly safe. As soon as partition vanishes, node may replicate each other’s state without any overwrites, thus preserving counter values without any significant space overhead. Whenever node receives a read request for counter, it reads whole table — which shouldn’t be very big until you’re Netflix, you won’t usually go even with a dozen of nodes — and simply sums values for every counter (however, linear complexity is still in place and can’t let you scale indefinitely). Even if you take node out of cluster, it may still be present in this virtual table to keep track of values.

Bang!

Distributed counter problem, you’re now officially killed.

Rollbang

While all of this is ultimately cool, this leaves one big problem behind: such counter is tightly coupled to database. While you can implement such structure yourself, you have to identify node right inside your request, and that’s not always possible. In that case you can always go with bloat-your-database way.

Where’s the terminology and white papers, dude?

Actually, i believe that i’m describing things equal to G-Counter. However, i haven’t read white papers myself and saw all that stuff of wikipedia, so i may have left couple of mistakes here and there, and the whole thing is described more easily in any similar G-Counter post. Sorry for being too excited during writing.

In case i’ve interested you in this topic, you can google CRDT for more formal definitions and more distributedly-safe data structure implementations.

--

--

etki

I still bear a russian citizenship with no plans to prolong the contract, if that matters