Implementing a key-value database in Golang, Part 2 - The append-only log

/images/post/kvdb-2.jpg

This is the second of several blog posts about implementing a simple key-value store in Golang. All code described in this blog-post can be found here: https://github.com/olif/kvdb.

Previous posts:

Following posts:

Intro

In the last post I laid the foundation of the database and provided an in-memory implementation of a key-value store engine. In-memory storage engines can offer some nice properties such as near time complexity O(1) but comes with some severe limitations such as:

which means that they cannot be used for non-volatile data.

In this post I will try to solve these issues but as we will see, it will have a negative impact on performance.

Log structured vs page oriented

In order to achieve durability we need to write the data to non-volatile memory. This will also reduce the cost in case of large values or a large key-space.

So, lets implement a storage engine which writes data to disk. In Designing Data-Intensive Applications the author describes two different approaches for storage engines:

The difference between them can superficially be said that log-structured storage engines is like the bash script in the previous post which is using an append-only data file. When writing to the database it appends the record to the last position of a file and then when reading the record, the last matching is picked. The nice thing about this approach is that no random disk writes are made (which are more expensive than sequential writes).

Page-oriented storage engines (which are still the most common) breaks the database down into fixed-size pages which are part of larger tree (often a BTree). The engine then writes or reads data one page at a time. Page-oriented storage engines can sometimes have better read performance than log structured ones.

When reading upon these different types of engines I found the log-structured approach more interesting and it also seemed the most fun to implement which is why I throughout this blog-series focus on the log-structured approach.

A second attempt

In this second attempt at a storage engine I will introduce the append-only log (aol). It basically is a file to which the only mutating operation is the append-operation. Once a record has been written, it will not be modified later in time.

We still must have the ability to change the value for a specific key, this is handled by appending a new record for the key with the new value. The last entry in the database matching the key is considered to hold the correct state.

Storage format

The simplest approach (which I outlined in the previous post) for storing the key value pairs would be to just append them as a CSV record to a file

# db.txt
# key, value
key1, value1
key2, value2
key1, value3

$> db.Get("key1")
value3

. Nothing wrong with this approach except we need a way to delete records also.

Delete functionality can be implemented by adding an additional value alongside the (key, value) pair which tells the database engine if the records has been deleted. Such a value is called a tombstone.

# db.txt
# tombstone, key, value
, key1, value1
, key2, value2
x, key1,

$> db.Get("key1")
"not found'

The db.Get method searches the file from top to bottom and picks the last record found. If the record contains a tombstone we consider it deleted.

This format is simple and simple is always good but consider the following:

#db.txt
# tombstone, key, value
, key1, value1

$> db.Put("key2, value", "value")

#db.txt
# tombstone, key, value
, key1, value1
, key2, value, actualValue

$> db.Get("key2")
?

. Using key2, value as key should be perfectly fine since it's just a string but it will break the database engine since it (depending on implementation) probably would have returned value and not actualValue. The records needs to be escaped.

There is also another problem, what would happen if someone pulled the plug to the computer at the exact time data is written to disk? Can we assume that the write operation is atomic during such an event? Probably not, I will discuss this a bit more further down but in order to gurantee the integrity of our data, a checksum is also needed.

I will solve these issues by defining a binary format.

Binary storage format

By defining a binary format we gain greater control and flexiblity of what we are writing to disk.

A Record is the tuple of (kind, key, value), where kind tells us if the Record is holding a value or if it is a tombstone.

A Record will be serialized to a sequence of bytes in the following binary format.

[crc][type][kL][vL][key][value]
length name description
4 bytes crc crc checksum
1 byte type record type
4 bytes kL key length
4 bytes vL value length
<1..n> key key
<0..n> val value

. This means that the smallest possible record will be at least 14 bytes in size assuming empty value.

We can represent a record in Go with:

package record

const (
	valueKind = iota
	tombstoneKind
)

// Record is a database record
type Record struct {
	kind  byte
	key   string
	value []byte
}

// NewValue returns a new record of value kind
func NewValue(key string, value []byte) *Record {
	return &Record{
		kind:  valueKind,
		key:   key,
		value: value,
	}
}

// NewTombstone returns a new record of tombstone kind
func NewTombstone(key string) *Record {
	return &Record{
		kind:  tombstoneKind,
		key:   key,
		value: []byte{},
	}
}

