Skip to content

Commit

Permalink
Merge pull request #2 from Ja7ad/feature/grpc
Browse files Browse the repository at this point in the history
feat: grpc server forker
  • Loading branch information
Ja7ad authored Oct 12, 2022
2 parents fb625f9 + 68f0c2f commit ac98684
Show file tree
Hide file tree
Showing 8 changed files with 308 additions and 138 deletions.
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,34 @@ func GreetingHandler() http.HandlerFunc {
}
```

### grpc server

[benchmark](_example/grpc)

```go
package main

import (
"github.com/Ja7ad/forker"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/reflection"
"log"
)

func main() {
f := forker.NewGrpcForker(nil)
srv := f.GetGrpcServer()

grpc_health_v1.RegisterHealthServer(srv, health.NewServer())

reflection.Register(srv)

log.Fatalln(f.ServeGrpc(":9090"))
}

```

### echo framework

[benchmark](_example/echo)
Expand Down
39 changes: 39 additions & 0 deletions _example/grpc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Benchmark grpc forker

```shell
ghz -c 500 -n 200000 --insecure --call grpc.health.v1.Health.Check 0.0.0.0:9090
```

### with forker

```shell
Summary:
Count: 200000
Total: 14.38 s
Slowest: 142.50 ms
Fastest: 0.14 ms
Average: 21.14 ms
Requests/sec: 13910.00

Response time histogram:
0.136 [1] |
14.372 [61253] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎
28.609 [92588] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎
42.845 [36131] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎
57.081 [7548] |∎∎∎
71.318 [1670] |
85.554 [599] |
99.790 [100] |
114.026 [39] |
128.263 [12] |
142.499 [59] |

Latency distribution:
10 % in 6.91 ms
25 % in 12.65 ms
50 % in 19.31 ms
75 % in 27.64 ms
90 % in 36.87 ms
95 % in 42.87 ms
99 % in 59.01 ms
```
20 changes: 20 additions & 0 deletions _example/grpc/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package main

import (
"github.com/Ja7ad/forker"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/reflection"
"log"
)

func main() {
f := forker.NewGrpcForker(nil)
srv := f.GetGrpcServer()

grpc_health_v1.RegisterHealthServer(srv, health.NewServer())

reflection.Register(srv)

log.Fatalln(f.ServeGrpc(":9090"))
}
141 changes: 141 additions & 0 deletions fork.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package forker

import (
"flag"
"github.com/valyala/fasthttp/reuseport"
"net"
"os"
"os/exec"
"runtime"
)

const (
childFlag = "-child"
_defaultNetwork = TCP4
)

type processSignal struct {
pid int
err error
}

func init() {
flag.Bool(childFlag[1:], false, "is forker child process")
}

func (f *Fork) forker(address string) (err error) {
if !f.ReusePort {
if runtime.GOOS == "windows" {
return ErrReuseportOnWindows
}

if err = f.setTCPListenerFiles(address); err != nil {
return
}

defer func() {
if e := f.ln.Close(); e != nil {
err = e
}
}()
}

goMaxProcess := runtime.GOMAXPROCS(0)
sigCh := make(chan processSignal, goMaxProcess)
childProcess := make(map[int]*exec.Cmd)

defer func() {
for _, proc := range childProcess {
_ = proc.Process.Kill()
}
}()

for i := 0; i < goMaxProcess; i++ {
var cmd *exec.Cmd
cmd, err = f.doCmd()
if err != nil {
return err
}

pid := cmd.Process.Pid
childProcess[pid] = cmd
f.childsPid = append(f.childsPid, pid)

go func() {
sigCh <- processSignal{cmd.Process.Pid, cmd.Wait()}
}()
}

var exitedProcess int
for sig := range sigCh {
delete(childProcess, sig.pid)

if exitedProcess++; exitedProcess > f.recoverChild {
err = ErrOverRecovery
break
}

var cmd *exec.Cmd
if cmd, err = f.doCmd(); err != nil {
break
}

childProcess[cmd.Process.Pid] = cmd
go func() {
sigCh <- processSignal{cmd.Process.Pid, cmd.Wait()}
}()
}

return nil
}

func (f *Fork) listen(address string) (net.Listener, error) {
runtime.GOMAXPROCS(1)

if f.ReusePort {
return reuseport.Listen(f.Network.String(), address)
}

return net.FileListener(os.NewFile(3, ""))
}

func (f *Fork) setTCPListenerFiles(address string) error {

tcpAddr, err := net.ResolveTCPAddr(f.Network.String(), address)
if err != nil {
return err
}

tcpListener, err := net.ListenTCP(f.Network.String(), tcpAddr)
if err != nil {
return err
}

f.ln = tcpListener

file, err := tcpListener.File()
if err != nil {
return err
}

f.files = []*os.File{file}

return nil
}

func (f *Fork) doCmd() (*exec.Cmd, error) {
cmd := exec.Command(os.Args[0], append(os.Args[1:], childFlag)...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.ExtraFiles = f.files
return cmd, cmd.Start()
}

func isChild() bool {
for _, arg := range os.Args[1:] {
if arg == childFlag {
return true
}
}
return false
}
Loading

0 comments on commit ac98684

Please sign in to comment.