Add Versionstamp support to the Go Tuple layer
This commit is contained in:
parent
ad8bfc1b8f
commit
7ac098dc0d
|
@ -62,6 +62,6 @@ testers = {
|
|||
'ruby': Tester('ruby', _absolute_path('ruby/tests/tester.rb'), 2040, 23, MAX_API_VERSION),
|
||||
'java': Tester('java', _java_cmd + 'StackTester', 2040, 510, MAX_API_VERSION, types=ALL_TYPES),
|
||||
'java_async': Tester('java', _java_cmd + 'AsyncStackTester', 2040, 510, MAX_API_VERSION, types=ALL_TYPES),
|
||||
'go': Tester('go', _absolute_path('go/build/bin/_stacktester'), 2040, 200, MAX_API_VERSION),
|
||||
'go': Tester('go', _absolute_path('go/build/bin/_stacktester'), 2040, 200, MAX_API_VERSION, types=ALL_TYPES),
|
||||
'flow': Tester('flow', _absolute_path('flow/bin/fdb_flow_tester'), 63, 500, MAX_API_VERSION, directory_snapshot_ops_enabled=False),
|
||||
}
|
||||
|
|
|
@ -25,8 +25,6 @@ import (
|
|||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"github.com/apple/foundationdb/bindings/go/src/fdb"
|
||||
"github.com/apple/foundationdb/bindings/go/src/fdb/tuple"
|
||||
"log"
|
||||
"math/big"
|
||||
"os"
|
||||
|
@ -37,6 +35,9 @@ import (
|
|||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/apple/foundationdb/bindings/go/src/fdb"
|
||||
"github.com/apple/foundationdb/bindings/go/src/fdb/tuple"
|
||||
)
|
||||
|
||||
const verbose bool = false
|
||||
|
@ -661,6 +662,25 @@ func (sm *StackMachine) processInst(idx int, inst tuple.Tuple) {
|
|||
t = append(t, sm.waitAndPop().item)
|
||||
}
|
||||
sm.store(idx, []byte(t.Pack()))
|
||||
case op == "TUPLE_PACK_VERSIONSTAMP":
|
||||
var t tuple.Tuple
|
||||
count := sm.waitAndPop().item.(int64)
|
||||
for i := 0; i < int(count); i++ {
|
||||
t = append(t, sm.waitAndPop().item)
|
||||
}
|
||||
|
||||
incomplete, err := t.HasIncompleteVersionstamp()
|
||||
if incomplete == false {
|
||||
sm.store(idx, []byte("ERROR: NONE"))
|
||||
} else {
|
||||
if err != nil {
|
||||
sm.store(idx, []byte("ERROR: MULTIPLE"))
|
||||
} else {
|
||||
packed := t.Pack()
|
||||
sm.store(idx, "OK")
|
||||
sm.store(idx, packed)
|
||||
}
|
||||
}
|
||||
case op == "TUPLE_UNPACK":
|
||||
t, e := tuple.Unpack(fdb.Key(sm.waitAndPop().item.([]byte)))
|
||||
if e != nil {
|
||||
|
@ -893,7 +913,7 @@ func main() {
|
|||
log.Fatal("API version not equal to value selected")
|
||||
}
|
||||
|
||||
db, e = fdb.OpenDatabase(clusterFile)
|
||||
db, e = fdb.Open(clusterFile, []byte("DB"))
|
||||
if e != nil {
|
||||
log.Fatal(e)
|
||||
}
|
||||
|
|
|
@ -39,6 +39,7 @@ package tuple
|
|||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"math/big"
|
||||
|
@ -72,6 +73,37 @@ type Tuple []TupleElement
|
|||
// an instance of this type.
|
||||
type UUID [16]byte
|
||||
|
||||
// Versionstamp .
|
||||
type Versionstamp struct {
|
||||
TransactionVersion [10]byte
|
||||
UserVersion uint16
|
||||
}
|
||||
|
||||
var incompleteTransactionVersion = [10]byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}
|
||||
|
||||
const versionstampLength = 13
|
||||
|
||||
// IncompleteVersionstamp .
|
||||
func IncompleteVersionstamp(userVersion uint16) Versionstamp {
|
||||
return Versionstamp{
|
||||
TransactionVersion: incompleteTransactionVersion,
|
||||
UserVersion: userVersion,
|
||||
}
|
||||
}
|
||||
|
||||
// Bytes .
|
||||
func (v Versionstamp) Bytes() []byte {
|
||||
var scratch [12]byte
|
||||
|
||||
copy(scratch[:], v.TransactionVersion[:])
|
||||
|
||||
binary.BigEndian.PutUint16(scratch[10:], v.UserVersion)
|
||||
|
||||
fmt.Println(scratch)
|
||||
|
||||
return scratch[:]
|
||||
}
|
||||
|
||||
// Type codes: These prefix the different elements in a packed Tuple
|
||||
// to indicate what type they are.
|
||||
const nilCode = 0x00
|
||||
|
@ -86,6 +118,7 @@ const doubleCode = 0x21
|
|||
const falseCode = 0x26
|
||||
const trueCode = 0x27
|
||||
const uuidCode = 0x30
|
||||
const versionstampCode = 0x33
|
||||
|
||||
var sizeLimits = []uint64{
|
||||
1<<(0*8) - 1,
|
||||
|
@ -122,7 +155,15 @@ func adjustFloatBytes(b []byte, encode bool) {
|
|||
}
|
||||
|
||||
type packer struct {
|
||||
buf []byte
|
||||
versionstampPos int32
|
||||
buf []byte
|
||||
}
|
||||
|
||||
func newPacker() *packer {
|
||||
return &packer{
|
||||
versionstampPos: -1,
|
||||
buf: make([]byte, 0, 64),
|
||||
}
|
||||
}
|
||||
|
||||
func (p *packer) putByte(b byte) {
|
||||
|
@ -249,6 +290,18 @@ func (p *packer) encodeUUID(u UUID) {
|
|||
p.putBytes(u[:])
|
||||
}
|
||||
|
||||
func (p *packer) encodeVersionstamp(v Versionstamp) {
|
||||
p.putByte(versionstampCode)
|
||||
|
||||
if p.versionstampPos != 0 && v.TransactionVersion == incompleteTransactionVersion {
|
||||
panic(fmt.Sprintf("Tuple can only contain one unbound versionstamp"))
|
||||
} else {
|
||||
p.versionstampPos = int32(len(p.buf))
|
||||
}
|
||||
|
||||
p.putBytes(v.Bytes())
|
||||
}
|
||||
|
||||
func (p *packer) encodeTuple(t Tuple, nested bool) {
|
||||
if nested {
|
||||
p.putByte(nestedCode)
|
||||
|
@ -293,6 +346,8 @@ func (p *packer) encodeTuple(t Tuple, nested bool) {
|
|||
}
|
||||
case UUID:
|
||||
p.encodeUUID(e)
|
||||
case Versionstamp:
|
||||
p.encodeVersionstamp(e)
|
||||
default:
|
||||
panic(fmt.Sprintf("unencodable element at index %d (%v, type %T)", i, t[i], t[i]))
|
||||
}
|
||||
|
@ -314,11 +369,50 @@ func (p *packer) encodeTuple(t Tuple, nested bool) {
|
|||
// call Pack when using a Tuple with a FoundationDB API function that requires a
|
||||
// key.
|
||||
func (t Tuple) Pack() []byte {
|
||||
p := packer{buf: make([]byte, 0, 64)}
|
||||
p := newPacker()
|
||||
p.encodeTuple(t, false)
|
||||
return p.buf
|
||||
}
|
||||
|
||||
// PackWithVersionstamp packs the specified tuple into a key for versionstamp operations
|
||||
func (t Tuple) PackWithVersionstamp() ([]byte, error) {
|
||||
hasVersionstamp, err := t.HasIncompleteVersionstamp()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
p := newPacker()
|
||||
p.encodeTuple(t, false)
|
||||
|
||||
if hasVersionstamp {
|
||||
var scratch [4]byte
|
||||
binary.LittleEndian.PutUint32(scratch[:], uint32(p.versionstampPos))
|
||||
p.putBytes(scratch[:])
|
||||
}
|
||||
|
||||
return p.buf, nil
|
||||
}
|
||||
|
||||
// HasIncompleteVersionstamp determines if there is at least one incomplete versionstamp in a tuple
|
||||
func (t Tuple) HasIncompleteVersionstamp() (bool, error) {
|
||||
incompleteCount := 0
|
||||
for _, el := range t {
|
||||
switch e := el.(type) {
|
||||
case Versionstamp:
|
||||
if e.TransactionVersion == incompleteTransactionVersion {
|
||||
incompleteCount++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var err error
|
||||
if incompleteCount > 1 {
|
||||
err = errors.New("Tuple can only contain one unbound versionstamp")
|
||||
}
|
||||
|
||||
return incompleteCount == 1, err
|
||||
}
|
||||
|
||||
func findTerminator(b []byte) int {
|
||||
bp := b
|
||||
var length int
|
||||
|
@ -438,6 +532,20 @@ func decodeUUID(b []byte) (UUID, int) {
|
|||
return u, 17
|
||||
}
|
||||
|
||||
func decodeVersionstamp(b []byte) (Versionstamp, int) {
|
||||
var transactionVersion [10]byte
|
||||
var userVersion uint16
|
||||
|
||||
copy(transactionVersion[:], b[1:11])
|
||||
|
||||
userVersion = binary.BigEndian.Uint16(b[11:])
|
||||
|
||||
return Versionstamp{
|
||||
TransactionVersion: transactionVersion,
|
||||
UserVersion: userVersion,
|
||||
}, versionstampLength
|
||||
}
|
||||
|
||||
func decodeTuple(b []byte, nested bool) (Tuple, int, error) {
|
||||
var t Tuple
|
||||
|
||||
|
@ -489,6 +597,11 @@ func decodeTuple(b []byte, nested bool) (Tuple, int, error) {
|
|||
return nil, i, fmt.Errorf("insufficient bytes to decode UUID starting at position %d of byte array for tuple", i)
|
||||
}
|
||||
el, off = decodeUUID(b[i:])
|
||||
case b[i] == versionstampCode:
|
||||
if i+versionstampLength > len(b) {
|
||||
return nil, i, fmt.Errorf("insufficient bytes to decode Versionstamp starting at position %d of byte array for tuple", i)
|
||||
}
|
||||
el, off = decodeVersionstamp(b[i:])
|
||||
case b[i] == nestedCode:
|
||||
var err error
|
||||
el, off, err = decodeTuple(b[i+1:], true)
|
||||
|
|
Loading…
Reference in New Issue