// Key returns the record key
func (r *Record) Key() string {
	return r.key
}

// Value returns the record value
func (r *Record) Value() []byte {
	return r.value
}

// IsTombstone returns true if the record is of tombstone kind
func (r *Record) IsTombstone() bool {
	return r.kind == tombstoneKind
}

Then the methods for serializing/deserializing the records to and from the binary format can be implemented.

const (
	kindByteSize   = 1
	crcLen         = 4
	keyLenByteSize = 4
	valLenByteSize = 4
	metaLength     = kindByteSize + crcLen + keyLenByteSize + valLenByteSize
)

// ToBytes serializes the record into a sequence of bytes
func (r *Record) ToBytes() []byte {
	keyBytes := []byte(r.key)
	keyLen := make([]byte, keyLenByteSize)
	binary.BigEndian.PutUint32(keyLen, uint32(len(keyBytes)))

	valLen := make([]byte, valLenByteSize)
	binary.BigEndian.PutUint32(valLen, uint32(len(r.value)))

	data := []byte{}
	crc := crc32.NewIEEE()
	for _, v := range [][]byte{{r.kind}, keyLen, valLen, []byte(r.key), r.value} {
		data = append(data, v...)
		crc.Write(v)
	}

	crcData := make([]byte, crcLen)
	binary.BigEndian.PutUint32(crcData, crc.Sum32())
	return append(crcData, data...)
}

// FromBytes deserialize []byte into a record. If the data cannot be
// deserialized a wrapped ErrParse error will be returned.
func FromBytes(data []byte) (*Record, error) {
	if len(data) < metaLength {
		return nil, ErrInsufficientData
	}

	keyLenStart := crcLen + kindByteSize
	klb := data[keyLenStart : keyLenStart+keyLenByteSize]
	vlb := data[keyLenStart+keyLenByteSize : keyLenStart+keyLenByteSize+valLenByteSize]

	crc := uint32(binary.BigEndian.Uint32(data[:4]))
	keyLen := int(binary.BigEndian.Uint32(klb))
	valLen := int(binary.BigEndian.Uint32(vlb))

	if len(data) < metaLength+keyLen+valLen {
		return nil, ErrInsufficientData
	}

	keyStartIdx := metaLength
	valStartIdx := keyStartIdx + keyLen

	kind := data[crcLen]
	key := make([]byte, keyLen)
	val := make([]byte, valLen)
	copy(key, data[keyStartIdx:valStartIdx])
	copy(val, data[valStartIdx:valStartIdx+valLen])

	check := crc32.NewIEEE()
	check.Write(data[4 : metaLength+keyLen+valLen])
	if check.Sum32() != crc {
		return nil, ErrCorruptData
	}

	return &Record{kind: kind, key: string(key), value: val}, nil
}

// Write writes the record to the writer in binary format
func (r *Record) Write(w io.Writer) (int, error) {
	data := r.ToBytes()
	return w.Write(data)
}

Nothing fishy going on here, I added a small Write method which accepts the io.Writer interface also.

Note that I explicitly copy the key and value data from the byte buffer instead of just doing:

val := data[keyStartIdx:valStartIdx+valLen]

which I think is a real footcannon. Slicing a slice returns a new slice but the new slice will still point to the underlying array. Consider the following:

func footcannon() {
	data := []string{"a", "b", "c"}
	d1 := data[0:1]
	data[0] = "o"
	fmt.Println(d1[0]) // o
}

. Both data and d1 points to the same array which at least for me is very easy to forget.

Scanner/Tokenizer

The file to which all records are going to be written will consist of a long byte sequence. In order to make sense of all data in the file, a tokenizer is needed which Go fortunately provides via the bufio/Scanner type:

Scanner provides a convenient interface for reading data such as a file of newline-delimited lines of text. Successive calls to the Scan method will step through the 'tokens' of a file, skipping the bytes between the tokens. The specification of a token is defined by a split function of type SplitFunc; the default split function breaks the input into lines with line termination stripped. Split functions are defined in this package for scanning a file into lines, bytes, UTF-8-encoded runes, and space-delimited words. The client may instead provide a custom split function.

So the only thing needed is a split function which can convert a byte sequence into a Record type. The split function has the following signature:

