Implementing a key-value database in Golang, Part 4 - Merging and Compaction

/images/kvdb-4/kvdb-4.jpg

This is the fourth of several blog posts about implementing a simple key-value store in Golang. All code described in this blog-post can be found here: https://github.com/olif/kvdb.

Previous posts:

Intro

By now we have a durable key value database with pretty good performance due to the index we implemented in the last blog-post. It is still not fit for any real usage since the database size is increasing with every modifying request. In this post we will try to fix this by segmenting our data and then run a compaction process which rotates and prunes the segments.

Before we start the implementation we need to discuss the concept of segments a bit further, alongside the concept of compaction and merging and how it mitigates the problem of uncontrollable growth of our database.

Segments

A segment is a slice of the overall database storage. The idea is to only let the database write records to one segment at a time. When that segment has reached a certain size we close it and from then on that segment is immutable. The database then starts to write to a new segment.

/images/kvdb-4/segments.svg
Two segments, one is closed and immutable and one is open to which new data is written to

Segments are never modified after they have been closed but are still used for read requests. When doing a lookup of a key we first check if the value exists in the open segment, if it is not there we look in the closed segments in a descending chronological order until we find the record we are looking for or else it does not exist.

Each segment keeps its own index so that we can perform lookups fast.

Compaction

Compaction refers to the process of removing duplicates from a segment. Running compaction of the closed segment above would result in:

/images/kvdb-4/compaction.svg
Illustration of the compaction process

As we can see, duplicates where removed and the compacted segment is now smaller than the original. The effect of compaction will depend on the nature of the data. Compaction will have a smaller effect on the overall size if the key-space is larger.

Merging

Merging refers to the process of merging several segments into one large segment. If several segments contain the same key, those records must be merged in correct order so that the most recent record takes precedence.

Merging and compaction

We can merge and compact multiple segments at the same time.

/images/kvdb-4/compactionandmerge.svg
Illustration of the merge and compaction process

When a new segment has been created, we can replace the old segments with the newly created one and then delete the old files.

This process removes redundant data and keeps the number of segment in check so that we don't run out of memory.

Since this process can be done in a background process it won't affect the overall database performance directly except for the synchronization of segments between the compaction process and the database. We expect the compaction and merge process to have a small performance impact.

Implementation

Instead of having one large file for storing our data as we did in the previous post we will split the data into segments. Our database engine will write to the segment we define as "open" and will read from all segments in descending chronologically order until it find the requested record.

In more detail a database read will (in pseudocode):

read(key) {
    record = open_segment.find(key)
    if record {
        return record
    } else {
        foreach segment in closed_segments {
            record = segment.find(key)
            if record {
                return record
            }
        }
    }
}

and a write will:

write() {
    if open_segment.size >= SEGMENT_MAX_SIZE {
        open_segment.close()
        closed_segments.add(open_segment)
        open_segment = create_segment()
        open_segment.write(record)
    }
    open_segment.write(record)
}

. Both read() and write() will be used concurrently. We have dealt with concurrency issues before but now it will become more complex since we have more moving parts. Concurrency is really hard to get right but go has great tooling around concurrency which we are going to use in order to avoid data-races.

We can now start our implementation. We will do it naively at first, not taking concurrency into account and then when we have the basic functionality in place, find and fix any race conditions.

Segments

We define a segment as:

type index struct {
	table  map[string]int64
	cursor int64
}

func (i *index) get(key string) (int64, bool) {
	val, ok := i.table[key]
	return val, ok
}

func (i *index) put(key string, written int64) {
	i.table[key] = i.cursor
	i.cursor += written
}

type segment struct {
	storagePath   string
	maxRecordSize int
	logger        *log.Logger
	async         bool
	suffix        string

	index *index
}

func newSegment(baseDir string, maxRecordSize int, async bool, logger *log.Logger) *segment {
	filename := genFileName(openSegmentSuffix)
	filePath := path.Join(baseDir, filename)

	return &segment{
		storagePath: filePath,
		logger:      logger,
		async:       async,
		index: &index{
			cursor: 0,
			table:  map[string]int64{},
		},
	}
}

