Merge pull request #1187 from ryanworl/bindings/go-tuple-layer-versionstamps
Add Versionstamp support to the Go Tuple layer
This commit is contained in:
commit
1ac8ecf55e
|
@ -62,6 +62,6 @@ testers = {
|
|||
'ruby': Tester('ruby', _absolute_path('ruby/tests/tester.rb'), 2040, 23, MAX_API_VERSION),
|
||||
'java': Tester('java', _java_cmd + 'StackTester', 2040, 510, MAX_API_VERSION, types=ALL_TYPES),
|
||||
'java_async': Tester('java', _java_cmd + 'AsyncStackTester', 2040, 510, MAX_API_VERSION, types=ALL_TYPES),
|
||||
'go': Tester('go', _absolute_path('go/build/bin/_stacktester'), 2040, 200, MAX_API_VERSION),
|
||||
'go': Tester('go', _absolute_path('go/build/bin/_stacktester'), 2040, 200, MAX_API_VERSION, types=ALL_TYPES),
|
||||
'flow': Tester('flow', _absolute_path('flow/bin/fdb_flow_tester'), 63, 500, MAX_API_VERSION, directory_snapshot_ops_enabled=False),
|
||||
}
|
||||
|
|
|
@ -25,8 +25,6 @@ import (
|
|||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"github.com/apple/foundationdb/bindings/go/src/fdb"
|
||||
"github.com/apple/foundationdb/bindings/go/src/fdb/tuple"
|
||||
"log"
|
||||
"math/big"
|
||||
"os"
|
||||
|
@ -37,6 +35,9 @@ import (
|
|||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/apple/foundationdb/bindings/go/src/fdb"
|
||||
"github.com/apple/foundationdb/bindings/go/src/fdb/tuple"
|
||||
)
|
||||
|
||||
const verbose bool = false
|
||||
|
@ -104,7 +105,7 @@ func (sm *StackMachine) waitAndPop() (ret stackEntry) {
|
|||
switch el := ret.item.(type) {
|
||||
case []byte:
|
||||
ret.item = el
|
||||
case int64, uint64, *big.Int, string, bool, tuple.UUID, float32, float64, tuple.Tuple:
|
||||
case int64, uint64, *big.Int, string, bool, tuple.UUID, float32, float64, tuple.Tuple, tuple.Versionstamp:
|
||||
ret.item = el
|
||||
case fdb.Key:
|
||||
ret.item = []byte(el)
|
||||
|
@ -661,6 +662,24 @@ func (sm *StackMachine) processInst(idx int, inst tuple.Tuple) {
|
|||
t = append(t, sm.waitAndPop().item)
|
||||
}
|
||||
sm.store(idx, []byte(t.Pack()))
|
||||
case op == "TUPLE_PACK_WITH_VERSIONSTAMP":
|
||||
var t tuple.Tuple
|
||||
|
||||
prefix := sm.waitAndPop().item.([]byte)
|
||||
c := sm.waitAndPop().item.(int64)
|
||||
for i := 0; i < int(c); i++ {
|
||||
t = append(t, sm.waitAndPop().item)
|
||||
}
|
||||
|
||||
packed, err := t.PackWithVersionstamp(prefix)
|
||||
if err != nil && strings.Contains(err.Error(), "No incomplete") {
|
||||
sm.store(idx, []byte("ERROR: NONE"))
|
||||
} else if err != nil {
|
||||
sm.store(idx, []byte("ERROR: MULTIPLE"))
|
||||
} else {
|
||||
sm.store(idx, []byte("OK"))
|
||||
sm.store(idx, packed)
|
||||
}
|
||||
case op == "TUPLE_UNPACK":
|
||||
t, e := tuple.Unpack(fdb.Key(sm.waitAndPop().item.([]byte)))
|
||||
if e != nil {
|
||||
|
|
|
@ -39,6 +39,7 @@ package tuple
|
|||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"math/big"
|
||||
|
@ -72,6 +73,39 @@ type Tuple []TupleElement
|
|||
// an instance of this type.
|
||||
type UUID [16]byte
|
||||
|
||||
// Versionstamp is struct for a FoundationDB verionstamp. Versionstamps are
|
||||
// 12 bytes long composed of a 10 byte transaction version and a 2 byte user
|
||||
// version. The transaction version is filled in at commit time and the user
|
||||
// version is provided by the application to order results within a transaction.
|
||||
type Versionstamp struct {
|
||||
TransactionVersion [10]byte
|
||||
UserVersion uint16
|
||||
}
|
||||
|
||||
var incompleteTransactionVersion = [10]byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}
|
||||
|
||||
const versionstampLength = 12
|
||||
|
||||
// IncompleteVersionstamp is the constructor you should use to make
|
||||
// an incomplete versionstamp to use in a tuple.
|
||||
func IncompleteVersionstamp(userVersion uint16) Versionstamp {
|
||||
return Versionstamp{
|
||||
TransactionVersion: incompleteTransactionVersion,
|
||||
UserVersion: userVersion,
|
||||
}
|
||||
}
|
||||
|
||||
// Bytes converts a Versionstamp struct to a byte slice for encoding in a tuple.
|
||||
func (v Versionstamp) Bytes() []byte {
|
||||
var scratch [versionstampLength]byte
|
||||
|
||||
copy(scratch[:], v.TransactionVersion[:])
|
||||
|
||||
binary.BigEndian.PutUint16(scratch[10:], v.UserVersion)
|
||||
|
||||
return scratch[:]
|
||||
}
|
||||
|
||||
// Type codes: These prefix the different elements in a packed Tuple
|
||||
// to indicate what type they are.
|
||||
const nilCode = 0x00
|
||||
|
@ -86,6 +120,7 @@ const doubleCode = 0x21
|
|||
const falseCode = 0x26
|
||||
const trueCode = 0x27
|
||||
const uuidCode = 0x30
|
||||
const versionstampCode = 0x33
|
||||
|
||||
var sizeLimits = []uint64{
|
||||
1<<(0*8) - 1,
|
||||
|
@ -122,7 +157,15 @@ func adjustFloatBytes(b []byte, encode bool) {
|
|||
}
|
||||
|
||||
type packer struct {
|
||||
buf []byte
|
||||
versionstampPos int32
|
||||
buf []byte
|
||||
}
|
||||
|
||||
func newPacker() *packer {
|
||||
return &packer{
|
||||
versionstampPos: -1,
|
||||
buf: make([]byte, 0, 64),
|
||||
}
|
||||
}
|
||||
|
||||
func (p *packer) putByte(b byte) {
|
||||
|
@ -249,7 +292,22 @@ func (p *packer) encodeUUID(u UUID) {
|
|||
p.putBytes(u[:])
|
||||
}
|
||||
|
||||
func (p *packer) encodeTuple(t Tuple, nested bool) {
|
||||
func (p *packer) encodeVersionstamp(v Versionstamp) {
|
||||
p.putByte(versionstampCode)
|
||||
|
||||
isIncomplete := v.TransactionVersion == incompleteTransactionVersion
|
||||
if isIncomplete {
|
||||
if p.versionstampPos != -1 {
|
||||
panic(fmt.Sprintf("Tuple can only contain one incomplete versionstamp"))
|
||||
}
|
||||
|
||||
p.versionstampPos = int32(len(p.buf))
|
||||
}
|
||||
|
||||
p.putBytes(v.Bytes())
|
||||
}
|
||||
|
||||
func (p *packer) encodeTuple(t Tuple, nested bool, versionstamps bool) {
|
||||
if nested {
|
||||
p.putByte(nestedCode)
|
||||
}
|
||||
|
@ -257,7 +315,7 @@ func (p *packer) encodeTuple(t Tuple, nested bool) {
|
|||
for i, e := range t {
|
||||
switch e := e.(type) {
|
||||
case Tuple:
|
||||
p.encodeTuple(e, true)
|
||||
p.encodeTuple(e, true, versionstamps)
|
||||
case nil:
|
||||
p.putByte(nilCode)
|
||||
if nested {
|
||||
|
@ -293,6 +351,12 @@ func (p *packer) encodeTuple(t Tuple, nested bool) {
|
|||
}
|
||||
case UUID:
|
||||
p.encodeUUID(e)
|
||||
case Versionstamp:
|
||||
if versionstamps == false && e.TransactionVersion == incompleteTransactionVersion {
|
||||
panic(fmt.Sprintf("Incomplete Versionstamp included in vanilla tuple pack"))
|
||||
}
|
||||
|
||||
p.encodeVersionstamp(e)
|
||||
default:
|
||||
panic(fmt.Sprintf("unencodable element at index %d (%v, type %T)", i, t[i], t[i]))
|
||||
}
|
||||
|
@ -306,19 +370,103 @@ func (p *packer) encodeTuple(t Tuple, nested bool) {
|
|||
// Pack returns a new byte slice encoding the provided tuple. Pack will panic if
|
||||
// the tuple contains an element of any type other than []byte,
|
||||
// fdb.KeyConvertible, string, int64, int, uint64, uint, *big.Int, big.Int, float32,
|
||||
// float64, bool, tuple.UUID, nil, or a Tuple with elements of valid types. It will
|
||||
// also panic if an integer is specified with a value outside the range
|
||||
// [-2**2040+1, 2**2040-1]
|
||||
// float64, bool, tuple.UUID, tuple.Versionstamp, nil, or a Tuple with elements of
|
||||
// valid types. It will also panic if an integer is specified with a value outside
|
||||
// the range [-2**2040+1, 2**2040-1]
|
||||
//
|
||||
// Tuple satisfies the fdb.KeyConvertible interface, so it is not necessary to
|
||||
// call Pack when using a Tuple with a FoundationDB API function that requires a
|
||||
// key.
|
||||
//
|
||||
// This method will panic if it contains an incomplete Versionstamp. Use
|
||||
// PackWithVersionstamp instead.
|
||||
//
|
||||
func (t Tuple) Pack() []byte {
|
||||
p := packer{buf: make([]byte, 0, 64)}
|
||||
p.encodeTuple(t, false)
|
||||
p := newPacker()
|
||||
p.encodeTuple(t, false, false)
|
||||
return p.buf
|
||||
}
|
||||
|
||||
// PackWithVersionstamp packs the specified tuple into a key for versionstamp
|
||||
// operations. See Pack for more information. This function will return an error
|
||||
// if you attempt to pack a tuple with more than one versionstamp. This function will
|
||||
// return an error if you attempt to pack a tuple with a versionstamp position larger
|
||||
// than an uint16 if the API version is less than 520.
|
||||
func (t Tuple) PackWithVersionstamp(prefix []byte) ([]byte, error) {
|
||||
hasVersionstamp, err := t.HasIncompleteVersionstamp()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
apiVersion, err := fdb.GetAPIVersion()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if hasVersionstamp == false {
|
||||
return nil, errors.New("No incomplete versionstamp included in tuple pack with versionstamp")
|
||||
}
|
||||
|
||||
p := newPacker()
|
||||
|
||||
if prefix != nil {
|
||||
p.putBytes(prefix)
|
||||
}
|
||||
|
||||
p.encodeTuple(t, false, true)
|
||||
|
||||
if hasVersionstamp {
|
||||
var scratch [4]byte
|
||||
var offsetIndex int
|
||||
if apiVersion < 520 {
|
||||
if p.versionstampPos > math.MaxUint16 {
|
||||
return nil, errors.New("Versionstamp position too large")
|
||||
}
|
||||
|
||||
offsetIndex = 2
|
||||
binary.LittleEndian.PutUint16(scratch[:], uint16(p.versionstampPos))
|
||||
} else {
|
||||
offsetIndex = 4
|
||||
binary.LittleEndian.PutUint32(scratch[:], uint32(p.versionstampPos))
|
||||
}
|
||||
|
||||
p.putBytes(scratch[0:offsetIndex])
|
||||
}
|
||||
|
||||
return p.buf, nil
|
||||
}
|
||||
|
||||
// HasIncompleteVersionstamp determines if there is at least one incomplete
|
||||
// versionstamp in a tuple. This function will return an error this tuple has
|
||||
// more than one versionstamp.
|
||||
func (t Tuple) HasIncompleteVersionstamp() (bool, error) {
|
||||
incompleteCount := t.countIncompleteVersionstamps()
|
||||
|
||||
var err error
|
||||
if incompleteCount > 1 {
|
||||
err = errors.New("Tuple can only contain one incomplete versionstamp")
|
||||
}
|
||||
|
||||
return incompleteCount >= 1, err
|
||||
}
|
||||
|
||||
func (t Tuple) countIncompleteVersionstamps() int {
|
||||
incompleteCount := 0
|
||||
|
||||
for _, el := range t {
|
||||
switch e := el.(type) {
|
||||
case Versionstamp:
|
||||
if e.TransactionVersion == incompleteTransactionVersion {
|
||||
incompleteCount++
|
||||
}
|
||||
case Tuple:
|
||||
incompleteCount += e.countIncompleteVersionstamps()
|
||||
}
|
||||
}
|
||||
|
||||
return incompleteCount
|
||||
}
|
||||
|
||||
func findTerminator(b []byte) int {
|
||||
bp := b
|
||||
var length int
|
||||
|
@ -438,6 +586,20 @@ func decodeUUID(b []byte) (UUID, int) {
|
|||
return u, 17
|
||||
}
|
||||
|
||||
func decodeVersionstamp(b []byte) (Versionstamp, int) {
|
||||
var transactionVersion [10]byte
|
||||
var userVersion uint16
|
||||
|
||||
copy(transactionVersion[:], b[1:11])
|
||||
|
||||
userVersion = binary.BigEndian.Uint16(b[11:])
|
||||
|
||||
return Versionstamp{
|
||||
TransactionVersion: transactionVersion,
|
||||
UserVersion: userVersion,
|
||||
}, versionstampLength + 1
|
||||
}
|
||||
|
||||
func decodeTuple(b []byte, nested bool) (Tuple, int, error) {
|
||||
var t Tuple
|
||||
|
||||
|
@ -489,6 +651,11 @@ func decodeTuple(b []byte, nested bool) (Tuple, int, error) {
|
|||
return nil, i, fmt.Errorf("insufficient bytes to decode UUID starting at position %d of byte array for tuple", i)
|
||||
}
|
||||
el, off = decodeUUID(b[i:])
|
||||
case b[i] == versionstampCode:
|
||||
if i+versionstampLength+1 > len(b) {
|
||||
return nil, i, fmt.Errorf("insufficient bytes to decode Versionstamp starting at position %d of byte array for tuple", i)
|
||||
}
|
||||
el, off = decodeVersionstamp(b[i:])
|
||||
case b[i] == nestedCode:
|
||||
var err error
|
||||
el, off, err = decodeTuple(b[i+1:], true)
|
||||
|
|
|
@ -142,10 +142,10 @@
|
|||
A transaction is not permitted to read any transformed key or value previously set within that transaction, and an attempt to do so will result in an error.
|
||||
|
||||
.. |atomic-versionstamps-tuple-warning-key| replace::
|
||||
At this time, versionstamped keys are not compatible with the Tuple layer except in Java and Python. Note that this implies versionstamped keys may not be used with the Subspace and Directory layers except in those languages.
|
||||
At this time, versionstamped keys are not compatible with the Tuple layer except in Java, Python, and Go. Note that this implies versionstamped keys may not be used with the Subspace and Directory layers except in those languages.
|
||||
|
||||
.. |atomic-versionstamps-tuple-warning-value| replace::
|
||||
At this time, versionstamped values are not compatible with the Tuple layer except in Java and Python. Note that this implies versionstamped values may not be used with the Subspace and Directory layers except in those languages.
|
||||
At this time, versionstamped values are not compatible with the Tuple layer except in Java, Python, and Go. Note that this implies versionstamped values may not be used with the Subspace and Directory layers except in those languages.
|
||||
|
||||
.. |api-version| replace:: 610
|
||||
|
||||
|
|
|
@ -251,10 +251,10 @@ description is not currently required but encouraged.
|
|||
description="Performs a little-endian comparison of byte strings. If the existing value in the database is not present, then ``param`` is stored in the database. If the existing value in the database is shorter than ``param``, it is first extended to the length of ``param`` with zero bytes. If ``param`` is shorter than the existing value in the database, the existing value is truncated to match the length of ``param``. The smaller of the two values is then stored in the database."/>
|
||||
<Option name="set_versionstamped_key" code="14"
|
||||
paramType="Bytes" paramDescription="value to which to set the transformed key"
|
||||
description="Transforms ``key`` using a versionstamp for the transaction. Sets the transformed key in the database to ``param``. The key is transformed by removing the final four bytes from the key and reading those as a little-Endian 32-bit integer to get a position ``pos``. The 10 bytes of the key from ``pos`` to ``pos + 10`` are replaced with the versionstamp of the transaction used. The first byte of the key is position 0. A versionstamp is a 10 byte, unique, monotonically (but not sequentially) increasing value for each committed transaction. The first 8 bytes are the committed version of the database (serialized in big-Endian order). The last 2 bytes are monotonic in the serialization order for transactions. WARNING: At this time, versionstamps are compatible with the Tuple layer only in the Java and Python bindings. Also, note that prior to API version 520, the offset was computed from only the final two bytes rather than the final four bytes." />
|
||||
description="Transforms ``key`` using a versionstamp for the transaction. Sets the transformed key in the database to ``param``. The key is transformed by removing the final four bytes from the key and reading those as a little-Endian 32-bit integer to get a position ``pos``. The 10 bytes of the key from ``pos`` to ``pos + 10`` are replaced with the versionstamp of the transaction used. The first byte of the key is position 0. A versionstamp is a 10 byte, unique, monotonically (but not sequentially) increasing value for each committed transaction. The first 8 bytes are the committed version of the database (serialized in big-Endian order). The last 2 bytes are monotonic in the serialization order for transactions. WARNING: At this time, versionstamps are compatible with the Tuple layer only in the Java, Python, and Go bindings. Also, note that prior to API version 520, the offset was computed from only the final two bytes rather than the final four bytes." />
|
||||
<Option name="set_versionstamped_value" code="15"
|
||||
paramType="Bytes" paramDescription="value to versionstamp and set"
|
||||
description="Transforms ``param`` using a versionstamp for the transaction. Sets the ``key`` given to the transformed ``param``. The parameter is transformed by removing the final four bytes from ``param`` and reading those as a little-Endian 32-bit integer to get a position ``pos``. The 10 bytes of the parameter from ``pos`` to ``pos + 10`` are replaced with the versionstamp of the transaction used. The first byte of the parameter is position 0. A versionstamp is a 10 byte, unique, monotonically (but not sequentially) increasing value for each committed transaction. The first 8 bytes are the committed version of the database (serialized in big-Endian order). The last 2 bytes are monotonic in the serialization order for transactions. WARNING: At this time, versionstamps are compatible with the Tuple layer only in the Java and Python bindings. Also, note that prior to API version 520, the versionstamp was always placed at the beginning of the parameter rather than computing an offset." />
|
||||
description="Transforms ``param`` using a versionstamp for the transaction. Sets the ``key`` given to the transformed ``param``. The parameter is transformed by removing the final four bytes from ``param`` and reading those as a little-Endian 32-bit integer to get a position ``pos``. The 10 bytes of the parameter from ``pos`` to ``pos + 10`` are replaced with the versionstamp of the transaction used. The first byte of the parameter is position 0. A versionstamp is a 10 byte, unique, monotonically (but not sequentially) increasing value for each committed transaction. The first 8 bytes are the committed version of the database (serialized in big-Endian order). The last 2 bytes are monotonic in the serialization order for transactions. WARNING: At this time, versionstamps are compatible with the Tuple layer only in the Java, Python, and Go bindings. Also, note that prior to API version 520, the versionstamp was always placed at the beginning of the parameter rather than computing an offset." />
|
||||
<Option name="byte_min" code="16"
|
||||
paramType="Bytes" paramDescription="value to check against database value"
|
||||
description="Performs lexicographic comparison of byte strings. If the existing value in the database is not present, then ``param`` is stored. Otherwise the smaller of the two values is then stored in the database."/>
|
||||
|
|
Loading…
Reference in New Issue