diff --git a/pool.go b/pool.go index 77fb17e..dc51088 100644 --- a/pool.go +++ b/pool.go @@ -2,6 +2,7 @@ package gophernaut import ( "bufio" + "bytes" "fmt" "io" "log" @@ -29,39 +30,34 @@ func copyToLog(dst *log.Logger, src io.Reader) { } } -func startProcess(control <-chan Event, events chan<- Event, executable string) { - procLog := log.New(os.Stdout, fmt.Sprintf("gopher-worker(%s) ", executable), log.Ldate|log.Ltime) - - commandParts := strings.Split(executable, " ") - command := exec.Command(commandParts[0], commandParts[1:]...) - log.Printf("Command: %v\n", command) - - stdout, err := command.StdoutPipe() - if err != nil { - procLog.Fatalln("Unable to connect to stdout from command...") - } - stderr, err := command.StderrPipe() +// http://stackoverflow.com/questions/31879817/golang-os-exec-realtime-memory-usage +func calculateMemory(pid int) (uint64, error) { + // only works on linux, mac os x doesn't have a proc fs + f, err := os.Open(fmt.Sprintf("/proc/%d/smaps", pid)) if err != nil { - procLog.Fatalln("Unable to connect to stderr from command...") + return 0, err } - - go copyToLog(procLog, stdout) - go copyToLog(procLog, stderr) - - // Actually start the subprocess and wait for it to startup - command.Start() - - // Push an event onto the channel for manage processes to track the new process - events <- Start - for { - _, ok := <-control - if !ok { - fmt.Println("Killing worker process after receiving close event.") - command.Process.Kill() - events <- Shutdown - break + defer f.Close() + + res := uint64(0) + pfx := []byte("Pss:") + r := bufio.NewScanner(f) + for r.Scan() { + line := r.Bytes() + if bytes.HasPrefix(line, pfx) { + var size uint64 + _, err := fmt.Sscanf(string(line[4:]), "%d", &size) + if err != nil { + return 0, err + } + res += size } } + if err := r.Err(); err != nil { + return 0, err + } + + return res, nil } // Worker ... @@ -70,6 +66,7 @@ type Worker struct { requestCount int pool *Pool busy bool + command *exec.Cmd } // StartRequest marks this work as busy @@ -83,7 +80,11 @@ func (w *Worker) StartRequest() { func (w *Worker) CompleteRequest() { w.busy = false w.pool.workerChannel <- w - log.Printf("Worker %s request %d complete!\n", w.Hostname, w.requestCount) + // if the process has ended you can use this... so not helpful to us now + // memoryUsage := w.command.ProcessState.SysUsage().(*syscall.Rusage).Maxrss + // Instead we need to do something like: + memory, _ := calculateMemory(w.command.Process.Pid) + log.Printf("Worker %s request %d complete! (%v)\n", w.Hostname, w.requestCount, memory) } // GetRequestCount accessor @@ -108,6 +109,46 @@ type Pool struct { eventsChannel chan Event } +func (p *Pool) startProcess(control <-chan Event, events chan<- Event, executable string, hostname string) { + procLog := log.New(os.Stdout, fmt.Sprintf("gopher-worker(%s) ", executable), log.Ldate|log.Ltime) + + commandParts := strings.Split(executable, " ") + command := exec.Command(commandParts[0], commandParts[1:]...) + log.Printf("Command: %v\n", command) + + stdout, err := command.StdoutPipe() + if err != nil { + procLog.Fatalln("Unable to connect to stdout from command...") + } + stderr, err := command.StderrPipe() + if err != nil { + procLog.Fatalln("Unable to connect to stderr from command...") + } + + go copyToLog(procLog, stdout) + go copyToLog(procLog, stderr) + + // Actually start the subprocess and wait for it to startup + command.Start() + log.Printf("creating worker...\n") + w := Worker{Hostname: hostname, pool: p, command: command} + log.Printf("Queuing worker...\n") + p.workerChannel <- &w + p.Workers = append(p.Workers, &w) + + // Push an event onto the channel for manage processes to track the new process + events <- Start + for { + _, ok := <-control + if !ok { + fmt.Println("Killing worker process after receiving close event.") + command.Process.Kill() + events <- Shutdown + break + } + } +} + // Start up the pool func (p *Pool) Start() { p.controlChannel = make(chan Event) @@ -126,16 +167,11 @@ func (p *Pool) Start() { signal.Stop(receivedSignals) }() - log.Printf("Starting processes... %d\n", len(p.Executables)) + log.Printf("Starting workers... %d\n", len(p.Executables)) // Actually start some processes for index, executable := range p.Executables { - log.Printf("creating worker...\n") - w := Worker{Hostname: p.Hostnames[index], pool: p} - log.Printf("Queuing worker...\n") - p.workerChannel <- &w - p.Workers = append(p.Workers, &w) log.Printf("Starting worker process...\n") - go startProcess(p.controlChannel, p.eventsChannel, executable) + go p.startProcess(p.controlChannel, p.eventsChannel, executable, p.Hostnames[index]) } }