Merge branch 'master' of github.com:apple/foundationdb

This commit is contained in:
Ben Collins 2017-07-26 11:02:13 -07:00
commit 6f0062330b
61 changed files with 797 additions and 367 deletions

View File

@ -47,7 +47,7 @@ else ifeq ($(PLATFORM),Darwin)
CXX := /usr/bin/clang
CFLAGS += -mmacosx-version-min=10.7 -stdlib=libc++
CXXFLAGS += -std=c++11 -stdlib=libc++ -msse4.2 -Wno-undefined-var-template -Wno-unknown-warning-option
CXXFLAGS += -mmacosx-version-min=10.7 -std=c++11 -stdlib=libc++ -msse4.2 -Wno-undefined-var-template -Wno-unknown-warning-option
.LIBPATTERNS := lib%.dylib lib%.a

View File

@ -63,6 +63,6 @@ testers = {
'java_async' : Tester('java', _java_cmd + 'AsyncStackTester', 2040, 500, MAX_API_VERSION),
'java_completable' : Tester('java', _java_completable_cmd + 'StackTester', 2040, 500, MAX_API_VERSION),
'java_completable_async' : Tester('java', _java_completable_cmd + 'AsyncStackTester', 2040, 500, MAX_API_VERSION),
'go' : Tester('go', _absolute_path('go/bin/_stacktester'), 63, 200, MAX_API_VERSION),
'go' : Tester('go', _absolute_path('go/build/bin/_stacktester'), 63, 200, MAX_API_VERSION),
'flow' : Tester('flow', _absolute_path('flow/bin/fdb_flow_tester'), 63, 500, MAX_API_VERSION),
}

View File

@ -22,5 +22,23 @@
fdb_flow_CFLAGS := -Ibindings/c $(fdbrpc_CFLAGS)
fdb_flow_LDFLAGS := -Llib -lfdb_c $(fdbrpc_LDFLAGS)
fdb_flow_LIBS := lib/libfdbrpc.a
fdb_flow_LIBS :=
packages/fdb-flow-$(FLOWVER)-$(PLATFORM)-$(ARCH).tar.gz: fdb_flow
@echo "Packaging fdb_flow"
@rm -rf packages/fdb-flow-$(FLOWVER)-$(PLATFORM)-$(ARCH)
@mkdir -p packages/fdb-flow-$(FLOWVER)-$(PLATFORM)-$(ARCH)/lib packages/fdb-flow-$(FLOWVER)-$(PLATFORM)-$(ARCH)/include/bindings/flow packages/fdb-flow-$(FLOWVER)-$(PLATFORM)-$(ARCH)/include/bindings/c/foundationdb
@cp lib/libfdb_flow.a packages/fdb-flow-$(FLOWVER)-$(PLATFORM)-$(ARCH)/lib
@find bindings/flow -name '*.h' -not -name 'bindings/flow/tester/*' -exec cp {} packages/fdb-flow-$(FLOWVER)-$(PLATFORM)-$(ARCH)/include/bindings/flow \;
@find bindings/c/foundationdb -name '*.h' -exec cp {} packages/fdb-flow-$(FLOWVER)-$(PLATFORM)-$(ARCH)/include/bindings/c/foundationdb \;
@tar czf packages/fdb-flow-$(FLOWVER)-$(PLATFORM)-$(ARCH).tar.gz -C packages fdb-flow-$(FLOWVER)-$(PLATFORM)-$(ARCH)
@rm -rf packages/fdb-flow-$(FLOWVER)-$(PLATFORM)-$(ARCH)
FDB_FLOW: packages/fdb-flow-$(FLOWVER)-$(PLATFORM)-$(ARCH).tar.gz
FDB_FLOW_clean:
@echo "Cleaning fdb_flow package"
@rm -rf packages/fdb-flow-*.tar.gz
packages: FDB_FLOW
packages_clean: FDB_FLOW_clean

9
bindings/go/Gopkg.lock generated Normal file
View File

@ -0,0 +1,9 @@
# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'.
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
inputs-digest = "ab4fef131ee828e96ba67d31a7d690bd5f2f42040c6766b1b12fe856f87e0ff7"
solver-name = "gps-cdcl"
solver-version = 1

2
bindings/go/Gopkg.toml Normal file
View File

@ -0,0 +1,2 @@
# The FoundationDB go bindings currently have no external golang dependencies outside of
# the go standard library.

View File

@ -10,10 +10,20 @@ This package requires:
Use of this package requires the selection of a FoundationDB API version at runtime. This package currently supports FoundationDB API versions 200-500.
To install this package, in the top level of this repository run:
To build this package, in the top level of this repository run:
make fdb_go
This will create binary packages for the appropriate platform within the "build" subdirectory of this folder.
To install this package, you can run the "fdb-go-install.sh" script:
./fdb-go-install.sh install
The "install" command of this script does not depend on the presence of the repo in general and will download the repository into
your local go path. Running "localinstall" instead of "install" will use the local copy here (with a symlink) instead
of downloading from the remote repository.
Documentation
-------------

304
bindings/go/fdb-go-install.sh Executable file
View File

@ -0,0 +1,304 @@
#!/bin/bash -eu
#
# fdb-go-install.sh
#
# Installs the FoundationDB Go bindings for a client. This will download
# the repository from the remote repo either into the go directory
# with the appropriate semantic version. It will then build a few
# generated files that need to be present for the go build to work.
# At the end, it has some advice for flags to modify within your
# go environment so that other packages may successfully use this
# library.
#
DESTDIR="${DESTDIR:-}"
FDBVER="${FDBVER:-5.0.1}"
REMOTE="${REMOTE:-github.com}"
FDBREPO="${FDBREPO:-apple/foundationdb}"
status=0
platform=$(uname)
if [[ "${platform}" == "Darwin" ]] ; then
FDBLIBDIR="${FDBLIBDIR:-/usr/local/lib}"
libfdbc="libfdb_c.dylib"
elif [[ "${platform}" == "Linux" ]] ; then
FDBLIBDIR="${FDBLIBDIR:-/usr/lib}"
libfdbc="libfdb_c.so"
else
echo "Unsupported platform ${platform}".
echo "At the moment, only macOS and Linux are supported by this script."
let status="${status} + 1"
fi
filedir=$(cd `dirname "${BASH_SOURCE[0]}"` && pwd)
destdir=""
function printUsage() {
echo "Usage: fdb-go-install.sh <cmd>"
echo
echo "cmd: One of the commands to run. The options are:"
echo " install Download the FDB go bindings and install them"
echo " localinstall Install a into the go path a local copy of the repo"
echo " download Download but do not prepare the FoundationDB bindings"
echo " help Print this help message and then quit"
echo
echo "Command Line Options:"
echo " --fdbver <version> FoundationDB semantic version (default is ${FDBVER})"
echo " -d/--dest-dir <dest> Local location for the repo (default is to place in go path)"
echo
echo "Environment Variable Options:"
echo " REMOTE Remote repository to download from (currently ${REMOTE})"
echo " FDBREPO Repository of FoundationDB library to download (currently ${FDBREPO})"
echo " FDBLIBDIR Directory within which should be the FoundationDB c library (currently ${FDBLIBDIR})"
}
function parseArgs() {
local status=0
if [[ "${#}" -lt 0 ]] ; then
printUsage
let status="${status} + 1"
else
operation="${1}"
shift
if [[ "${operation}" != "install" ]] && [[ "${operation}" != "localinstall" ]] && [[ "${operation}" != "download" ]] && [[ "${operation}" != "help" ]] ; then
echo "Unknown command: ${operation}"
printUsage
let status="${status} + 1"
fi
fi
while [[ "${#}" -gt 0 ]] && [[ "${status}" -eq 0 ]] ; do
local key="${1}"
case "${key}" in
--fdbver)
if [[ "${#}" -lt 2 ]] ; then
echo "No version specified with --fdbver flag"
printUsage
let status="${status} + 1"
else
FDBVER="${2}"
fi
shift
;;
-d|--dest-dir)
if [[ "${#}" -lt 2 ]] ; then
echo "No destination specified with ${key} flag"
printUsage
let status="${status} + 1"
else
destdir="${2}"
fi
shift
;;
*)
echo "Unrecognized argument ${key}"
printUsage
let status="${status} + 1"
esac
shift
done
return "${status}"
}
function checkBin() {
if [[ "${#}" -lt 1 ]] ; then
echo "Usage: checkBin <binary>"
return 1
else
if [[ -n $(which "${1}") ]] ; then
return 0
else
return 1
fi
fi
}
if [[ "${status}" -gt 0 ]] ; then
# We have already failed.
:
elif [[ "${#}" -lt 1 ]] ; then
printUsage
else
required_bins=( 'go' 'git' 'make' 'mono' )
missing_bins=()
for bin in "${required_bins[@]}" ; do
if ! checkBin "${bin}" ; then
missing_bins+=("${bin}")
let status="${status} + 1"
fi
done
if [[ "${status}" -gt 0 ]] ; then
echo "Missing binaries: ${missing_bins[*]}"
elif ! parseArgs ${@} ; then
let status="${status} + 1"
elif [[ "${operation}" == "help" ]] ; then
printUsage
else
# Add go-specific environment variables.
eval $(go env)
golibdir=$(dirname "${GOPATH}/src/${REMOTE}/${FDBREPO}")
if [[ -z "${destdir}" ]] ; then
if [[ "${operation}" == "localinstall" ]] ; then
# Assume its the local directory.
destdir=$(cd "${filedir}/../../.." && pwd)
else
destdir="${golibdir}"
fi
fi
if [[ ! -d "${destdir}" ]] ; then
cmd=( 'mkdir' '-p' "${destdir}" )
echo "${cmd[*]}"
if ! "${cmd[@]}" ; then
let status="${status} + 1"
echo "Could not create destination directory ${destdir}."
fi
fi
# Step 1: Make sure repository is present.
if [[ "${status}" -eq 0 ]] ; then
destdir=$( cd "${destdir}" && pwd ) # Get absolute path of destination dir.
fdbdir="${destdir}/foundation"
if [[ ! -d "${destdir}" ]] ; then
cmd=("mkdir" "-p" "${destdir}")
echo "${cmd[*]}"
if ! "${cmd[@]}" ; then
echo "Could not create destination directory ${destdir}."
let status="${status} + 1"
fi
fi
fi
if [[ "${operation}" == "localinstall" ]] ; then
# No download occurs in this case.
:
else
if [[ -d "${fdbdir}" ]] ; then
echo "Directory ${fdbdir} already exists ; checking out appropriate tag"
cmd1=( 'git' '-C' "${fdbdir}" 'fetch' 'origin' )
cmd2=( 'git' '-C' "${fdbdir}" 'checkout' "release-${FDBVER}" )
if ! echo "${cmd1[*]}" || ! "${cmd1[@]}" ; then
let status="${status} + 1"
echo "Could not pull latest changes from origin"
elif ! echo "${cmd2[*]}" || ! "${cmd2[@]}" ; then
let status="${status} + 1"
echo "Could not checkout tag release-${FDBVER}."
fi
else
echo "Downloading foundation repository into ${destdir}:"
cmd=( 'git' '-C' "${destdir}" 'clone' '--branch' "release-${FDBVER}" "git@${REMOTE}:${FDBREPO}.git" )
echo "${cmd[*]}"
if ! "${cmd[@]}" ; then
let status="${status} + 1"
echo "Could not download repository."
fi
fi
fi
# Step 2: Build generated things.
if [[ "${operation}" == "download" ]] ; then
# The generated files are not created under a strict download.
:
elif [[ "${status}" -eq 0 ]] ; then
echo "Building generated files."
cmd=( 'make' '-C' "${fdbdir}" 'bindings/c/foundationdb/fdb_c_options.g.h' )
echo "${cmd[*]}"
if ! "${cmd[@]}" ; then
let status="${status} + 1"
echo "Could not generate required c header"
else
infile="${fdbdir}/fdbclient/vexillographer/fdb.options"
outfile="${fdbdir}/bindings/go/src/fdb/generated.go"
cmd=( 'go' 'run' "${fdbdir}/bindings/go/src/_util/translate_fdb_options.go" )
echo "${cmd[*]} < ${infile} > ${outfile}"
if ! "${cmd[@]}" < "${infile}" > "${outfile}" ; then
let status="${status} + 1"
echo "Could not generate generated go file."
fi
fi
fi
# Step 3: Add to go path.
if [[ "${operation}" == "download" ]] ; then
# The files are not moved under a strict download.
:
elif [[ "${status}" -eq 0 ]] ; then
linkpath="${GOPATH}/src/${REMOTE}/${FDBREPO}"
if [[ "${linkpath}" == "${fdbdir}" ]] ; then
# Downloaded directly into go path. Skip making the link.
:
elif [[ -e "${linkpath}" ]] ; then
echo "Warning: link path (${linkpath}) already exists. Leaving in place."
else
dirpath=$(dirname "${linkpath}")
if [[ ! -d "${dirpath}" ]] ; then
cmd=( 'mkdir' '-p' "${dirpath}" )
echo "${cmd[*]}"
if ! "${cmd[@]}" ; then
let status="${status} + 1"
echo "Could not create directory for link."
fi
fi
if [[ "${status}" -eq 0 ]] ; then
cmd=( 'ln' '-s' "${fdbdir}" "${linkpath}" )
echo "${cmd[*]}"
if ! "${cmd[@]}" ; then
let status="${status} + 1"
echo "Could not create link within go path."
fi
fi
fi
fi
# Step 4: Build the binaries.
if [[ "${operation}" == "download" ]] ; then
# Do not install if only downloading
:
elif [[ "${status}" -eq 0 ]] ; then
cgo_cflags="-g -O2 -I${linkpath}/bindings/c"
cgo_ldflags="-g -O2 -L${FDBLIBDIR}"
fdb_go_path="${REMOTE}/${FDBREPO}/bindings/go/src"
if [[ ! -e "${FDBLIBDIR}/${libfdbc}" ]] ; then
# Just a warning. Don't fail script.
echo
echo "WARNING: The FoundationDB C library was not found within ${FDBLIBDIR}."
echo "Your installation may be incomplete."
echo
elif ! CGO_CFLAGS="${cgo_cflags}" CGO_LDFLAGS="${cgo_ldflags}" go install "${fdb_go_path}/fdb" "${fdb_go_path}/fdb/tuple" "${fdb_go_path}/fdb/subspace" "${fdb_go_path}/fdb/directory" ; then
let status="${status} + 1"
echo "Could not build FoundationDB go libraries."
fi
fi
# Step 5: Explain CGO flags.
if [[ "${status}" -eq 0 && ("${operation}" == "localinstall" || "${operation}" == "install" ) ]] ; then
echo
echo "The FoundationDB go bindings were successfully installed."
echo "To build packages which use the go bindings, you will need to"
echo "set the following environment variables:"
echo " CGO_CFLAGS=\"${cgo_cflags}\""
echo " CGO_LDFLAGS=\"${cgo_ldflags}\""
fi
fi
fi
exit "${status}"