func fromFile(filePath string, maxRecordSize int, async bool, logger *log.Logger) (*segment, error) {
	idx := index{
		cursor: 0,
		table:  map[string]int64{},
	}

	f, err := os.OpenFile(filePath, os.O_RDONLY|os.O_CREATE, 0600)
	defer f.Close()
	if err != nil {
		return nil, err
	}

	scanner, err := record.NewScanner(f, maxRecordSize)
	if err != nil {
		return nil, err
	}

	for scanner.Scan() {
		record := scanner.Record()
		idx.put(record.Key(), int64(record.Size()))
	}

	if scanner.Err() != nil {
		return nil, fmt.Errorf("could not scan entry, %w", err)
	}

	return &segment{
		storagePath: filePath,
		index:       &idx,
	}, nil
}

func (s *segment) get(key string) (*record.Record, error) {
	offset, ok := s.index.get(key)
	if !ok {
		return nil, kvdb.NewNotFoundError(key)
	}

	f, err := s.getFile(os.O_RDONLY)
	defer f.Close()
	if err != nil {
		return nil, err
	}

	_, err = f.Seek(offset, io.SeekStart)
	if err != nil {
		return nil, err
	}

	scanner, err := record.NewScanner(f, s.maxRecordSize)
	if err != nil {
		return nil, err
	}

	if scanner.Scan() {
		return scanner.Record(), nil
	}

	return nil, kvdb.NewNotFoundError(key)
}

func (s *segment) append(record *record.Record) error {
	file, err := s.getFile(os.O_CREATE | os.O_WRONLY | os.O_APPEND)
	defer file.Close()
	if err != nil {
		return fmt.Errorf("could not open file: %s for write, %w", s.storagePath, err)
	}

	n, err := record.Write(file)
	if err != nil {
		return fmt.Errorf("could not write record to file: %s, %w", s.storagePath, err)
	}

	if !s.async {
		if err := file.Sync(); err != nil {
			return err
		}
	}

	if err := file.Close(); err != nil {
		return err
	}

	s.index.put(record.Key(), int64(n))
	return nil
}

func (s *segment) getFile(mode int) (*os.File, error) {
	return os.OpenFile(s.storagePath, mode, 0600)
}

func (s *segment) changeSuffix(oldSuffix, newSuffix string) error {
	newFilePath := strings.Replace(s.storagePath, oldSuffix, newSuffix, 1)

	if err := os.Rename(s.storagePath, newFilePath); err != nil {
		return err
	}

	s.storagePath = newFilePath
	return nil
}

func (s *segment) size() int64 {
	return s.index.cursor
}

func (s *segment) clearFile() error {
	s.index.table = map[string]int64{}
	s.index.cursor = 0
	return os.Remove(s.storagePath)
}

The implementation of a segment is almost exactly that of the indexed database engine from the previous post except that we now need some additional functionality for "closing" a segment and removing a segment in case it has been merged.

Database engine

The engine will make use of the segments. The files representing segments will have a suffix according to their open/closed status.

type file suffix
open segment .oseg
closed segment .cseg

We will also keep track of the when the segments was first created by setting the file name to the unix timestamp. A closed segment will then have the following appearance:

$> 1600260653373254000.cseg

This makes it possible to sort the segments in order based on when they where written.

The database engine will keep a stack of all closed segments and a reference to the current open segment. When the open segment is full, we will change file-suffix, put the segment into the closed segment stack and then create a new segment which we now use as the "open" segment.

Segment stack

In order to keep track of our closed segment we will create a data structure to hold and iterate these.

type segmentStack struct {
	segments []*segment
}

func newSegmentStack() *segmentStack {
	segments := make([]*segment, 0)
	return &segmentStack{
		segments: segments,
	}
}

func (s *segmentStack) iter() *segmentStackIter {
	return &segmentStackIter{
		segments: s.segments,
		pos:      -1,
	}
}

func (s *segmentStack) push(seg *segment) {
	s.segments = append([]*segment{seg}, s.segments...)
}

type segmentStackIter struct {
	segments []*segment
	pos      int
}

func (i *segmentStackIter) hasNext() bool {
	return i.pos < len(i.segments)-1
}

