Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 73 additions & 37 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gophernaut

import (
"bufio"
"bytes"
"fmt"
"io"
"log"
Expand Down Expand Up @@ -29,39 +30,34 @@ func copyToLog(dst *log.Logger, src io.Reader) {
}
}

func startProcess(control <-chan Event, events chan<- Event, executable string) {
procLog := log.New(os.Stdout, fmt.Sprintf("gopher-worker(%s) ", executable), log.Ldate|log.Ltime)

commandParts := strings.Split(executable, " ")
command := exec.Command(commandParts[0], commandParts[1:]...)
log.Printf("Command: %v\n", command)

stdout, err := command.StdoutPipe()
if err != nil {
procLog.Fatalln("Unable to connect to stdout from command...")
}
stderr, err := command.StderrPipe()
// http://stackoverflow.com/questions/31879817/golang-os-exec-realtime-memory-usage
func calculateMemory(pid int) (uint64, error) {
// only works on linux, mac os x doesn't have a proc fs
f, err := os.Open(fmt.Sprintf("/proc/%d/smaps", pid))
if err != nil {
procLog.Fatalln("Unable to connect to stderr from command...")
return 0, err
}

go copyToLog(procLog, stdout)
go copyToLog(procLog, stderr)

// Actually start the subprocess and wait for it to startup
command.Start()

// Push an event onto the channel for manage processes to track the new process
events <- Start
for {
_, ok := <-control
if !ok {
fmt.Println("Killing worker process after receiving close event.")
command.Process.Kill()
events <- Shutdown
break
defer f.Close()

res := uint64(0)
pfx := []byte("Pss:")
r := bufio.NewScanner(f)
for r.Scan() {
line := r.Bytes()
if bytes.HasPrefix(line, pfx) {
var size uint64
_, err := fmt.Sscanf(string(line[4:]), "%d", &size)
if err != nil {
return 0, err
}
res += size
}
}
if err := r.Err(); err != nil {
return 0, err
}

return res, nil
}

// Worker ...
Expand All @@ -70,6 +66,7 @@ type Worker struct {
requestCount int
pool *Pool
busy bool
command *exec.Cmd
}

// StartRequest marks this work as busy
Expand All @@ -83,7 +80,11 @@ func (w *Worker) StartRequest() {
func (w *Worker) CompleteRequest() {
w.busy = false
w.pool.workerChannel <- w
log.Printf("Worker %s request %d complete!\n", w.Hostname, w.requestCount)
// if the process has ended you can use this... so not helpful to us now
// memoryUsage := w.command.ProcessState.SysUsage().(*syscall.Rusage).Maxrss
// Instead we need to do something like:
memory, _ := calculateMemory(w.command.Process.Pid)
log.Printf("Worker %s request %d complete! (%v)\n", w.Hostname, w.requestCount, memory)
}

// GetRequestCount accessor
Expand All @@ -108,6 +109,46 @@ type Pool struct {
eventsChannel chan Event
}

func (p *Pool) startProcess(control <-chan Event, events chan<- Event, executable string, hostname string) {
procLog := log.New(os.Stdout, fmt.Sprintf("gopher-worker(%s) ", executable), log.Ldate|log.Ltime)

commandParts := strings.Split(executable, " ")
command := exec.Command(commandParts[0], commandParts[1:]...)
log.Printf("Command: %v\n", command)

stdout, err := command.StdoutPipe()
if err != nil {
procLog.Fatalln("Unable to connect to stdout from command...")
}
stderr, err := command.StderrPipe()
if err != nil {
procLog.Fatalln("Unable to connect to stderr from command...")
}

go copyToLog(procLog, stdout)
go copyToLog(procLog, stderr)

// Actually start the subprocess and wait for it to startup
command.Start()
log.Printf("creating worker...\n")
w := Worker{Hostname: hostname, pool: p, command: command}
log.Printf("Queuing worker...\n")
p.workerChannel <- &w
p.Workers = append(p.Workers, &w)

// Push an event onto the channel for manage processes to track the new process
events <- Start
for {
_, ok := <-control
if !ok {
fmt.Println("Killing worker process after receiving close event.")
command.Process.Kill()
events <- Shutdown
break
}
}
}

// Start up the pool
func (p *Pool) Start() {
p.controlChannel = make(chan Event)
Expand All @@ -126,16 +167,11 @@ func (p *Pool) Start() {
signal.Stop(receivedSignals)
}()

log.Printf("Starting processes... %d\n", len(p.Executables))
log.Printf("Starting workers... %d\n", len(p.Executables))
// Actually start some processes
for index, executable := range p.Executables {
log.Printf("creating worker...\n")
w := Worker{Hostname: p.Hostnames[index], pool: p}
log.Printf("Queuing worker...\n")
p.workerChannel <- &w
p.Workers = append(p.Workers, &w)
log.Printf("Starting worker process...\n")
go startProcess(p.controlChannel, p.eventsChannel, executable)
go p.startProcess(p.controlChannel, p.eventsChannel, executable, p.Hostnames[index])
}
}

Expand Down