Use an write-and-rename pattern when copying files for atomicity.

Restructure the usage of the mutex in the monitor class.
This commit is contained in:
John Brownlee 2021-09-17 16:26:05 -07:00
parent f8ec3cc27d
commit 1a5069a047
4 changed files with 67 additions and 41 deletions

View File

@ -4,8 +4,8 @@ This package provides a launcher program for running FoundationDB in Kubernetes.
To test this, run the following commands from the root of the FoundationDB
repository:
docker build -t foundationdb/foundationdb-kubernetes:latest --build-arg FDB_VERSION=6.3.13 --build-arg FDB_LIBRARY_VERSIONS="6.3.13 6.2.30 6.1.13" -f packaging/docker/kubernetes/Dockerfile .
docker build -t foundationdb/foundationdb-kubernetes:latest-sidecar --build-arg FDB_VERSION=6.3.15 --build-arg FDB_LIBRARY_VERSIONS="6.3.15 6.2.30 6.1.13" -f packaging/docker/kubernetes/Dockerfile .
docker build -t foundationdb/foundationdb-kubernetes:6.3.13-local --build-arg FDB_VERSION=6.3.13 --build-arg FDB_LIBRARY_VERSIONS="6.3.13 6.2.30 6.1.13" -f packaging/docker/kubernetes/Dockerfile .
docker build -t foundationdb/foundationdb-kubernetes:6.3.15-local --build-arg FDB_VERSION=6.3.15 --build-arg FDB_LIBRARY_VERSIONS="6.3.15 6.2.30 6.1.13" -f packaging/docker/kubernetes/Dockerfile .
kubectl apply -f packaging/docker/kubernetes/test_config.yaml
# Wait for the pods to become ready
ips=$(kubectl get pod -l app=fdb-kubernetes-example -o json | jq -j '[[.items|.[]|select(.status.podIP!="")]|limit(3;.[])|.status.podIP+":4501"]|join(",")')
@ -14,6 +14,8 @@ repository:
# Watch the logs for the fdb-kubernetes-example pods to confirm that they have launched the fdbserver processes.
kubectl exec -it sts/fdb-kubernetes-example -- fdbcli --exec "configure new double ssd"
This will set up a cluster in your Kubernetes environment using a statefulset, to provide a simple subset of what the Kubernetes operator does to set up the cluster.
You can then make changes to the data in the config map and update the fdbserver processes:
cat packaging/docker/kubernetes/test_config.yaml | sed -e "s/fdb.cluster: \"\"/fdb.cluster: \"test:test@$ips\"/" -e "s/\"serverCount\": 0/\"serverCount\": 1/" | kubectl apply -f -

View File

