Implementing a key-value database in Golang, Part 4 - Merging and Compaction
This is the fourth of several blog posts about implementing a simple key-value store in Golang. All code described in this blog-post can be found here: https://github.com/olif/kvdb.
Previous posts:
Intro
By now we have a durable key value database with pretty good performance due to the index we implemented in the last blog-post. It is still not fit for any real usage since the database size is increasing with every modifying request. In this post we will try to fix this by segmenting our data and then run a compaction process which rotates and prunes the segments.
Before we start the implementation we need to discuss the concept of segments a bit further, alongside the concept of compaction and merging and how it mitigates the problem of uncontrollable growth of our database.
Segments
A segment is a slice of the overall database storage. The idea is to only let the database write records to one segment at a time. When that segment has reached a certain size we close it and from then on that segment is immutable. The database then starts to write to a new segment.
Segments are never modified after they have been closed but are still used for read requests. When doing a lookup of a key we first check if the value exists in the open segment, if it is not there we look in the closed segments in a descending chronological order until we find the record we are looking for or else it does not exist.
Each segment keeps its own index so that we can perform lookups fast.
Compaction
Compaction refers to the process of removing duplicates from a segment. Running compaction of the closed segment above would result in:
As we can see, duplicates where removed and the compacted segment is now smaller than the original. The effect of compaction will depend on the nature of the data. Compaction will have a smaller effect on the overall size if the key-space is larger.
Merging
Merging refers to the process of merging several segments into one large segment. If several segments contain the same key, those records must be merged in correct order so that the most recent record takes precedence.
Merging and compaction
We can merge and compact multiple segments at the same time.
When a new segment has been created, we can replace the old segments with the newly created one and then delete the old files.
This process removes redundant data and keeps the number of segment in check so that we don't run out of memory.
Since this process can be done in a background process it won't affect the overall database performance directly except for the synchronization of segments between the compaction process and the database. We expect the compaction and merge process to have a small performance impact.
Implementation
Instead of having one large file for storing our data as we did in the previous post we will split the data into segments. Our database engine will write to the segment we define as "open" and will read from all segments in descending chronologically order until it find the requested record.
In more detail a database read
will (in pseudocode):
read(key) {
record = open_segment.find(key)
if record {
return record
} else {
foreach segment in closed_segments {
record = segment.find(key)
if record {
return record
}
}
}
}
and a write
will:
write() {
if open_segment.size >= SEGMENT_MAX_SIZE {
open_segment.close()
closed_segments.add(open_segment)
open_segment = create_segment()
open_segment.write(record)
}
open_segment.write(record)
}
. Both read()
and write()
will be used concurrently. We have dealt with
concurrency issues before but now it will become more complex since we have more
moving parts. Concurrency is really hard to get right but go
has great tooling
around concurrency which we are going to use in order to avoid data-races.
We can now start our implementation. We will do it naively at first, not taking concurrency into account and then when we have the basic functionality in place, find and fix any race conditions.
Segments
We define a segment as:
type index struct {
table map[string]int64
cursor int64
}
func (i *index) get(key string) (int64, bool) {
val, ok := i.table[key]
return val, ok
}
func (i *index) put(key string, written int64) {
i.table[key] = i.cursor
i.cursor += written
}
type segment struct {
storagePath string
maxRecordSize int
logger *log.Logger
async bool
suffix string
index *index
}
func newSegment(baseDir string, maxRecordSize int, async bool, logger *log.Logger) *segment {
filename := genFileName(openSegmentSuffix)
filePath := path.Join(baseDir, filename)
return &segment{
storagePath: filePath,
logger: logger,
async: async,
index: &index{
cursor: 0,
table: map[string]int64{},
},
}
}
func fromFile(filePath string, maxRecordSize int, async bool, logger *log.Logger) (*segment, error) {
idx := index{
cursor: 0,
table: map[string]int64{},
}
f, err := os.OpenFile(filePath, os.O_RDONLY|os.O_CREATE, 0600)
defer f.Close()
if err != nil {
return nil, err
}
scanner, err := record.NewScanner(f, maxRecordSize)
if err != nil {
return nil, err
}
for scanner.Scan() {
record := scanner.Record()
idx.put(record.Key(), int64(record.Size()))
}
if scanner.Err() != nil {
return nil, fmt.Errorf("could not scan entry, %w", err)
}
return &segment{
storagePath: filePath,
index: &idx,
}, nil
}
func (s *segment) get(key string) (*record.Record, error) {
offset, ok := s.index.get(key)
if !ok {
return nil, kvdb.NewNotFoundError(key)
}
f, err := s.getFile(os.O_RDONLY)
defer f.Close()
if err != nil {
return nil, err
}
_, err = f.Seek(offset, io.SeekStart)
if err != nil {
return nil, err
}
scanner, err := record.NewScanner(f, s.maxRecordSize)
if err != nil {
return nil, err
}
if scanner.Scan() {
return scanner.Record(), nil
}
return nil, kvdb.NewNotFoundError(key)
}
func (s *segment) append(record *record.Record) error {
file, err := s.getFile(os.O_CREATE | os.O_WRONLY | os.O_APPEND)
defer file.Close()
if err != nil {
return fmt.Errorf("could not open file: %s for write, %w", s.storagePath, err)
}
n, err := record.Write(file)
if err != nil {
return fmt.Errorf("could not write record to file: %s, %w", s.storagePath, err)
}
if !s.async {
if err := file.Sync(); err != nil {
return err
}
}
if err := file.Close(); err != nil {
return err
}
s.index.put(record.Key(), int64(n))
return nil
}
func (s *segment) getFile(mode int) (*os.File, error) {
return os.OpenFile(s.storagePath, mode, 0600)
}
func (s *segment) changeSuffix(oldSuffix, newSuffix string) error {
newFilePath := strings.Replace(s.storagePath, oldSuffix, newSuffix, 1)
if err := os.Rename(s.storagePath, newFilePath); err != nil {
return err
}
s.storagePath = newFilePath
return nil
}
func (s *segment) size() int64 {
return s.index.cursor
}
func (s *segment) clearFile() error {
s.index.table = map[string]int64{}
s.index.cursor = 0
return os.Remove(s.storagePath)
}
The implementation of a segment is almost exactly that of the indexed database engine from the previous post except that we now need some additional functionality for "closing" a segment and removing a segment in case it has been merged.
Database engine
The engine will make use of the segments. The files representing segments will have a suffix according to their open/closed status.
type | file suffix |
---|---|
open segment | .oseg |
closed segment | .cseg |
We will also keep track of the when the segments was first created by setting the file name to the unix timestamp. A closed segment will then have the following appearance:
$> 1600260653373254000.cseg
This makes it possible to sort the segments in order based on when they where written.
The database engine will keep a stack of all closed segments and a reference to the current open segment. When the open segment is full, we will change file-suffix, put the segment into the closed segment stack and then create a new segment which we now use as the "open" segment.
Segment stack
In order to keep track of our closed segment we will create a data structure to hold and iterate these.
type segmentStack struct {
segments []*segment
}
func newSegmentStack() *segmentStack {
segments := make([]*segment, 0)
return &segmentStack{
segments: segments,
}
}
func (s *segmentStack) iter() *segmentStackIter {
return &segmentStackIter{
segments: s.segments,
pos: -1,
}
}
func (s *segmentStack) push(seg *segment) {
s.segments = append([]*segment{seg}, s.segments...)
}
type segmentStackIter struct {
segments []*segment
pos int
}
func (i *segmentStackIter) hasNext() bool {
return i.pos < len(i.segments)-1
}
func (i *segmentStackIter) next() *segment {
i.pos = i.pos + 1
return i.segments[i.pos]
}
The engine implementation
We can now use the segmentStack
in our engine implementation.
const (
defaultAsync = false
defaultMaxRecordSize = 1024 * 1024 //1Mb
defaultSegmentMaxSize = 4096 * 1024 //4Mb
closedSegmentSuffix = ".cseg"
openSegmentSuffix = ".oseg"
)
var voidLogger = log.New(ioutil.Discard, "", log.LstdFlags)
// Store implements the kvdb store interface providing a simple key-value
// database engine based on an append-log.
type Store struct {
storagePath string
maxRecordSize int
maxSegmentSize int64
logger *log.Logger
async bool
openSegment *segment
// closedSegmentStack contains all immutable segments. The newest segment
// has position 0 and the oldest at the last position
closedSegments *segmentStack
}
// Config contains the configuration properties for the compacted aol store
type Config struct {
// Storage path
BasePath string
// Sets a limit on the size of the records
MaxRecordSize *int
// If true, fsync will be called on every write
Async *bool
// MaxSegmentSize defines the maximum size of the segments, must be >=
// MaxRecordSize
MaxSegmentSize *int
Logger *log.Logger
}
// NewStore returns a new SimpleLogStore
func NewStore(config Config) (*Store, error) {
var (
maxRecordSize = defaultMaxRecordSize
maxSegmentSize = defaultSegmentMaxSize
storagePath = config.BasePath
async = defaultAsync
logger = voidLogger
)
if _, err := os.OpenFile(storagePath, os.O_CREATE, 0600); err != nil {
return nil, err
}
if config.MaxRecordSize != nil {
maxRecordSize = *config.MaxRecordSize
}
if config.MaxSegmentSize != nil {
maxSegmentSize = *config.MaxSegmentSize
}
if config.Async != nil {
async = *config.Async
}
if config.Logger != nil {
logger = config.Logger
}
openSegment, err := loadOpenSegment(storagePath, maxRecordSize, async, logger)
if err != nil {
return nil, err
}
closedSegments, err := loadClosedSegments(storagePath, maxRecordSize, async, logger)
if err != nil {
return nil, err
}
store := &Store{
storagePath: storagePath,
maxRecordSize: maxRecordSize,
maxSegmentSize: int64(maxSegmentSize),
async: async,
logger: logger,
openSegment: openSegment,
closedSegments: closedSegments,
}
return store, nil
}
func loadOpenSegment(storagePath string, maxRecordSize int, async bool, log *log.Logger) (*segment, error) {
fi, err := listFilesWithSuffix(storagePath, openSegmentSuffix, true)
if err != nil {
return nil, fmt.Errorf("could not load open segment: %w", err)
}
switch len(fi) {
case 0:
return newSegment(storagePath, maxRecordSize, async, log), nil
case 1:
return fromFile(fi[0].filepath, maxRecordSize, async, log)
default:
return nil, fmt.Errorf("more than one open segment found")
}
}
func loadClosedSegments(storagePath string, maxRecordSize int, async bool, log *log.Logger) (*segmentStack, error) {
fis, err := listFilesWithSuffix(storagePath, closedSegmentSuffix, true)
if err != nil {
return nil, fmt.Errorf("could not list closed segment files: %w", err)
}
closedSegments := newSegmentStack()
for _, fi := range fis {
s, err := fromFile(fi.filepath, maxRecordSize, async, log)
if err != nil {
return nil, fmt.Errorf("could not load closed segment: %s, %w", fi.filepath, err)
}
closedSegments.push(s)
}
return closedSegments, nil
}
// Close closes the store
func (s *Store) Close() error {
s.logger.Println("Closing database")
return nil
}
// IsNotFoundError returns true if the error signales that a non-existing key
// was requested
func (s *Store) IsNotFoundError(err error) bool {
return kvdb.IsNotFoundError(err)
}
// IsBadRequestError returns true if the error, or any of the wrapped errors
// is of type BadRequestError
func (s *Store) IsBadRequestError(err error) bool {
return kvdb.IsBadRequestError(err)
}
When the engine is instantiated we check the storagePath and pre-load any existing segment files into memory.
Reading
Since a record now can exist in any segment we have to check them all. Newer values takes precedence which means that we first have to look in our open segment. If the record is not found there we have to go through all our closed segments.
// Get returns the value associated with the key or a kvdb.NotFoundError if the
// key was not found, or any other error encountered
func (s *Store) Get(key string) ([]byte, error) {
record, err := s.openSegment.get(key)
if err == nil {
return resolveRecord(record)
} else if !kvdb.IsNotFoundError(err) {
return nil, err
}
iter := s.closedSegments.iter()
for iter.hasNext() {
record, err := iter.next().get(key)
if err == nil {
return resolveRecord(record)
} else if !kvdb.IsNotFoundError(err) {
return nil, err
}
}
return nil, kvdb.NewNotFoundError(key)
}
func resolveRecord(record *record.Record) ([]byte, error) {
if record.IsTombstone() {
return nil, kvdb.NewNotFoundError(record.Key())
}
return record.Value(), nil
}
Writing
We will always write to the open segment but will need to check if the segment is full or not before writing. If the segment is full we need to close it and create a new open segment to which we write the record.
// Put saves the value to the database and returns any error encountered
func (s *Store) Put(key string, value []byte) error {
record := record.NewValue(key, value)
if record.Size() > s.maxRecordSize {
msg := fmt.Sprintf("key-value too big, max size: %d", s.maxRecordSize)
return kvdb.NewBadRequestError(msg)
}
return s.append(record)
}
// Delete removes the value from the store and returns any error encountered
func (s *Store) Delete(key string) error {
record := record.NewTombstone(key)
return s.append(record)
}
func (s *Store) append(record *record.Record) error {
if s.openSegment.size()+int64(record.Size()) > s.maxSegmentSize {
if err := s.rotateOpenSegment(); err != nil {
return err
}
}
return s.openSegment.append(record)
}
func (s *Store) rotateOpenSegment() error {
newOpenSegment := newSegment(s.storagePath, s.maxRecordSize, s.async, s.logger)
if err := s.openSegment.changeSuffix(openSegmentSuffix, closedSegmentSuffix); err != nil {
return err
}
s.closedSegments.push(s.openSegment)
s.openSegment = newOpenSegment
}
A first round of race detection
Now that we have a first walking skeleton of our database engine we can start to
look for race conditions. Go has a built-in race detector which can be activated
by setting the -race
flag when either build the application or running tests.
More information about it can be found here: https://golang.org/doc/articles/race_detector.html.
In order for the race-detector to find possible race-conditions we want to execute as much of our code as possible. We can do this by writing simulating real-world usage of our database.
const (
readers = 20
writers = 40
removers = 2
nWrites = 1000
nReads = 1000
nRemoves = 1000
cardinality = 1000
)
var data string = `{
"extends": [
"tslint:latest",
"tslint-config-standard",
"tslint-react"
],
"rules": {
"semicolon": [true, "never"],
"object-literal-sort-keys": false,
"trailing-comma": [true, {"multiline": "always", "singleline": "never"}],
"jsx-no-lambda": false,
"jsx-no-multiline-js": false,
"quotemark": [true, "single", "jsx-double"],
"no-implicit-dependencies": [true, "dev"],
"no-console": [false],
"max-line-length": [true, 200]
}
}`
func TestLoad(t *testing.T) {
dbPath, err := ioutil.TempDir("./", dbPathPattern)
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dbPath)
logger := log.New(os.Stdout, "", log.LstdFlags)
maxRecordSize := 5 * 1024
maxSegmentSize := 20 * 1024
async := true
store, err := compactedaol.NewStore(compactedaol.Config{
Async: &async,
Logger: logger,
BasePath: dbPath,
MaxRecordSize: &maxRecordSize,
MaxSegmentSize: &maxSegmentSize,
})
if err != nil {
t.Fatal(err)
}
run(t, store)
}
func run(t *testing.T, store kvdb.Store) {
wg := sync.WaitGroup{}
for i := 0; i < writers; i++ {
wg.Add(1)
go func(wi int, gr *sync.WaitGroup) {
writer(t, store)
gr.Done()
}(i, &wg)
}
for i := 0; i < readers; i++ {
wg.Add(1)
go func(ri int, gr *sync.WaitGroup) {
reader(t, store)
gr.Done()
}(i, &wg)
}
for i := 0; i < removers; i++ {
wg.Add(1)
go func(ri int, gr *sync.WaitGroup) {
remover(t, store)
gr.Done()
}(i, &wg)
}
wg.Wait()
store.Close()
}
func reader(t *testing.T, store kvdb.Store) {
for i := 0; i < nReads; i++ {
rnd := rand.Intn(cardinality)
_, err := store.Get(fmt.Sprintf("key-%d", rnd))
if err != nil && !kvdb.IsNotFoundError(err) {
t.Fatal(err)
}
}
}
func writer(t *testing.T, store kvdb.Store) {
for i := 0; i < nWrites; i++ {
rnd := rand.Intn(cardinality)
err := store.Put(fmt.Sprintf("key-%d", rnd), []byte(data))
if err != nil {
t.Fatal(err)
}
}
}
func remover(t *testing.T, store kvdb.Store) {
for i := 0; i < nRemoves; i++ {
rnd := rand.Intn(cardinality)
err := store.Delete(fmt.Sprintf("key-%d", rnd))
if err != nil {
t.Fatal(err)
}
}
}
Executing this with:
$> go test -race -run='TestLoad' ./test
gives (among many other data races)
$> go test -race -run='TestLoad' ./test
=== RUN TestLoad
==================
WARNING: DATA RACE
Read at 0x00c00009a538 by goroutine 8:
github.com/olif/kvdb/pkg/kvdb/compactedaol.(*index).put()
/Users/niklas/workspace/kvdb/pkg/kvdb/compactedaol/segment.go:28 +0x488
github.com/olif/kvdb/pkg/kvdb/compactedaol.(*segment).append()
/Users/niklas/workspace/kvdb/pkg/kvdb/compactedaol/segment.go:140 +0x40d
github.com/olif/kvdb/pkg/kvdb/compactedaol.(*Store).append()
/Users/niklas/workspace/kvdb/pkg/kvdb/compactedaol/store.go:201 +0x12a
github.com/olif/kvdb/pkg/kvdb/compactedaol.(*Store).Put()
/Users/niklas/workspace/kvdb/pkg/kvdb/compactedaol/store.go:187 +0x24c
github.com/olif/kvdb/test.writer()
/Users/niklas/workspace/kvdb/test/load_test.go:116 +0x151
github.com/olif/kvdb/test.run.func1()
/Users/niklas/workspace/kvdb/test/load_test.go:78 +0x4c
Previous write at 0x00c00009a538 by goroutine 9:
github.com/olif/kvdb/pkg/kvdb/compactedaol.(*index).put()
/Users/niklas/workspace/kvdb/pkg/kvdb/compactedaol/segment.go:29 +0x539
github.com/olif/kvdb/pkg/kvdb/compactedaol.(*segment).append()
/Users/niklas/workspace/kvdb/pkg/kvdb/compactedaol/segment.go:140 +0x40d
github.com/olif/kvdb/pkg/kvdb/compactedaol.(*Store).append()
/Users/niklas/workspace/kvdb/pkg/kvdb/compactedaol/store.go:201 +0x12a
github.com/olif/kvdb/pkg/kvdb/compactedaol.(*Store).Put()
/Users/niklas/workspace/kvdb/pkg/kvdb/compactedaol/store.go:187 +0x24c
github.com/olif/kvdb/test.writer()
/Users/niklas/workspace/kvdb/test/load_test.go:116 +0x151
github.com/olif/kvdb/test.run.func1()
/Users/niklas/workspace/kvdb/test/load_test.go:78 +0x4c
Goroutine 8 (running) created at:
github.com/olif/kvdb/test.run()
/Users/niklas/workspace/kvdb/test/load_test.go:77 +0xd7
github.com/olif/kvdb/test.TestLoad()
/Users/niklas/workspace/kvdb/test/load_test.go:70 +0x41e
testing.tRunner()
/usr/local/go/src/testing/testing.go:991 +0x1eb
Goroutine 9 (running) created at:
github.com/olif/kvdb/test.run()
/Users/niklas/workspace/kvdb/test/load_test.go:77 +0xd7
github.com/olif/kvdb/test.TestLoad()
/Users/niklas/workspace/kvdb/test/load_test.go:70 +0x41e
testing.tRunner()
/usr/local/go/src/testing/testing.go:991 +0x1eb
==================
. Seems like we have some work to do. From the above stacktrace we can directly
see that we have a data-race in the segment index which is expected. We can
solve this problem by adding a simple RW-mutex
.
type index struct {
table map[string]int64
mutex sync.RWMutex
cursor int64
}
func (i *index) get(key string) (int64, bool) {
i.mutex.RLock()
defer i.mutex.RUnlock()
val, ok := i.table[key]
return val, ok
}
func (i *index) put(key string, written int64) {
i.mutex.Lock()
defer i.mutex.Unlock()
i.table[key] = i.cursor
i.cursor += written
}
The other data races where handled in this way:
-
Run the load_test with the
-race
flag. -
Try fix the issue by adding a mutex
-
Run the race-detector again and see if the mutex solved the issue
-
Repeat
and resulted in these changes.
The race-detector does not guarantee to find all race-conditions but it should not give any false positive. This means that if the race-detector do find a data race then there is a bug in the code which should be fixed.
It should be noted that the race-detector is not a panacea. It is much better to not having to resort to using mutexes but instead trying to avoid data races all-together. Lots of mutexes sprinkled throughout the code will make it hard to maintain.
Compaction and merging process
Since the compaction and merging only have to care about the segment files we can run the process in background and only affect the engine itself when a new segment has been created.
The compaction and merging can divided into three steps
files = select_files_for_compaction()
new_file = merge_and_compact(files)
engine.replace_segments(files, new_file)
and it is only the last step that affects the database engine so we will let that step be implemented by the engine as a callback.
type onCompactionDoneFunc func(targetFile string, compactedFiles []string) error
type compacter struct {
onCompactionDone onCompactionDoneFunc
compactionThreshold int
storagePath string
maxRecordSize int
semaphore *semaphore.Weighted
logger *log.Logger
}
func newCompacter(
onCompactionDone onCompactionDoneFunc,
compactionThreshold int,
storagePath string,
maxRecordSize int,
logger *log.Logger) *compacter {
return &compacter{
onCompactionDone: onCompactionDone,
compactionThreshold: compactionThreshold,
storagePath: storagePath,
maxRecordSize: maxRecordSize,
semaphore: semaphore.NewWeighted(1),
logger: logger,
}
}
func (c *compacter) run(interval time.Duration) {
go (func() {
for range time.Tick(interval) {
go func() {
if c.semaphore.TryAcquire(1) {
defer c.semaphore.Release(1)
c.logger.Println("compaction: running compaction")
filesToCompact := c.selectFilesForCompaction()
if filesToCompact == nil || len(filesToCompact) == 0 {
c.logger.Println("compaction: no files to compact")
return
}
targetFile := strings.ReplaceAll(filesToCompact[0], closedSegmentSuffix, compactionSuffix)
if err := c.doCompaction(targetFile, filesToCompact); err != nil {
c.logger.Printf("compaction: could not run compaction: %s", err)
return
}
if err := c.onCompactionDone(targetFile, filesToCompact); err != nil {
c.logger.Printf("compaction: could not run onCompactionDone: %s", err)
}
}
}()
}
})()
}
func (c *compacter) stop() {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second)
defer cancel()
c.semaphore.Acquire(ctx, 1)
c.ticker.Stop()
c.semaphore.Release(1)
}
The compaction process will be run once every compactionInterval
. We make life
easy for ourselves by only allowing one compaction process at a time to be run
by adding a semaphore.
The only thing left now is to implement the three steps:
Selecting files
// selectFilesForCompaction tries to select as many consecutive files as
// possible without exceeding the CompactionThreshold.
func (c *compacter) selectFilesForCompaction() []string {
files, _ := listFilesWithSuffix(c.storagePath, closedSegmentSuffix, false)
fileGroup := []string{}
var groupSize int64 = 0
for i := range files {
fs := files[i].info
if groupSize+fs.Size() < int64(c.compactionThreshold) {
groupSize += fs.Size()
fileGroup = append(fileGroup, files[i].filepath)
} else {
if len(fileGroup) > 1 {
break
} else {
fileGroup = []string{files[i].filepath}
groupSize = fs.Size()
}
}
}
if len(fileGroup) >= 2 {
sort.Sort(sort.Reverse(sort.StringSlice(fileGroup)))
return fileGroup
}
return nil
}
Doing the compaction
func (c *compacter) doCompaction(target string, sources []string) error {
var sourceScanners = []*record.Scanner{}
for _, source := range sources {
file, err := os.Open(source)
if err != nil {
return fmt.Errorf("could not open file for compaction: %w", err)
}
defer file.Close()
scanner, err := record.NewScanner(file, c.maxRecordSize)
if err != nil {
return err
}
sourceScanners = append([]*record.Scanner{scanner}, sourceScanners...)
}
data := map[string]*record.Record{}
for _, source := range sourceScanners {
for source.Scan() {
if source.Err() != nil {
c.logger.Fatalf("compacter: skipping corrupted record for key: %s\n", source.Record().Key())
} else {
record := source.Record()
data[record.Key()] = record
}
}
}
c.logger.Printf("compacter: creating new segment: %s\n", target)
targetFile, err := os.OpenFile(target, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600)
if err != nil {
return fmt.Errorf("could not create compaction target: %w", err)
}
defer targetFile.Close()
for _, record := range data {
_, err := record.Write(targetFile)
if err != nil {
return fmt.Errorf("compacter: could not write to target: %w", err)
}
}
return nil
}
Pruning the existing segments and avoiding race conditions (onCompactionDone
)
When we have created a new segment during the compaction process we need to add
it in the right position in the segmentStack
. All segments that have been
merged should be removed except for the most recent one which we instead want to
replace with the newly created segment.
This means that we need to modify the segment stack which will cause race-conditions. Again we start out with a naive implementation and the run the race-detector.
func (s *Store) onCompactionDone(targetFile string, compactedFiles []string) error {
newSegment, err := fromFile(targetFile, s.maxRecordSize, s.async, s.logger)
if err != nil {
return fmt.Errorf("could not create segment of compaction target: %w", err)
}
err = s.closedSegments.replace(func(segment *segment) bool {
return segment.storagePath == compactedFiles[0]
}, newSegment)
if err != nil {
return fmt.Errorf("could not replace with compacted segment: %w", err)
}
filesToRemove := compactedFiles[1:]
for i := range filesToRemove {
err = s.closedSegments.remove(func(segment *segment) bool {
return filesToRemove[i] == segment.storagePath
})
if err != nil {
return fmt.Errorf("could not remove compacted segment: %w", err)
}
}
if err = newSegment.changeSuffix(compactionSuffix, closedSegmentSuffix); err != nil {
return fmt.Errorf("could not rename compacted segment: %w", err)
}
return nil
}
The onCompactionDone
operation calls requires two new operations on the
segmentStack
. replace
and remove
which we also implement naively.
func (s *segmentStack) remove(predicate func(segment *segment) bool) error {
rem := []*segment{}
for i := range s.segments {
if predicate(s.segments[i]) {
if err := s.segments[i].clearFile(); err != nil {
return fmt.Errorf("could not remove segment file: %s, due to: %w",
s.segments[i].storagePath, err)
}
} else {
rem = append(rem, s.segments[i])
}
}
s.segments = rem
return nil
}
func (s *segmentStack) replace(predicate func(segment *segment) bool, seg *segment) error {
rem := []*segment{}
for i := range s.segments {
if predicate(s.segments[i]) {
if err := s.segments[i].clearFile(); err != nil {
return fmt.Errorf("could not remove segment file: %s, due to: %w",
s.segments[i].storagePath, err)
}
rem = append(rem, seg)
} else {
rem = append(rem, s.segments[i])
}
}
s.segments = rem
return nil
}
Again we run the race-detector and fix any found data-races. The resulting changes can be found here.
Finally our compactedaol
database is finished and we can take it out on some
performance tests.
Performance
Since the compaction is done i background we do not expect to affect the performance of the engine in any significant way. The `Read` performance will be a bit slower since we now possible have to go through multiple segment indexes to see if any key matches. Also, when doing compaction we are writing lots of data to disk which will affect the overall I/O-performance.
Lets run the same tests as we did last time.
❯ go test -run=^$ -bench='Benchmark(Aol|IndexedAol|CompactedAol)(Write)[0-9]*' ./test | prettybench
goos: darwin
goarch: amd64
pkg: github.com/olif/kvdb/test
PASS
benchmark iter time/iter throughput
--------- ---- --------- ----------
BenchmarkAolWrite100Db100-4 10000 111.53 μs/op 1.06 MB/s
BenchmarkAolWrite1000Db100-4 10000 115.00 μs/op 8.85 MB/s
BenchmarkIndexedAolWrite100Db100-4 12068 101.82 μs/op 1.16 MB/s
BenchmarkIndexedAolWrite1000Db100-4 10000 121.10 μs/op 8.41 MB/s
BenchmarkCompactedAolWrite100Db100-4 12124 103.47 μs/op 1.14 MB/s
BenchmarkCompactedAolWrite1000Db100-4 8928 128.21 μs/op 7.94 MB/s
ok github.com/olif/kvdb/test 9.116s
We are measuring the time and throughput for writing a single record with a value of byte size 100 resp 1000 bytes without syncing to disk. As we can see, there is a small difference in performance but not significant.
Let's check out the difference in Read performance:
❯ go test -run=^$ -bench='Benchmark(Aol|IndexedAol|CompactedAol)(Read)[0-9]*' ./test | prettybench
goos: darwin
goarch: amd64
pkg: github.com/olif/kvdb/test
PASS
benchmark iter time/iter
--------- ---- ---------
BenchmarkAolReadFrom10000Db-4 252 4963.48 μs/op
BenchmarkIndexedAolReadFrom10000Db-4 56095 19.08 μs/op
BenchmarkCompactedAolReadFrom10000Db-4 50811 28.08 μs/op
ok github.com/olif/kvdb/test 39.140s
. Again we are reading a random value from the database with an dataset of
10.000 records. The compactedaol
engine has a maxSegmentSize of only 20kB
so
that it consists of around 50 segments. As we expect we can see a slight
decrease in Read performance.
Summary
In this post we implemented compaction and merging which is a sort of garbage
collection so that we can reclaim disk space. This comes at the cost of more
complex code and a slight decrease in read
performance.
There are still some issues with this approach. For instance, we run the merge and compaction process we need to store in entire resulting segment in memory. Also, each segment needs a dense index which means that the entire key-space needs to reside in volatile-memory.
In the next blog post we will try to resolve the issues by implementing
SSTables
used by a Log-Structured Merge-Tree (LSMT) engine.