foundationdb/fdbkubernetesmonitor/monitor.go

386 lines
11 KiB
Go

// monitor.go
//
// This source file is part of the FoundationDB open source project
//
// Copyright 2021 Apple Inc. and the FoundationDB project authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package main
import (
"bufio"
"encoding/json"
"fmt"
"io"
"os"
"os/exec"
"os/signal"
"path"
"sync"
"syscall"
"time"
"github.com/apple/foundationdb/fdbkubernetesmonitor/api"
"github.com/fsnotify/fsnotify"
"github.com/go-logr/logr"
)
// errorBackoffSeconds is the time to wait after a process fails before starting
// another process.
// This delay will only be applied when there has been more than one failure
// within this time window.
const errorBackoffSeconds = 60
// Monitor provides the main monitor loop
type Monitor struct {
// ConfigFile defines the path to the config file to load.
ConfigFile string
// CustomEnvironment defines the custom environment variables to use when
// interpreting the monitor configuration.
CustomEnvironment map[string]string
// ActiveConfiguration defines the active process configuration.
ActiveConfiguration *api.ProcessConfiguration
// ActiveConfigurationBytes defines the source data for the active process
// configuration.
ActiveConfigurationBytes []byte
// LastConfigurationTime is the last time we successfully reloaded the
// configuration file.
LastConfigurationTime time.Time
// ProcessCount defines how many processes the
ProcessCount int
// ProcessIDs stores the PIDs of the processes that are running. A PID of
// zero will indicate that a process does not have a run loop. A PID of -1
// will indicate that a process has a run loop but is not currently running
// the subprocess.
ProcessIDs []int
// Mutex defines a mutex around working with configuration.
// This is used to synchronize access to local state like the active
// configuration and the process IDs from multiple goroutines.
Mutex sync.Mutex
// PodClient is a client for posting updates about this pod to
// Kubernetes.
PodClient *PodClient
// Logger is the logger instance for this monitor.
Logger logr.Logger
}
// StartMonitor starts the monitor loop.
func StartMonitor(logger logr.Logger, configFile string, customEnvironment map[string]string, processCount int) {
podClient, err := CreatePodClient(logger)
if err != nil {
panic(err)
}
monitor := &Monitor{
ConfigFile: configFile,
PodClient: podClient,
Logger: logger,
CustomEnvironment: customEnvironment,
ProcessCount: processCount,
}
go func() { monitor.WatchPodTimestamps() }()
monitor.Run()
}
// LoadConfiguration loads the latest configuration from the config file.
func (monitor *Monitor) LoadConfiguration() {
file, err := os.Open(monitor.ConfigFile)
if err != nil {
monitor.Logger.Error(err, "Error reading monitor config file", "monitorConfigPath", monitor.ConfigFile)
return
}
defer file.Close()
configuration := &api.ProcessConfiguration{}
configurationBytes, err := io.ReadAll(file)
if err != nil {
monitor.Logger.Error(err, "Error reading monitor configuration", "monitorConfigPath", monitor.ConfigFile)
}
err = json.Unmarshal(configurationBytes, configuration)
if err != nil {
monitor.Logger.Error(err, "Error parsing monitor configuration", "rawConfiguration", string(configurationBytes))
return
}
if currentContainerVersion == configuration.Version {
configuration.BinaryPath = fdbserverPath
} else {
configuration.BinaryPath = path.Join(sharedBinaryDir, configuration.Version, "fdbserver")
}
err = checkOwnerExecutable(configuration.BinaryPath)
if err != nil {
monitor.Logger.Error(err, "Error with binary path for latest configuration", "configuration", configuration, "binaryPath", configuration.BinaryPath)
return
}
_, err = configuration.GenerateArguments(1, monitor.CustomEnvironment)
if err != nil {
monitor.Logger.Error(err, "Error generating arguments for latest configuration", "configuration", configuration, "binaryPath", configuration.BinaryPath)
return
}
monitor.acceptConfiguration(configuration, configurationBytes)
}
// checkOwnerExecutable validates that a path is a file that exists and is
// executable by its owner.
func checkOwnerExecutable(path string) error {
binaryStat, err := os.Stat(path)
if err != nil {
return err
}
if binaryStat.Mode()&0o100 == 0 {
return fmt.Errorf("Binary is not executable")
}
return nil
}
// acceptConfiguration is called when the monitor process parses and accepts
// a configuration from the local config file.
func (monitor *Monitor) acceptConfiguration(configuration *api.ProcessConfiguration, configurationBytes []byte) {
monitor.Mutex.Lock()
defer monitor.Mutex.Unlock()
monitor.Logger.Info("Received new configuration file", "configuration", configuration)
if monitor.ProcessIDs == nil {
monitor.ProcessIDs = make([]int, monitor.ProcessCount+1)
} else {
for len(monitor.ProcessIDs) <= monitor.ProcessCount {
monitor.ProcessIDs = append(monitor.ProcessIDs, 0)
}
}
monitor.ActiveConfiguration = configuration
monitor.ActiveConfigurationBytes = configurationBytes
monitor.LastConfigurationTime = time.Now()
for processNumber := 1; processNumber <= monitor.ProcessCount; processNumber++ {
if monitor.ProcessIDs[processNumber] == 0 {
monitor.ProcessIDs[processNumber] = -1
tempNumber := processNumber
go func() { monitor.RunProcess(tempNumber) }()
}
}
err := monitor.PodClient.UpdateAnnotations(monitor)
if err != nil {
monitor.Logger.Error(err, "Error updating pod annotations")
}
}
// RunProcess runs a loop to continually start and watch a process.
func (monitor *Monitor) RunProcess(processNumber int) {
pid := 0
logger := monitor.Logger.WithValues("processNumber", processNumber, "area", "RunProcess")
logger.Info("Starting run loop")
for {
if !monitor.checkProcessRequired(processNumber) {
return
}
arguments, err := monitor.ActiveConfiguration.GenerateArguments(processNumber, monitor.CustomEnvironment)
if err != nil {
logger.Error(err, "Error generating arguments for subprocess", "configuration", monitor.ActiveConfiguration)
time.Sleep(errorBackoffSeconds * time.Second)
}
cmd := exec.Cmd{
Path: arguments[0],
Args: arguments,
}
logger.Info("Starting subprocess", "arguments", arguments)
stdout, err := cmd.StdoutPipe()
if err != nil {
logger.Error(err, "Error getting stdout from subprocess")
}
stderr, err := cmd.StderrPipe()
if err != nil {
logger.Error(err, "Error getting stderr from subprocess")
}
err = cmd.Start()
if err != nil {
logger.Error(err, "Error starting subprocess")
time.Sleep(errorBackoffSeconds * time.Second)
continue
}
if cmd.Process != nil {
pid = cmd.Process.Pid
} else {
logger.Error(nil, "No Process information available for subprocess")
}
startTime := time.Now()
logger.Info("Subprocess started", "PID", pid)
monitor.updateProcessID(processNumber, pid)
if stdout != nil {
stdoutScanner := bufio.NewScanner(stdout)
go func() {
for stdoutScanner.Scan() {
logger.Info("Subprocess output", "msg", stdoutScanner.Text(), "PID", pid)
}
}()
}
if stderr != nil {
stderrScanner := bufio.NewScanner(stderr)
go func() {
for stderrScanner.Scan() {
logger.Error(nil, "Subprocess error log", "msg", stderrScanner.Text(), "PID", pid)
}
}()
}
err = cmd.Wait()
if err != nil {
logger.Error(err, "Error from subprocess", "PID", pid)
}
exitCode := -1
if cmd.ProcessState != nil {
exitCode = cmd.ProcessState.ExitCode()
}
logger.Info("Subprocess terminated", "exitCode", exitCode, "PID", pid)
endTime := time.Now()
monitor.updateProcessID(processNumber, -1)
processDuration := endTime.Sub(startTime)
if processDuration.Seconds() < errorBackoffSeconds {
logger.Info("Backing off from restarting subprocess", "backOffTimeSeconds", errorBackoffSeconds, "lastExecutionDurationSeconds", processDuration)
time.Sleep(errorBackoffSeconds * time.Second)
}
}
}
// checkProcessRequired determines if the latest configuration requires that a
// process stay running.
// If the process is no longer desired, this will remove it from the process ID
// list and return false. If the process is still desired, this will return
// true.
func (monitor *Monitor) checkProcessRequired(processNumber int) bool {
monitor.Mutex.Lock()
defer monitor.Mutex.Unlock()
logger := monitor.Logger.WithValues("processNumber", processNumber, "area", "checkProcessRequired")
runProcesses := monitor.ActiveConfiguration.RunServers
if monitor.ProcessCount < processNumber || (runProcesses != nil && !*runProcesses) {
logger.Info("Terminating run loop")
monitor.ProcessIDs[processNumber] = 0
return false
}
return true
}
// updateProcessID records a new Process ID from a newly launched process.
func (monitor *Monitor) updateProcessID(processNumber int, pid int) {
monitor.Mutex.Lock()
defer monitor.Mutex.Unlock()
monitor.ProcessIDs[processNumber] = pid
}
// WatchConfiguration detects changes to the monitor configuration file.
func (monitor *Monitor) WatchConfiguration(watcher *fsnotify.Watcher) {
for {
select {
case event, ok := <-watcher.Events:
if !ok {
return
}
monitor.Logger.Info("Detected event on monitor conf file", "event", event)
if event.Op&fsnotify.Write == fsnotify.Write || event.Op&fsnotify.Create == fsnotify.Create {
monitor.LoadConfiguration()
} else if event.Op&fsnotify.Remove == fsnotify.Remove {
err := watcher.Add(monitor.ConfigFile)
if err != nil {
panic(err)
}
monitor.LoadConfiguration()
}
case err, ok := <-watcher.Errors:
if !ok {
return
}
monitor.Logger.Error(err, "Error watching for file system events")
}
}
}
// Run runs the monitor loop.
func (monitor *Monitor) Run() {
done := make(chan bool, 1)
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
go func() {
latestSignal := <-signals
monitor.Logger.Info("Received system signal", "signal", latestSignal)
for processNumber, processID := range monitor.ProcessIDs {
if processID > 0 {
subprocessLogger := monitor.Logger.WithValues("processNumber", processNumber, "PID", processID)
process, err := os.FindProcess(processID)
if err != nil {
subprocessLogger.Error(err, "Error finding subprocess")
continue
}
subprocessLogger.Info("Sending signal to subprocess", "signal", latestSignal)
err = process.Signal(latestSignal)
if err != nil {
subprocessLogger.Error(err, "Error signaling subprocess")
continue
}
}
}
done <- true
}()
monitor.LoadConfiguration()
watcher, err := fsnotify.NewWatcher()
if err != nil {
panic(err)
}
err = watcher.Add(monitor.ConfigFile)
if err != nil {
panic(err)
}
defer watcher.Close()
go func() { monitor.WatchConfiguration(watcher) }()
<-done
}
func (monitor *Monitor) WatchPodTimestamps() {
for timestamp := range monitor.PodClient.TimestampFeed {
if timestamp > monitor.LastConfigurationTime.Unix() {
monitor.LoadConfiguration()
}
}
}