@ -21,17 +21,13 @@ package main
import (
"fmt"
"io"
"io/ioutil"
"os"
"path"
"github.com/go-logr/logr"
)
const (
bufferSize = 1024
)
// copyFile copies a file into the output directory.
func copyFile(logger logr.Logger, inputPath string, outputPath string, required bool) error {
logger.Info("Copying file", "inputPath", inputPath, "outputPath", outputPath)
@ -52,29 +48,34 @@ func copyFile(logger logr.Logger, inputPath string, outputPath string, required
return fmt.Errorf("File %s is empty", inputPath)
}
outputFile, err := os.OpenFile(outputPath, os.O_CREATE|os.O_WRONLY, inputInfo.Mode())
outputDir := path.Dir(outputPath)
tempFile, err := ioutil.TempFile(outputDir, "")
if err != nil {
return err
}
defer outputFile.Close()
defer tempFile.Close()
var buffer = make([]byte, bufferSize)
for {
readLength, readError := inputFile.Read(buffer)
if readError == io.EOF {
break
}
if readError != nil {
logger.Error(readError, "Error reading file", "path", inputPath)
return readError
_, err = tempFile.ReadFrom(inputFile)
if err != nil {
return err
}
_, writeError := outputFile.Write(buffer[:readLength])
if writeError != nil {
logger.Error(writeError, "Error writing file", "path", outputPath)
return writeError
err = tempFile.Close()
if err != nil {
return err
}
err = os.Chmod(tempFile.Name(), inputInfo.Mode())
if err != nil {
return err
}
err = os.Rename(tempFile.Name(), outputPath)
if err != nil {
return err
}
return nil
}

View File

@ -68,6 +68,8 @@ type Monitor struct {
ProcessIDs []int
// Mutex defines a mutex around working with configuration.
// This is used to synchronize access to local state like the active
// configuration and the process IDs from multiple goroutines.
Mutex sync.Mutex
// PodClient is a client for posting updates about this pod to
@ -137,9 +139,15 @@ func (monitor *Monitor) LoadConfiguration() {
return
}
monitor.Logger.Info("Received new configuration file", "configuration", configuration)
monitor.acceptConfiguration(configuration, configurationBytes)
}
// acceptConfiguration is called when the monitor process parses and accepts
// a configuration from the local config file.
func (monitor *Monitor) acceptConfiguration(configuration *ProcessConfiguration, configurationBytes []byte) {
monitor.Mutex.Lock()
defer monitor.Mutex.Unlock()
monitor.Logger.Info("Received new configuration file", "configuration", configuration)
if monitor.ProcessIDs == nil {
monitor.ProcessIDs = make([]int, configuration.ServerCount+1)
@ -161,7 +169,7 @@ func (monitor *Monitor) LoadConfiguration() {
}
}
err = monitor.PodClient.UpdateAnnotations(monitor)
err := monitor.PodClient.UpdateAnnotations(monitor)
if err != nil {
monitor.Logger.Error(err, "Error updating pod annotations")
}
@ -173,14 +181,9 @@ func (monitor *Monitor) RunProcess(processNumber int) {
logger := monitor.Logger.WithValues("processNumber", processNumber, "area", "RunProcess")
logger.Info("Starting run loop")
for {
monitor.Mutex.Lock()
if monitor.ActiveConfiguration.ServerCount < processNumber {
logger.Info("Terminating run loop")
monitor.ProcessIDs[processNumber] = 0
monitor.Mutex.Unlock()
if !monitor.checkProcessRequired(processNumber) {
return
}
monitor.Mutex.Unlock()
arguments, err := monitor.ActiveConfiguration.GenerateArguments(processNumber, monitor.CustomEnvironment)
if err != nil {
@ -220,9 +223,7 @@ func (monitor *Monitor) RunProcess(processNumber int) {
startTime := time.Now()
logger.Info("Subprocess started", "PID", pid)
monitor.Mutex.Lock()
monitor.ProcessIDs[processNumber] = pid
monitor.Mutex.Unlock()
monitor.updateProcessID(processNumber, pid)
if stdout != nil {
stdoutScanner := bufio.NewScanner(stdout)
@ -254,9 +255,7 @@ func (monitor *Monitor) RunProcess(processNumber int) {
logger.Info("Subprocess terminated", "exitCode", exitCode, "PID", pid)
endTime := time.Now()
monitor.Mutex.Lock()
monitor.ProcessIDs[processNumber] = -1
monitor.Mutex.Unlock()
monitor.updateProcessID(processNumber, -1)
processDuration := endTime.Sub(startTime)
if processDuration.Seconds() < errorBackoffSeconds {
@ -266,6 +265,30 @@ func (monitor *Monitor) RunProcess(processNumber int) {
}
}
// checkProcessRequired determines if the latest configuration requires that a
// process stay running.
// If the process is no longer desired, this will remove it from the process ID
// list and return false. If the process is still desired, this will return
// true.
func (monitor *Monitor) checkProcessRequired(processNumber int) bool {
monitor.Mutex.Lock()
defer monitor.Mutex.Unlock()
logger := monitor.Logger.WithValues("processNumber", processNumber, "area", "checkProcessRequired")
if monitor.ActiveConfiguration.ServerCount < processNumber {
logger.Info("Terminating run loop")
monitor.ProcessIDs[processNumber] = 0
return false
}
return true
}
// updateProcessID records a new Process ID from a newly launched process.
func (monitor *Monitor) updateProcessID(processNumber int, pid int) {
monitor.Mutex.Lock()
defer monitor.Mutex.Unlock()
monitor.ProcessIDs[processNumber] = pid
}
// WatchConfiguration detects changes to the monitor configuration file.
func (monitor *Monitor) WatchConfiguration(watcher *fsnotify.Watcher) {
for {

View File

@ -43,7 +43,7 @@ spec:
spec:
containers:
- name: foundationdb
image: foundationdb/foundationdb-kubernetes:latest
image: foundationdb/foundationdb-kubernetes:6.3.13-local
imagePullPolicy: IfNotPresent
args:
- --input-dir
@ -91,7 +91,7 @@ spec:
- name: logs
mountPath: /var/fdb/logs
- name: foundationdb-sidecar
image: foundationdb/foundationdb-kubernetes:latest-sidecar
image: foundationdb/foundationdb-kubernetes:6.3.15-local
imagePullPolicy: IfNotPresent
args:
- --mode