func (i *segmentStackIter) next() *segment {
	i.pos = i.pos + 1
	return i.segments[i.pos]
}

The engine implementation

We can now use the segmentStack in our engine implementation.

const (
	defaultAsync          = false
	defaultMaxRecordSize  = 1024 * 1024 //1Mb
	defaultSegmentMaxSize = 4096 * 1024 //4Mb
	closedSegmentSuffix   = ".cseg"
	openSegmentSuffix     = ".oseg"
)

var voidLogger = log.New(ioutil.Discard, "", log.LstdFlags)

// Store implements the kvdb store interface providing a simple key-value
// database engine based on an append-log.
type Store struct {
	storagePath    string
	maxRecordSize  int
	maxSegmentSize int64
	logger         *log.Logger
	async          bool

	openSegment *segment

	// closedSegmentStack contains all immutable segments. The newest segment
	// has position 0 and the oldest at the last position
	closedSegments *segmentStack
}

// Config contains the configuration properties for the compacted aol store
type Config struct {
	// Storage path
	BasePath string
	// Sets a limit on the size of the records
	MaxRecordSize *int
	// If true, fsync will be called on every write
	Async *bool
	// MaxSegmentSize defines the maximum size of the segments, must be >=
	// MaxRecordSize
	MaxSegmentSize *int

	Logger *log.Logger
}

// NewStore returns a new SimpleLogStore
func NewStore(config Config) (*Store, error) {
	var (
		maxRecordSize  = defaultMaxRecordSize
		maxSegmentSize = defaultSegmentMaxSize
		storagePath    = config.BasePath
		async          = defaultAsync
		logger         = voidLogger
	)

	if _, err := os.OpenFile(storagePath, os.O_CREATE, 0600); err != nil {
		return nil, err
	}

	if config.MaxRecordSize != nil {
		maxRecordSize = *config.MaxRecordSize
	}

	if config.MaxSegmentSize != nil {
		maxSegmentSize = *config.MaxSegmentSize
	}

	if config.Async != nil {
		async = *config.Async
	}

	if config.Logger != nil {
		logger = config.Logger
	}

	openSegment, err := loadOpenSegment(storagePath, maxRecordSize, async, logger)
	if err != nil {
		return nil, err
	}

	closedSegments, err := loadClosedSegments(storagePath, maxRecordSize, async, logger)
	if err != nil {
		return nil, err
	}

	store := &Store{
		storagePath:    storagePath,
		maxRecordSize:  maxRecordSize,
		maxSegmentSize: int64(maxSegmentSize),
		async:          async,
		logger:         logger,
		openSegment:    openSegment,
		closedSegments: closedSegments,
	}

	return store, nil
}

func loadOpenSegment(storagePath string, maxRecordSize int, async bool, log *log.Logger) (*segment, error) {
	fi, err := listFilesWithSuffix(storagePath, openSegmentSuffix, true)
	if err != nil {
		return nil, fmt.Errorf("could not load open segment: %w", err)
	}

	switch len(fi) {
	case 0:
		return newSegment(storagePath, maxRecordSize, async, log), nil
	case 1:
		return fromFile(fi[0].filepath, maxRecordSize, async, log)
	default:
		return nil, fmt.Errorf("more than one open segment found")
	}
}

func loadClosedSegments(storagePath string, maxRecordSize int, async bool, log *log.Logger) (*segmentStack, error) {
	fis, err := listFilesWithSuffix(storagePath, closedSegmentSuffix, true)
	if err != nil {
		return nil, fmt.Errorf("could not list closed segment files: %w", err)
	}

	closedSegments := newSegmentStack()
	for _, fi := range fis {
		s, err := fromFile(fi.filepath, maxRecordSize, async, log)
		if err != nil {
			return nil, fmt.Errorf("could not load closed segment: %s, %w", fi.filepath, err)
		}
		closedSegments.push(s)
	}

	return closedSegments, nil
}

// Close closes the store
func (s *Store) Close() error {
	s.logger.Println("Closing database")
	return nil
}

