Merge pull request #5993 from brownleej/new-image-fixes

Updates to fdb-kubernetes-monitor to ease integration with the Kubernetes operator
This commit is contained in:
John Brownlee 2021-11-17 11:56:19 -08:00 committed by GitHub
commit 01c37a053a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 39 additions and 26 deletions

View File

@ -10,7 +10,7 @@ docker build -t foundationdb/foundationdb-kubernetes:6.3.15-local --build-arg FD
kubectl apply -f packaging/docker/kubernetes/test_config.yaml kubectl apply -f packaging/docker/kubernetes/test_config.yaml
# Wait for the pods to become ready # Wait for the pods to become ready
ips=$(kubectl get pod -l app=fdb-kubernetes-example -o json | jq -j '[[.items|.[]|select(.status.podIP!="")]|limit(3;.[])|.status.podIP+":4501"]|join(",")') ips=$(kubectl get pod -l app=fdb-kubernetes-example -o json | jq -j '[[.items|.[]|select(.status.podIP!="")]|limit(3;.[])|.status.podIP+":4501"]|join(",")')
sed -e "s/fdb.cluster: \"\"/fdb.cluster: \"test:test@$ips\"/" -e "s/\"serverCount\": 0/\"serverCount\": 1/" packaging/docker/kubernetes/test_config.yaml | kubectl apply -f - sed -e "s/fdb.cluster: \"\"/fdb.cluster: \"test:test@$ips\"/" -e "s/\"runProcesses\": false/\"runProcesses\": true/" packaging/docker/kubernetes/test_config.yaml | kubectl apply -f -
kubectl get pod -l app=fdb-kubernetes-example -o name | xargs -I {} kubectl annotate {} foundationdb.org/outdated-config-map-seen=$(date +%s) --overwrite kubectl get pod -l app=fdb-kubernetes-example -o name | xargs -I {} kubectl annotate {} foundationdb.org/outdated-config-map-seen=$(date +%s) --overwrite
# Watch the logs for the fdb-kubernetes-example pods to confirm that they have launched the fdbserver processes. # Watch the logs for the fdb-kubernetes-example pods to confirm that they have launched the fdbserver processes.
kubectl exec -it sts/fdb-kubernetes-example -- fdbcli --exec "configure new double ssd" kubectl exec -it sts/fdb-kubernetes-example -- fdbcli --exec "configure new double ssd"
@ -21,7 +21,7 @@ This will set up a cluster in your Kubernetes environment using a statefulset, t
You can then make changes to the data in the config map and update the fdbserver processes: You can then make changes to the data in the config map and update the fdbserver processes:
```bash ```bash
sed -e "s/fdb.cluster: \"\"/fdb.cluster: \"test:test@$ips\"/" -e "s/\"serverCount\": 0/\"serverCount\": 1/" packaging/docker/kubernetes/test_config.yaml | kubectl apply -f - sed -e "s/fdb.cluster: \"\"/fdb.cluster: \"test:test@$ips\"/" -e "s/\"runProcesses\": false/\"runProcesses\": true/" packaging/docker/kubernetes/test_config.yaml | kubectl apply -f -
# You can apply an annotation to speed up the propagation of config # You can apply an annotation to speed up the propagation of config
kubectl get pod -l app=fdb-kubernetes-example -o name | xargs -I {} kubectl annotate {} foundationdb.org/outdated-config-map-seen=$(date +%s) --overwrite kubectl get pod -l app=fdb-kubernetes-example -o name | xargs -I {} kubectl annotate {} foundationdb.org/outdated-config-map-seen=$(date +%s) --overwrite

View File

