Skip to content

Commit

Permalink
refact(process): drop gopsutil (it is fat)
Browse files Browse the repository at this point in the history
  • Loading branch information
karitra committed Jan 25, 2018
1 parent cf5aa80 commit a2e0be3
Show file tree
Hide file tree
Showing 2 changed files with 178 additions and 64 deletions.
13 changes: 3 additions & 10 deletions isolate/process/box.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,6 @@ type codeStorage interface {
type workerInfo struct {
*exec.Cmd
uuid string
startTime time.Time
}

type taskInfo struct {
uuid string
startTime time.Time
}

type Box struct {
Expand Down Expand Up @@ -285,7 +279,6 @@ func (b *Box) Spawn(ctx context.Context, config isolate.SpawnConfig, output io.W
b.children[pr.cmd.Process.Pid] = workerInfo{
Cmd: pr.cmd,
uuid: config.Args["--uuid"],
startTime: newProcStarted,
}
b.mu.Unlock()

Expand Down Expand Up @@ -347,15 +340,15 @@ func (b *Box) QueryMetrics(uuids []string) (r []isolate.MarkedWorkerMetrics) {
return
}

func (b *Box) getIdUuidMapping() (result map[int]taskInfo) {
func (b *Box) getIdUuidMapping() (result map[int]string) {
// TODO: is len(b.children) safe to use as `expectedWorkersCount`
result = make(map[int]taskInfo, expectedWorkersCount)
result = make(map[int]string, expectedWorkersCount)

b.mu.Lock()
defer b.mu.Unlock()

for pid, kid := range b.children {
result[pid] = taskInfo{uuid: kid.uuid, startTime: kid.startTime}
result[pid] = kid.uuid
}

return
Expand Down
229 changes: 175 additions & 54 deletions isolate/process/metrics_gatherer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,128 +4,249 @@
package process

import (
"bytes"
"bufio"
"context"
"fmt"
"io/ioutil"
"regexp"
"strconv"
"strings"
"syscall"
"time"

"github.com/noxiouz/stout/isolate"
"github.com/noxiouz/stout/pkg/log"

gopsnet "github.com/shirou/gopsutil/net"
gopsutil "github.com/shirou/gopsutil/process"
)

const eachIface = true
const clockTicks = 100 // sysconf(_SC_CLK_TCK)

var (
pageSize = uint64(syscall.Getpagesize())
spacesRegexp, _ = regexp.Compile("[ ]+")
)

func makeNiceName(name string) string {
return spacesRegexp.ReplaceAllString(name, "_")
type (
memStat struct {
vms uint64
rss uint64
}
)

// /proc/<pid>/statm fields (see `man proc` for details)
const (
statmVMS = iota
statmRSS

statmShare
statmText
statmLib
statmData
statmDt

statmFieldsCount
)

const (
statUtime = 13
statStime = 14

statStartTime = 21

statFieldsCount = 44
)

const (
pairKey = iota
pairVal
pairLen
)

func readLines(b []byte) (text []string) {
reader := bytes.NewBuffer(b)
scanner := bufio.NewScanner(reader)

for scanner.Scan() {
text = append(text, scanner.Text())
}

return
}

func generateNetStat(net []gopsnet.IOCountersStat) (out map[string]isolate.NetStat) {
out = make(map[string]isolate.NetStat, len(net))
func loadSysBootTime() (bt uint64, err error) {
var b []byte
if b, err = ioutil.ReadFile("/proc/stat"); err != nil {
return
}

for _, ln := range readLines(b) {
if strings.HasPrefix(ln, "btime") {
fields := strings.Fields(ln)
if len(fields) < pairLen {
return bt, fmt.Errorf("incorrect count of fields in `btime` record: %d", len(fields))
}

for _, c := range net {
out[c.Name] = isolate.NetStat{
RxBytes: c.BytesRecv,
TxBytes: c.BytesSent,
return strconv.ParseUint(fields[pairVal], 10, 64)
}
}

return
}

func readProcStat(pid int, uptimeSeconds uint64) (isolate.WorkerMetrics, error) {
func getProcPath(pid int, file string) string {
return fmt.Sprintf("/proc/%d/%s", pid, file)
}

var (
process *gopsutil.Process
func getProcContent(pid int, file string) (content string, err error) {
var b []byte

cpuload float64
// netstat []gopsnet.IOCountersStat
memstat *gopsutil.MemoryInfoStat
if b, err = ioutil.ReadFile(getProcPath(pid, file)); err != nil {
return
}

errStub isolate.WorkerMetrics
err error
)
content = string(b)
return
}

if process, err = gopsutil.NewProcess(int32(pid)); err != nil {
return errStub, err
func readMemStat(pid int) (mstat memStat, err error) {
var content string
if content, err = getProcContent(pid, "statm"); err != nil {
return
}

if cpuload, err = process.CPUPercent(); err != nil {
return errStub, err
fields := strings.Fields(content)
if len(fields) < statmFieldsCount {
err = fmt.Errorf("wrong number of fields in `statm` file: %d, but shoud be greater or equal to %d", len(fields), statmFieldsCount)
return
}

if memstat, err = process.MemoryInfo(); err != nil {
return errStub, err
var vms, rss uint64
vms, err = strconv.ParseUint(fields[statmVMS], 10, 64)
if err != nil {
return
}

rss, err = strconv.ParseUint(fields[statmRSS], 10, 64)
if err != nil {
return
}

mstat = memStat{
vms: vms * pageSize,
rss: rss * pageSize,
}

//
// TODO:
// There is no per process network stat yet in gopsutil,
// Per process view of system stat is in `netstat` slice.
//
// Most commonly used (the only?) way to take per process network
// stats is by libpcap.
//
// if netstat, err = process.NetIOCounters(eachIface); err != nil {
//
if _, err = process.NetIOCounters(eachIface); err != nil {
return errStub, err
return
}

func readCPUPercent(pid int, bootTime uint64) (cpu float32, uptime uint64, err error) {
var content string
if content, err = getProcContent(pid, "stat"); err != nil {
return
}

return isolate.WorkerMetrics{
fields := strings.Fields(content)
if len(fields) < statFieldsCount {
err = fmt.Errorf("wrong number of fields in `statm` file: %d, but shoud be greater or equal to %d", len(fields), statFieldsCount)
return
}

var utime, stime, startedAt uint64
if utime, err = strconv.ParseUint(fields[statUtime], 10, 64); err != nil {
return
}

if stime, err = strconv.ParseUint(fields[statStime], 10, 64); err != nil {
return
}

if startedAt, err = strconv.ParseUint(fields[statStartTime], 10, 64); err != nil {
return
}

utimeSec := float64(utime) / clockTicks
stimeSec := float64(stime) / clockTicks

startedAt = bootTime + startedAt / clockTicks
created := time.Unix(0, int64(startedAt * uint64(time.Second)))

total := float64(utimeSec + stimeSec)
if runtime := time.Since(created).Seconds(); runtime > 0 {
uptime = uint64(runtime)
cpu = float32(100 * total / runtime)
}

return
}

func makeNiceName(name string) string {
return spacesRegexp.ReplaceAllString(name, "_")
}

func readProcStat(pid int, bootTime uint64) (stat isolate.WorkerMetrics,err error) {
var (
cpuload float32
uptimeSeconds uint64
memstat memStat
)

if cpuload, uptimeSeconds, err = readCPUPercent(pid, bootTime); err != nil {
return
}

if memstat, err = readMemStat(pid); err != nil {
return
}

stat = isolate.WorkerMetrics{
UptimeSec: uptimeSeconds,
// CpuUsageSec:

CpuLoad: cpuload,
Mem: memstat.VMS,
Mem: memstat.vms,

// Per process net io stat is unimplemented.
// Net: generateNetStat(netstat),
}, nil
}

return
}

func (b *Box) gatherMetrics(ctx context.Context) {
func (b *Box) gatherMetrics(ctx context.Context, bootTime uint64) {
ids := b.getIdUuidMapping()
metrics := make(map[string]*isolate.WorkerMetrics, len(ids))

now := time.Now()

for pid, taskInfo := range ids {
uptimeSeconds := uint64(now.Sub(taskInfo.startTime).Seconds())

if state, err := readProcStat(pid, uptimeSeconds); err != nil {
log.G(ctx).Errorf("Failed to read stat for process with pid %d", pid)
continue
for pid, uuid := range ids {
if stat, err := readProcStat(pid, bootTime); err == nil {
metrics[uuid] = &stat
} else {
metrics[taskInfo.uuid] = &state
log.G(ctx).Errorf("Failed to read stat, pid: %d, err: %v", pid, err)
}
} // for each taskInfo

b.setMetricsMapping(metrics)
}

func (b *Box) gatherMetricsEvery(ctx context.Context, interval time.Duration) {

if interval == 0 {
log.G(ctx).Info("Process metrics gatherer disabled (use config to setup)")
return
}

log.G(ctx).Infof("Initializing Process metrics gather loop with %v duration", interval)

bootTime, err := loadSysBootTime()
if err != nil {
log.G(ctx).Errorf("Error while reading system boot time %v", err)
return
}

for {
select {
case <-ctx.Done():
return
case <-time.After(interval):
b.gatherMetrics(ctx)
b.gatherMetrics(ctx, bootTime)
}
}

Expand Down

0 comments on commit a2e0be3

Please sign in to comment.