// IsNotFoundError returns true if the error signales that a non-existing key
// was requested
func (s *Store) IsNotFoundError(err error) bool {
	return kvdb.IsNotFoundError(err)
}

// IsBadRequestError returns true if the error, or any of the wrapped errors
// is of type BadRequestError
func (s *Store) IsBadRequestError(err error) bool {
	return kvdb.IsBadRequestError(err)
}

When the engine is instantiated we check the storagePath and pre-load any existing segment files into memory.

Reading

Since a record now can exist in any segment we have to check them all. Newer values takes precedence which means that we first have to look in our open segment. If the record is not found there we have to go through all our closed segments.

// Get returns the value associated with the key or a kvdb.NotFoundError if the
// key was not found, or any other error encountered
func (s *Store) Get(key string) ([]byte, error) {
	record, err := s.openSegment.get(key)

	if err == nil {
		return resolveRecord(record)
	} else if !kvdb.IsNotFoundError(err) {
		return nil, err
	}

	iter := s.closedSegments.iter()

	for iter.hasNext() {
		record, err := iter.next().get(key)
		if err == nil {
			return resolveRecord(record)
		} else if !kvdb.IsNotFoundError(err) {
			return nil, err
		}
	}

	return nil, kvdb.NewNotFoundError(key)
}

func resolveRecord(record *record.Record) ([]byte, error) {
	if record.IsTombstone() {
		return nil, kvdb.NewNotFoundError(record.Key())
	}

	return record.Value(), nil
}
Writing

We will always write to the open segment but will need to check if the segment is full or not before writing. If the segment is full we need to close it and create a new open segment to which we write the record.

// Put saves the value to the database and returns any error encountered
func (s *Store) Put(key string, value []byte) error {
	record := record.NewValue(key, value)

	if record.Size() > s.maxRecordSize {
		msg := fmt.Sprintf("key-value too big, max size: %d", s.maxRecordSize)
		return kvdb.NewBadRequestError(msg)
	}

	return s.append(record)
}

// Delete removes the value from the store and returns any error encountered
func (s *Store) Delete(key string) error {
	record := record.NewTombstone(key)
	return s.append(record)
}

func (s *Store) append(record *record.Record) error {
	if s.openSegment.size()+int64(record.Size()) > s.maxSegmentSize {
		if err := s.rotateOpenSegment(); err != nil {
			return err
		}
	}

	return s.openSegment.append(record)
}

func (s *Store) rotateOpenSegment() error {
	newOpenSegment := newSegment(s.storagePath, s.maxRecordSize, s.async, s.logger)
	if err := s.openSegment.changeSuffix(openSegmentSuffix, closedSegmentSuffix); err != nil {
		return err
	}

	s.closedSegments.push(s.openSegment)
	s.openSegment = newOpenSegment
}

A first round of race detection

Now that we have a first walking skeleton of our database engine we can start to look for race conditions. Go has a built-in race detector which can be activated by setting the -race flag when either build the application or running tests.

More information about it can be found here: https://golang.org/doc/articles/race_detector.html.

In order for the race-detector to find possible race-conditions we want to execute as much of our code as possible. We can do this by writing simulating real-world usage of our database.

const (
	readers     = 20
	writers     = 40
	removers    = 2
	nWrites     = 1000
	nReads      = 1000
	nRemoves    = 1000
	cardinality = 1000
)

var data string = `{
  "extends": [
    "tslint:latest",
    "tslint-config-standard",
    "tslint-react"
  ],
  "rules": {
    "semicolon": [true, "never"],
    "object-literal-sort-keys": false,
    "trailing-comma": [true, {"multiline": "always", "singleline": "never"}],
    "jsx-no-lambda": false,
    "jsx-no-multiline-js": false,
    "quotemark": [true, "single", "jsx-double"],
    "no-implicit-dependencies": [true, "dev"],
    "no-console": [false],
    "max-line-length": [true, 200]
  }
}`

