Retrospective time-based UUID generation (with Spark)

Page content

Update 26.03.2023: 8 years ago 50 Gb sounded more serious that it is now, but even then we could and should have done this easily with one beefy machine without Spark or any other then-fancy tool.

I have faced a problem: having about 50 Gb of data in one database, export records to another database with slight modifications, which include UUID generation based on timestamps of records (collisions were intolerable). We have Spark cluster, and with it the problem did not seem even a little tricky: create RDD, map it and send to the target DB.

(In this post I am telling about Spark. I have not told about it in this blog so far, but I will. Generally, it is a framework for large-scale data processing, like Hadoop. It operates on abstract distributed data collections called Resilient Distributed Datasets (RDDs). Their interface is quite similar to functional collections (map, flatMap, etc.), and also has some operations specific for Spark's distributed and large-scale nature.)

However, I was too optimistic - there were difficulties.

UUID

The idea of UUID or universally unique identifier is quite simple: it is a 128-bit value that can be generated locally and guaranteed (practically) to be unique (across the human civilization). It has so-called versions - different ways to generate it. The simplest is a version 4 (random generation), but there are more sophisticated ones. I needed a version 1, which is generated from a MAC address and a timestamp.

This version supports up to ten thousand unique IDs on one host (one MAC address) in one moment of time (timestamp), that is achieved by using a counter. In normal situation the generation of UUIDs of this versions is pretty simple: store previous timestamp. If the next timestamp is equals to it, then increment the counter. If the next is newer - set the counter to 0 and save the passed timestamp as the previous. (For instance, see the code from Cassandra driver.) But it is invariable: the next timestamp is equal or greater than the previous.

The problem

The problem starts when the invariant is violated, i.e. when we generate UUIDs from timestamps that are not following in order. This could happen when we generate it retrospectively, on a set of records that already exists and is not ordered by the timestamp. This was my case.

Ordering of records using DBMS or Spark was theoretically possible but quite hard practically due to the amount of data.

I had an idea to store the counters for each timestamp instead of the current (using e.g. ConcurrentHashMap), but I had estimated that it would take approximately 6 Gb of additional memory per Spark worker. In our setup that was feasible, but I opted to avoid that if possible.

My solution

In fact, it is not necessary to store the counters for all timestamps. There is an option to divide them into groups by timestamp and use an individual UUID generator for each group, so we will not have to use these gigabytes of memory to store all the timestamps, but only for timestamps that are in a particular group. The crucial requirement: all records with the same timestamp must be in the same group (to actually make use of the counters and avoid collisions).

The UUID generator

Let us start from the generator:

import java.util.Date;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import com.eaio.uuid.UUIDGen;

public final class TimestampToUUIDConverter {
  /**
   * Map between timestamps in millis and UUID time value counters.
   */
  private final ConcurrentHashMap<Long, Long> lastTimestamps =
      new ConcurrentHashMap<>();

  public final UUID convert(final Date timestamp) {
    final long mostSignificatBits = createTime(timestamp.getTime());
    final long leastSignificatBits = UUIDGen.getClockSeqAndNode();
    return new UUID(mostSignificatBits, leastSignificatBits);
  }
      
  private long createTime(final long millis) {
    long time;

    // UTC time in Gregorian calendar (UUID format).
    long timeMillis = (millis * 10000) + 0x01B21DD213814000L;

    lastTimestamps.putIfAbsent(millis, 0L);

    // Concurrently increment the counter.
    Long r;
    do {
      r = lastTimestamps.get(millis);
    } while (!lastTimestamps.replace(millis, r, r + 1));

    timeMillis += r;

    // UUID format byte magic:
    // Time low.
    time = timeMillis << 32;
    // Time mid.
    time |= (timeMillis & 0xFFFF00000000L) >> 16;
    // Time high and version.
    time |= 0x1000 | ((timeMillis >> 48) & 0x0FFF); // version 1

    return time;
  }
}

The generator is basically the same as I would write for the solution in which all the counters are stored. As you see, the counters are saved in ConcurrentHashMap, where the timestamps are keys. A counter starts from 0 and increments when the same timestamp arrives again and again. This construction with do-while is compare-and-swap operation for ConcurrentHashMap.

Record grouping

As I mentioned earlier, all records with the same timestamp must go to the same group. Grouping by timestamp itself is an obvious solution. There is a small inconvenience: the records are distributed non-uniformly along the timeline: their density function grows linearly (we can consider so for this task).

Spark supports custom partitioners, thus I coded one.

import java.util.GregorianCalendar

import org.apache.spark.Partitioner

class TimestampPartitioner extends Partitioner {
  import TimestampPartitioner._

  override def numPartitions: Int = numPartitionsInternal

  override def getPartition(key: Any): Int = {
    val timestamp = key.asInstanceOf[Long]

    // In which region the timestamp goes?
    val result =
      if (timestamp < a2) {
        (timestamp - a1) / partLength1
      } else if (timestamp < a3) {
        (timestamp - a2) / partLength2 + numPartitions1
      } else {
        (timestamp - a3) / partLength3 + numPartitions2
      }

    // Insurance against accidental off-by-one errors.
    result.toInt.max(0).min(numPartitionsInternal - 1)
  }
}

object TimestampPartitioner {
  // Region borders.
  private val a1 = new GregorianCalendar(2013, 8, 1).getTime.getTime
  private val a2 = new GregorianCalendar(2014, 10, 1).getTime.getTime
  private val a3 = new GregorianCalendar(2015, 5, 1).getTime.getTime
  private val a4 = new GregorianCalendar(2016, 1, 1).getTime.getTime
  //                                                        ↑
  //                                                Consistent API :)

  // [a1; a2) -- 1500 partitions
  // [a2; a3) -- 3000 partitions
  // [a3; a4) -- 6000 partitions
  private val numPartitions1 = 1500
  private val numPartitions2 = 3000
  private val numPartitions3 = 6000
  private val numPartitionsInternal = numPartitions1 + numPartitions2 + numPartitions3

  private val partLength1 = (a2 - a1) / numPartitions1
  private val partLength2 = (a3 - a2) / numPartitions2
  private val partLength3 = (a4 - a3) / numPartitions3
}

As you can see, I divided the timeline into unequal regions with 1500, 3000 and 6000 partitions in them. In Spark’s terms, partition is a piece of data that is entirely processed on a single worker. We will use an individual UUID generator per partition.

Putting it all together

The essential Spark code is fairly simple:

recordRDD
  // Transform into an RDD of key-value pairs.
  .map(rec => (rec.timestamp, rec))
  // Group based on timestamp using custom partitioner -
  // we will get an RDD of partitions.
  .groupByKey(new TimestampPartitioner)
  .map { case (_, partition) =>
    // Generate UUIDs for the partition using
    // an individual generator.
    val converter = new TimestampToUUIDConverter()
    partition.map { rec =>
      val uuid = converter.convert(rec.timestamp)
      NewObject(rec, uuid)
    }
  }
  // Save the result to the target database.
  .saveTo...(...)

I believe the piece of code is concise enough.

That is it: correct UUIDs are generated for all records. No big deal :)