This repository has been archived by the owner on Dec 3, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 11
/
writeaheadlog.go
387 lines (337 loc) · 12.4 KB
/
writeaheadlog.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
// Package writeaheadlog defines and implements a general purpose, high
// performance write-ahead-log for performing ACID transactions to disk without
// sacrificing speed or latency more than fundamentally required.
package writeaheadlog
import (
"bytes"
"encoding/binary"
"fmt"
"os"
"sort"
"sync"
"sync/atomic"
"unsafe"
"github.com/NebulousLabs/errors"
)
// WAL is a general purpose, high performance write-ahead-log for performing
// ACID transactions to disk without sacrificing speed or latency more than
// fundamentally required.
type WAL struct {
// atomicNextTxnNum is used to give every transaction a unique transaction
// number. The transaction will then wait until atomicTransactionCounter allows
// the transaction to be committed. This ensures that transactions are committed
// in the correct order.
atomicNextTxnNum uint64
// atomicUnfinishedTxns counts how many transactions were created but not
// released yet. This counter needs to be 0 for the wal to exit cleanly.
atomicUnfinishedTxns int64
// Variables to coordinate batched syncs. See sync.go for more information.
atomicSyncStatus uint32 // 0: no syncing thread, 1: syncing thread, empty queue, 2: syncing thread, non-empty queue.
atomicSyncState unsafe.Pointer // points to a struct containing a RWMutex and an error
// availablePages lists the offset of file pages which currently have completed or
// voided updates in them. The pages are in no particular order.
availablePages []uint64
// filePageCount indicates the number of pages total in the file. If the
// number of availablePages ever drops below the number of pages required
// for a new transaction, then the file is extended, new pages are added,
// and the availablePages array is updated to include the extended pages.
filePageCount uint64
// wg is a WaitGroup that allows us to wait for the syncThread to finish to
// ensure a clean shutdown
wg sync.WaitGroup
// dependencies are used to inject special behaviour into the wal by providing
// custom dependencies when the wal is created and calling deps.disrupt(setting).
// The following settings are currently available
deps dependencies
logFile file
mu sync.Mutex
path string // path of the underlying logFile
}
// allocatePages creates new pages and adds them to the available pages of the wal
func (w *WAL) allocatePages(numPages uint64) {
// Starting at index 1 because the first page is reserved for metadata
start := w.filePageCount + 1
for i := start; i < start+numPages; i++ {
w.availablePages = append(w.availablePages, uint64(i)*pageSize)
}
w.filePageCount += numPages
}
// newWal initializes and returns a wal.
func newWal(path string, deps dependencies) (txns []*Transaction, w *WAL, err error) {
// Create a new WAL.
newWal := &WAL{
deps: deps,
path: path,
}
// sync.go expects the sync state to be initialized with a locked rwMu at
// startup.
ss := new(syncState)
ss.rwMu.Lock()
atomic.StorePointer(&newWal.atomicSyncState, unsafe.Pointer(ss))
// Create a condition for the wal
// Try opening the WAL file.
data, err := deps.readFile(path)
if err == nil {
// Reuse the existing wal
newWal.logFile, err = deps.openFile(path, os.O_RDWR, 0600)
if err != nil {
return nil, nil, errors.Extend(errors.New("unable to open wal logFile"), err)
}
// Recover WAL and return updates
txns, err = newWal.recoverWAL(data)
if err != nil {
err = errors.Compose(err, newWal.logFile.Close())
return nil, nil, errors.Extend(err, errors.New("unable to perform wal recovery"))
}
return txns, newWal, nil
} else if !os.IsNotExist(err) {
// the file exists but couldn't be opened
return nil, nil, errors.Extend(err, errors.New("walFile was not opened successfully"))
}
// Create new empty WAL
newWal.logFile, err = deps.create(path)
if err != nil {
return nil, nil, errors.Extend(err, errors.New("walFile could not be created"))
}
// Write the metadata to the WAL
if err = writeWALMetadata(newWal.logFile); err != nil {
return nil, nil, errors.Extend(err, errors.New("Failed to write metadata to file"))
}
return nil, newWal, nil
}
// readWALMetadata reads WAL metadata from the input file, returning an error
// if the result is unexpected.
func readWALMetadata(data []byte) (uint16, error) {
// The metadata should at least long enough to contain all the fields.
if len(data) < len(metadataHeader)+len(metadataVersion)+metadataStatusSize {
return 0, errors.New("unable to read wal metadata")
}
// Check that the header and version match.
if !bytes.Equal(data[:len(metadataHeader)], metadataHeader[:]) {
return 0, errors.New("file header is incorrect")
}
if !bytes.Equal(data[len(metadataHeader):len(metadataHeader)+len(metadataVersion)], metadataVersion[:]) {
return 0, errors.New("file version is unrecognized - maybe you need to upgrade")
}
// Determine and return the current status of the file.
fileState := uint16(data[len(metadataHeader)+len(metadataVersion)])
if fileState <= 0 || fileState > 3 {
fileState = recoveryStateUnclean
}
return fileState, nil
}
// recoverWAL recovers a WAL and returns comitted but not finished updates
func (w *WAL) recoverWAL(data []byte) ([]*Transaction, error) {
// Validate metadata
recoveryState, err := readWALMetadata(data[0:])
if err != nil {
return nil, errors.Extend(err, errors.New("unable to read wal metadata"))
}
if recoveryState == recoveryStateClean {
if err := w.writeRecoveryState(recoveryStateUnclean); err != nil {
return nil, errors.Extend(err, errors.New("unable to write WAL recovery state"))
}
return nil, nil
}
// load all normal pages
type diskPage struct {
page
nextPageOffset uint64
}
pageSet := make(map[uint64]*diskPage) // keyed by offset
for i := uint64(pageSize); i+pageSize <= uint64(len(data)); i += pageSize {
nextOffset := binary.LittleEndian.Uint64(data[i:])
if nextOffset < pageSize {
// nextOffset is actually a transaction status
continue
}
pageSet[i] = &diskPage{
page: page{
offset: i,
payload: data[i+pageMetaSize : i+pageSize],
},
nextPageOffset: nextOffset,
}
}
// fill in each nextPage pointer
for _, p := range pageSet {
if nextDiskPage, ok := pageSet[p.nextPageOffset]; ok {
p.nextPage = &nextDiskPage.page
}
}
// reconstruct transactions
var txns []*Transaction
nextTxn:
for i := pageSize; i+pageSize <= len(data); i += pageSize {
status := binary.LittleEndian.Uint64(data[i:])
if status != txnStatusComitted {
continue
}
// decode metadata and first page
seq := binary.LittleEndian.Uint64(data[i+8:])
var diskChecksum checksum
n := copy(diskChecksum[:], data[i+16:])
nextPageOffset := binary.LittleEndian.Uint64(data[i+16+n:])
firstPage := &page{
offset: uint64(i),
payload: data[i+firstPageMetaSize : i+pageSize],
}
if nextDiskPage, ok := pageSet[nextPageOffset]; ok {
firstPage.nextPage = &nextDiskPage.page
}
// Check if the pages of the transaction form a loop
visited := make(map[uint64]struct{})
for page := firstPage; page != nil; page = page.nextPage {
if _, exists := visited[page.offset]; exists {
// Loop detected
continue nextTxn
}
visited[page.offset] = struct{}{}
}
txn := &Transaction{
status: status,
setupComplete: true,
commitComplete: true,
sequenceNumber: seq,
firstPage: firstPage,
wal: w,
}
// validate checksum
if txn.checksum() != diskChecksum {
continue
}
// decode updates
var updateBytes []byte
for page := txn.firstPage; page != nil; page = page.nextPage {
updateBytes = append(updateBytes, page.payload...)
}
updates, err := unmarshalUpdates(updateBytes)
if err != nil {
continue
}
txn.Updates = updates
txns = append(txns, txn)
}
// sort txns by sequence number
sort.Slice(txns, func(i, j int) bool {
return txns[i].sequenceNumber < txns[j].sequenceNumber
})
// filePageCount is the number of pages minus 1 metadata page
w.filePageCount = uint64(len(data)) / pageSize
if len(data)%pageSize != 0 {
w.filePageCount++
}
if w.filePageCount > 0 {
w.filePageCount--
}
// find out which pages are used and add the unused ones to availablePages
usedPages := make(map[uint64]struct{})
for _, txn := range txns {
for page := txn.firstPage; page != nil; page = page.nextPage {
usedPages[page.offset] = struct{}{}
}
}
for offset := uint64(pageSize); offset < w.filePageCount*pageSize; offset += pageSize {
if _, exists := usedPages[offset]; !exists {
w.availablePages = append(w.availablePages, offset)
}
}
// make sure that the unfinished txn counter has the correct value
w.atomicUnfinishedTxns = int64(len(txns))
return txns, nil
}
// writeRecoveryState is a helper function that changes the recoveryState on disk
func (w *WAL) writeRecoveryState(state uint16) error {
_, err := w.logFile.WriteAt([]byte{byte(state)}, int64(len(metadataHeader)+len(metadataVersion)))
if err != nil {
return err
}
return w.logFile.Sync()
}
// managedReservePages reserves pages for a given payload and links them
// together, allocating new pages if necessary. It returns the first page in
// the chain.
func (w *WAL) managedReservePages(data []byte) *page {
// Find out how many pages are needed for the payload
numPages := uint64(len(data) / MaxPayloadSize)
if len(data)%MaxPayloadSize != 0 {
numPages++
}
w.mu.Lock()
defer w.mu.Unlock()
// allocate more pages if necessary
if pagesNeeded := int64(numPages) - int64(len(w.availablePages)); pagesNeeded > 0 {
w.allocatePages(uint64(pagesNeeded))
// sanity check: the number of available pages should now equal the number of required ones
if int64(len(w.availablePages)) != int64(numPages) {
panic(fmt.Errorf("sanity check failed: num of available pages (%v) != num of required pages (%v)", len(w.availablePages), numPages))
}
}
// Reserve some pages and remove them from the available ones
reservedPages := w.availablePages[uint64(len(w.availablePages))-numPages:]
w.availablePages = w.availablePages[:uint64(len(w.availablePages))-numPages]
// Set the fields of each page
buf := bytes.NewBuffer(data)
pages := make([]page, numPages)
for i := range pages {
// Set nextPage if the current page isn't the last one
if uint64(i+1) < numPages {
pages[i].nextPage = &pages[i+1]
}
// Set offset according to the index in reservedPages
pages[i].offset = reservedPages[i]
// Copy part of the update into the payload
pages[i].payload = buf.Next(MaxPayloadSize)
}
return &pages[0]
}
// writeWALMetadata writes WAL metadata to the input file.
func writeWALMetadata(f file) error {
// Create the metadata.
data := make([]byte, 0, len(metadataHeader)+len(metadataVersion)+metadataStatusSize)
data = append(data, metadataHeader[:]...)
data = append(data, metadataVersion[:]...)
// Penultimate byte is the recovery state, and final byte is a newline.
data = append(data, byte(recoveryStateUnclean))
data = append(data, byte('\n'))
_, err := f.WriteAt(data, 0)
return err
}
// Close closes the wal, frees used resources and checks for active
// transactions.
func (w *WAL) Close() error {
// Check if there are unfinished transactions
var err1 error
if atomic.LoadInt64(&w.atomicUnfinishedTxns) != 0 {
err1 = errors.New("There are still non-released transactions left")
}
// Write the recovery state to indicate clean shutdown if no error occured
if err1 == nil && !w.deps.disrupt("UncleanShutdown") {
err1 = w.writeRecoveryState(recoveryStateClean)
}
// Make sure sync thread isn't running
w.wg.Wait()
// Close the logFile
err2 := w.logFile.Close()
return errors.Compose(err1, err2)
}
// CloseIncomplete closes the WAL and reports the number of transactions that
// are still uncomitted.
func (w *WAL) CloseIncomplete() (int64, error) {
w.wg.Wait()
return atomic.LoadInt64(&w.atomicUnfinishedTxns), w.logFile.Close()
}
// New will open a WAL. If the previous run did not shut down cleanly, a set of
// updates will be returned which got committed successfully to the WAL, but
// were never signaled as fully completed.
//
// If no WAL exists, a new one will be created.
//
// If in debugging mode, the WAL may return a series of updates multiple times,
// simulating multiple consecutive unclean shutdowns. If the updates are
// properly idempotent, there should be no functional difference between the
// multiple appearances and them just being loaded a single time correctly.
func New(path string) ([]*Transaction, *WAL, error) {
// Create a wal with production dependencies
return newWal(path, &dependencyProduction{})
}