func TestLoad(t *testing.T) {
	dbPath, err := ioutil.TempDir("./", dbPathPattern)
	if err != nil {
		t.Fatal(err)
	}

	defer os.RemoveAll(dbPath)

	logger := log.New(os.Stdout, "", log.LstdFlags)
	maxRecordSize := 5 * 1024
	maxSegmentSize := 20 * 1024
	async := true

	store, err := compactedaol.NewStore(compactedaol.Config{
		Async:          &async,
		Logger:         logger,
		BasePath:       dbPath,
		MaxRecordSize:  &maxRecordSize,
		MaxSegmentSize: &maxSegmentSize,
	})

	if err != nil {
		t.Fatal(err)
	}

	run(t, store)
}

func run(t *testing.T, store kvdb.Store) {
	wg := sync.WaitGroup{}
	for i := 0; i < writers; i++ {
		wg.Add(1)
		go func(wi int, gr *sync.WaitGroup) {
			writer(t, store)
			gr.Done()
		}(i, &wg)
	}

	for i := 0; i < readers; i++ {
		wg.Add(1)
		go func(ri int, gr *sync.WaitGroup) {
			reader(t, store)
			gr.Done()
		}(i, &wg)
	}

	for i := 0; i < removers; i++ {
		wg.Add(1)
		go func(ri int, gr *sync.WaitGroup) {
			remover(t, store)
			gr.Done()
		}(i, &wg)
	}

	wg.Wait()
	store.Close()
}

func reader(t *testing.T, store kvdb.Store) {
	for i := 0; i < nReads; i++ {
		rnd := rand.Intn(cardinality)
		_, err := store.Get(fmt.Sprintf("key-%d", rnd))
		if err != nil && !kvdb.IsNotFoundError(err) {
			t.Fatal(err)
		}
	}
}

func writer(t *testing.T, store kvdb.Store) {
	for i := 0; i < nWrites; i++ {
		rnd := rand.Intn(cardinality)
		err := store.Put(fmt.Sprintf("key-%d", rnd), []byte(data))
		if err != nil {
			t.Fatal(err)
		}
	}
}

func remover(t *testing.T, store kvdb.Store) {
	for i := 0; i < nRemoves; i++ {
		rnd := rand.Intn(cardinality)
		err := store.Delete(fmt.Sprintf("key-%d", rnd))
		if err != nil {
			t.Fatal(err)
		}
	}
}

Executing this with:

$> go test -race -run='TestLoad' ./test

gives (among many other data races)

$> go test -race -run='TestLoad' ./test
=== RUN   TestLoad
==================
WARNING: DATA RACE
Read at 0x00c00009a538 by goroutine 8:
  github.com/olif/kvdb/pkg/kvdb/compactedaol.(*index).put()
      /Users/niklas/workspace/kvdb/pkg/kvdb/compactedaol/segment.go:28 +0x488
  github.com/olif/kvdb/pkg/kvdb/compactedaol.(*segment).append()
      /Users/niklas/workspace/kvdb/pkg/kvdb/compactedaol/segment.go:140 +0x40d
  github.com/olif/kvdb/pkg/kvdb/compactedaol.(*Store).append()
      /Users/niklas/workspace/kvdb/pkg/kvdb/compactedaol/store.go:201 +0x12a
  github.com/olif/kvdb/pkg/kvdb/compactedaol.(*Store).Put()
      /Users/niklas/workspace/kvdb/pkg/kvdb/compactedaol/store.go:187 +0x24c
  github.com/olif/kvdb/test.writer()
      /Users/niklas/workspace/kvdb/test/load_test.go:116 +0x151
  github.com/olif/kvdb/test.run.func1()
      /Users/niklas/workspace/kvdb/test/load_test.go:78 +0x4c

Previous write at 0x00c00009a538 by goroutine 9:
  github.com/olif/kvdb/pkg/kvdb/compactedaol.(*index).put()
      /Users/niklas/workspace/kvdb/pkg/kvdb/compactedaol/segment.go:29 +0x539
  github.com/olif/kvdb/pkg/kvdb/compactedaol.(*segment).append()
      /Users/niklas/workspace/kvdb/pkg/kvdb/compactedaol/segment.go:140 +0x40d
  github.com/olif/kvdb/pkg/kvdb/compactedaol.(*Store).append()
      /Users/niklas/workspace/kvdb/pkg/kvdb/compactedaol/store.go:201 +0x12a
  github.com/olif/kvdb/pkg/kvdb/compactedaol.(*Store).Put()
      /Users/niklas/workspace/kvdb/pkg/kvdb/compactedaol/store.go:187 +0x24c
  github.com/olif/kvdb/test.writer()
      /Users/niklas/workspace/kvdb/test/load_test.go:116 +0x151
  github.com/olif/kvdb/test.run.func1()
      /Users/niklas/workspace/kvdb/test/load_test.go:78 +0x4c