type SplitFunc func(data []byte, atEOF bool) (advance int, token []byte, err error)

. The documentation about SplitFunc is pretty extensive and can be found here. The important parts for us are that we should try to deserialize the data into a record, if we receive an ErrInsufficientData error we want to tell the scanner to read more data into it's buffer. This is done by returning (0, nil, nil). If we have enough data to parse into a Record we return the rest of the data together with a number telling the scanner how much to advance.

I can now create a custom Record-Scanner.

// NewScanner returns a new Record-Scanner for the reader. maxScanTokenSize is
// the largest possible size that the scanner will buffer and should be set to
// at least the byte size of the key and value combined.
func NewScanner(r io.Reader, maxScanTokenSize int) (*Scanner, error) {
	scanner := bufio.NewScanner(r)
	buf := make([]byte, 4096)
	scanner.Buffer(buf, maxScanTokenSize+metaLength)
	scanner.Split(split)
	return &Scanner{scanner}, nil
}

func split(data []byte, atEOF bool) (advance int, token []byte, err error) {
	if atEOF && len(data) == 0 {
		return 0, nil, nil
	}

	r, err := FromBytes(data)
	if errors.Is(err, ErrInsufficientData) {
		return 0, nil, nil
	}

	if err != nil {
		return 0, nil, err
	}

	adv := r.Size()

	return adv, data[:adv], nil
}

// Record returns the most recent record generated by a call to scan, since
// Scanner keeps track of any error encountered we can ignore them here.
func (r *Scanner) Record() *Record {
	data := r.Bytes()
	record, _ := FromBytes(data)
	return record
}

Using the Scanner will look something like this:

scanner, err := record.NewScanner(file, s.maxRecordSize)
if err != nil {
    return nil, fmt.Errorf("could not create scanner for file: %s, %w", s.storagePath, err)
}

for scanner.Scan() {
    record := scanner.Record()
	fmt.Println(fmt.Sprintf("Key: %s", record.Key()))
}

Now all plumbing is done and we can start to write some data to disk.

The aol implementation

This implementation aims at solving the lack of durability the the in-memory implementation had.

Writing data

I will start out with the implementation of the Put and Delete methods since they can be implemented in the same way. A first, naive approach could be:

// Put saves the value to the database and returns any error encountered
func (s *Store) Put(key string, value []byte) error {
	record := record.NewValue(key, value)
	return s.append(record)
}

// Delete removes the value from the store
func (s *Store) Delete(key string) error {
	record := record.NewTombstone(key)
	return s.append(record)
}

func (s *Store) append(record *record.Record) error {
	file, err := os.OpenFile(s.storagePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600)
	defer file.Close()
	if err != nil {
		return fmt.Errorf("could not open file: %s for write, %w", s.storagePath, err)
	}
	_, err = record.Write(file)
	if err != nil {
		return fmt.Errorf("could not write record to file: %s, %w", s.storagePath, err)
	}

	return nil
}

. Note that I used the os.OpenFile with the os.O_APPEND flag. Assuming a linux system, Go will translate the OpenFile into the OPEN(2) sycall which tells the OS to put the file offset at the end of the file when writing (as an atomic step).

There are some things we should consider now before proceeding. The engine could and probably will be used in an concurrent environment and what really happens when we call record.Write?

More specifically:

  1. Is the Put and Delete operations safe to call concurrently without risking corrupt data?

  2. When data is written to the file via record.Write, is it really on disk? That is, when the Put or Delete functions returns, can we leave any guarantees about the durability of the data?

and finding answers to them was not so straightforward as at least I thought it should be.

You get a writer and you get a writer everybody gets a writer!

On a linux system, go:s file.Write function calls the write(2) syscall under the hood. Almost all but a few linux syscalls are thread safe1 but Go does not state any information about how file.Write acts under concurrent use. If not explicitly stated that the function is safe we must assume that it is not.

We can also check how the creators of Go uses the function by peeking into the log library and see how it appends log entries and lo and behold, a mutex is taken before writing data.

One could also argue that if data can be written to disk without any interleaving, somewhere between the application and the disk there must be a lock involved. From this we can draw the conclusion that allowing concurrent writes to disk will only have a limited gain in performance.

This is confirmed by this article which measures performance gains,for among other things, concurrent sequential writes.