View File

@ -21,7 +21,11 @@
TARGETS += fdb_go fdb_go_tester
CLEAN_TARGETS += fdb_go_clean fdb_go_tester_clean
GOPATH := $(CURDIR)/bindings/go
GOPATH := $(CURDIR)/bindings/go/build
GO_IMPORT_PATH := github.com/apple/foundationdb/bindings/go/src
GO_DEST := $(GOPATH)/src/$(GO_IMPORT_PATH)
.PHONY: fdb_go fdb_go_path fdb_go_tester fdb_go_tester_clean godoc godoc_clean
# We only override if the environment didn't set it (this is used by
# the fdbwebsite documentation build process)
@ -38,18 +42,23 @@ else
$(error Not prepared to compile on platform $(PLATFORM))
endif
GO_PACKAGE_OUTDIR := $(GOPATH)/pkg/$(GOPLATFORM)
GO_PACKAGE_OUTDIR := $(GOPATH)/pkg/$(GOPLATFORM)/$(GO_IMPORT_PATH)
GO_PACKAGES := fdb fdb/tuple fdb/subspace fdb/directory
GO_PACKAGE_OBJECTS := $(addprefix $(GO_PACKAGE_OUTDIR)/,$(GO_PACKAGES:=.a))
GO_SRC := $(shell find $(GOPATH)/src -name '*.go')
GO_SRC := $(shell find $(CURDIR)/bindings/go/src -name '*.go')
fdb_go: $(GO_PACKAGE_OBJECTS) $(GO_SRC)
fdb_go_path: $(GO_SRC)
@echo "Creating fdb_go_path"
@mkdir -p $(GO_DEST)
@cp -r bindings/go/src/* $(GO_DEST)
fdb_go_clean:
@echo "Cleaning fdb_go"
@rm -rf $(GO_PACKAGE_OUTDIR)
@rm -rf $(GOPATH)
fdb_go_tester: $(GOPATH)/bin/_stacktester
@ -57,40 +66,40 @@ fdb_go_tester_clean:
@echo "Cleaning fdb_go_tester"
@rm -rf $(GOPATH)/bin
$(GOPATH)/bin/_stacktester: $(GO_SRC) $(GO_PACKAGE_OBJECTS) bindings/go/src/fdb/generated.go
$(GOPATH)/bin/_stacktester: fdb_go_path $(GO_SRC) $(GO_PACKAGE_OBJECTS) $(GO_DEST)/fdb/generated.go
@echo "Compiling $(basename $(notdir $@))"
@go install _stacktester
@go install $(GO_IMPORT_PATH)/_stacktester
$(GO_PACKAGE_OUTDIR)/fdb/tuple.a: $(GO_SRC) $(GO_PACKAGE_OUTDIR)/fdb.a bindings/go/src/fdb/generated.go
$(GO_PACKAGE_OUTDIR)/fdb/tuple.a: fdb_go_path $(GO_SRC) $(GO_PACKAGE_OUTDIR)/fdb.a $(GO_DEST)/fdb/generated.go
@echo "Compiling fdb/tuple"
@go install fdb/tuple
@go install $(GO_IMPORT_PATH)/fdb/tuple
$(GO_PACKAGE_OUTDIR)/fdb/subspace.a: $(GO_SRC) $(GO_PACKAGE_OUTDIR)/fdb.a $(GO_PACKAGE_OUTDIR)/fdb/tuple.a bindings/go/src/fdb/generated.go
$(GO_PACKAGE_OUTDIR)/fdb/subspace.a: fdb_go_path $(GO_SRC) $(GO_PACKAGE_OUTDIR)/fdb.a $(GO_PACKAGE_OUTDIR)/fdb/tuple.a $(GO_DEST)/fdb/generated.go
@echo "Compiling fdb/subspace"
@go install fdb/subspace
@go install $(GO_IMPORT_PATH)/fdb/subspace
$(GO_PACKAGE_OUTDIR)/fdb/directory.a: $(GO_SRC) $(GO_PACKAGE_OUTDIR)/fdb.a $(GO_PACKAGE_OUTDIR)/fdb/tuple.a $(GO_PACKAGE_OUTDIR)/fdb/subspace.a bindings/go/src/fdb/generated.go
$(GO_PACKAGE_OUTDIR)/fdb/directory.a: fdb_go_path $(GO_SRC) $(GO_PACKAGE_OUTDIR)/fdb.a $(GO_PACKAGE_OUTDIR)/fdb/tuple.a $(GO_PACKAGE_OUTDIR)/fdb/subspace.a $(GO_DEST)/fdb/generated.go
@echo "Compiling fdb/directory"
@go install fdb/directory
@go install $(GO_IMPORT_PATH)/fdb/directory
$(GO_PACKAGE_OUTDIR)/fdb.a: $(GO_SRC) bindings/go/src/fdb/generated.go
$(GO_PACKAGE_OUTDIR)/fdb.a: fdb_go_path $(GO_SRC) $(GO_DEST)/fdb/generated.go
@echo "Compiling fdb"
@go install fdb
@go install $(GO_IMPORT_PATH)/fdb
bindings/go/src/fdb/generated.go: lib/libfdb_c.$(DLEXT) bindings/go/src/_util/translate_fdb_options.go fdbclient/vexillographer/fdb.options
$(GO_DEST)/fdb/generated.go: fdb_go_path lib/libfdb_c.$(DLEXT) bindings/go/src/_util/translate_fdb_options.go fdbclient/vexillographer/fdb.options
@echo "Building $@"
@go run bindings/go/src/_util/translate_fdb_options.go < fdbclient/vexillographer/fdb.options > $@
godoc: $(GO_SRC)
godoc: fdb_go_path $(GO_SRC)
@echo "Generating Go Documentation"
@rm -rf $(GODOC_DIR)/godoc
@mkdir -p $(GODOC_DIR)/godoc
@mkdir -p $(GODOC_DIR)/godoc/lib/godoc
@godoc -url "http://localhost:6060/pkg/fdb" > $(GODOC_DIR)/godoc/fdb.html
@godoc -url "http://localhost:6060/pkg/fdb/tuple" > $(GODOC_DIR)/godoc/fdb.tuple.html
@godoc -url "http://localhost:6060/pkg/fdb/subspace" > $(GODOC_DIR)/godoc/fdb.subspace.html
@godoc -url "http://localhost:6060/pkg/fdb/directory" > $(GODOC_DIR)/godoc/fdb.directory.html
@cp $(GOPATH)/godoc-resources/* $(GODOC_DIR)/godoc/lib/godoc
@godoc -url "pkg/$(GO_IMPORT_PATH)/fdb" > $(GODOC_DIR)/godoc/fdb.html
@godoc -url "pkg/$(GO_IMPORT_PATH)/fdb/tuple" > $(GODOC_DIR)/godoc/fdb.tuple.html
@godoc -url "pkg/$(GO_IMPORT_PATH)/fdb/subspace" > $(GODOC_DIR)/godoc/fdb.subspace.html
@godoc -url "pkg/$(GO_IMPORT_PATH)/fdb/directory" > $(GODOC_DIR)/godoc/fdb.directory.html
@cp $(CURDIR)/bindings/go/godoc-resources/* $(GODOC_DIR)/godoc/lib/godoc
@echo "Mangling paths in Go Documentation"
@(find $(GODOC_DIR)/godoc/ -name *.html -exec sed -i '' -e 's_/lib_lib_' {} \;)
@(sed -i -e 's_a href="tuple/"_a href="fdb.tuple.html"_' $(GODOC_DIR)/godoc/fdb.html)

View File

@ -21,10 +21,10 @@
package main
import (
"fdb"
"fdb/tuple"
"fdb/subspace"
"fdb/directory"
"github.com/apple/foundationdb/bindings/go/src/fdb"
"github.com/apple/foundationdb/bindings/go/src/fdb/tuple"
"github.com/apple/foundationdb/bindings/go/src/fdb/subspace"
"github.com/apple/foundationdb/bindings/go/src/fdb/directory"
"strings"
"bytes"
)
@ -94,14 +94,14 @@ func (sm *StackMachine) maybePath() []string {
}
var createOps = map[string]bool {
"CREATE_SUBSPACE": true,
"CREATE_LAYER": true,
"CREATE_OR_OPEN": true,
"CREATE": true,
"OPEN": true,
"MOVE": true,
"MOVE_TO": true,
"OPEN_SUBSPACE": true,
"CREATE_SUBSPACE": true,
"CREATE_LAYER": true,
"CREATE_OR_OPEN": true,
"CREATE": true,
"OPEN": true,
"MOVE": true,
"MOVE_TO": true,
"OPEN_SUBSPACE": true,
}
func (de *DirectoryExtension) processOp(sm *StackMachine, op string, isDB bool, idx int, t fdb.Transactor, rt fdb.ReadTransactor) {

View File

@ -24,8 +24,8 @@ import (
"bytes"
"encoding/binary"
"encoding/hex"
"fdb"
"fdb/tuple"
"github.com/apple/foundationdb/bindings/go/src/fdb"
"github.com/apple/foundationdb/bindings/go/src/fdb/tuple"
"log"
"fmt"
"os"

View File

@ -23,8 +23,8 @@
package directory
import (
"fdb"
"fdb/subspace"
"github.com/apple/foundationdb/bindings/go/src/fdb"
"github.com/apple/foundationdb/bindings/go/src/fdb/subspace"
"encoding/binary"
"bytes"
"math/rand"

View File

@ -40,8 +40,8 @@
package directory
import (
"fdb"
"fdb/subspace"
"github.com/apple/foundationdb/bindings/go/src/fdb"
"github.com/apple/foundationdb/bindings/go/src/fdb/subspace"
"errors"
)
@ -140,15 +140,15 @@ type Directory interface {
}
func stringsEqual(a, b []string) bool {
if len(a) != len(b) {
return false
}
for i, v := range a {
if v != b[i] {
return false
}
}
return true
if len(a) != len(b) {
return false
}
for i, v := range a {
if v != b[i] {
return false
}
}
return true
}
func moveTo(t fdb.Transactor, dl directoryLayer, path, newAbsolutePath []string) (DirectorySubspace, error) {

View File

@ -23,9 +23,9 @@
package directory
import (
"fdb"
"fdb/subspace"
"fdb/tuple"
"github.com/apple/foundationdb/bindings/go/src/fdb"
"github.com/apple/foundationdb/bindings/go/src/fdb/subspace"
"github.com/apple/foundationdb/bindings/go/src/fdb/tuple"
"encoding/binary"
"bytes"
"fmt"

View File

@ -23,9 +23,9 @@
package directory
import (
"fdb"
"fdb/subspace"
"fdb/tuple"
"github.com/apple/foundationdb/bindings/go/src/fdb"
"github.com/apple/foundationdb/bindings/go/src/fdb/subspace"
"github.com/apple/foundationdb/bindings/go/src/fdb/tuple"
)
type directoryPartition struct {

View File

@ -23,8 +23,8 @@
package directory
import (
"fdb"
"fdb/subspace"
"github.com/apple/foundationdb/bindings/go/src/fdb"
"github.com/apple/foundationdb/bindings/go/src/fdb/subspace"
)
// DirectorySubspace represents a Directory that may also be used as a Subspace

View File

@ -23,8 +23,8 @@
package directory
import (
"fdb"
"fdb/subspace"
"github.com/apple/foundationdb/bindings/go/src/fdb"
"github.com/apple/foundationdb/bindings/go/src/fdb/subspace"
"bytes"
)

View File

@ -39,7 +39,7 @@ A basic interaction with the FoundationDB API is demonstrated below:
package main
import (
"github.com/apple/foundationdb/bindings/go/fdb"
"github.com/apple/foundationdb/bindings/go/src/fdb"
"log"
"fmt"
)

View File

@ -23,7 +23,7 @@
package fdb
/*
#define FDB_API_VERSION 200
#define FDB_API_VERSION 500
#include <foundationdb/fdb_c.h>
*/
import "C"