Goroutine 8 (running) created at:
  github.com/olif/kvdb/test.run()
      /Users/niklas/workspace/kvdb/test/load_test.go:77 +0xd7
  github.com/olif/kvdb/test.TestLoad()
      /Users/niklas/workspace/kvdb/test/load_test.go:70 +0x41e
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:991 +0x1eb

Goroutine 9 (running) created at:
  github.com/olif/kvdb/test.run()
      /Users/niklas/workspace/kvdb/test/load_test.go:77 +0xd7
  github.com/olif/kvdb/test.TestLoad()
      /Users/niklas/workspace/kvdb/test/load_test.go:70 +0x41e
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:991 +0x1eb
==================

. Seems like we have some work to do. From the above stacktrace we can directly see that we have a data-race in the segment index which is expected. We can solve this problem by adding a simple RW-mutex.

type index struct {
	table  map[string]int64
	mutex  sync.RWMutex
	cursor int64
}

func (i *index) get(key string) (int64, bool) {
	i.mutex.RLock()
	defer i.mutex.RUnlock()
	val, ok := i.table[key]
	return val, ok
}

func (i *index) put(key string, written int64) {
	i.mutex.Lock()
	defer i.mutex.Unlock()
	i.table[key] = i.cursor
	i.cursor += written
}

The other data races where handled in this way:

  1. Run the load_test with the -race flag.

  2. Try fix the issue by adding a mutex

  3. Run the race-detector again and see if the mutex solved the issue

  4. Repeat

and resulted in these changes.

The race-detector does not guarantee to find all race-conditions but it should not give any false positive. This means that if the race-detector do find a data race then there is a bug in the code which should be fixed.

It should be noted that the race-detector is not a panacea. It is much better to not having to resort to using mutexes but instead trying to avoid data races all-together. Lots of mutexes sprinkled throughout the code will make it hard to maintain.

Compaction and merging process

Since the compaction and merging only have to care about the segment files we can run the process in background and only affect the engine itself when a new segment has been created.

The compaction and merging can divided into three steps

files = select_files_for_compaction()
new_file = merge_and_compact(files)
engine.replace_segments(files, new_file)

and it is only the last step that affects the database engine so we will let that step be implemented by the engine as a callback.

type onCompactionDoneFunc func(targetFile string, compactedFiles []string) error

type compacter struct {
	onCompactionDone    onCompactionDoneFunc
	compactionThreshold int
	storagePath         string
	maxRecordSize       int
	semaphore           *semaphore.Weighted
	logger              *log.Logger
}

func newCompacter(
	onCompactionDone onCompactionDoneFunc,
	compactionThreshold int,
	storagePath string,
	maxRecordSize int,
	logger *log.Logger) *compacter {

	return &compacter{
		onCompactionDone:    onCompactionDone,
		compactionThreshold: compactionThreshold,
		storagePath:         storagePath,
		maxRecordSize:       maxRecordSize,
		semaphore:           semaphore.NewWeighted(1),
		logger:              logger,
	}
}

func (c *compacter) run(interval time.Duration) {
	go (func() {
		for range time.Tick(interval) {
			go func() {
				if c.semaphore.TryAcquire(1) {
					defer c.semaphore.Release(1)
					c.logger.Println("compaction: running compaction")

					filesToCompact := c.selectFilesForCompaction()
					if filesToCompact == nil || len(filesToCompact) == 0 {
						c.logger.Println("compaction: no files to compact")
						return
					}

					targetFile := strings.ReplaceAll(filesToCompact[0], closedSegmentSuffix, compactionSuffix)
					if err := c.doCompaction(targetFile, filesToCompact); err != nil {
						c.logger.Printf("compaction: could not run compaction: %s", err)
						return
					}

					if err := c.onCompactionDone(targetFile, filesToCompact); err != nil {
						c.logger.Printf("compaction: could not run onCompactionDone: %s", err)
					}
				}
			}()
		}
	})()
}