Considering this, we will restrict the database engine to have a single writer which is to be realized by the use of a mutex. It is not expected that this should have any significant impact on the write performance.

One does not simply write data to disk

At least I have thought that when

file.Write([]byte("I really hope to reach the physical disk someday"))

returns, the data is written to disk but that is typically not the case. At least on linux systems, file.Write calls the write(2) syscall which in turn writes the data to the os. Just because the data is handed over to the os, it does not mean that the data is on disk, typically the os batches the data into buffers as a way to increase write throughput.

This means that the data would survive a hard kill of our application since the data has been handed over to the os, but if the os crashes or the complete system goes down due to a power outage the data will be lost.

So how then can databases guarantee durability?

This is where the fsync(2) system call comes in. It tells the operating system to flush the os-buffers to disk and returns when done. In Golang it is realized by file.Sync.

file.Write([]byte("Yay, I will soon be flushed to disk!"))
file.Sync() // Tell the os to immediately write the data to disk and return when it is done

There is only one little caveat with fsync() and that is that it is really slow. I mean, really, really slow. I did a quick benchmark:

func BenchmarkWriteAsync(b *testing.B) { write1Kb(b, false) }
func BenchmarkWriteSync(b *testing.B)  { write1Kb(b, true) }

func write1Kb(b *testing.B, sync bool) {
	data := []byte(strings.Repeat("a", 1024))
	dbPath, err := ioutil.TempDir("./", "test")
	defer os.RemoveAll(dbPath)
	if err != nil {
		b.Fatal(err)
	}

	filePath := fmt.Sprintf("%s/%s", dbPath, "test.db")
	file, err := os.OpenFile(filePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600)
	defer file.Close()
	if err != nil {
		b.Fatal(err)
	}

	b.SetBytes(int64(len(data)))
	b.ResetTimer()
	for i := 0; i < b.N; i++ {
		_, err := file.Write(data)
		if err != nil {
			b.Fatal(err)
		}
		if sync {
			file.Sync()
		}
	}
}

and the results where:

❯ go test -benchmem github.com/olif/kvdb/test -bench 'BenchmarkWriteSync|BenchmarkWriteAsync' | prettybench
goos: darwin
goarch: amd64
pkg: github.com/olif/kvdb/test
PASS
benchmark                 iter          time/iter    throughput   bytes alloc        allocs
---------                 ----          ---------    ----------   -----------        ------
BenchmarkWriteAsync-4   142908      7163.00 ns/op   142.97 MB/s        0 B/op   0 allocs/op
BenchmarkWriteSync-4       237   5143289.00 ns/op     0.20 MB/s       76 B/op   0 allocs/op
ok  	github.com/olif/kvdb/test	3.723s

. Skipping fsync increases the write throughput (on my machine) around 700 times.

So it seems that we have to choose between having high write throughput or durability.

I got a bit curios and did a quick investigation on how this is handled in existing databases and this is what I found:

. To sum up: all databases offered different ways to avoid syncing in order to increase performance. Most of them did not sync per default (postgres being the exception). In general, it is very important to read the database documentation if full durability is a requirement.

So after this odyssey I have decided that our database engine also will have the cake and eat it too by providing a configuration setting telling the engine if it should sync the data or not after write.

If you are more interested in this topic I found this excellent post by Antirez (creator of Redis): Redis persistence demystified.

A new attempt to write data

In our new, a bit more mature attempt I will make the following modifications to the append method:

and the corresponding implementation:

// Put saves the value to the database and returns any error encountered
func (s *Store) Put(key string, value []byte) error {
	record := record.NewValue(key, value)
	return s.append(record)
}

// Delete removes the value from the store and returns any error encountered
func (s *Store) Delete(key string) error {
	record := record.NewTombstone(key)
	return s.append(record)
}

func (s *Store) append(record *record.Record) error {
	if record.Size() > s.maxRecordSize {
		msg := fmt.Sprintf("key-value too big, max size: %d", s.maxRecordSize)
		return kvdb.NewBadRequestError(msg)
	}

	s.writeMutex.Lock()
	defer s.writeMutex.Unlock()

	file, err := os.OpenFile(s.storagePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600)
	defer file.Close()
	if err != nil {
		return fmt.Errorf("could not open file: %s for write, %w", s.storagePath, err)
	}

	_, err = record.Write(file)
	if err != nil {
		return fmt.Errorf("could not write record to file: %s, %w", s.storagePath, err)
	}

	if !s.async {
		return file.Sync()
	}

	return file.Close()
}