@ -1,6 +1,5 @@
{ {
"version": "6.3.15", "version": "6.3.15",
"serverCount": 1,
"arguments": [ "arguments": [
{"value": "--cluster_file"}, {"value": "--cluster_file"},
{"value": ".testdata/fdb.cluster"}, {"value": ".testdata/fdb.cluster"},

View File

@ -17,7 +17,7 @@
// limitations under the License. // limitations under the License.
// //
package main package api
import ( import (
"fmt" "fmt"
@ -31,8 +31,10 @@ type ProcessConfiguration struct {
// Version provides the version of FoundationDB the process should run. // Version provides the version of FoundationDB the process should run.
Version string `json:"version"` Version string `json:"version"`
// ServerCount defines the number of processes to start. // RunServers defines whether we should run the server processes.
ServerCount int `json:"serverCount,omitempty"` // This defaults to true, but you can set it to false to prevent starting
// new fdbserver processes.
RunServers *bool `json:"runServers,omitempty"`
// BinaryPath provides the path to the binary to launch. // BinaryPath provides the path to the binary to launch.
BinaryPath string `json:"-"` BinaryPath string `json:"-"`

View File

@ -17,7 +17,7 @@
// limitations under the License. // limitations under the License.
// //
package main package api
import ( import (
"encoding/json" "encoding/json"

View File

@ -24,8 +24,10 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"os" "os"
"path"
"strconv" "strconv"
"github.com/apple/foundationdb/fdbkubernetesmonitor/api"
"github.com/go-logr/logr" "github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -67,7 +69,7 @@ type PodClient struct {
} }
// CreatePodClient creates a new client for working with the pod object. // CreatePodClient creates a new client for working with the pod object.
func CreatePodClient() (*PodClient, error) { func CreatePodClient(logger logr.Logger) (*PodClient, error) {
config, err := rest.InClusterConfig() config, err := rest.InClusterConfig()
if err != nil { if err != nil {
return nil, err return nil, err
@ -83,7 +85,7 @@ func CreatePodClient() (*PodClient, error) {
return nil, err return nil, err
} }
podClient := &PodClient{podApi: podApi, pod: pod, TimestampFeed: make(chan int64, 10)} podClient := &PodClient{podApi: podApi, pod: pod, TimestampFeed: make(chan int64, 10), Logger: logger}
err = podClient.watchPod() err = podClient.watchPod()
if err != nil { if err != nil {
return nil, err return nil, err
@ -94,7 +96,7 @@ func CreatePodClient() (*PodClient, error) {
// retrieveEnvironmentVariables extracts the environment variables we have for // retrieveEnvironmentVariables extracts the environment variables we have for
// an argument into a map. // an argument into a map.
func retrieveEnvironmentVariables(argument Argument, target map[string]string) { func retrieveEnvironmentVariables(argument api.Argument, target map[string]string) {
if argument.Source != "" { if argument.Source != "" {
target[argument.Source] = os.Getenv(argument.Source) target[argument.Source] = os.Getenv(argument.Source)
} }
@ -112,6 +114,7 @@ func (client *PodClient) UpdateAnnotations(monitor *Monitor) error {
for _, argument := range monitor.ActiveConfiguration.Arguments { for _, argument := range monitor.ActiveConfiguration.Arguments {
retrieveEnvironmentVariables(argument, environment) retrieveEnvironmentVariables(argument, environment)
} }
environment["BINARY_DIR"] = path.Dir(monitor.ActiveConfiguration.BinaryPath)
jsonEnvironment, err := json.Marshal(environment) jsonEnvironment, err := json.Marshal(environment)
if err != nil { if err != nil {
return err return err
@ -180,7 +183,7 @@ func (client *PodClient) processPodUpdate(pod *corev1.Pod) {
} }
timestamp, err := strconv.ParseInt(annotation, 10, 64) timestamp, err := strconv.ParseInt(annotation, 10, 64)
if err != nil { if err != nil {
client.Logger.Error(err, "Error parsing annotation", "key", OutdatedConfigMapAnnotation, "rawAnnotation", annotation, err) client.Logger.Error(err, "Error parsing annotation", "key", OutdatedConfigMapAnnotation, "rawAnnotation", annotation)
return return
} }

View File

@ -51,6 +51,7 @@ var (
mainContainerVersion string mainContainerVersion string
currentContainerVersion string currentContainerVersion string
additionalEnvFile string additionalEnvFile string
processCount int
) )
type executionMode string type executionMode string
@ -78,6 +79,7 @@ func main() {
pflag.StringArrayVar(&requiredCopyFiles, "require-not-empty", nil, "When copying this file, exit with an error if the file is empty") pflag.StringArrayVar(&requiredCopyFiles, "require-not-empty", nil, "When copying this file, exit with an error if the file is empty")
pflag.StringVar(&mainContainerVersion, "main-container-version", "", "For sidecar mode, this specifies the version of the main container. If this is equal to the current container version, no files will be copied") pflag.StringVar(&mainContainerVersion, "main-container-version", "", "For sidecar mode, this specifies the version of the main container. If this is equal to the current container version, no files will be copied")
pflag.StringVar(&additionalEnvFile, "additional-env-file", "", "A file with additional environment variables to use when interpreting the monitor configuration") pflag.StringVar(&additionalEnvFile, "additional-env-file", "", "A file with additional environment variables to use when interpreting the monitor configuration")
pflag.IntVar(&processCount, "process-count", 1, "The number of processes to start")
pflag.Parse() pflag.Parse()
zapConfig := zap.NewProductionConfig() zapConfig := zap.NewProductionConfig()
@ -110,7 +112,7 @@ func main() {
logger.Error(err, "Error loading additional environment") logger.Error(err, "Error loading additional environment")
os.Exit(1) os.Exit(1)
} }
StartMonitor(logger, fmt.Sprintf("%s/%s", inputDir, monitorConfFile), customEnvironment) StartMonitor(logger, fmt.Sprintf("%s/%s", inputDir, monitorConfFile), customEnvironment, processCount)
case executionModeInit: case executionModeInit:
err = CopyFiles(logger, outputDir, copyDetails, requiredCopies) err = CopyFiles(logger, outputDir, copyDetails, requiredCopies)
if err != nil { if err != nil {
@ -124,9 +126,10 @@ func main() {
logger.Error(err, "Error copying files") logger.Error(err, "Error copying files")
os.Exit(1) os.Exit(1)
} }
done := make(chan bool)
<-done
} }
logger.Info("Waiting for process to be terminated")
done := make(chan bool)
<-done
default: default:
logger.Error(nil, "Unknown execution mode", "mode", mode) logger.Error(nil, "Unknown execution mode", "mode", mode)
os.Exit(1) os.Exit(1)

View File

@ -32,6 +32,7 @@ import (
"syscall" "syscall"
"time" "time"
"github.com/apple/foundationdb/fdbkubernetesmonitor/api"
"github.com/fsnotify/fsnotify" "github.com/fsnotify/fsnotify"
"github.com/go-logr/logr" "github.com/go-logr/logr"
) )
@ -52,7 +53,7 @@ type Monitor struct {
CustomEnvironment map[string]string CustomEnvironment map[string]string
// ActiveConfiguration defines the active process configuration. // ActiveConfiguration defines the active process configuration.
ActiveConfiguration *ProcessConfiguration ActiveConfiguration *api.ProcessConfiguration
// ActiveConfigurationBytes defines the source data for the active process // ActiveConfigurationBytes defines the source data for the active process
// configuration. // configuration.
@ -62,6 +63,9 @@ type Monitor struct {
// configuration file. // configuration file.
LastConfigurationTime time.Time LastConfigurationTime time.Time
// ProcessCount defines how many processes the
ProcessCount int
// ProcessIDs stores the PIDs of the processes that are running. A PID of // ProcessIDs stores the PIDs of the processes that are running. A PID of
// zero will indicate that a process does not have a run loop. A PID of -1 // zero will indicate that a process does not have a run loop. A PID of -1
// will indicate that a process has a run loop but is not currently running // will indicate that a process has a run loop but is not currently running
@ -82,8 +86,8 @@ type Monitor struct {
} }
// StartMonitor starts the monitor loop. // StartMonitor starts the monitor loop.
func StartMonitor(logger logr.Logger, configFile string, customEnvironment map[string]string) { func StartMonitor(logger logr.Logger, configFile string, customEnvironment map[string]string, processCount int) {
podClient, err := CreatePodClient() podClient, err := CreatePodClient(logger)
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -93,6 +97,7 @@ func StartMonitor(logger logr.Logger, configFile string, customEnvironment map[s
PodClient: podClient, PodClient: podClient,
Logger: logger, Logger: logger,
CustomEnvironment: customEnvironment, CustomEnvironment: customEnvironment,
ProcessCount: processCount,
} }
go func() { monitor.WatchPodTimestamps() }() go func() { monitor.WatchPodTimestamps() }()
@ -107,7 +112,7 @@ func (monitor *Monitor) LoadConfiguration() {
return return
} }
defer file.Close() defer file.Close()
configuration := &ProcessConfiguration{} configuration := &api.ProcessConfiguration{}
configurationBytes, err := io.ReadAll(file) configurationBytes, err := io.ReadAll(file)
if err != nil { if err != nil {
monitor.Logger.Error(err, "Error reading monitor configuration", "monitorConfigPath", monitor.ConfigFile) monitor.Logger.Error(err, "Error reading monitor configuration", "monitorConfigPath", monitor.ConfigFile)
@ -154,15 +159,15 @@ func checkOwnerExecutable(path string) error {
// acceptConfiguration is called when the monitor process parses and accepts // acceptConfiguration is called when the monitor process parses and accepts
// a configuration from the local config file. // a configuration from the local config file.
func (monitor *Monitor) acceptConfiguration(configuration *ProcessConfiguration, configurationBytes []byte) { func (monitor *Monitor) acceptConfiguration(configuration *api.ProcessConfiguration, configurationBytes []byte) {
monitor.Mutex.Lock() monitor.Mutex.Lock()
defer monitor.Mutex.Unlock() defer monitor.Mutex.Unlock()
monitor.Logger.Info("Received new configuration file", "configuration", configuration) monitor.Logger.Info("Received new configuration file", "configuration", configuration)
if monitor.ProcessIDs == nil { if monitor.ProcessIDs == nil {
monitor.ProcessIDs = make([]int, configuration.ServerCount+1) monitor.ProcessIDs = make([]int, monitor.ProcessCount+1)
} else { } else {
for len(monitor.ProcessIDs) <= configuration.ServerCount { for len(monitor.ProcessIDs) <= monitor.ProcessCount {
monitor.ProcessIDs = append(monitor.ProcessIDs, 0) monitor.ProcessIDs = append(monitor.ProcessIDs, 0)
} }
} }
@ -171,7 +176,7 @@ func (monitor *Monitor) acceptConfiguration(configuration *ProcessConfiguration,
monitor.ActiveConfigurationBytes = configurationBytes monitor.ActiveConfigurationBytes = configurationBytes
monitor.LastConfigurationTime = time.Now() monitor.LastConfigurationTime = time.Now()
for processNumber := 1; processNumber <= configuration.ServerCount; processNumber++ { for processNumber := 1; processNumber <= monitor.ProcessCount; processNumber++ {
if monitor.ProcessIDs[processNumber] == 0 { if monitor.ProcessIDs[processNumber] == 0 {
monitor.ProcessIDs[processNumber] = -1 monitor.ProcessIDs[processNumber] = -1
tempNumber := processNumber tempNumber := processNumber
@ -284,7 +289,8 @@ func (monitor *Monitor) checkProcessRequired(processNumber int) bool {
monitor.Mutex.Lock() monitor.Mutex.Lock()
defer monitor.Mutex.Unlock() defer monitor.Mutex.Unlock()
logger := monitor.Logger.WithValues("processNumber", processNumber, "area", "checkProcessRequired") logger := monitor.Logger.WithValues("processNumber", processNumber, "area", "checkProcessRequired")
if monitor.ActiveConfiguration.ServerCount < processNumber { runProcesses := monitor.ActiveConfiguration.RunServers
if monitor.ProcessCount < processNumber || (runProcesses != nil && !*runProcesses) {
logger.Info("Terminating run loop") logger.Info("Terminating run loop")
monitor.ProcessIDs[processNumber] = 0 monitor.ProcessIDs[processNumber] = 0
return false return false

View File

@ -26,7 +26,7 @@ FROM golang:1.16.7-bullseye AS go-build
COPY fdbkubernetesmonitor/ /fdbkubernetesmonitor COPY fdbkubernetesmonitor/ /fdbkubernetesmonitor
WORKDIR /fdbkubernetesmonitor WORKDIR /fdbkubernetesmonitor
RUN go build -o /fdb-kubernetes-monitor ./... RUN go build -o /fdb-kubernetes-monitor *.go
# Build the main image # Build the main image

View File

@ -138,7 +138,7 @@ data:
fdb.cluster: "" fdb.cluster: ""
config.json: | config.json: |
{ {
"serverCount": 0, "runProcesses": false,
"version": "6.3.13", "version": "6.3.13",
"arguments": [ "arguments": [
{"value": "--cluster_file"}, {"value": "--cluster_file"},
@ -239,7 +239,7 @@ spec:
emptyDir: {} emptyDir: {}
initContainers: initContainers:
- name: foundationdb-kubernetes-init - name: foundationdb-kubernetes-init
image: foundationdb/foundationdb-kubernetes:latest image: foundationdb/foundationdb-kubernetes:6.3.13-local
imagePullPolicy: IfNotPresent imagePullPolicy: IfNotPresent
args: args:
- "--mode" - "--mode"