func (c *compacter) stop() {
	ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second)
	defer cancel()

	c.semaphore.Acquire(ctx, 1)
	c.ticker.Stop()
	c.semaphore.Release(1)
}

The compaction process will be run once every compactionInterval. We make life easy for ourselves by only allowing one compaction process at a time to be run by adding a semaphore.

The only thing left now is to implement the three steps:

Selecting files

// selectFilesForCompaction tries to select as many consecutive files as
// possible without exceeding the CompactionThreshold.
func (c *compacter) selectFilesForCompaction() []string {
	files, _ := listFilesWithSuffix(c.storagePath, closedSegmentSuffix, false)

	fileGroup := []string{}
	var groupSize int64 = 0
	for i := range files {
		fs := files[i].info
		if groupSize+fs.Size() < int64(c.compactionThreshold) {
			groupSize += fs.Size()
			fileGroup = append(fileGroup, files[i].filepath)
		} else {
			if len(fileGroup) > 1 {
				break
			} else {
				fileGroup = []string{files[i].filepath}
				groupSize = fs.Size()
			}
		}
	}

	if len(fileGroup) >= 2 {
		sort.Sort(sort.Reverse(sort.StringSlice(fileGroup)))
		return fileGroup
	}

	return nil
}

Doing the compaction

func (c *compacter) doCompaction(target string, sources []string) error {
	var sourceScanners = []*record.Scanner{}

	for _, source := range sources {
		file, err := os.Open(source)
		if err != nil {
			return fmt.Errorf("could not open file for compaction: %w", err)
		}
		defer file.Close()

		scanner, err := record.NewScanner(file, c.maxRecordSize)
		if err != nil {
			return err
		}

		sourceScanners = append([]*record.Scanner{scanner}, sourceScanners...)
	}

	data := map[string]*record.Record{}
	for _, source := range sourceScanners {
		for source.Scan() {
			if source.Err() != nil {
				c.logger.Fatalf("compacter: skipping corrupted record for key: %s\n", source.Record().Key())
			} else {
				record := source.Record()
				data[record.Key()] = record
			}
		}
	}

	c.logger.Printf("compacter: creating new segment: %s\n", target)
	targetFile, err := os.OpenFile(target, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600)
	if err != nil {
		return fmt.Errorf("could not create compaction target: %w", err)
	}
	defer targetFile.Close()

	for _, record := range data {
		_, err := record.Write(targetFile)
		if err != nil {
			return fmt.Errorf("compacter: could not write to target: %w", err)
		}
	}

	return nil
}

Pruning the existing segments and avoiding race conditions (onCompactionDone)

When we have created a new segment during the compaction process we need to add it in the right position in the segmentStack. All segments that have been merged should be removed except for the most recent one which we instead want to replace with the newly created segment.

This means that we need to modify the segment stack which will cause race-conditions. Again we start out with a naive implementation and the run the race-detector.

func (s *Store) onCompactionDone(targetFile string, compactedFiles []string) error {
	newSegment, err := fromFile(targetFile, s.maxRecordSize, s.async, s.logger)
	if err != nil {
		return fmt.Errorf("could not create segment of compaction target: %w", err)
	}

	err = s.closedSegments.replace(func(segment *segment) bool {
		return segment.storagePath == compactedFiles[0]
	}, newSegment)

	if err != nil {
		return fmt.Errorf("could not replace with compacted segment: %w", err)
	}

	filesToRemove := compactedFiles[1:]
	for i := range filesToRemove {
		err = s.closedSegments.remove(func(segment *segment) bool {
			return filesToRemove[i] == segment.storagePath
		})

		if err != nil {
			return fmt.Errorf("could not remove compacted segment: %w", err)
		}
	}

	if err = newSegment.changeSuffix(compactionSuffix, closedSegmentSuffix); err != nil {
		return fmt.Errorf("could not rename compacted segment: %w", err)
	}

	return nil
}