. I have introduced a mutex so that only one go-routine at a time can write data. Also, I have introduced a property on the aol.Store struct which tells the append method whether to sync the data or not.

A friend of order may spot that file.Close() will be called twice if the async flag is true. This is because file.Close() can return an error from the os telling us that the write cannot be done. More about this can be found here here. In order to tell the client that an error has occured we need to return possible errors from the file.Close() method.

Reading data

Reading data from the aol means that we have to go through the whole file from the beginning to the end and return the value of the last matching record.

// Get returns the value associated with the key or a kvdb.NotFoundError if the
// key was not found, or any other error encountered
func (s *Store) Get(key string) ([]byte, error) {
	file, err := os.Open(s.storagePath)
	defer file.Close()
	if err != nil {
		return nil, fmt.Errorf("could not open file: %s, %w", s.storagePath, err)
	}

	scanner, err := record.NewScanner(file, s.maxRecordSize)
	if err != nil {
		return nil, fmt.Errorf("could not create scanner for file: %s, %w", s.storagePath, err)
	}

	var found *record.Record
	for scanner.Scan() {
		record := scanner.Record()
		if record.Key() == key {
			found = record
		}
	}

	if scanner.Err() != nil {
		s.logger.Printf("error encountered: %s", scanner.Err())
		return nil, scanner.Err()
	}

	if found == nil || found.IsTombstone() {
		return nil, kvdb.NewNotFoundError(key)
	}

	return found.Value(), nil
}

Here we are using our custom Scanner to iterate over all the records in the file and put away the record if it matches the key. If the last matching key is of tombstone kind we return a kvdb.NotFoundError.

Performance

We expect the write performance of the aol engine to be invariant of the number of records that exists in the database since we are only appending to file.

The performance of the Get method should scale lineary with the number of records that already exists in the file, i.e, it has O(n) time complexity which is very slow for a database.

I did some benachmark runs, varying the size of the existing dataest to confirm this:

❯ go test -benchmem github.com/olif/kvdb/test -bench 'Benchmark(Write|Read)[0-9]+Db*' | prettybench
goos: darwin
goarch: amd64
pkg: github.com/olif/kvdb/test
PASS
benchmark                     iter       time/iter   throughput       bytes alloc                allocs
---------                     ----       ---------   ----------       -----------                ------
BenchmarkRead100Db100-4        304      3.90 ms/op                   4252067 B/op      100400 allocs/op
BenchmarkRead1000Db100-4        36     36.17 ms/op                  38813344 B/op     1000404 allocs/op
BenchmarkRead10000Db100-4        1   1330.51 ms/op                 384414664 B/op    10000423 allocs/op
BenchmarkRead100000Db100-4       1   9514.87 ms/op                4128413568 B/op   100000412 allocs/op
BenchmarkWrite100Db100-4       126      9.12 ms/op    0.01 MB/s        46619 B/op        1300 allocs/op
BenchmarkWrite1000Db100-4      100     10.31 ms/op    0.01 MB/s        46637 B/op        1300 allocs/op
BenchmarkWrite10000Db100-4     145      9.57 ms/op    0.01 MB/s        46555 B/op        1300 allocs/op
BenchmarkWrite100000Db100-4    148      8.79 ms/op    0.01 MB/s        46579 B/op        1300 allocs/op

and as we expected, the write performance is unaffected by the size of the initial database whereas the read performance degrades.

Summary

Before starting the implementation of the aol engine I had not really grasped the complexity of durability or the fact that durability guarantees often comes with a big disclaimer somewhere in the documentation of the database.

Our aol implementation also offers different kinds of durability guarantees which depends on the sync flag.

In contrast to the in-memory implementation from the last post, we now offer durability, albeit the claim depends on the async configuration flag. The durability comes at the expense of read performance which is O(n).

In the next blog post, I'm gonna do a new implementation of a database engine which increases the read performance while at the same time also offer durability.

The full code of the aol implementation including the benchmark tests can be found at: https://github.com/olif/kvdb.

comments powered by Disqus