2021-08-22 11:40:20 +08:00
// monitor.go
//
// This source file is part of the FoundationDB open source project
//
2024-06-18 00:15:36 +08:00
// Copyright 2021-2024 Apple Inc. and the FoundationDB project authors
2021-08-22 11:40:20 +08:00
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package main
import (
2021-08-23 12:19:10 +08:00
"bufio"
2024-04-13 01:09:57 +08:00
"context"
2024-06-14 23:39:52 +08:00
"crypto/tls"
2021-08-22 11:40:20 +08:00
"encoding/json"
2024-06-18 00:15:36 +08:00
"errors"
2021-09-22 03:12:43 +08:00
"fmt"
2021-08-22 11:40:20 +08:00
"io"
2022-06-22 15:07:48 +08:00
"net/http"
"net/http/pprof"
2021-08-22 11:40:20 +08:00
"os"
"os/exec"
"os/signal"
2021-08-23 15:31:18 +08:00
"path"
2024-06-24 23:01:23 +08:00
"strconv"
2024-06-18 00:15:36 +08:00
"strings"
2021-08-22 11:40:20 +08:00
"sync"
"syscall"
"time"
2021-11-13 03:58:38 +08:00
"github.com/apple/foundationdb/fdbkubernetesmonitor/api"
2024-06-14 23:59:39 +08:00
"github.com/apple/foundationdb/fdbkubernetesmonitor/internal/certloader"
2021-08-22 11:40:20 +08:00
"github.com/fsnotify/fsnotify"
2021-08-23 12:19:10 +08:00
"github.com/go-logr/logr"
2024-06-18 00:15:36 +08:00
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/utils/pointer"
2021-08-22 11:40:20 +08:00
)
2024-06-18 00:15:36 +08:00
const (
// maxErrorBackoffSeconds is the maximum time to wait after a process fails before starting another process.
// The actual delay will be based on the observed errors and will increase until maxErrorBackoffSeconds is hit.
maxErrorBackoffSeconds = 60 * time . Second
// fdbClusterFilePath defines the default path to the fdb cluster file that contains the current connection string.
// This file is managed by the fdbserver processes itself and they will automatically update the file if the
// coordinators have changed.
fdbClusterFilePath = "/var/fdb/data/fdb.cluster"
)
2021-08-22 11:40:20 +08:00
2024-06-24 23:01:23 +08:00
// monitor provides the main monitor loop
type monitor struct {
// configFile defines the path to the config file to load.
configFile string
2021-08-22 11:40:20 +08:00
2024-06-24 23:01:23 +08:00
// currentContainerVersion defines the version of the container. This will be the same as the fdbserver version.
currentContainerVersion api . Version
2024-04-13 01:09:57 +08:00
2024-06-24 23:01:23 +08:00
// customEnvironment defines the custom environment variables to use when
2021-08-23 16:11:25 +08:00
// interpreting the monitor configuration.
2024-06-24 23:01:23 +08:00
customEnvironment map [ string ] string
2021-08-22 11:40:20 +08:00
2024-06-24 23:01:23 +08:00
// activeConfiguration defines the active process configuration.
activeConfiguration * api . ProcessConfiguration
2021-08-22 11:40:20 +08:00
2024-06-24 23:01:23 +08:00
// activeConfigurationBytes defines the source data for the active process
2021-08-22 11:40:20 +08:00
// configuration.
2024-06-24 23:01:23 +08:00
activeConfigurationBytes [ ] byte
2021-08-22 11:40:20 +08:00
2024-06-24 23:01:23 +08:00
// lastConfigurationTime is the last time we successfully reloaded the
2021-08-22 16:28:09 +08:00
// configuration file.
2024-06-24 23:01:23 +08:00
lastConfigurationTime time . Time
2021-08-22 16:28:09 +08:00
2024-06-24 23:01:23 +08:00
// processCount defines how many processes the
processCount int
2021-11-13 03:58:38 +08:00
2024-06-24 23:01:23 +08:00
// processIDs stores the PIDs of the processes that are running. A PID of
2021-08-22 11:40:20 +08:00
// zero will indicate that a process does not have a run loop. A PID of -1
// will indicate that a process has a run loop but is not currently running
// the subprocess.
2024-06-24 23:01:23 +08:00
processIDs [ ] int
2021-08-22 11:40:20 +08:00
2024-06-24 23:01:23 +08:00
// mutex defines a mutex around working with configuration.
2021-09-18 07:26:05 +08:00
// This is used to synchronize access to local state like the active
// configuration and the process IDs from multiple goroutines.
2024-06-24 23:01:23 +08:00
mutex sync . Mutex
2021-08-22 16:28:09 +08:00
2024-06-24 23:01:23 +08:00
// podClient is a client for posting updates about this pod to
2021-08-22 16:28:09 +08:00
// Kubernetes.
2024-06-24 23:01:23 +08:00
podClient * kubernetesClient
2021-08-23 12:19:10 +08:00
2024-06-24 23:01:23 +08:00
// logger is the logger instance for this monitor.
logger logr . Logger
2024-06-18 00:15:36 +08:00
// metrics represents the prometheus monitor metrics.
metrics * metrics
2021-08-22 11:40:20 +08:00
}
2024-06-05 18:51:48 +08:00
type httpConfig struct {
2024-06-14 23:39:52 +08:00
listenAddr , certPath , keyPath , rootCaPath string
2024-06-05 18:51:48 +08:00
}
2024-06-24 23:01:23 +08:00
// startMonitor starts the monitor loop.
func startMonitor ( ctx context . Context , logger logr . Logger , configFile string , customEnvironment map [ string ] string , processCount int , promConfig httpConfig , enableDebug bool , currentContainerVersion api . Version , enableNodeWatcher bool ) {
client , err := createPodClient ( ctx , logger , enableNodeWatcher , setupCache )
2021-08-22 16:28:09 +08:00
if err != nil {
2024-04-13 01:09:57 +08:00
logger . Error ( err , "could not create Pod client" )
os . Exit ( 1 )
2021-08-22 16:28:09 +08:00
}
2024-06-24 23:01:23 +08:00
mon := & monitor {
configFile : configFile ,
podClient : client ,
logger : logger ,
customEnvironment : customEnvironment ,
processCount : processCount ,
processIDs : make ( [ ] int , processCount + 1 ) ,
currentContainerVersion : currentContainerVersion ,
2021-08-22 16:28:09 +08:00
}
2024-06-24 23:01:23 +08:00
go func ( ) { mon . watchPodTimestamps ( ) } ( )
2022-06-22 15:07:48 +08:00
mux := http . NewServeMux ( )
// Enable pprof endpoints for debugging purposes.
if enableDebug {
mux . Handle ( "/debug/pprof/heap" , pprof . Handler ( "heap" ) )
mux . Handle ( "/debug/pprof/goroutine" , pprof . Handler ( "goroutine" ) )
mux . Handle ( "/debug/pprof/threadcreate" , pprof . Handler ( "threadcreate" ) )
mux . Handle ( "/debug/pprof/allocs" , pprof . Handler ( "allocs" ) )
mux . Handle ( "/debug/pprof/block" , pprof . Handler ( "block" ) )
mux . Handle ( "/debug/pprof/mutex" , pprof . Handler ( "mutex" ) )
mux . HandleFunc ( "/debug/pprof/" , pprof . Index )
mux . HandleFunc ( "/debug/pprof/profile" , pprof . Profile )
mux . HandleFunc ( "/debug/pprof/symbol" , pprof . Symbol )
mux . HandleFunc ( "/debug/pprof/cmdline" , pprof . Cmdline )
mux . HandleFunc ( "/debug/pprof/trace" , pprof . Trace )
}
2024-06-18 00:15:36 +08:00
reg := prometheus . NewRegistry ( )
// Enable the default go metrics.
reg . MustRegister ( collectors . NewGoCollector ( ) )
monitorMetrics := registerMetrics ( reg )
2024-06-24 23:01:23 +08:00
mon . metrics = monitorMetrics
2024-06-18 00:15:36 +08:00
promHandler := promhttp . HandlerFor ( reg , promhttp . HandlerOpts { } )
2022-06-22 15:07:48 +08:00
// Add Prometheus support
2024-06-18 00:15:36 +08:00
mux . Handle ( "/metrics" , promHandler )
2022-06-22 15:07:48 +08:00
go func ( ) {
2024-06-14 23:39:52 +08:00
if promConfig . keyPath != "" || promConfig . certPath != "" {
certLoader := certloader . NewCertLoader ( logger , promConfig . certPath , promConfig . keyPath )
tlsConfig := & tls . Config {
GetCertificate : certLoader . GetCertificate ,
}
server := & http . Server {
Addr : promConfig . listenAddr ,
Handler : mux ,
TLSConfig : tlsConfig ,
}
err = server . ListenAndServeTLS ( "" , "" )
2024-06-05 18:51:48 +08:00
if err != nil {
logger . Error ( err , "could not start HTTPS server" )
os . Exit ( 1 )
}
}
err := http . ListenAndServe ( promConfig . listenAddr , mux )
2022-06-22 15:07:48 +08:00
if err != nil {
logger . Error ( err , "could not start HTTP server" )
os . Exit ( 1 )
}
} ( )
2024-06-24 23:01:23 +08:00
mon . run ( )
2021-08-22 11:40:20 +08:00
}
2024-06-18 00:15:36 +08:00
// updateCustomEnvironment will add the node labels and their values to the custom environment map. All the generated
// environment variables will start with NODE_LABEL and "/" and "." will be replaced in the key as "_", e.g. from the
// label "foundationdb.org/testing = awesome" the env variables NODE_LABEL_FOUNDATIONDB_ORG_TESTING = awesome" will be
// generated.
2024-06-24 23:01:23 +08:00
func ( monitor * monitor ) updateCustomEnvironmentFromNodeMetadata ( ) {
if monitor . podClient . nodeMetadata == nil {
2024-06-18 00:15:36 +08:00
return
}
2024-06-24 23:01:23 +08:00
nodeLabels := monitor . podClient . nodeMetadata . Labels
2024-06-18 00:15:36 +08:00
for key , value := range nodeLabels {
sanitizedKey := strings . ReplaceAll ( key , "/" , "_" )
sanitizedKey = strings . ReplaceAll ( sanitizedKey , "." , "_" )
envKey := "NODE_LABEL_" + strings . ToUpper ( sanitizedKey )
2024-06-24 23:01:23 +08:00
currentValue , ok := monitor . customEnvironment [ envKey ]
2024-06-18 00:15:36 +08:00
if ! ok {
2024-06-24 23:01:23 +08:00
monitor . logger . Info ( "adding new custom environment variable from node labels" , "key" , envKey , "value" , value )
monitor . customEnvironment [ envKey ] = value
2024-06-18 00:15:36 +08:00
continue
}
if currentValue == value {
continue
}
2024-06-24 23:01:23 +08:00
monitor . logger . Info ( "update custom environment variable from node labels" , "key" , envKey , "newValue" , value , "currentValue" , currentValue )
monitor . customEnvironment [ envKey ] = value
2024-06-18 00:15:36 +08:00
continue
}
}
2024-06-24 23:01:23 +08:00
// readConfiguration reads the latest configuration from the monitor file.
func ( monitor * monitor ) readConfiguration ( ) ( * api . ProcessConfiguration , [ ] byte ) {
file , err := os . Open ( monitor . configFile )
2021-08-22 11:40:20 +08:00
if err != nil {
2024-06-24 23:01:23 +08:00
monitor . logger . Error ( err , "Error reading monitor config file" , "monitorConfigPath" , monitor . configFile )
return nil , nil
2021-08-22 11:40:20 +08:00
}
2024-06-18 00:15:36 +08:00
defer func ( ) {
err := file . Close ( )
2024-06-24 23:01:23 +08:00
monitor . logger . Error ( err , "Error could not close file" , "monitorConfigPath" , monitor . configFile )
2024-06-18 00:15:36 +08:00
} ( )
2021-11-13 03:58:38 +08:00
configuration := & api . ProcessConfiguration { }
2021-08-22 11:40:20 +08:00
configurationBytes , err := io . ReadAll ( file )
if err != nil {
2024-06-24 23:01:23 +08:00
monitor . logger . Error ( err , "Error reading monitor configuration" , "monitorConfigPath" , monitor . configFile )
2021-08-22 11:40:20 +08:00
}
err = json . Unmarshal ( configurationBytes , configuration )
if err != nil {
2024-06-24 23:01:23 +08:00
monitor . logger . Error ( err , "Error parsing monitor configuration" , "rawConfiguration" , string ( configurationBytes ) )
return nil , nil
}
if configuration . Version == nil {
monitor . logger . Error ( err , "Error could not parse configured version" , "rawConfiguration" , string ( configurationBytes ) )
return nil , nil
2021-08-22 11:40:20 +08:00
}
2024-06-24 23:01:23 +08:00
// If the versions are protocol compatible don't try to point to another binary path. Otherwise, the processes will
// cannot restart when a process crashes during a patch upgrade.
if monitor . currentContainerVersion . IsProtocolCompatible ( * configuration . Version ) {
2021-08-23 16:11:25 +08:00
configuration . BinaryPath = fdbserverPath
2021-08-23 15:31:18 +08:00
} else {
2024-06-24 23:01:23 +08:00
configuration . BinaryPath = path . Join ( sharedBinaryDir , configuration . Version . String ( ) , "fdbserver" )
2021-08-23 15:31:18 +08:00
}
2021-09-22 03:12:43 +08:00
err = checkOwnerExecutable ( configuration . BinaryPath )
2021-08-23 15:31:18 +08:00
if err != nil {
2024-06-24 23:01:23 +08:00
monitor . logger . Error ( err , "Error with binary path for latest configuration" , "configuration" , configuration , "binaryPath" , configuration . BinaryPath )
return nil , nil
2021-08-23 15:31:18 +08:00
}
2024-06-18 00:15:36 +08:00
monitor . updateCustomEnvironmentFromNodeMetadata ( )
2024-06-24 23:01:23 +08:00
_ , err = configuration . GenerateArguments ( 1 , monitor . customEnvironment )
2021-08-22 11:40:20 +08:00
if err != nil {
2024-06-24 23:01:23 +08:00
monitor . logger . Error ( err , "Error generating arguments for latest configuration" , "configuration" , configuration , "binaryPath" , configuration . BinaryPath )
return nil , nil
}
if configuration . ShouldRunServers ( ) {
// In case that the process is isolated we don't want to start the servers and we should terminate the running fdbserver
// instances.
if monitor . processIsIsolated ( ) {
configuration . RunServers = pointer . Bool ( false )
}
}
return configuration , configurationBytes
}
// loadConfiguration loads the latest configuration from the config file.
func ( monitor * monitor ) loadConfiguration ( ) {
configuration , configurationBytes := monitor . readConfiguration ( )
if configuration == nil || len ( configurationBytes ) == 0 {
2021-08-22 11:40:20 +08:00
return
}
2021-09-18 07:26:05 +08:00
monitor . acceptConfiguration ( configuration , configurationBytes )
2024-07-04 17:28:14 +08:00
// Always update the annotations if needed to handle cases where the fdb-kubernetes-monitor
// has loaded the new configuration but was not able to update the annotations.
err := monitor . podClient . updateAnnotations ( monitor )
if err != nil {
monitor . logger . Error ( err , "Error updating pod annotations" )
}
2021-09-18 07:26:05 +08:00
}
2021-09-22 03:12:43 +08:00
// checkOwnerExecutable validates that a path is a file that exists and is
// executable by its owner.
func checkOwnerExecutable ( path string ) error {
binaryStat , err := os . Stat ( path )
if err != nil {
return err
}
if binaryStat . Mode ( ) & 0 o100 == 0 {
2024-04-13 01:09:57 +08:00
return fmt . Errorf ( "binary is not executable" )
2021-09-22 03:12:43 +08:00
}
return nil
}
2021-09-18 07:26:05 +08:00
// acceptConfiguration is called when the monitor process parses and accepts
// a configuration from the local config file.
2024-06-24 23:01:23 +08:00
func ( monitor * monitor ) acceptConfiguration ( configuration * api . ProcessConfiguration , configurationBytes [ ] byte ) {
monitor . mutex . Lock ( )
defer monitor . mutex . Unlock ( )
2021-08-22 11:40:20 +08:00
2024-06-18 00:15:36 +08:00
// If the configuration hasn't changed ignore those events to prevent noisy logging.
2024-06-24 23:01:23 +08:00
if equality . Semantic . DeepEqual ( monitor . activeConfiguration , configuration ) {
2024-06-18 00:15:36 +08:00
return
2021-08-22 11:40:20 +08:00
}
2024-06-24 23:01:23 +08:00
monitor . logger . Info ( "Received new configuration file" , "configuration" , configuration )
monitor . activeConfiguration = configuration
monitor . activeConfigurationBytes = configurationBytes
monitor . lastConfigurationTime = time . Now ( )
2024-06-18 00:15:36 +08:00
// Update the prometheus metrics.
2024-06-24 23:01:23 +08:00
monitor . metrics . registerConfigurationChange ( configuration . Version . String ( ) )
2021-08-22 11:40:20 +08:00
2024-06-24 23:01:23 +08:00
var hasRunningProcesses bool
for processNumber := 1 ; processNumber <= monitor . processCount ; processNumber ++ {
if monitor . processIDs [ processNumber ] == 0 {
monitor . processIDs [ processNumber ] = - 1
2021-08-22 11:40:20 +08:00
tempNumber := processNumber
2024-06-24 23:01:23 +08:00
go func ( ) { monitor . runProcess ( tempNumber ) } ( )
continue
2021-08-22 11:40:20 +08:00
}
2024-06-24 23:01:23 +08:00
hasRunningProcesses = true
}
// If the monitor has running processes but the processes shouldn't be running, kill them with SIGTERM.
if hasRunningProcesses && ! monitor . activeConfiguration . ShouldRunServers ( ) {
monitor . sendSignalToProcesses ( syscall . SIGTERM )
2021-08-22 11:40:20 +08:00
}
}
2024-06-12 21:08:39 +08:00
// getBackoffDuration returns the backoff duration. The backoff time will increase exponential with a maximum of 60 seconds.
func getBackoffDuration ( errorCounter int ) time . Duration {
timeToBackoff := time . Duration ( errorCounter * errorCounter ) * time . Second
if timeToBackoff > maxErrorBackoffSeconds {
return maxErrorBackoffSeconds
}
return timeToBackoff
}
2024-06-24 23:01:23 +08:00
// runProcess runs a loop to continually start and watch a process.
func ( monitor * monitor ) runProcess ( processNumber int ) {
2021-08-23 12:19:10 +08:00
pid := 0
2024-06-24 23:01:23 +08:00
logger := monitor . logger . WithValues ( "processNumber" , processNumber , "area" , "runProcess" )
2021-08-23 12:19:10 +08:00
logger . Info ( "Starting run loop" )
2024-06-12 21:08:39 +08:00
startTime := time . Now ( )
// Counts the successive errors that occurred during process start up. Based on the error count the backoff time
// will be calculated.
var errorCounter int
2021-08-22 11:40:20 +08:00
for {
2024-06-18 00:15:36 +08:00
if ! monitor . processRequired ( processNumber ) {
2021-08-22 13:11:53 +08:00
return
}
2024-06-12 21:08:39 +08:00
durationSinceLastStart := time . Since ( startTime )
// If for more than 5 minutes no error have occurred we reset the error counter to reset the backoff time.
if durationSinceLastStart > 5 * time . Minute {
errorCounter = 0
}
2024-06-24 23:01:23 +08:00
arguments , err := monitor . activeConfiguration . GenerateArguments ( processNumber , monitor . customEnvironment )
2021-08-22 11:40:20 +08:00
if err != nil {
2024-06-12 21:08:39 +08:00
backoffDuration := getBackoffDuration ( errorCounter )
2024-06-24 23:01:23 +08:00
logger . Error ( err , "Error generating arguments for subprocess" , "configuration" , monitor . activeConfiguration , "errorCounter" , errorCounter , "backoffDuration" , backoffDuration . String ( ) )
2024-06-12 21:08:39 +08:00
time . Sleep ( backoffDuration )
errorCounter ++
continue
2021-08-22 11:40:20 +08:00
}
cmd := exec . Cmd {
2021-08-23 12:19:10 +08:00
Path : arguments [ 0 ] ,
Args : arguments ,
}
logger . Info ( "Starting subprocess" , "arguments" , arguments )
stdout , err := cmd . StdoutPipe ( )
if err != nil {
logger . Error ( err , "Error getting stdout from subprocess" )
}
stderr , err := cmd . StderrPipe ( )
if err != nil {
logger . Error ( err , "Error getting stderr from subprocess" )
2021-08-22 11:40:20 +08:00
}
err = cmd . Start ( )
if err != nil {
2024-06-12 21:08:39 +08:00
backoffDuration := getBackoffDuration ( errorCounter )
logger . Error ( err , "Error starting subprocess" , "backoffDuration" , backoffDuration . String ( ) )
time . Sleep ( backoffDuration )
errorCounter ++
2021-08-22 11:40:20 +08:00
continue
}
2024-06-18 00:15:36 +08:00
// Update the prometheus metrics for the process.
2024-06-24 23:01:23 +08:00
monitor . metrics . registerProcessStartup ( processNumber , monitor . activeConfiguration . Version . String ( ) )
2024-06-18 00:15:36 +08:00
2021-08-23 12:19:10 +08:00
if cmd . Process != nil {
pid = cmd . Process . Pid
} else {
2021-09-22 03:12:43 +08:00
logger . Error ( nil , "No Process information available for subprocess" )
2021-08-23 12:19:10 +08:00
}
2024-06-12 21:08:39 +08:00
startTime = time . Now ( )
2021-08-23 12:19:10 +08:00
logger . Info ( "Subprocess started" , "PID" , pid )
2021-09-18 07:26:05 +08:00
monitor . updateProcessID ( processNumber , pid )
2021-08-22 11:40:20 +08:00
2021-08-23 12:19:10 +08:00
if stdout != nil {
stdoutScanner := bufio . NewScanner ( stdout )
go func ( ) {
for stdoutScanner . Scan ( ) {
logger . Info ( "Subprocess output" , "msg" , stdoutScanner . Text ( ) , "PID" , pid )
}
} ( )
}
if stderr != nil {
stderrScanner := bufio . NewScanner ( stderr )
go func ( ) {
for stderrScanner . Scan ( ) {
logger . Error ( nil , "Subprocess error log" , "msg" , stderrScanner . Text ( ) , "PID" , pid )
}
} ( )
}
2021-08-22 11:40:20 +08:00
2021-08-23 12:19:10 +08:00
err = cmd . Wait ( )
2021-08-22 11:40:20 +08:00
if err != nil {
2021-08-23 12:19:10 +08:00
logger . Error ( err , "Error from subprocess" , "PID" , pid )
}
exitCode := - 1
if cmd . ProcessState != nil {
exitCode = cmd . ProcessState . ExitCode ( )
2021-08-22 11:40:20 +08:00
}
2024-06-12 21:08:39 +08:00
processDuration := time . Since ( startTime )
logger . Info ( "Subprocess terminated" , "exitCode" , exitCode , "PID" , pid , "lastExecutionDurationSeconds" , processDuration . String ( ) )
2021-09-18 07:26:05 +08:00
monitor . updateProcessID ( processNumber , - 1 )
2021-08-22 11:40:20 +08:00
2024-06-12 21:08:39 +08:00
// Only backoff if the exit code is non-zero.
if exitCode != 0 {
backoffDuration := getBackoffDuration ( errorCounter )
logger . Info ( "Backing off from restarting subprocess" , "backoffDuration" , backoffDuration . String ( ) , "lastExecutionDurationSeconds" , processDuration . String ( ) , "errorCounter" , errorCounter , "exitCode" , exitCode )
time . Sleep ( backoffDuration )
errorCounter ++
2021-08-23 12:19:10 +08:00
}
2021-08-22 11:40:20 +08:00
}
}
2024-06-18 00:15:36 +08:00
// processRequired determines if the latest configuration requires that a
2021-09-18 07:26:05 +08:00
// process stay running.
// If the process is no longer desired, this will remove it from the process ID
// list and return false. If the process is still desired, this will return
// true.
2024-06-24 23:01:23 +08:00
func ( monitor * monitor ) processRequired ( processNumber int ) bool {
monitor . mutex . Lock ( )
defer monitor . mutex . Unlock ( )
logger := monitor . logger . WithValues ( "processNumber" , processNumber , "area" , "processRequired" )
if monitor . processCount < processNumber || ! monitor . activeConfiguration . ShouldRunServers ( ) {
if monitor . processIDs [ processNumber ] != 0 {
2024-06-18 00:15:36 +08:00
logger . Info ( "Terminating run loop" )
2024-06-24 23:01:23 +08:00
monitor . processIDs [ processNumber ] = 0
2024-06-18 00:15:36 +08:00
}
2021-09-18 07:26:05 +08:00
return false
}
2022-06-16 02:41:01 +08:00
2021-09-18 07:26:05 +08:00
return true
}
2024-06-24 23:01:23 +08:00
// processIsIsolated returns true if the IsolateProcessGroupAnnotation is set to "true".
func ( monitor * monitor ) processIsIsolated ( ) bool {
if monitor . podClient . podMetadata == nil {
return false
}
if monitor . podClient . podMetadata . Annotations == nil {
return false
}
val , ok := monitor . podClient . podMetadata . Annotations [ api . IsolateProcessGroupAnnotation ]
if ! ok {
return false
}
isolated , err := strconv . ParseBool ( val )
if err != nil {
monitor . logger . Error ( err , "could not parse the value of the %s annotation" , api . IsolateProcessGroupAnnotation )
return false
}
return isolated
}
2021-09-18 07:26:05 +08:00
// updateProcessID records a new Process ID from a newly launched process.
2024-06-24 23:01:23 +08:00
func ( monitor * monitor ) updateProcessID ( processNumber int , pid int ) {
monitor . mutex . Lock ( )
defer monitor . mutex . Unlock ( )
monitor . processIDs [ processNumber ] = pid
2021-09-18 07:26:05 +08:00
}
2024-06-24 23:01:23 +08:00
// watchConfiguration detects changes to the monitor configuration file.
func ( monitor * monitor ) watchConfiguration ( watcher * fsnotify . Watcher ) {
2021-08-22 11:40:20 +08:00
for {
select {
case event , ok := <- watcher . Events :
if ! ok {
return
}
2024-06-18 00:15:36 +08:00
2024-06-24 23:01:23 +08:00
monitor . logger . Info ( "Detected event on monitor conf file or cluster file" , "event" , event )
2021-08-22 13:11:53 +08:00
if event . Op & fsnotify . Write == fsnotify . Write || event . Op & fsnotify . Create == fsnotify . Create {
2024-06-18 00:15:36 +08:00
monitor . handleFileChange ( event . Name )
2021-08-22 13:11:53 +08:00
} else if event . Op & fsnotify . Remove == fsnotify . Remove {
2024-06-18 00:15:36 +08:00
err := watcher . Add ( event . Name )
2021-08-22 13:11:53 +08:00
if err != nil {
panic ( err )
}
2024-06-18 00:15:36 +08:00
monitor . handleFileChange ( event . Name )
2021-08-22 11:40:20 +08:00
}
case err , ok := <- watcher . Errors :
if ! ok {
return
}
2024-06-24 23:01:23 +08:00
monitor . logger . Error ( err , "Error watching for file system events" )
2021-08-22 11:40:20 +08:00
}
}
}
2024-06-18 00:15:36 +08:00
// handleFileChange will perform the required action based on the changed/modified file.
2024-06-24 23:01:23 +08:00
func ( monitor * monitor ) handleFileChange ( changedFile string ) {
2024-06-18 00:15:36 +08:00
if changedFile == fdbClusterFilePath {
2024-06-24 23:01:23 +08:00
err := monitor . podClient . updateFdbClusterTimestampAnnotation ( )
2024-06-18 00:15:36 +08:00
if err != nil {
2024-06-24 23:01:23 +08:00
monitor . logger . Error ( err , fmt . Sprintf ( "could not update %s annotation" , api . ClusterFileChangeDetectedAnnotation ) )
2024-06-18 00:15:36 +08:00
}
return
}
2024-06-24 23:01:23 +08:00
monitor . loadConfiguration ( )
2024-06-18 00:15:36 +08:00
}
2024-06-24 23:01:23 +08:00
func ( monitor * monitor ) sendSignalToProcesses ( signal os . Signal ) {
for processNumber , processID := range monitor . processIDs {
if processID <= 0 {
continue
}
subprocessLogger := monitor . logger . WithValues ( "processNumber" , processNumber , "PID" , processID )
process , err := os . FindProcess ( processID )
if err != nil {
subprocessLogger . Error ( err , "Error finding subprocess" )
continue
}
subprocessLogger . Info ( "Sending signal to subprocess" , "signal" , signal )
err = process . Signal ( signal )
if err != nil {
subprocessLogger . Error ( err , "Error signaling subprocess" )
continue
}
}
}
// run runs the monitor loop.
func ( monitor * monitor ) run ( ) {
2021-08-22 11:40:20 +08:00
done := make ( chan bool , 1 )
signals := make ( chan os . Signal , 1 )
signal . Notify ( signals , syscall . SIGINT , syscall . SIGTERM )
go func ( ) {
latestSignal := <- signals
2024-06-24 23:01:23 +08:00
monitor . logger . Info ( "Received system signal" , "signal" , latestSignal )
2024-06-18 00:15:36 +08:00
2024-06-24 23:01:23 +08:00
// Reset the processCount to 0 to make sure the monitor doesn't try to restart the processes.
monitor . processCount = 0
monitor . sendSignalToProcesses ( latestSignal )
2024-04-13 01:09:57 +08:00
2024-06-24 23:01:23 +08:00
annotations := monitor . podClient . podMetadata . Annotations
2024-04-13 01:09:57 +08:00
if len ( annotations ) > 0 {
2024-06-24 23:01:23 +08:00
delayValue , ok := annotations [ api . DelayShutdownAnnotation ]
2024-04-13 01:09:57 +08:00
if ok {
delay , err := time . ParseDuration ( delayValue )
if err == nil {
time . Sleep ( delay )
}
}
}
2021-08-22 11:40:20 +08:00
done <- true
} ( )
2024-06-24 23:01:23 +08:00
monitor . loadConfiguration ( )
2021-08-22 11:40:20 +08:00
watcher , err := fsnotify . NewWatcher ( )
if err != nil {
panic ( err )
}
2024-06-24 23:01:23 +08:00
monitor . logger . Info ( "adding watch for file" , "path" , path . Base ( monitor . configFile ) )
err = watcher . Add ( monitor . configFile )
2021-08-22 11:40:20 +08:00
if err != nil {
panic ( err )
}
2022-06-16 02:41:01 +08:00
defer func ( watcher * fsnotify . Watcher ) {
err := watcher . Close ( )
if err != nil {
2024-06-24 23:01:23 +08:00
monitor . logger . Error ( err , "could not close watcher" )
2022-06-16 02:41:01 +08:00
}
} ( watcher )
2024-06-24 23:01:23 +08:00
go func ( ) { monitor . watchConfiguration ( watcher ) } ( )
2021-08-22 11:40:20 +08:00
2024-06-18 00:15:36 +08:00
// The cluster file will be created and managed by the fdbserver processes, so we have to wait until the fdbserver
2024-06-18 17:02:09 +08:00
// processes have been started. Except for the initial cluster creation this file should be present as soon as the
2024-06-18 00:15:36 +08:00
// monitor starts the processes.
for {
_ , err = os . Stat ( fdbClusterFilePath )
if errors . Is ( err , os . ErrNotExist ) {
2024-06-24 23:01:23 +08:00
monitor . logger . Info ( "waiting for file to be created" , "path" , fdbClusterFilePath )
2024-06-18 00:15:36 +08:00
time . Sleep ( 5 * time . Second )
continue
}
2024-06-24 23:01:23 +08:00
monitor . logger . Info ( "adding watch for file" , "path" , fdbClusterFilePath )
2024-06-18 00:15:36 +08:00
err = watcher . Add ( fdbClusterFilePath )
if err != nil {
panic ( err )
}
break
}
2021-08-22 11:40:20 +08:00
<- done
}
2021-08-22 16:28:09 +08:00
2024-06-24 23:01:23 +08:00
// watchPodTimestamps watches the timestamp feed to reload the configuration.
func ( monitor * monitor ) watchPodTimestamps ( ) {
for timestamp := range monitor . podClient . TimestampFeed {
if timestamp > monitor . lastConfigurationTime . Unix ( ) {
monitor . loadConfiguration ( )
2021-08-22 16:28:09 +08:00
}
}
}