The onCompactionDone operation calls requires two new operations on the segmentStack. replace and remove which we also implement naively.

func (s *segmentStack) remove(predicate func(segment *segment) bool) error {
	rem := []*segment{}

	for i := range s.segments {
		if predicate(s.segments[i]) {
			if err := s.segments[i].clearFile(); err != nil {
				return fmt.Errorf("could not remove segment file: %s, due to: %w",
					s.segments[i].storagePath, err)
			}
		} else {
			rem = append(rem, s.segments[i])
		}
	}
	s.segments = rem

	return nil
}

func (s *segmentStack) replace(predicate func(segment *segment) bool, seg *segment) error {
	rem := []*segment{}

	for i := range s.segments {
		if predicate(s.segments[i]) {
			if err := s.segments[i].clearFile(); err != nil {
				return fmt.Errorf("could not remove segment file: %s, due to: %w",
					s.segments[i].storagePath, err)
			}
			rem = append(rem, seg)
		} else {
			rem = append(rem, s.segments[i])
		}
	}
	s.segments = rem

	return nil
}

Again we run the race-detector and fix any found data-races. The resulting changes can be found here.

Finally our compactedaol database is finished and we can take it out on some performance tests.

Performance

Since the compaction is done i background we do not expect to affect the performance of the engine in any significant way. The `Read` performance will be a bit slower since we now possible have to go through multiple segment indexes to see if any key matches. Also, when doing compaction we are writing lots of data to disk which will affect the overall I/O-performance.

Lets run the same tests as we did last time.

❯ go test -run=^$ -bench='Benchmark(Aol|IndexedAol|CompactedAol)(Write)[0-9]*' ./test | prettybench

goos: darwin
goarch: amd64
pkg: github.com/olif/kvdb/test
PASS
benchmark                                iter       time/iter   throughput
---------                                ----       ---------   ----------
BenchmarkAolWrite100Db100-4             10000    111.53 μs/op    1.06 MB/s
BenchmarkAolWrite1000Db100-4            10000    115.00 μs/op    8.85 MB/s
BenchmarkIndexedAolWrite100Db100-4      12068    101.82 μs/op    1.16 MB/s
BenchmarkIndexedAolWrite1000Db100-4     10000    121.10 μs/op    8.41 MB/s
BenchmarkCompactedAolWrite100Db100-4    12124    103.47 μs/op    1.14 MB/s
BenchmarkCompactedAolWrite1000Db100-4    8928    128.21 μs/op    7.94 MB/s
ok  	github.com/olif/kvdb/test	9.116s

We are measuring the time and throughput for writing a single record with a value of byte size 100 resp 1000 bytes without syncing to disk. As we can see, there is a small difference in performance but not significant.

Let's check out the difference in Read performance:

❯ go test -run=^$ -bench='Benchmark(Aol|IndexedAol|CompactedAol)(Read)[0-9]*' ./test | prettybench
goos: darwin
goarch: amd64
pkg: github.com/olif/kvdb/test
PASS
benchmark                                 iter        time/iter
---------                                 ----        ---------
BenchmarkAolReadFrom10000Db-4              252    4963.48 μs/op
BenchmarkIndexedAolReadFrom10000Db-4     56095      19.08 μs/op
BenchmarkCompactedAolReadFrom10000Db-4   50811      28.08 μs/op
ok  	github.com/olif/kvdb/test	39.140s

. Again we are reading a random value from the database with an dataset of 10.000 records. The compactedaol engine has a maxSegmentSize of only 20kB so that it consists of around 50 segments. As we expect we can see a slight decrease in Read performance.

Summary

In this post we implemented compaction and merging which is a sort of garbage collection so that we can reclaim disk space. This comes at the cost of more complex code and a slight decrease in read performance.

There are still some issues with this approach. For instance, we run the merge and compaction process we need to store in entire resulting segment in memory. Also, each segment needs a dense index which means that the entire key-space needs to reside in volatile-memory.

In the next blog post we will try to resolve the issues by implementing SSTables used by a Log-Structured Merge-Tree (LSMT) engine.

comments powered by Disqus