View File

@ -23,7 +23,7 @@
package fdb_test
import (
"fdb"
"github.com/apple/foundationdb/bindings/go/src/fdb"
"fmt"
"testing"
)

View File

@ -33,8 +33,8 @@
package subspace
import (
"fdb"
"fdb/tuple"
"github.com/apple/foundationdb/bindings/go/src/fdb"
"github.com/apple/foundationdb/bindings/go/src/fdb/tuple"
"bytes"
"errors"
)

View File

@ -38,7 +38,7 @@ import (
"fmt"
"encoding/binary"
"bytes"
"fdb"
"github.com/apple/foundationdb/bindings/go/src/fdb"
)
// A TupleElement is one of the types that may be encoded in FoundationDB

View File

@ -44,8 +44,8 @@ public interface Database extends Disposable, TransactionContext {
* Creates a {@link Transaction} that operates on this {@code Database}.<br>
* <br>
* Note: Java transactions automatically set the {@link TransactionOptions#setUsedDuringCommitProtectionDisable}
* option. This is because the Java bindings disallow use of {@code Transaction} objects after either
* {@link Transaction#reset} or {@link Transaction#onError} is called.
* option. This is because the Java bindings disallow use of {@code Transaction} objects after
* {@link Transaction#onError} is called.
*
* @return a newly created {@code Transaction} that reads from and writes to this {@code Database}.
*/

View File

@ -61,7 +61,7 @@ public interface AsyncIterator<T> extends Iterator<T>, Disposable {
/**
* Returns the next element in the sequence. This will not block if, since the
* last call to {@code next()}, {@link #onHasNext()} was called and the resulting
* <h1>FIXME!!!!</h1> has completed or the blocking call {@link #hasNext()} was called
* {@link CompletableFuture} has completed or the blocking call {@link #hasNext()} was called
* and has returned. It is legal, therefore, to make a call to {@code next()} without a
* preceding call to
* {@link #hasNext()} or {@link #onHasNext()}, but that invocation of {@code next()}

View File

@ -19,7 +19,7 @@
*/
/**
* Provides additional constructs for asynchronous programming against Java's CompletableFutures.
* Provides additional constructs for asynchronous programming against Java's {@link java.util.concurrent.CompletableFuture CompletableFuture}s.
*
*/
package com.apple.cie.foundationdb.async;

View File

@ -24,10 +24,11 @@ and add it to your classpath.<br>
<br>
<h3>Getting started</h3>
To start using FoundationDB from Java, create an instance of the
{@link FDB FoundationDB API interface} with the version of the
{@link com.apple.cie.foundationdb.FDB FoundationDB API interface} with the version of the
API that you want to use (this release of the FoundationDB Java API supports only version {@code 500}).
With this API object you can then open {@link Cluster}s and
{@link Database}s and start using {@link Transaction}s.
With this API object you can then open {@link com.apple.cie.foundationdb.Cluster Cluster}s and
{@link com.apple.cie.foundationdb.Database Database}s and start using
{@link com.apple.cie.foundationdb.Transaction Transactions}s.
Here we give an example. The example relies on a cluster file at the
<a href="/documentation/api-general.html#default-cluster-file">default location</a>
for your platform and a running server.<br>
@ -77,7 +78,7 @@ for information about how Tuples sort and can be used to efficiently model data.
The {@link com.apple.cie.foundationdb.directory Directory API} is provided with the core
Java API for FoundationDB. This layer is provided in some form in all official
language bindings. The FoundationDB API provides directories as a tool for
managing related {@link Subspace}s. Directories are a
managing related {@link com.apple.cie.foundationdb.subspace.Subspace Subspace}s. Directories are a
recommended approach for administering applications. Each application should
create or open at least one directory to manage its subspaces. Directories are
identified by hierarchical paths analogous to the paths in a Unix-like file system.

View File

@ -114,4 +114,4 @@ public abstract class AbstractTester {
t.printStackTrace();
return new RuntimeException(errorMessage, t);
}
}
}

View File

@ -46,8 +46,8 @@ public interface Database extends Disposable, TransactionContext {
* Creates a {@link Transaction} that operates on this {@code Database}.<br>
* <br>
* Note: Java transactions automatically set the {@link TransactionOptions#setUsedDuringCommitProtectionDisable}
* option. This is because the Java bindings disallow use of {@code Transaction} objects after either
* {@link Transaction#reset} or {@link Transaction#onError} is called.
* option. This is because the Java bindings disallow use of {@code Transaction} objects after
* {@link Transaction#onError} is called.
*
* @return a newly created {@code Transaction} that reads from and writes to this {@code Database}.
*/

View File

@ -24,11 +24,12 @@ and add it to your classpath.<br>
<br>
<h3>Getting started</h3>
To start using FoundationDB from Java, create an instance of the
{@link FDB FoundationDB API interface} with the version of the
{@link com.apple.cie.foundationdb.FDB FoundationDB API interface} with the version of the
API that you want to use (this release of the FoundationDB Java API supports only version {@code 500}).
With this API object you can then open {@link Cluster}s and
{@link Database}s and start using {@link Transaction}s.
Here we give an example. The example relies on a cluster file at the
With this API object you can then open {@link com.apple.cie.foundationdb.Cluster}s and
{@link com.apple.cie.foundationdb.Database}s and start using
{@link com.apple.cie.foundationdb.Transaction}s. Here we give an example. The example relies on a
cluster file at the
<a href="/documentation/api-general.html#default-cluster-file">default location</a>
for your platform and a running server.<br>
<br>
@ -77,7 +78,7 @@ for information about how Tuples sort and can be used to efficiently model data.
The {@link com.apple.cie.foundationdb.directory Directory API} is provided with the core
Java API for FoundationDB. This layer is provided in some form in all official
language bindings. The FoundationDB API provides directories as a tool for
managing related {@link Subspace}s. Directories are a
managing related {@link com.apple.cie.foundationdb.subspace.Subspace Subspace}s. Directories are a
recommended approach for administering applications. Each application should
create or open at least one directory to manage its subspaces. Directories are
identified by hierarchical paths analogous to the paths in a Unix-like file system.
@ -87,12 +88,12 @@ for the corresponding subspace. In effect, directories provide a level of indire
for access to subspaces.
<br>
<h3>{@link com.apple.cie.foundationdb.async.Future Future}s and asynchronous operation</h3>
Asynchronous FoundationDB operations return {@link Future}s.
A {@link Future} can be used in a blocking way using the
{@link Future#get() get()} method or in a
Asynchronous FoundationDB operations return {@link com.apple.cie.foundationdb.async.Future Future}s.
A {@link com.apple.cie.foundationdb.async.Future Future} can be used in a blocking way using the
{@link com.apple.cie.foundationdb.async.Future#get() get()} method or in a
fully-asynchronous way using the
{@link Future#map(Function) map()} and
{@link Future#flatMap(Function) flatMap()}
{@link com.apple.cie.foundationdb.async.Future#map(Function) map()} and
{@link com.apple.cie.foundationdb.async.Future#flatMap(Function) flatMap()}
methods. Generally, the blocking style is more straightforward and the asynchronous style
is more efficient. Mixing the two styles correctly can be tricky, so consider choosing
one or the other. See the {@linkplain com.apple.cie.foundationdb.async async Package documentation}

View File

@ -25,7 +25,7 @@
#include "flow/flow.h"
#include "NativeAPI.h"
#include "TaskBucket.h"
#include "flow/Notified.h"
#include "Notified.h"
#include <fdbrpc/IAsyncFile.h>
#include "KeyBackedTypes.h"
#include <ctime>

View File

@ -22,12 +22,20 @@
#include "fdbrpc/FailureMonitor.h"
#include "ClusterInterface.h"
struct FailureMonitorClientState : ReferenceCounted<FailureMonitorClientState> {
std::set<NetworkAddress> knownAddrs;
double serverFailedTimeout;
FailureMonitorClientState() {
serverFailedTimeout = CLIENT_KNOBS->FAILURE_TIMEOUT_DELAY;
}
};
ACTOR Future<Void> failureMonitorClientLoop(
SimpleFailureMonitor* monitor,
ClusterInterface controller,
double* pServerFailedTimeout,
bool trackMyStatus,
std::set<NetworkAddress>* knownAddrs)
Reference<FailureMonitorClientState> fmState,
bool trackMyStatus)
{
state Version version = 0;
state Future<FailureMonitoringReply> request = Never();
@ -37,7 +45,7 @@ ACTOR Future<Void> failureMonitorClientLoop(
state double waitfor = 0;
monitor->setStatus(controller.failureMonitoring.getEndpoint().address, FailureStatus(false));
knownAddrs->insert( controller.failureMonitoring.getEndpoint().address );
fmState->knownAddrs.insert( controller.failureMonitoring.getEndpoint().address );
//The cluster controller's address (controller.failureMonitoring.getEndpoint().address) is treated specially because we can declare that it is down independently
//of the response from the cluster controller. It still needs to be in knownAddrs in case the cluster controller changes, so the next cluster controller resets its state
@ -51,14 +59,14 @@ ACTOR Future<Void> failureMonitorClientLoop(
requestTimeout = Never();
if (reply.allOthersFailed) {
// Reset all systems *not* mentioned in the reply to the default (failed) state
knownAddrs->erase( controller.failureMonitoring.getEndpoint().address );
fmState->knownAddrs.erase( controller.failureMonitoring.getEndpoint().address );
std::set<NetworkAddress> changedAddresses;
for(int c=0; c<reply.changes.size(); c++)
changedAddresses.insert( reply.changes[c].address );
for(auto it : *knownAddrs)
for(auto it : fmState->knownAddrs)
if (!changedAddresses.count( it ))
monitor->setStatus( it, FailureStatus() );
knownAddrs->clear();
fmState->knownAddrs.clear();
} else {
ASSERT( version != 0 );
}
@ -66,20 +74,20 @@ ACTOR Future<Void> failureMonitorClientLoop(
if( monitor->getState( controller.failureMonitoring.getEndpoint() ).isFailed() )
TraceEvent("FailureMonitoringServerUp").detail("OldServer",controller.id());
monitor->setStatus( controller.failureMonitoring.getEndpoint().address, FailureStatus(false) );
knownAddrs->insert( controller.failureMonitoring.getEndpoint().address );
fmState->knownAddrs.insert( controller.failureMonitoring.getEndpoint().address );
//if (version != reply.failureInformationVersion)
// printf("Client '%s': update from %lld to %lld (%d changes, aof=%d)\n", g_network->getLocalAddress().toString().c_str(), version, reply.failureInformationVersion, reply.changes.size(), reply.allOthersFailed);
version = reply.failureInformationVersion;
*pServerFailedTimeout = reply.considerServerFailedTimeoutMS * .001;
fmState->serverFailedTimeout = reply.considerServerFailedTimeoutMS * .001;
for(int c=0; c<reply.changes.size(); c++) {
//printf("Client '%s': status of '%s' is now '%s'\n", g_network->getLocalAddress().toString().c_str(), reply.changes[c].address.toString().c_str(), reply.changes[c].status.failed ? "Failed" : "OK");
monitor->setStatus( reply.changes[c].address, reply.changes[c].status );
if (reply.changes[c].status != FailureStatus())
knownAddrs->insert( reply.changes[c].address );
fmState->knownAddrs.insert( reply.changes[c].address );
else
knownAddrs->erase( reply.changes[c].address );
fmState->knownAddrs.erase( reply.changes[c].address );
ASSERT( reply.changes[c].address != controller.failureMonitoring.getEndpoint().address || !reply.changes[c].status.failed );
}
before = now();
@ -91,7 +99,7 @@ ACTOR Future<Void> failureMonitorClientLoop(
requestTimeout = Never();
TraceEvent(SevWarn, "FailureMonitoringServerDown").detail("OldServerID",controller.id());
monitor->setStatus( controller.failureMonitoring.getEndpoint().address, FailureStatus(true) );
knownAddrs->erase( controller.failureMonitoring.getEndpoint().address );
fmState->knownAddrs.erase( controller.failureMonitoring.getEndpoint().address );
}
when( Void _ = wait( nextRequest ) ) {
g_network->setCurrentTask(TaskDefaultDelay);
@ -111,7 +119,7 @@ ACTOR Future<Void> failureMonitorClientLoop(
req.senderStatus = FailureStatus(false);
request = controller.failureMonitoring.getReply( req, TaskFailureMonitor );
if(!controller.failureMonitoring.getEndpoint().isLocal())
requestTimeout = delay( *pServerFailedTimeout, TaskFailureMonitor );
requestTimeout = delay( fmState->serverFailedTimeout, TaskFailureMonitor );
}
}
}
@ -125,11 +133,10 @@ ACTOR Future<Void> failureMonitorClientLoop(
ACTOR Future<Void> failureMonitorClient( Reference<AsyncVar<Optional<struct ClusterInterface>>> ci, bool trackMyStatus ) {
state SimpleFailureMonitor* monitor = static_cast<SimpleFailureMonitor*>( &IFailureMonitor::failureMonitor() );
state std::set<NetworkAddress> knownAddrs;
state double serverFailedTimeout = CLIENT_KNOBS->FAILURE_TIMEOUT_DELAY;
state Reference<FailureMonitorClientState> fmState = Reference<FailureMonitorClientState>(new FailureMonitorClientState());
loop {
state Future<Void> client = ci->get().present() ? failureMonitorClientLoop(monitor, ci->get().get(), &serverFailedTimeout, trackMyStatus, &knownAddrs) : Void();
state Future<Void> client = ci->get().present() ? failureMonitorClientLoop(monitor, ci->get().get(), fmState, trackMyStatus) : Void();
Void _ = wait( ci->onChange() );
}
}

25
fdbclient/MetricLogger.h Executable file
View File

@ -0,0 +1,25 @@
/*
* MetricLogger.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "NativeAPI.h"
Future<Void> runMetrics( Future<Database> const& fcx, Key const& metricsPrefix );

View File

@ -18,11 +18,11 @@
* limitations under the License.
*/
#ifndef FLOW_NOTIFIED_H
#define FLOW_NOTIFIED_H
#ifndef FDBCLIENT_NOTIFIED_H
#define FDBCLIENT_NOTIFIED_H
#pragma once
#include "fdbclient/FDBTypes.h"
#include "FDBTypes.h"
#include "flow/TDMetric.actor.h"
struct NotifiedVersion {
@ -78,4 +78,4 @@ private:
VersionMetricHandle val;
};
#endif
#endif

5
fdbclient/fdbclient.vcxproj Normal file → Executable file
View File

@ -38,6 +38,8 @@
<EnableCompile Condition="'$(Configuration)|$(Platform)'=='Debug|X64'">false</EnableCompile>
<EnableCompile Condition="'$(Configuration)|$(Platform)'=='Release|X64'">false</EnableCompile>
</ActorCompiler>
<ClInclude Include="MetricLogger.h" />
<ActorCompiler Include="MetricLogger.actor.cpp" />
<ClInclude Include="FailureMonitorClient.h" />
<ClInclude Include="FDBOptions.g.h" />
<ClInclude Include="FDBOptions.h" />
@ -57,6 +59,7 @@
<ClInclude Include="MultiVersionTransaction.h" />
<ClInclude Include="MutationList.h" />
<ClInclude Include="NativeAPI.h" />
<ClInclude Include="Notified.h" />
<ClInclude Include="ReadYourWrites.h" />
<ActorCompiler Include="RunTransaction.actor.h" />
<ClInclude Include="RYWIterator.h" />
@ -201,4 +204,4 @@
<Target Name="MyPreCompileSteps" AfterTargets="CLCompile">
<Exec Command="&quot;$(SolutionDir)bin\$(Configuration)\coveragetool.exe&quot; &quot;$(OutDir)coverage.$(TargetName).xml&quot; @(ActorCompiler -> '%(RelativeDir)%(Filename)%(Extension)', ' ') @(CLInclude -> '%(RelativeDir)%(Filename)%(Extension)', ' ') @(CLCompile -> '%(RelativeDir)%(Filename)%(Extension)', ' ')" />
</Target>
</Project>
</Project>

View File

@ -504,7 +504,7 @@ private:
}
void setIOTimeout(double timeout) {
ioTimeout = timeout;
ioTimeout = fabs(timeout);
timeoutWarnOnly = timeout < 0;
}

View File

@ -20,6 +20,10 @@
#include "IAsyncFile.h"
#if VALGRIND
#include <memcheck.h>
#endif
class AsyncFileWriteChecker : public IAsyncFile, public ReferenceCounted<AsyncFileWriteChecker> {
public:
void addref() { ReferenceCounted<AsyncFileWriteChecker>::addref(); }
@ -112,9 +116,14 @@ private:
}
while(page < pageEnd) {
//printf("%d %d %u %u\n", write, page, checksum, historySum);
uint32_t checksum = hashlittle(start, checksumHistoryPageSize, 0xab12fd93);
WriteInfo &history = checksumHistory[page];
//printf("%d %d %u %u\n", write, page, checksum, history.checksum);
#if VALGRIND
// It's possible we'll read or write a page where not all of the data is defined, but the checksum of the page is still valid
VALGRIND_MAKE_MEM_DEFINED_IF_ADDRESSABLE(&checksum, sizeof(uint32_t));
#endif
// For writes, just update the stored sum
if(write) {

View File

@ -137,7 +137,7 @@ class TransportData {
public:
TransportData(uint64_t transportId)
: endpointNotFoundReceiver(endpoints),
pingReceiver(endpoints),
pingReceiver(endpoints),
warnAlwaysForLargePacket(true),
lastIncompatibleMessage(0),
transportId(transportId)
@ -204,7 +204,9 @@ struct ConnectPacket {
static_assert( sizeof(ConnectPacket) == CONNECT_PACKET_V2_SIZE, "ConnectPacket packed incorrectly" );
#pragma pack( pop )
static Future<Void> connectionReader( TransportData* const& transport, Reference<IConnection> const& conn, bool const& isOutgoing, Promise<NetworkAddress> const& onPeerAddress );
static Future<Void> connectionReader( TransportData* const& transport, Reference<IConnection> const& conn, Peer* const& peer, Promise<Peer*> const& onConnected );
static PacketID sendPacket( TransportData* self, ISerializeSource const& what, const Endpoint& destination, bool reliable );
struct Peer : NonCopyable {
// FIXME: Peers don't die!
@ -215,12 +217,14 @@ struct Peer : NonCopyable {
ReliablePacketList reliable;
AsyncTrigger dataToSend; // Triggered when unsent.empty() becomes false
Future<Void> connect;
AsyncVar<bool> incompatibleDataRead;
bool compatible;
bool outgoingConnectionIdle; // We don't actually have a connection open and aren't trying to open one because we don't have anything to send
double lastConnectTime;
double reconnectionDelay;
explicit Peer( TransportData* transport, NetworkAddress const& destination, bool doConnect = true )
: transport(transport), destination(destination), outgoingConnectionIdle(!doConnect), lastConnectTime(0.0), reconnectionDelay(FLOW_KNOBS->INITIAL_RECONNECTION_TIME)
: transport(transport), destination(destination), outgoingConnectionIdle(!doConnect), lastConnectTime(0.0), reconnectionDelay(FLOW_KNOBS->INITIAL_RECONNECTION_TIME), compatible(true)
{
if(doConnect) {
connect = connectionKeeper(this);
@ -293,8 +297,7 @@ struct Peer : NonCopyable {
}
}
ACTOR static Future<Void> connectionMonitor( Peer* peer ) {
ACTOR static Future<Void> connectionMonitor( Peer *peer ) {
state RequestStream< ReplyPromise<Void> > remotePing( Endpoint( peer->destination, WLTOKEN_PING_PACKET ) );
loop {
@ -305,9 +308,11 @@ struct Peer : NonCopyable {
state ReplyPromise<Void> reply;
FlowTransport::transport().sendUnreliable( SerializeSource<ReplyPromise<Void>>(reply), remotePing.getEndpoint() );
peer->incompatibleDataRead.set(false);
choose {
when (Void _ = wait( delay( FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT ) )) { TraceEvent("ConnectionTimeout").detail("WithAddr", peer->destination); throw connection_failed(); }
when (Void _ = wait( reply.getFuture() )) {}
when (Void _ = wait( peer->incompatibleDataRead.onChange())) {}
}
}
}
@ -364,18 +369,17 @@ struct Peer : NonCopyable {
Reference<IConnection> _conn = wait( timeout( INetworkConnections::net()->connect(self->destination), FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT, Reference<IConnection>() ) );
if (_conn) {
conn = _conn;
TraceEvent("ConnEstablishedTo", conn->getDebugID()).detail("PeerAddr", self->destination);
TraceEvent("ConnectionExchangingConnectPacket", conn->getDebugID()).detail("PeerAddr", self->destination);
self->prependConnectPacket();
} else {
TraceEvent("ConnTimedOut", conn ? conn->getDebugID() : UID()).detail("PeerAddr", self->destination);
TraceEvent("ConnectionTimedOut", conn ? conn->getDebugID() : UID()).detail("PeerAddr", self->destination);
throw connection_failed();
}
reader = connectionReader( self->transport, conn, true, Promise<NetworkAddress>() );
reader = connectionReader( self->transport, conn, self, Promise<Peer*>());
} else {
self->outgoingConnectionIdle = false;
}
self->transport->countConnEstablished++;
Void _ = wait( connectionWriter( self, conn ) || reader || connectionMonitor(self) );
@ -389,12 +393,17 @@ struct Peer : NonCopyable {
self->discardUnreliablePackets();
reader = Future<Void>();
bool ok = e.code() == error_code_connection_failed || e.code() == error_code_actor_cancelled || ( g_network->isSimulated() && e.code() == error_code_checksum_failed );
TraceEvent(ok ? SevInfo : SevError, "ConnectionClosed", conn ? conn->getDebugID() : UID()).detail("PeerAddr", self->destination).error(e, true);
if (ok)
self->transport->countConnClosedWithoutError++;
else
self->transport->countConnClosedWithError++;
if(self->compatible) {
TraceEvent(ok ? SevInfo : SevError, "ConnectionClosed", conn ? conn->getDebugID() : UID()).detail("PeerAddr", self->destination).error(e, true);
if (ok)
self->transport->countConnClosedWithoutError++;
else
self->transport->countConnClosedWithError++;
}
else {
TraceEvent(ok ? SevInfo : SevError, "IncompatibleConnectionClosed", conn ? conn->getDebugID() : UID()).detail("PeerAddr", self->destination).error(e, true);
}
if (conn) {
conn->close();
@ -408,8 +417,6 @@ struct Peer : NonCopyable {
}
};
static PacketID sendPacket( TransportData* self, ISerializeSource const& what, const Endpoint& destination, bool reliable );
ACTOR static void deliver( TransportData* self, Endpoint destination, ArenaReader reader, bool inReadSocket ) {
int priority = self->endpoints.getPriority(destination.token);
if (priority < TaskReadSocket || !inReadSocket) {
@ -533,7 +540,9 @@ static void scanPackets( TransportData* transport, uint8_t*& unprocessed_begin,
ACTOR static Future<Void> connectionReader(
TransportData* transport,
Reference<IConnection> conn,
bool isOutgoing, Promise<NetworkAddress> onPeerAddress ) {
Peer *peer,
Promise<Peer*> onConnected)
{
// This actor exists whenever there is an open or opening connection, whether incoming or outgoing
// For incoming connections conn is set and peer is initially NULL; for outgoing connections it is the reverse
@ -542,12 +551,14 @@ ACTOR static Future<Void> connectionReader(
state uint8_t* unprocessed_end = NULL;
state uint8_t* buffer_end = NULL;
state bool expectConnectPacket = true;
state bool compatible = false;
state NetworkAddress peerAddress;
state uint64_t peerProtocolVersion = 0;
peerAddress = conn->getPeerAddress();
if (!isOutgoing)
if (peer == nullptr) {
ASSERT( !peerAddress.isPublic() );
}
loop {
loop {
@ -592,7 +603,8 @@ ACTOR static Future<Void> connectionReader(
.detail("LocalVersion", currentProtocolVersion)
.detail("RejectedVersion", p->protocolVersion)
.detail("VersionMask", compatibleProtocolVersionMask)
.detail("Peer", p->canonicalRemotePort ? NetworkAddress( p->canonicalRemoteIp, p->canonicalRemotePort ) : conn->getPeerAddress());
.detail("Peer", p->canonicalRemotePort ? NetworkAddress( p->canonicalRemoteIp, p->canonicalRemotePort ) : conn->getPeerAddress())
.detail("ConnectionId", connectionId);
transport->lastIncompatibleMessage = now();
}
if(!transport->incompatiblePeers.count(addr)) {
@ -601,7 +613,20 @@ ACTOR static Future<Void> connectionReader(
} else if(connectionId > 1) {
transport->multiVersionConnections[connectionId] = now() + FLOW_KNOBS->CONNECTION_ID_TIMEOUT;
}
throw incompatible_protocol_version();
compatible = false;
if(p->protocolVersion < 0x0FDB00A551000000LL) {
// Older versions expected us to hang up. It may work even if we don't hang up here, but it's safer to keep the old behavior.
throw incompatible_protocol_version();
}
}
else {
compatible = true;
TraceEvent("ConnectionEstablished", conn->getDebugID())
.detail("Peer", conn->getPeerAddress())
.detail("ConnectionId", connectionId);
transport->countConnEstablished++;
}
if(connectionId > 1) {
@ -611,21 +636,29 @@ ACTOR static Future<Void> connectionReader(
expectConnectPacket = false;
peerProtocolVersion = p->protocolVersion;
if (isOutgoing) {
if (peer != nullptr) {
// Outgoing connection; port information should be what we expect
TraceEvent("ConnectedOutgoing").detail("PeerAddr", NetworkAddress( p->canonicalRemoteIp, p->canonicalRemotePort ) );
peer->compatible = compatible;
ASSERT( p->canonicalRemotePort == peerAddress.port );
} else {
if (p->canonicalRemotePort) {
peerAddress = NetworkAddress( p->canonicalRemoteIp, p->canonicalRemotePort, true, peerAddress.isTLS() );
}
onPeerAddress.send( peerAddress );
peer = transport->getPeer(peerAddress);
peer->compatible = compatible;
onConnected.send( peer );
Void _ = wait( delay(0) ); // Check for cancellation
}
}
}
if (!expectConnectPacket)
if (compatible) {
scanPackets( transport, unprocessed_begin, unprocessed_end, arena, peerAddress, peerProtocolVersion );
}
else if(!expectConnectPacket) {
unprocessed_begin = unprocessed_end;
peer->incompatibleDataRead.set(true);
}
if (readWillBlock)
break;
@ -640,12 +673,11 @@ ACTOR static Future<Void> connectionReader(
ACTOR static Future<Void> connectionIncoming( TransportData* self, Reference<IConnection> conn ) {
try {
state Promise<NetworkAddress> onPeerAddress;
state Future<Void> reader = connectionReader( self, conn, false, onPeerAddress );
state Promise<Peer*> onConnected;
state Future<Void> reader = connectionReader( self, conn, nullptr, onConnected );
choose {
when( Void _ = wait( reader ) ) { ASSERT(false); return Void(); }
when( NetworkAddress pa = wait( onPeerAddress.getFuture() ) ) {
Peer* p = self->getPeer( pa, false );
when( Peer *p = wait( onConnected.getFuture() ) ) {
p->onIncomingConnection( conn, reader );
}
when( Void _ = wait( delayJittered(FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT) ) ) {
@ -785,9 +817,9 @@ static PacketID sendPacket( TransportData* self, ISerializeSource const& what, c
Peer* peer = self->getPeer(destination.address);
// If there isn't an open connection or public address, we can't send
if (peer->outgoingConnectionIdle && !destination.address.isPublic()) {
TEST(true); // Can't send to private address without an open connection
// If there isn't an open connection, a public address, or the peer isn't compatible, we can't send
if ((peer->outgoingConnectionIdle && !destination.address.isPublic()) || (!peer->compatible && destination.token != WLTOKEN_PING_PACKET)) {
TEST(true); // Can't send to private address without a compatible open connection
return (PacketID)NULL;
}

View File

@ -25,10 +25,10 @@
#include "fdbclient/MutationList.h"
#include "fdbclient/SystemData.h"
#include "fdbclient/BackupAgent.h"
#include "fdbclient/Notified.h"
#include "IKeyValueStore.h"
#include "LogSystem.h"
#include "LogProtocolMessage.h"
#include "flow/Notified.h"
static bool isMetadataMutation(MutationRef const& m) {
// FIXME: This is conservative - not everything in system keyspace is necessarily processed by applyMetadataMutations

View File

@ -482,11 +482,12 @@ struct DDTeamCollection {
int teamSize,
IRepPolicyRef replicationPolicy,
KeyValueStoreType storeType,
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > const& serverChanges )
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > const& serverChanges,
Future<Void> readyToStart )
:cx(cx), masterId(masterId), lock(lock), output(output), shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), doBuildTeams( true ), teamBuilder( Void() ),
teamSize( teamSize ), replicationPolicy(replicationPolicy), storeType( storeType ), serverChanges(serverChanges),
initialFailureReactionDelay( delay( BUGGIFY ? 0 : SERVER_KNOBS->INITIAL_FAILURE_REACTION_DELAY, TaskDataDistribution ) ), healthyTeamCount( 0 ),
initializationDoneActor(logOnCompletion(initialFailureReactionDelay, this)), optimalTeamCount( 0 ), recruitingStream(0), restartRecruiting( SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY ),
initializationDoneActor(logOnCompletion(readyToStart && initialFailureReactionDelay, this)), optimalTeamCount( 0 ), recruitingStream(0), restartRecruiting( SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY ),
unhealthyServers(0)
{
TraceEvent("DDTrackerStarting", masterId)
@ -1766,7 +1767,7 @@ ACTOR Future<Void> dataDistributionTeamCollection(
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > serverChanges,
Future<Void> readyToStart )
{
state DDTeamCollection self( cx, masterId, lock, output, shardsAffectedByTeamFailure, teamSize, replicationPolicy, storeType, serverChanges );
state DDTeamCollection self( cx, masterId, lock, output, shardsAffectedByTeamFailure, teamSize, replicationPolicy, storeType, serverChanges, readyToStart );
state Future<Void> loggingTrigger = Void();
state PromiseStream<Void> serverRemoved;
@ -2146,7 +2147,8 @@ DDTeamCollection* testTeamCollection(int teamSize, IRepPolicyRef policy, int pro
teamSize,
policy,
KeyValueStoreType(),
PromiseStream<std::pair<UID, Optional<StorageServerInterface>>>()
PromiseStream<std::pair<UID, Optional<StorageServerInterface>>>(),
Future<Void>(Void())
);
for(int id = 1; id <= processCount; id++) {

View File

@ -22,8 +22,8 @@
#include "IKeyValueStore.h"
#include "IDiskQueue.h"
#include "flow/IndexedSet.h"
#include "flow/Notified.h"
#include "flow/ActorCollection.h"
#include "fdbclient/Notified.h"
#include "fdbclient/SystemData.h"
#define OP_DISK_OVERHEAD (sizeof(OpHeader) + 1)
@ -705,4 +705,4 @@ IKeyValueStore* keyValueStoreMemory( std::string const& basename, UID logID, int
IKeyValueStore* keyValueStoreLogSystem( class IDiskQueue* queue, UID logID, int64_t memoryLimit, bool disableSnapshot ) {
return new KeyValueStoreMemory( queue, logID, memoryLimit, disableSnapshot );
}
}

View File

@ -31,9 +31,9 @@
#include "LogSystemDiskQueueAdapter.h"
#include "IKeyValueStore.h"
#include "fdbclient/SystemData.h"
#include "flow/Notified.h"
#include "fdbrpc/sim_validation.h"
#include "fdbrpc/batcher.actor.h"
#include "fdbclient/Notified.h"
#include "fdbclient/KeyRangeMap.h"
#include "ConflictSet.h"
#include "flow/Stats.h"

View File

@ -23,11 +23,11 @@
#include "flow/Stats.h"
#include "flow/UnitTest.h"
#include "fdbclient/NativeAPI.h"
#include "fdbclient/Notified.h"
#include "fdbclient/KeyRangeMap.h"
#include "fdbclient/SystemData.h"
#include "WorkerInterface.h"
#include "TLogInterface.h"
#include "flow/Notified.h"
#include "Knobs.h"
#include "IKeyValueStore.h"
#include "flow/ActorCollection.h"
@ -1448,4 +1448,4 @@ namespace oldTLog {
Void _ = wait( tLogStart( &self, LogSystemConfig(), Version(0), Version(0), std::vector<Tag>(), true, tli, ReplyPromise<TLogInterface>(), recoveryCount ) || removed );
throw internal_error(); // tLogStart doesn't return without an error
}
}
}

View File

@ -27,8 +27,8 @@
#elif !defined(FDBSERVER_ORDERER_ACTOR_H)
#define FDBSERVER_ORDERER_ACTOR_H
#include "fdbclient/Notified.h"
#include "flow/actorcompiler.h"
#include "flow/Notified.h"
template <class Seq>
class Orderer {
@ -71,4 +71,4 @@ private:
Promise<Void> shutdown; // Never set, only broken on destruction
};
#endif
#endif

View File

@ -23,11 +23,11 @@
#include "flow/Stats.h"
#include "flow/UnitTest.h"
#include "fdbclient/NativeAPI.h"
#include "fdbclient/Notified.h"
#include "fdbclient/KeyRangeMap.h"
#include "fdbclient/SystemData.h"
#include "WorkerInterface.h"
#include "TLogInterface.h"
#include "flow/Notified.h"
#include "Knobs.h"
#include "IKeyValueStore.h"
#include "flow/ActorCollection.h"
@ -338,13 +338,14 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
TLogInterface tli;
PromiseStream<Future<Void>> addActor;
TLogData* tLogData;
Future<Void> recovery;
explicit LogData(TLogData* tLogData, TLogInterface interf) : tLogData(tLogData), knownCommittedVersion(0), tli(interf), logId(interf.id()),
cc("TLog", interf.id().toString()),
bytesInput("bytesInput", cc),
bytesDurable("bytesDurable", cc),
// These are initialized differently on init() or recovery
recoveryCount(), stopped(false), initialized(false), queueCommittingVersion(0), newPersistentDataVersion(invalidVersion)
recoveryCount(), stopped(false), initialized(false), queueCommittingVersion(0), newPersistentDataVersion(invalidVersion), recovery(Void())
{
startRole(interf.id(), UID(), "TLog");
@ -387,6 +388,7 @@ ACTOR Future<Void> tLogLock( TLogData* self, ReplyPromise< TLogLockResult > repl
TraceEvent("TLogStop", logData->logId).detail("Ver", stopVersion).detail("isStopped", logData->stopped).detail("queueCommitted", logData->queueCommittedVersion.get());
logData->stopped = true;
logData->recovery = Void();
// Lock once the current version has been committed
Void _ = wait( logData->queueCommittedVersion.whenAtLeast( stopVersion ) );
@ -1197,6 +1199,7 @@ ACTOR Future<Void> serveTLogInterface( TLogData* self, TLogInterface tli, Refere
void removeLog( TLogData* self, Reference<LogData> logData ) {
TraceEvent("TLogRemoved", logData->logId).detail("input", logData->bytesInput.getValue()).detail("durable", logData->bytesDurable.getValue());
logData->stopped = true;
logData->recovery = Void();
logData->addActor = PromiseStream<Future<Void>>(); //there could be items still in the promise stream if one of the actors threw an error immediately
self->id_data.erase(logData->logId);
@ -1208,7 +1211,7 @@ void removeLog( TLogData* self, Reference<LogData> logData ) {
}
}
ACTOR Future<Void> tLogCore( TLogData* self, Reference<LogData> logData, Future<Void> recovery ) {
ACTOR Future<Void> tLogCore( TLogData* self, Reference<LogData> logData ) {
if(logData->removed.isReady()) {
Void _ = wait(delay(0)); //to avoid iterator invalidation in restorePersistentState when removed is already ready
ASSERT(logData->removed.isError());
@ -1229,12 +1232,12 @@ ACTOR Future<Void> tLogCore( TLogData* self, Reference<LogData> logData, Future<
state Future<Void> warningCollector = timeoutWarningCollector( warningCollectorInput.getFuture(), 1.0, "TLogQueueCommitSlow", self->dbgid );
state Future<Void> error = actorCollection( logData->addActor.getFuture() );
if( recovery.isValid() && !recovery.isReady()) {
logData->addActor.send( recovery );
if( logData->recovery.isValid() && !logData->recovery.isReady()) {
logData->addActor.send( logData->recovery );
}
logData->addActor.send( waitFailureServer(logData->tli.waitFailure.getFuture()) );
logData->addActor.send( respondToRecovered(logData->tli, recovery) );
logData->addActor.send( respondToRecovered(logData->tli, logData->recovery) );
logData->addActor.send( logData->removed );
//FIXME: update tlogMetrics to include new information, or possibly only have one copy for the shared instance
logData->addActor.send( traceCounters("TLogMetrics", logData->logId, SERVER_KNOBS->STORAGE_LOGGING_DELAY, &logData->cc, self->dbgid.toString() + "/TLogMetrics"));
@ -1436,7 +1439,7 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
TraceEvent("TLogZeroVersion", self->dbgid).detail("logId", it.first);
it.second->queueCommittedVersion.set(it.second->version.get());
}
self->sharedActors.send( tLogCore( self, it.second, Void() ) );
self->sharedActors.send( tLogCore( self, it.second ) );
}
return Void();
@ -1488,7 +1491,7 @@ ACTOR Future<Void> recoverTagFromLogSystem( TLogData* self, Reference<LogData> l
}
}
TraceEvent("LogRecoveringTagResults", logData->logId).detail("Tag", tag);
//TraceEvent("LogRecoveringTagResults", logData->logId).detail("Tag", tag);
Version ver = 0;
BinaryWriter wr( Unversioned() );
@ -1597,7 +1600,7 @@ ACTOR Future<Void> recoverFromLogSystem( TLogData* self, Reference<LogData> logD
committing = self->persistentData->commit();
commitTimeout = delay(SERVER_KNOBS->LONG_TLOG_COMMIT_TIME);
uncommittedBytes->set(0);
TraceEvent("TLogCommitRecoveryData", self->dbgid).detail("MemoryUsage", DEBUG_DETERMINISM ? 0 : getMemoryUsage());
//TraceEvent("TLogCommitRecoveryData", self->dbgid).detail("MemoryUsage", DEBUG_DETERMINISM ? 0 : getMemoryUsage());
}
when(Void _ = wait(uncommittedBytes->onChange())) {
if(uncommittedBytes->get() >= SERVER_KNOBS->LARGE_TLOG_COMMIT_BYTES)
@ -1617,7 +1620,6 @@ ACTOR Future<Void> recoverFromLogSystem( TLogData* self, Reference<LogData> logD
}
ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, LocalityData locality ) {
state Future<Void> recovery = Void();
state TLogInterface recruited;
recruited.locality = locality;
recruited.initEndpoints();
@ -1631,6 +1633,7 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
for(auto it : self->id_data) {
it.second->stopped = true;
it.second->recovery = Void();
}
state Reference<LogData> logData = Reference<LogData>( new LogData(self, recruited) );
@ -1658,7 +1661,7 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
state Promise<Void> copyComplete;
TraceEvent("TLogRecover", self->dbgid).detail("logId", logData->logId).detail("at", req.recoverAt).detail("known", req.knownCommittedVersion).detail("tags", describe(req.recoverTags));
recovery = recoverFromLogSystem( self, logData, req.recoverFrom, req.recoverAt, req.knownCommittedVersion, req.recoverTags, copyComplete );
logData->recovery = recoverFromLogSystem( self, logData, req.recoverFrom, req.recoverAt, req.knownCommittedVersion, req.recoverTags, copyComplete );
Void _ = wait(copyComplete.getFuture() || logData->removed );
} else {
// Brand new tlog, initialization has already been done by caller
@ -1683,7 +1686,7 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
req.reply.send( recruited );
Void _ = wait( tLogCore( self, logData, recovery ) );
Void _ = wait( tLogCore( self, logData ) );
return Void();
}
@ -1727,6 +1730,10 @@ ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQ
}
}
for( auto& it : self.id_data ) {
it.second->recovery = Void();
}
if (tlogTerminated( &self, persistentData, self.persistentQueue, e )) {
return Void();
} else {

View File

@ -294,7 +294,6 @@ Future<Void> tLog( class IKeyValueStore* const& persistentData, class IDiskQueue
Future<Void> debugQueryServer( DebugQueryRequest const& req );
Future<Void> monitorServerDBInfo( Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> const& ccInterface, Reference<ClusterConnectionFile> const&, LocalityData const&, Reference<AsyncVar<ServerDBInfo>> const& dbInfo );
Future<Void> resolver( ResolverInterface const& proxy, InitializeResolverRequest const&, Reference<AsyncVar<ServerDBInfo>> const& db );
Future<Void> runMetrics( Future<Database> const& fcx, Key const& metricsPrefix );
void registerThreadForProfiling();
void updateCpuProfiler(ProfilerRequest req);

View File

@ -48,7 +48,6 @@
<ClCompile Include="Knobs.cpp" />
<ActorCompiler Include="QuietDatabase.actor.cpp" />
<ActorCompiler Include="networktest.actor.cpp" />
<ActorCompiler Include="MetricLogger.actor.cpp" />
<ActorCompiler Include="workloads\SaveAndKill.actor.cpp" />
<ActorCompiler Include="Resolver.actor.cpp" />
<ActorCompiler Include="LogSystemDiskQueueAdapter.actor.cpp" />

View File

@ -24,9 +24,9 @@
#include "flow/Trace.h"
#include "fdbrpc/FailureMonitor.h"
#include "fdbclient/NativeAPI.h"
#include "fdbclient/Notified.h"
#include "fdbclient/SystemData.h"
#include "ConflictSet.h"
#include "flow/Notified.h"
#include "DataDistribution.h"
#include "Knobs.h"
#include <iterator>

View File

@ -29,12 +29,12 @@
#include "fdbclient/KeyRangeMap.h"
#include "fdbclient/SystemData.h"
#include "fdbclient/NativeAPI.h"
#include "fdbclient/Notified.h"
#include "fdbclient/MasterProxyInterface.h"
#include "fdbclient/DatabaseContext.h"
#include "WorkerInterface.h"
#include "TLogInterface.h"
#include "MoveKeys.h"
#include "flow/Notified.h"
#include "Knobs.h"
#include "WaitFailure.h"
#include "IKeyValueStore.h"

View File

@ -24,6 +24,7 @@
#include "flow/TDMetric.actor.h"
#include "fdbrpc/simulator.h"
#include "fdbclient/NativeAPI.h"
#include "fdbclient/MetricLogger.h"
#include "WorkerInterface.h"
#include "IKeyValueStore.h"
#include "WaitFailure.h"

View File

@ -52,7 +52,7 @@ using namespace boost::asio::ip;
// These impact both communications and the deserialization of certain zookeeper, database and IKeyValueStore keys
// xyzdev
// vvvv
uint64_t currentProtocolVersion = 0x0FDB00A550010001LL;
uint64_t currentProtocolVersion = 0x0FDB00A551000001LL;
uint64_t compatibleProtocolVersionMask = 0xffffffffffff0000LL;
uint64_t minValidProtocolVersion = 0x0FDB00A200060001LL;

View File

@ -71,7 +71,6 @@
<ClInclude Include="IThreadPool.h" />
<ClInclude Include="Knobs.h" />
<ClInclude Include="Net2Packet.h" />
<ClInclude Include="Notified.h" />
<ClInclude Include="serialize.h" />
<ClInclude Include="SimpleOpt.h" />
<ClInclude Include="Stats.h" />

View File

@ -64,9 +64,8 @@
<ClInclude Include="network.h" />
<ClInclude Include="AsioReactor.h" />
<ClInclude Include="Net2Packet.h" />
<ClInclude Include="Notified.h" />
</ItemGroup>
<ItemGroup>
<None Include="no_intellisense.opt" />
</ItemGroup>
</Project>
</Project>

View File

@ -37,3 +37,27 @@ flow/hgVersion.h: FORCE
lib/libflow.a: bin/coverage.flow.xml
ifeq ($(RELEASE),true)
FLOWVER = $(VERSION)
else
FLOWVER = $(VERSION)-PRERELEASE
endif
packages/flow-$(FLOWVER)-$(PLATFORM)-$(ARCH).tar.gz: flow
@echo "Packaging flow"
@rm -rf packages/flow-$(FLOWVER)-$(PLATFORM)-$(ARCH)
@mkdir -p packages/flow-$(FLOWVER)-$(PLATFORM)-$(ARCH)/bin packages/flow-$(FLOWVER)-$(PLATFORM)-$(ARCH)/lib packages/flow-$(FLOWVER)-$(PLATFORM)-$(ARCH)/include/flow
@cp lib/libflow.a packages/flow-$(FLOWVER)-$(PLATFORM)-$(ARCH)/lib
@cp bin/actorcompiler.exe packages/flow-$(FLOWVER)-$(PLATFORM)-$(ARCH)/bin
@find flow -name '*.h' -exec cp {} packages/flow-$(FLOWVER)-$(PLATFORM)-$(ARCH)/include/flow \;
@tar czf packages/flow-$(FLOWVER)-$(PLATFORM)-$(ARCH).tar.gz -C packages flow-$(FLOWVER)-$(PLATFORM)-$(ARCH)
@rm -rf packages/flow-$(FLOWVER)-$(PLATFORM)-$(ARCH)
FLOW: packages/flow-$(FLOWVER)-$(PLATFORM)-$(ARCH).tar.gz
FLOW_clean:
@echo "Cleaning flow"
@rm -rf packages/flow-*.tar.gz
packages: FLOW
packages_clean: FLOW_clean

View File

@ -21,10 +21,10 @@
package main
import (
"github.com/FoundationDB/fdb-go/fdb"
"github.com/FoundationDB/fdb-go/fdb/directory"
"github.com/FoundationDB/fdb-go/fdb/subspace"
"github.com/FoundationDB/fdb-go/fdb/tuple"
"github.com/apple/foundationdb/bindings/go/src/fdb"
"github.com/apple/foundationdb/bindings/go/src/fdb/directory"
"github.com/apple/foundationdb/bindings/go/src/fdb/subspace"
"github.com/apple/foundationdb/bindings/go/src/fdb/tuple"
"fmt"
)
@ -76,7 +76,7 @@ func read_blob(t fdb.ReadTransactor, blob_subspace subspace.Subspace) ([]byte, e
}
func main() {
fdb.MustAPIVersion(300)
fdb.MustAPIVersion(500)
db := fdb.MustOpenDefault()

View File

@ -21,44 +21,44 @@
package main
import (
"fmt"
"fmt"
"io/ioutil"
"math/rand"
"encoding/json"
"github.com/FoundationDB/fdb-go/fdb"
"github.com/FoundationDB/fdb-go/fdb/directory"
"github.com/FoundationDB/fdb-go/fdb/subspace"
"github.com/FoundationDB/fdb-go/fdb/tuple"
"github.com/apple/foundationdb/bindings/go/src/fdb"
"github.com/apple/foundationdb/bindings/go/src/fdb/directory"
"github.com/apple/foundationdb/bindings/go/src/fdb/subspace"
"github.com/apple/foundationdb/bindings/go/src/fdb/tuple"
)
func clear_subspace(trtr fdb.Transactor, sub subspace.Subspace) error {
_, err := trtr.Transact(func(tr fdb.Transaction) (interface{}, error) {
tr.ClearRange(sub)
return nil, nil
})
return err
_, err := trtr.Transact(func(tr fdb.Transaction) (interface{}, error) {
tr.ClearRange(sub)
return nil, nil
})
return err
}
func print_subspace(trtr fdb.Transactor, sub subspace.Subspace) {
trtr.Transact(func(tr fdb.Transaction) (interface{}, error) {
k := tr.GetRange(sub, fdb.RangeOptions{0, -1, false}).Iterator()
trtr.Transact(func(tr fdb.Transaction) (interface{}, error) {
k := tr.GetRange(sub, fdb.RangeOptions{0, -1, false}).Iterator()
for k.Advance() {
fmt.Println(_unpack(k.MustGet().Value))
}
return nil, nil
})
for k.Advance() {
fmt.Println(_unpack(k.MustGet().Value))
}
return nil, nil
})
}
func _pack(t interface{}) []byte {
return tuple.Tuple{t}.Pack()
return tuple.Tuple{t}.Pack()
}
func _unpack(t []byte) tuple.Tuple {
i, e := tuple.Unpack(t)
if e != nil {return nil}
return i
i, e := tuple.Unpack(t)
if e != nil {return nil}
return i
}
const EmptyObject int = -1
@ -67,16 +67,16 @@ const EmptyList int = -2
func ToTuples(item interface{}) []tuple.Tuple {
switch i := item.(type) {
case []interface{}:
if len(i) == 0 {return []tuple.Tuple{tuple.Tuple{EmptyList}}}
if len(i) == 0 {return []tuple.Tuple{tuple.Tuple{EmptyList}}}
tuples := make([]tuple.Tuple, 0)
for i, v := range i {
for _, t := range ToTuples(v) {
tuples = append(tuples, append(tuple.Tuple{i}, t...))
}
}
return tuples
return tuples
case map[string]interface{}:
if len(i) == 0 {return []tuple.Tuple{tuple.Tuple{EmptyObject}}}
if len(i) == 0 {return []tuple.Tuple{tuple.Tuple{EmptyObject}}}
tuples := make([]tuple.Tuple, 0)
for k, v := range i {
for _, t := range ToTuples(v) {
@ -91,44 +91,44 @@ func ToTuples(item interface{}) []tuple.Tuple {
}
func FromTuples(tuples []tuple.Tuple) interface{} {
//fmt.Println(tuples)
if len(tuples) == 0 {return nil}
first := tuples[0]
if len(first) == 1 {return first[0]}
if first[0] == EmptyObject {return make(map[string]interface{}, 0)}
if first[0] == EmptyList {return make([]interface{}, 0)}
//fmt.Println(tuples)
if len(tuples) == 0 {return nil}
first := tuples[0]
if len(first) == 1 {return first[0]}
if first[0] == EmptyObject {return make(map[string]interface{}, 0)}
if first[0] == EmptyList {return make([]interface{}, 0)}
group := make(map[string][]tuple.Tuple)
group := make(map[string][]tuple.Tuple)
for _, t := range tuples {
k := string(_pack(t[0]))
_, ok := group[k]
if !ok {group[k] = make([]tuple.Tuple, 0)}
group[k] = append(group[k], t[0:len(t)])
}
for _, t := range tuples {
k := string(_pack(t[0]))
_, ok := group[k]
if !ok {group[k] = make([]tuple.Tuple, 0)}
group[k] = append(group[k], t[0:len(t)])
}
switch first[0].(type) {
case int64:
res := make([]interface{}, 0)
for _, g := range group {
subtup := make([]tuple.Tuple, 0)
for _, t := range g {
subtup = append(subtup, t[1:len(t)])
}
res = append(res, FromTuples(subtup))
}
return res
default:
res := make(map[string]interface{})
for _, g := range group {
subtup := make([]tuple.Tuple, 0)
for _, t := range g {
subtup = append(subtup, t[1:len(t)])
}
res[g[0][0].(string)] = FromTuples(subtup)
}
return res
}
switch first[0].(type) {
case int64:
res := make([]interface{}, 0)
for _, g := range group {
subtup := make([]tuple.Tuple, 0)
for _, t := range g {
subtup = append(subtup, t[1:len(t)])
}
res = append(res, FromTuples(subtup))
}
return res
default:
res := make(map[string]interface{})
for _, g := range group {
subtup := make([]tuple.Tuple, 0)
for _, t := range g {
subtup = append(subtup, t[1:len(t)])
}
res[g[0][0].(string)] = FromTuples(subtup)
}
return res
}
}
type Doc struct {
@ -138,16 +138,16 @@ type Doc struct {
func (doc Doc) InsertDoc(trtr fdb.Transactor, docdata []byte) int {
var data interface{}
json.Unmarshal(docdata, &data)
docid := 0
docid := 0
switch d := data.(type) {
case map[string]interface{}:
temp, ok := d["doc_id"]
if !ok {
docid = doc._GetNewID(trtr)
d["doc_id"] = docid
} else {
docid = temp.(int)
}
docid = doc._GetNewID(trtr)
d["doc_id"] = docid
} else {
docid = temp.(int)
}
tuples := ToTuples(d)
trtr.Transact(func(tr fdb.Transaction) (interface{}, error) {
for _, t := range tuples {
@ -156,7 +156,7 @@ func (doc Doc) InsertDoc(trtr fdb.Transactor, docdata []byte) int {
return nil, nil
})
}
return docid
return docid
}
func (doc Doc) _GetNewID(trtr fdb.Transactor) int {
@ -178,49 +178,40 @@ func (doc Doc) _GetNewID(trtr fdb.Transactor) int {
}
func (doc Doc) GetDoc(trtr fdb.Transactor, doc_id int) interface{} {
tuples := make([]tuple.Tuple, 0)
trtr.ReadTransact(func(tr fdb.ReadTransaction) (interface{}, error) {
kr, err := fdb.PrefixRange(doc.DocSS.Pack(tuple.Tuple{doc_id}))
if err != nil {panic(err)}
tuples := make([]tuple.Tuple, 0)
trtr.ReadTransact(func(tr fdb.ReadTransaction) (interface{}, error) {
kr, err := fdb.PrefixRange(doc.DocSS.Pack(tuple.Tuple{doc_id}))
if err != nil {panic(err)}
items := tr.GetRange(kr, fdb.RangeOptions{}).Iterator()
items := tr.GetRange(kr, fdb.RangeOptions{}).Iterator()
for items.Advance() {
v := items.MustGet()
tup, err := doc.DocSS.Unpack(v.Key)
if err != nil {panic(err)}
tuples = append(tuples, append(tup[1:len(tup)], _unpack(v.Value)))
}
return nil, nil
})
for items.Advance() {
v := items.MustGet()
tup, err := doc.DocSS.Unpack(v.Key)
if err != nil {panic(err)}
tuples = append(tuples, append(tup[1:len(tup)], _unpack(v.Value)))
}
return nil, nil
})
return FromTuples(tuples)
return FromTuples(tuples)
}
func main() {
fdb.MustAPIVersion(300)
fdb.MustAPIVersion(500)
db := fdb.MustOpenDefault()
DocDemoDir, err := directory.CreateOrOpen(db, []string{"docdemo"}, nil)
if err != nil {panic(err)}
clear_subspace(db, DocDemoDir)
clear_subspace(db, DocDemoDir)
mydoc := Doc{DocDemoDir}
docdata, err := ioutil.ReadFile("./doctestjson.json")
id := mydoc.InsertDoc(db, docdata)
fmt.Println(mydoc.GetDoc(db, id), id)
fmt.Println(mydoc.GetDoc(db, id), id)
}

View File

@ -21,10 +21,10 @@
package main
import (
"github.com/FoundationDB/fdb-go/fdb"
"github.com/FoundationDB/fdb-go/fdb/directory"
"github.com/FoundationDB/fdb-go/fdb/subspace"
"github.com/FoundationDB/fdb-go/fdb/tuple"
"github.com/apple/foundationdb/bindings/go/src/fdb"
"github.com/apple/foundationdb/bindings/go/src/fdb/directory"
"github.com/apple/foundationdb/bindings/go/src/fdb/subspace"
"github.com/apple/foundationdb/bindings/go/src/fdb/tuple"
"log"
"fmt"
)
@ -116,7 +116,7 @@ func (graph *Graph) get_in_neighbors(trtr fdb.Transactor, node int) ([]int, erro
}
func main() {
fdb.MustAPIVersion(300)
fdb.MustAPIVersion(500)
db := fdb.MustOpenDefault()

View File

@ -21,20 +21,20 @@
package main
import (
"log"
"fmt"
"github.com/FoundationDB/fdb-go/fdb"
"github.com/FoundationDB/fdb-go/fdb/directory"
"github.com/FoundationDB/fdb-go/fdb/subspace"
"github.com/FoundationDB/fdb-go/fdb/tuple"
"log"
"fmt"
"github.com/apple/foundationdb/bindings/go/src/fdb"
"github.com/apple/foundationdb/bindings/go/src/fdb/directory"
"github.com/apple/foundationdb/bindings/go/src/fdb/subspace"
"github.com/apple/foundationdb/bindings/go/src/fdb/tuple"
)
func clear_subspace(trtr fdb.Transactor, sub subspace.Subspace) error {
_, err := trtr.Transact(func(tr fdb.Transaction) (interface{}, error) {
tr.ClearRange(sub)
return nil, nil
})
return err
_, err := trtr.Transact(func(tr fdb.Transaction) (interface{}, error) {
tr.ClearRange(sub)
return nil, nil
})
return err
}
func print_subspace(trtr fdb.Transactor, sub subspace.Subspace) {
@ -49,13 +49,13 @@ func print_subspace(trtr fdb.Transactor, sub subspace.Subspace) {
}
func _pack(t interface{}) []byte {
return tuple.Tuple{t}.Pack()
return tuple.Tuple{t}.Pack()
}
func _unpack(t []byte) tuple.Tuple {
i, e := tuple.Unpack(t)
if e != nil {return nil}
return i
i, e := tuple.Unpack(t)
if e != nil {return nil}
return i
}
type Workspace struct {
@ -87,7 +87,7 @@ func (wrkspc Workspace) Session(foo func(directory.DirectorySubspace)) (err erro
}
func main() {
fdb.MustAPIVersion(300)
fdb.MustAPIVersion(500)
db := fdb.MustOpenDefault()

View File

@ -24,10 +24,10 @@ import (
"fmt"
//"log"
//"time"
"github.com/FoundationDB/fdb-go/fdb"
"github.com/FoundationDB/fdb-go/fdb/directory"
"github.com/FoundationDB/fdb-go/fdb/subspace"
"github.com/FoundationDB/fdb-go/fdb/tuple"
"github.com/apple/foundationdb/bindings/go/src/fdb"
"github.com/apple/foundationdb/bindings/go/src/fdb/directory"
"github.com/apple/foundationdb/bindings/go/src/fdb/subspace"
"github.com/apple/foundationdb/bindings/go/src/fdb/tuple"
)
func clear_subspace(db fdb.Transactor, ss subspace.Subspace) {
@ -70,21 +70,21 @@ func (multi MultiMap) MultiSubtract(trtr fdb.Transactor, index, value interface{
}
func (multi MultiMap) MultiGet(tr fdb.ReadTransactor, index int) (ret []interface{}, e error) {
_, e = tr.ReadTransact(func(tr fdb.ReadTransaction) (interface{}, error) {
pr, err := fdb.PrefixRange(multi.MapSS.Pack(tuple.Tuple{index}))
if err != nil {return nil, err}
kvs := tr.GetRange(pr, fdb.RangeOptions{0, -1, false}).GetSliceOrPanic()
ret := make([]interface{}, len(kvs))
i := 0
for _, kv := range kvs {
temp, err := multi.MapSS.Unpack(kv.Key)
if err != nil {return nil, err}
ret[i] = temp[1]
i++
}
return nil, nil
})
return
_, e = tr.ReadTransact(func(tr fdb.ReadTransaction) (interface{}, error) {
pr, err := fdb.PrefixRange(multi.MapSS.Pack(tuple.Tuple{index}))
if err != nil {return nil, err}
kvs := tr.GetRange(pr, fdb.RangeOptions{0, -1, false}).GetSliceOrPanic()
ret := make([]interface{}, len(kvs))
i := 0
for _, kv := range kvs {
temp, err := multi.MapSS.Unpack(kv.Key)
if err != nil {return nil, err}
ret[i] = temp[1]
i++
}
return nil, nil
})
return
}
func (multi MultiMap) MultiGetCounts(trtr fdb.Transactor, index interface{}) (map[interface{}]int, error) {
@ -124,7 +124,7 @@ func (multi MultiMap) MultiIsElement(trtr fdb.Transactor, index, value interface
func main() {
fdb.MustAPIVersion(300)
fdb.MustAPIVersion(500)
db := fdb.MustOpenDefault()
@ -144,8 +144,3 @@ func main() {
fmt.Println(m.MultiIsElement(db, 1, 2))
fmt.Println(m.MultiGetCounts(db, 1))
}

View File

@ -21,31 +21,31 @@
package main
import (
"log"
"fmt"
"log"
"fmt"
"math/rand"
"github.com/FoundationDB/fdb-go/fdb"
"github.com/FoundationDB/fdb-go/fdb/directory"
"github.com/FoundationDB/fdb-go/fdb/subspace"
"github.com/FoundationDB/fdb-go/fdb/tuple"
"github.com/apple/foundationdb/bindings/go/src/fdb"
"github.com/apple/foundationdb/bindings/go/src/fdb/directory"
"github.com/apple/foundationdb/bindings/go/src/fdb/subspace"
"github.com/apple/foundationdb/bindings/go/src/fdb/tuple"
)
func clear_subspace(trtr fdb.Transactor, sub subspace.Subspace) error {
_, err := trtr.Transact(func(tr fdb.Transaction) (interface{}, error) {
tr.ClearRange(sub)
return nil, nil
})
return err
_, err := trtr.Transact(func(tr fdb.Transaction) (interface{}, error) {
tr.ClearRange(sub)
return nil, nil
})
return err
}
func _pack(t interface{}) []byte {
return tuple.Tuple{t}.Pack()
return tuple.Tuple{t}.Pack()
}
func _unpack(t []byte) tuple.Tuple {
i, e := tuple.Unpack(t)
if e != nil {return nil}
return i
i, e := tuple.Unpack(t)
if e != nil {return nil}
return i
}
type Priority struct {
@ -100,7 +100,7 @@ func (prty Priority) Peek(trtr fdb.Transactor, max bool) interface{} {
func main() {
fdb.MustAPIVersion(300)
fdb.MustAPIVersion(500)
db := fdb.MustOpenDefault()
@ -131,13 +131,3 @@ func main() {
fmt.Println(p.Pop(db, false))
fmt.Println(p.Pop(db, false))
}

View File

@ -21,10 +21,10 @@
package main
import (
"github.com/FoundationDB/fdb-go/fdb"
"github.com/FoundationDB/fdb-go/fdb/directory"
"github.com/FoundationDB/fdb-go/fdb/subspace"
"github.com/FoundationDB/fdb-go/fdb/tuple"
"github.com/apple/foundationdb/bindings/go/src/fdb"
"github.com/apple/foundationdb/bindings/go/src/fdb/directory"
"github.com/apple/foundationdb/bindings/go/src/fdb/subspace"
"github.com/apple/foundationdb/bindings/go/src/fdb/tuple"
"log"
"fmt"
)
@ -101,7 +101,7 @@ func (q *Queue) FirstItem(trtr fdb.Transactor) (interface{}, error) {
func main() {
fmt.Println("Queue Example Program")
fdb.MustAPIVersion(300)
fdb.MustAPIVersion(500)
db := fdb.MustOpenDefault()

View File

@ -23,18 +23,18 @@ package main
import (
"log"
"fmt"
"github.com/FoundationDB/fdb-go/fdb"
"github.com/FoundationDB/fdb-go/fdb/directory"
"github.com/FoundationDB/fdb-go/fdb/subspace"
"github.com/FoundationDB/fdb-go/fdb/tuple"
"github.com/apple/foundationdb/bindings/go/src/fdb"
"github.com/apple/foundationdb/bindings/go/src/fdb/directory"
"github.com/apple/foundationdb/bindings/go/src/fdb/subspace"
"github.com/apple/foundationdb/bindings/go/src/fdb/tuple"
)
func clear_subspace(trtr fdb.Transactor, sub subspace.Subspace) error {
_, err := trtr.Transact(func(tr fdb.Transaction) (interface{}, error) {
tr.ClearRange(sub)
return nil, nil
})
return err
_, err := trtr.Transact(func(tr fdb.Transaction) (interface{}, error) {
tr.ClearRange(sub)
return nil, nil
})
return err
}
func _pack(t interface{}) []byte {
@ -53,7 +53,7 @@ type Table struct {
func (tbl *Table) NewTable(ss subspace.Subspace) {
tbl.row = ss.Sub("row")
tbl.col = ss.Sub("col")
tbl.col = ss.Sub("col")
}
func (tbl Table) TableSetCell(trtr fdb.Transactor, row, column int, value interface{}) {
@ -128,9 +128,9 @@ func (tbl Table) TableGetCol(tr fdb.ReadTransactor, col int) ([]interface{}, err
}
func main() {
fdb.MustAPIVersion(300)
fdb.MustAPIVersion(500)
db := fdb.MustOpenDefault()
db := fdb.MustOpenDefault()
TableDemoDir, err := directory.CreateOrOpen(db, []string{"Graph"}, nil)
if err != nil {log.Fatal(err)}
@ -150,9 +150,3 @@ func main() {
g.TableSetRow(db, 1, "Hello", "World", "Again!", 1)
fmt.Println(g.TableGetRow(db, 1))
}