diff --git a/.gitignore b/.gitignore index 0618ccef16..d1d861a777 100644 --- a/.gitignore +++ b/.gitignore @@ -7,17 +7,18 @@ *.out .DS_Store *.pem +*.db .idea/ .vscode/ +./skywire-config.json ./skywire.json ./build/ build/ ./apps/ ./skywire/ -./local/* -./local*/* +local*/ ./transport_logs ./dmsgpty ./rewards @@ -26,16 +27,16 @@ rewards ./pkg/visor/apps/ ./pkg/visor/bar/ ./pkg/visor/foo/ +./skywire-config.json ./bin ./node -./users.db ./hypervisor ./*-node ./*-visor ./*-cli ./*-server -./*.json +*.json !/dmsghttp-config.json !/services-config.json ./*.sh diff --git a/cmd/skywire-cli/commands/net/connect.go b/cmd/skywire-cli/commands/net/connect.go new file mode 100644 index 0000000000..c34e386bfd --- /dev/null +++ b/cmd/skywire-cli/commands/net/connect.go @@ -0,0 +1,157 @@ +// Package net cmd/skywire-cli/commands/net/connect.go +package net + +import ( + "bytes" + "fmt" + "os" + "strconv" + "strings" + "text/tabwriter" + + "github.com/google/uuid" + "github.com/spf13/cobra" + + "github.com/skycoin/skywire-utilities/pkg/cipher" + clirpc "github.com/skycoin/skywire/cmd/skywire-cli/commands/rpc" + "github.com/skycoin/skywire/cmd/skywire-cli/internal" + "github.com/skycoin/skywire/pkg/app/appnet" +) + +var ( + localPort int +) + +func init() { + conCmd.Flags().IntVarP(&localPort, "port", "p", 0, "local port to serve the remote app on") + conCmd.Flags().StringVarP(&netType, "type", "t", "http", "type of the remote app connection (http, tcp, udp)") + + conCmd.AddCommand(lsCmd) + conCmd.AddCommand(stopCmd) + RootCmd.AddCommand(conCmd) +} + +// conCmd contains commands to connect to a published app on the skywire network +var conCmd = &cobra.Command{ + Use: "con [flags]", + Short: "Connect to a published app on the skywire network", + Long: "Connect to a published app on the skywire network.\n This will allow you to access the remote app via the skywire network.", + Args: cobra.MinimumNArgs(1), + Run: func(cmd *cobra.Command, args []string) { + + rpcClient, err := clirpc.Client(cmd.Flags()) + if err != nil { + os.Exit(1) + } + + if len(args) == 0 { + cmd.Help() //nolint + os.Exit(0) + } + + var remotePK cipher.PubKey + var remotePort int + + parts := strings.Split(args[0], ":") + + if len(parts) != 2 { + cmd.Help() //nolint + os.Exit(0) + } + + internal.Catch(cmd.Flags(), remotePK.Set(parts[0])) + + if remotePort, err = strconv.Atoi(parts[1]); err != nil { + internal.PrintFatalError(cmd.Flags(), fmt.Errorf("invalid port: %s", parts[1])) + } + + if remotePort == 0 || localPort == 0 { + internal.PrintFatalError(cmd.Flags(), fmt.Errorf("port cannot be 0")) + } + //65535 is the highest TCP port number + if 65536 < remotePort || 65536 < localPort { + internal.PrintFatalError(cmd.Flags(), fmt.Errorf("port cannot be greater than 65535")) + } + + var appType appnet.AppType + + switch netType { + case "http": + appType = appnet.HTTP + case "tcp": + appType = appnet.TCP + case "udp": + appType = appnet.UDP + default: + internal.PrintFatalError(cmd.Flags(), fmt.Errorf("invalid type")) + } + + connInfo, err := rpcClient.Connect(remotePK, remotePort, localPort, appType) + internal.Catch(cmd.Flags(), err) + + internal.PrintOutput(cmd.Flags(), connInfo, fmt.Sprintf("Connected to %s with ID: %s\n", connInfo.RemoteAddr.String(), connInfo.ID.String())) + internal.PrintOutput(cmd.Flags(), connInfo, fmt.Sprintf("%v available on localhost:%d\n", connInfo.AppType, connInfo.WebPort)) + }, +} + +// lsCmd contains commands to list connected apps on the skywire network +var lsCmd = &cobra.Command{ + Use: "ls", + Short: "List connected apps on the skywire network", + Long: "List connected apps on the skywire network.\nThis will show you the ID, address, and web port of the connected apps.", + Args: cobra.MinimumNArgs(0), + Run: func(cmd *cobra.Command, args []string) { + + if len(args) != 0 { + cmd.Help() //nolint + os.Exit(0) + } + + rpcClient, err := clirpc.Client(cmd.Flags()) + if err != nil { + os.Exit(1) + } + + connectConns, err := rpcClient.ListConnected() + internal.Catch(cmd.Flags(), err) + + var b bytes.Buffer + w := tabwriter.NewWriter(&b, 0, 0, 3, ' ', tabwriter.TabIndent) + _, err = fmt.Fprintln(w, "id\taddr\tweb_port\tapp_type") + internal.Catch(cmd.Flags(), err) + + for _, connectConn := range connectConns { + _, err = fmt.Fprintf(w, "%v\t%v\t%v\t%v\n", connectConn.ID, connectConn.RemoteAddr, + connectConn.WebPort, connectConn.AppType) + internal.Catch(cmd.Flags(), err) + } + internal.Catch(cmd.Flags(), w.Flush()) + internal.PrintOutput(cmd.Flags(), connectConns, b.String()) + }, +} + +// stopCmd contains commands to stop a connection to a published app on the skywire network +var stopCmd = &cobra.Command{ + Use: "stop ", + Short: "Stop a connection to a published app on the skywire network", + Long: "Stop a connection to a published app on the skywire network.\nThis will disconnect you from the remote app.", + Args: cobra.MinimumNArgs(1), + Run: func(cmd *cobra.Command, args []string) { + + if len(args) == 0 { + cmd.Help() //nolint + os.Exit(0) + } + + rpcClient, err := clirpc.Client(cmd.Flags()) + if err != nil { + os.Exit(1) + } + + id, err := uuid.Parse(args[0]) + internal.Catch(cmd.Flags(), err) + err = rpcClient.Disconnect(id) + internal.Catch(cmd.Flags(), err) + internal.PrintOutput(cmd.Flags(), "OK", "OK\n") + }, +} diff --git a/cmd/skywire-cli/commands/net/publish.go b/cmd/skywire-cli/commands/net/publish.go new file mode 100644 index 0000000000..c7ec85ec06 --- /dev/null +++ b/cmd/skywire-cli/commands/net/publish.go @@ -0,0 +1,149 @@ +// Package net cmd/skywire-cli/commands/net/publish.go +package net + +import ( + "bytes" + "fmt" + "os" + "text/tabwriter" + + "github.com/google/uuid" + "github.com/spf13/cobra" + + clirpc "github.com/skycoin/skywire/cmd/skywire-cli/commands/rpc" + "github.com/skycoin/skywire/cmd/skywire-cli/internal" + "github.com/skycoin/skywire/pkg/app/appnet" +) + +var ( + netType string + skyPort int +) + +func init() { + pubCmd.PersistentFlags().IntVarP(&localPort, "port", "p", 0, "local port of the external (http, tcp, udp) app") + pubCmd.PersistentFlags().IntVarP(&skyPort, "skyport", "s", localPort, "skywire port for the external (http, tcp, udp) app") + pubCmd.PersistentFlags().StringVarP(&netType, "type", "t", "http", "type of the external app connection (http, tcp, udp)") + + pubCmd.AddCommand(lsPubCmd) + pubCmd.AddCommand(stopPubCmd) + RootCmd.AddCommand(pubCmd) +} + +// pubCmd contains commands to publish over the skywire network +var pubCmd = &cobra.Command{ + Use: "pub [flags]", + Short: "Publish over skywire network", + Long: "Publish over skywire network\nPublish a local port over the skywire network. This will allow other nodes to access the local port via the skywire network.", + Args: cobra.MinimumNArgs(0), + Run: func(cmd *cobra.Command, _ []string) { + + rpcClient, err := clirpc.Client(cmd.Flags()) + if err != nil { + os.Exit(1) + } + + if skyPort == 0 { + skyPort = localPort + } + + if localPort == 0 && skyPort == 0 { + cmd.Help() //nolint + os.Exit(0) + } + + //port 0 is reserved / not usable + if localPort == 0 { + internal.PrintFatalError(cmd.Flags(), fmt.Errorf("port cannot be 0")) + } + + //skyPort 0 is reserved / not usable + if skyPort == 0 { + internal.PrintFatalError(cmd.Flags(), fmt.Errorf("skyPort cannot be 0")) + } + + //65535 is the highest TCP port number + if 65536 < localPort { + internal.PrintFatalError(cmd.Flags(), fmt.Errorf("port cannot be greater than 65535")) + } + + var appType appnet.AppType + + switch netType { + case "http": + appType = appnet.HTTP + case "tcp": + appType = appnet.TCP + case "udp": + appType = appnet.UDP + default: + internal.PrintFatalError(cmd.Flags(), fmt.Errorf("invalid type")) + } + + internal.Catch(cmd.Flags(), err) + pubInfo, err := rpcClient.Publish(localPort, skyPort, appType) + internal.Catch(cmd.Flags(), err) + + internal.PrintOutput(cmd.Flags(), pubInfo, fmt.Sprintf("Published on %s with ID: %s\n", pubInfo.SkyAddr.String(), pubInfo.ID.String())) + + }, +} + +// lsPubCmd lists all the publised apps on the skywire network by the visor +var lsPubCmd = &cobra.Command{ + Use: "ls", + Short: "List published apps on the skywire network by the visor", + Long: "List published apps on the skywire network by the visor\nThe list contains the id and the local port of the published app.", + Args: cobra.MinimumNArgs(0), + Run: func(cmd *cobra.Command, args []string) { + + if len(args) != 0 { + cmd.Help() //nolint + os.Exit(0) + } + + rpcClient, err := clirpc.Client(cmd.Flags()) + if err != nil { + os.Exit(1) + } + + liss, err := rpcClient.ListPublished() + internal.Catch(cmd.Flags(), err) + var b bytes.Buffer + w := tabwriter.NewWriter(&b, 0, 0, 2, ' ', tabwriter.TabIndent) + _, err = fmt.Fprintln(w, "id\tsky_port\tlocal_port\tapp_type") + internal.Catch(cmd.Flags(), err) + for _, lis := range liss { + _, err = fmt.Fprintf(w, "%v\t%v\t%v\t%v\n", lis.ID, lis.SkyAddr.GetPort(), lis.LocalPort, lis.AppType) + internal.Catch(cmd.Flags(), err) + } + internal.Catch(cmd.Flags(), w.Flush()) + internal.PrintOutput(cmd.Flags(), liss, b.String()) + }, +} + +// stopPubCmd stops a published app on the skywire network published by the visor +var stopPubCmd = &cobra.Command{ + Use: "stop ", + Short: "Stop a published app on the skywire network by the visor", + Long: "Stop a published app on the skywire network by the visor.\nThis will stop the published app and remove it from the skywire network.", + Args: cobra.MinimumNArgs(1), + Run: func(cmd *cobra.Command, args []string) { + + if len(args) == 0 { + cmd.Help() //nolint + os.Exit(0) + } + + rpcClient, err := clirpc.Client(cmd.Flags()) + if err != nil { + os.Exit(1) + } + + id, err := uuid.Parse(args[0]) + internal.Catch(cmd.Flags(), err) + err = rpcClient.Depublish(id) + internal.Catch(cmd.Flags(), err) + internal.PrintOutput(cmd.Flags(), "OK", "OK\n") + }, +} diff --git a/cmd/skywire-cli/commands/net/root.go b/cmd/skywire-cli/commands/net/root.go new file mode 100644 index 0000000000..71d0933efb --- /dev/null +++ b/cmd/skywire-cli/commands/net/root.go @@ -0,0 +1,18 @@ +// Package net cmd/skywire-cli/commands/net/root.go +package net + +import ( + "github.com/spf13/cobra" + + clirpc "github.com/skycoin/skywire/cmd/skywire-cli/commands/rpc" +) + +func init() { + RootCmd.PersistentFlags().StringVar(&clirpc.Addr, "rpc", "localhost:3435", "RPC server address") +} + +// RootCmd contains commands that interact with the skywire network +var RootCmd = &cobra.Command{ + Use: "net", + Short: "Publish and connect to skywire network", +} diff --git a/cmd/skywire-cli/commands/root.go b/cmd/skywire-cli/commands/root.go index dee47484c0..f3249c6288 100644 --- a/cmd/skywire-cli/commands/root.go +++ b/cmd/skywire-cli/commands/root.go @@ -23,8 +23,8 @@ import ( clireward "github.com/skycoin/skywire/cmd/skywire-cli/commands/reward" clirewards "github.com/skycoin/skywire/cmd/skywire-cli/commands/rewards" cliroute "github.com/skycoin/skywire/cmd/skywire-cli/commands/route" - cliskyfwd "github.com/skycoin/skywire/cmd/skywire-cli/commands/skyfwd" - cliskyrev "github.com/skycoin/skywire/cmd/skywire-cli/commands/skyrev" + + clinet "github.com/skycoin/skywire/cmd/skywire-cli/commands/net" clisurvey "github.com/skycoin/skywire/cmd/skywire-cli/commands/survey" clitp "github.com/skycoin/skywire/cmd/skywire-cli/commands/tp" cliut "github.com/skycoin/skywire/cmd/skywire-cli/commands/ut" @@ -40,8 +40,7 @@ func init() { clivisor.RootCmd, clivpn.RootCmd, cliut.RootCmd, - cliskyfwd.RootCmd, - cliskyrev.RootCmd, + clinet.RootCmd, clireward.RootCmd, clirewards.RootCmd, clisurvey.RootCmd, diff --git a/cmd/skywire-cli/commands/skyfwd/root.go b/cmd/skywire-cli/commands/skyfwd/root.go deleted file mode 100644 index 3cd4420897..0000000000 --- a/cmd/skywire-cli/commands/skyfwd/root.go +++ /dev/null @@ -1,89 +0,0 @@ -// Package skyfwd cmd/skywire-cli/commands/skyfwd/root.go -package skyfwd - -import ( - "bytes" - "fmt" - "os" - "strconv" - "text/tabwriter" - - "github.com/spf13/cobra" - - clirpc "github.com/skycoin/skywire/cmd/skywire-cli/commands/rpc" - "github.com/skycoin/skywire/cmd/skywire-cli/internal" -) - -var ( - portNo int - deregister bool - lsPorts bool -) - -func init() { - RootCmd.PersistentFlags().IntVarP(&portNo, "port", "p", 0, "local port of the external (http) app") - RootCmd.PersistentFlags().BoolVarP(&deregister, "deregister", "d", false, "deregister local port of the external (http) app") - RootCmd.PersistentFlags().BoolVarP(&lsPorts, "ls", "l", false, "list registered local ports") - -} - -// RootCmd contains commands that interact with the skyforwarding -var RootCmd = &cobra.Command{ - Use: "fwd", - Short: "Control skyforwarding", - Long: "Control skyforwarding\n forward local ports over skywire", - Args: cobra.MinimumNArgs(0), - Run: func(cmd *cobra.Command, args []string) { - - rpcClient, err := clirpc.Client(cmd.Flags()) - if err != nil { - os.Exit(1) - } - - if lsPorts { - ports, err := rpcClient.ListHTTPPorts() - internal.Catch(cmd.Flags(), err) - var b bytes.Buffer - w := tabwriter.NewWriter(&b, 0, 0, 2, ' ', tabwriter.TabIndent) - _, err = fmt.Fprintln(w, "id\tlocal_port") - internal.Catch(cmd.Flags(), err) - for id, port := range ports { - _, err = fmt.Fprintf(w, "%v\t%v\n", id, port) - internal.Catch(cmd.Flags(), err) - } - internal.Catch(cmd.Flags(), w.Flush()) - internal.PrintOutput(cmd.Flags(), ports, b.String()) - os.Exit(0) - } - - if len(args) == 0 && portNo == 0 { - cmd.Help() //nolint - os.Exit(0) - } - - //if port is specified via flag, argument will override - if len(args) > 0 { - portNo, err = strconv.Atoi(args[0]) - internal.Catch(cmd.Flags(), err) - } - - //port 0 is reserved / not usable - if portNo == 0 { - internal.PrintFatalError(cmd.Flags(), fmt.Errorf("port cannot be 0")) - } - - //65535 is the highest TCP port number - if 65536 < portNo { - internal.PrintFatalError(cmd.Flags(), fmt.Errorf("port cannot be greater than 65535")) - } - - if deregister { - err = rpcClient.DeregisterHTTPPort(portNo) - } else { - err = rpcClient.RegisterHTTPPort(portNo) - } - internal.Catch(cmd.Flags(), err) - internal.PrintOutput(cmd.Flags(), "OK", "OK\n") - - }, -} diff --git a/cmd/skywire-cli/commands/skyrev/root.go b/cmd/skywire-cli/commands/skyrev/root.go deleted file mode 100644 index b8fccc903e..0000000000 --- a/cmd/skywire-cli/commands/skyrev/root.go +++ /dev/null @@ -1,104 +0,0 @@ -// Package skyrev cmd/skywire-cli/commands/skyfwd/root.go -package skyrev - -import ( - "bytes" - "fmt" - "os" - "strconv" - "text/tabwriter" - - "github.com/google/uuid" - "github.com/spf13/cobra" - - "github.com/skycoin/skywire-utilities/pkg/cipher" - clirpc "github.com/skycoin/skywire/cmd/skywire-cli/commands/rpc" - "github.com/skycoin/skywire/cmd/skywire-cli/internal" -) - -var ( - remotePort int - remotePk string - localPort int - lsPorts bool - disconnect string -) - -func init() { - RootCmd.Flags().IntVarP(&remotePort, "remote", "r", 0, "remote port to read from") - RootCmd.Flags().StringVarP(&remotePk, "pk", "k", "", "remote public key to connect to") - RootCmd.Flags().IntVarP(&localPort, "port", "p", 0, "local port to reverse proxy") - RootCmd.Flags().BoolVarP(&lsPorts, "ls", "l", false, "list configured connections") - RootCmd.Flags().StringVarP(&disconnect, "stop", "d", "", "disconnect from specified ") -} - -// RootCmd contains commands that interact with the skyforwarding -var RootCmd = &cobra.Command{ - Use: "rev", - Short: "reverse proxy skyfwd", - Long: "connect or disconnect from remote ports", - Args: cobra.MinimumNArgs(0), - Run: func(cmd *cobra.Command, args []string) { - - rpcClient, err := clirpc.Client(cmd.Flags()) - if err != nil { - os.Exit(1) - } - - if lsPorts { - forwardConns, err := rpcClient.List() - internal.Catch(cmd.Flags(), err) - - var b bytes.Buffer - w := tabwriter.NewWriter(&b, 0, 0, 3, ' ', tabwriter.TabIndent) - _, err = fmt.Fprintln(w, "id\tlocal_port\tremote_port") - internal.Catch(cmd.Flags(), err) - - for _, forwardConn := range forwardConns { - _, err = fmt.Fprintf(w, "%s\t%s\t%s\n", forwardConn.ID, strconv.Itoa(int(forwardConn.LocalPort)), - strconv.Itoa(int(forwardConn.RemotePort))) - internal.Catch(cmd.Flags(), err) - } - internal.Catch(cmd.Flags(), w.Flush()) - internal.PrintOutput(cmd.Flags(), forwardConns, b.String()) - os.Exit(0) - } - - if disconnect != "" { - id, err := uuid.Parse(disconnect) - internal.Catch(cmd.Flags(), err) - err = rpcClient.Disconnect(id) - internal.Catch(cmd.Flags(), err) - internal.PrintOutput(cmd.Flags(), "OK", "OK\n") - os.Exit(0) - } - - if len(args) == 0 && remotePk == "" { - cmd.Help() //nolint - os.Exit(0) - } - - var remotePK cipher.PubKey - - //if pk is specified via flag, argument will override - if len(args) > 0 { - internal.Catch(cmd.Flags(), remotePK.Set(args[0])) - } else { - if remotePk != "" { - internal.Catch(cmd.Flags(), remotePK.Set(remotePk)) - } - } - - if remotePort == 0 || localPort == 0 { - internal.PrintFatalError(cmd.Flags(), fmt.Errorf("port cannot be 0")) - } - //65535 is the highest TCP port number - if 65536 < remotePort || 65536 < localPort { - internal.PrintFatalError(cmd.Flags(), fmt.Errorf("port cannot be greater than 65535")) - } - - id, err := rpcClient.Connect(remotePK, remotePort, localPort) - internal.Catch(cmd.Flags(), err) - internal.PrintOutput(cmd.Flags(), id, fmt.Sprintln(id)) - }, -} diff --git a/docs/skywire_forwarding.md b/docs/skywire_forwarding.md index 5d6abfeb52..618bd576e3 100644 --- a/docs/skywire_forwarding.md +++ b/docs/skywire_forwarding.md @@ -36,8 +36,8 @@ func client() (visor.API, error) { ``` And then use the created RPC conn to register and deregister the server ``` -err = rpcClient.RegisterHTTPPort(port) -err = rpcClient.DeregisterHTTPPort(port) +id, err := rpcClient.Publish(port) +err = rpcClient.Depublish(id) ``` [Example](../example/http-server/README.md) diff --git a/example/http-server/server.go b/example/http-server/server.go index 4e231a39ed..cb2d762a86 100644 --- a/example/http-server/server.go +++ b/example/http-server/server.go @@ -12,6 +12,7 @@ import ( "github.com/skycoin/skywire-utilities/pkg/logging" "github.com/skycoin/skywire/example/http-server/html" + "github.com/skycoin/skywire/pkg/app/appnet" "github.com/skycoin/skywire/pkg/visor" ) @@ -60,7 +61,9 @@ func main() { fmt.Printf("error serving: %v\n", err) } - err = rpcClient.RegisterHTTPPort(port) + skyPort := port + + pubInfo, err := rpcClient.Publish(port, skyPort, appnet.HTTP) if err != nil { log.Errorf("error closing server: %v", err) } @@ -70,7 +73,7 @@ func main() { if err != nil { log.Errorf("error closing server: %v", err) } - err = rpcClient.DeregisterHTTPPort(port) + err = rpcClient.Depublish(pubInfo.ID) if err != nil { log.Errorf("error closing server: %v", err) } diff --git a/pkg/app/appnet/addr.go b/pkg/app/appnet/addr.go index 37d512bcf9..eab9bdfa5e 100644 --- a/pkg/app/appnet/addr.go +++ b/pkg/app/appnet/addr.go @@ -44,6 +44,11 @@ func (a Addr) PK() cipher.PubKey { return a.PubKey } +// GetPort returns port in the Addr. +func (a Addr) GetPort() routing.Port { + return a.Port +} + // ConvertAddr asserts type of the passed `net.Addr` and converts it // to `Addr` if possible. func ConvertAddr(addr net.Addr) (Addr, error) { diff --git a/pkg/app/appnet/connect.go b/pkg/app/appnet/connect.go new file mode 100644 index 0000000000..c1d9030962 --- /dev/null +++ b/pkg/app/appnet/connect.go @@ -0,0 +1,223 @@ +// Package appnet pkg/app/appnet/connect.go +package appnet + +import ( + "errors" + "fmt" + "io" + "net" + "net/http" + "sync" + "time" + + "github.com/gin-gonic/gin" + "github.com/google/uuid" + + "github.com/skycoin/skywire-utilities/pkg/cipher" + "github.com/skycoin/skywire-utilities/pkg/logging" + "github.com/skycoin/skywire/pkg/routing" +) + +// ConnectInfo represents the information of a connected connection +type ConnectInfo struct { + ID uuid.UUID `json:"id"` + WebPort int `json:"web_port"` + RemoteAddr Addr `json:"remote_addr"` + AppType AppType `json:"app_type"` +} + +// ConnectConn represents a connection that is connected to a published app +type ConnectConn struct { + ConnectInfo + skyConn net.Conn + srv *http.Server + lis net.Listener + closeOnce sync.Once + log *logging.Logger + nm *NetManager +} + +// NewConnectConn creates a new ConnectConn +func NewConnectConn(log *logging.Logger, nm *NetManager, remoteConn net.Conn, remoteAddr Addr, webPort int, appType AppType) (*ConnectConn, error) { + var srv *http.Server + var lis net.Listener + + switch appType { + case HTTP: + srv = newHTTPConnectServer(log, remoteConn, remoteAddr, webPort) + case TCP: + // lis = newTCPConnectListner(log, webPort) + return nil, errors.New("app type TCP is not supported yet") + case UDP: + return nil, errors.New("app type UDP is not supported yet") + } + + conn := &ConnectConn{ + ConnectInfo: ConnectInfo{ + ID: uuid.New(), + WebPort: webPort, + RemoteAddr: remoteAddr, + AppType: appType, + }, + skyConn: remoteConn, + log: log, + srv: srv, + lis: lis, + nm: nm, + } + + if err := nm.AddConnect(conn); err != nil { + return nil, err + } + + return conn, nil +} + +// Serve starts the server based on the AppType of the ConnectConn. +func (f *ConnectConn) Serve() error { + switch f.AppType { + case HTTP: + go func() { + err := f.srv.ListenAndServe() //nolint + if err != nil { + // don't print error if local server is closed + if !errors.Is(err, http.ErrServerClosed) { + f.log.WithError(err).Error("Error listening and serving app forwarding.") + } + } + }() + case TCP: + // go func() { + // handleConnectTCPConnection(f.lis, f.remoteConn, f.log) + // }() + return errors.New("app type TCP is not supported yet") + case UDP: + return errors.New("app type UDP is not supported yet") + } + f.log.Debugf("Serving on localhost:%v", f.WebPort) + return nil +} + +// Close closes the server, listener and remote connection. +func (f *ConnectConn) Close() (err error) { + f.closeOnce.Do(func() { + + switch f.AppType { + case HTTP: + err = f.srv.Close() + case TCP: + // err = f.lis.Close() + return + case UDP: + return + } + err = f.skyConn.Close() + f.nm.RemoveConnectConn(f.ID) + }) + return err +} + +func handleConnectFunc(httpC *http.Client, remotePK cipher.PubKey, remotePort routing.Port, mu *sync.Mutex) func(c *gin.Context) { + return func(c *gin.Context) { + mu.Lock() + defer mu.Unlock() + + var urlStr string + urlStr = fmt.Sprintf("sky://%s:%v%s", remotePK, remotePort, c.Param("path")) + if c.Request.URL.RawQuery != "" { + urlStr = fmt.Sprintf("%s?%s", urlStr, c.Request.URL.RawQuery) + } + + fmt.Printf("Proxying request: %s %s\n", c.Request.Method, urlStr) + req, err := http.NewRequest(c.Request.Method, urlStr, c.Request.Body) + if err != nil { + c.String(http.StatusInternalServerError, "Failed to create HTTP request") + return + } + + for header, values := range c.Request.Header { + for _, value := range values { + req.Header.Add(header, value) + } + } + + resp, err := httpC.Do(req) + if err != nil { + c.String(http.StatusInternalServerError, "Failed to connect to HTTP server") + fmt.Printf("Error: %v\n", err) + return + } + defer resp.Body.Close() //nolint + + for header, values := range resp.Header { + for _, value := range values { + c.Writer.Header().Add(header, value) + } + } + + c.Status(resp.StatusCode) + if _, err := io.Copy(c.Writer, resp.Body); err != nil { + c.String(http.StatusInternalServerError, "Failed to copy response body") + fmt.Printf("Error copying response body: %v\n", err) + } + } +} + +func newHTTPConnectServer(log *logging.Logger, remoteConn net.Conn, remoteAddr Addr, webPort int) *http.Server { + + httpC := &http.Client{Transport: MakeHTTPTransport(remoteConn, log)} + mu := new(sync.Mutex) + + r := gin.New() + + r.Use(gin.Recovery()) + + r.Use(loggingMiddleware()) + + r.Any("/*path", handleConnectFunc(httpC, remoteAddr.PK(), remoteAddr.GetPort(), mu)) + + srv := &http.Server{ + Addr: fmt.Sprint(":", webPort), + ReadHeaderTimeout: 5 * time.Second, + Handler: r, + } + return srv +} + +func newTCPConnectListner(log *logging.Logger, webPort int) net.Listener { + listener, err := net.Listen("tcp", fmt.Sprintf(":%d", webPort)) + if err != nil { + log.Errorf("Failed to start TCP listener on port %d: %v", webPort, err) + } + return listener +} + +func handleConnectTCPConnection(listener net.Listener, remoteConn net.Conn, log *logging.Logger) { + for { + conn, err := listener.Accept() + if err != nil { + log.Printf("Failed to accept connection: %v", err) + continue + } + + go func(conn net.Conn) { + defer conn.Close() //nolint + + go func() { + _, err := io.Copy(remoteConn, conn) + if err != nil { + log.Printf("Error copying data to dmsg server: %v", err) + } + remoteConn.Close() //nolint + }() + + go func() { + _, err := io.Copy(conn, remoteConn) + if err != nil { + log.Printf("Error copying data from dmsg server: %v", err) + } + conn.Close() //nolint + }() + }(conn) + } +} diff --git a/pkg/app/appnet/forwarding.go b/pkg/app/appnet/forwarding.go deleted file mode 100644 index fdc9d48d83..0000000000 --- a/pkg/app/appnet/forwarding.go +++ /dev/null @@ -1,179 +0,0 @@ -// Package appnet pkg/app/appnet/forwarding.go -package appnet - -import ( - "errors" - "fmt" - "io" - "net" - "net/http" - "sync" - "time" - - "github.com/google/uuid" - - "github.com/skycoin/skywire-utilities/pkg/logging" -) - -// nolint: gochecknoglobals -var ( - forwardConns = make(map[uuid.UUID]*ForwardConn) - forwardConnsMu sync.Mutex -) - -// AddForwarding adds ForwardConn to with it's ID -func AddForwarding(fwd *ForwardConn) { - forwardConnsMu.Lock() - defer forwardConnsMu.Unlock() - forwardConns[fwd.ID] = fwd -} - -// GetForwardConn get's a ForwardConn by ID -func GetForwardConn(id uuid.UUID) *ForwardConn { - forwardConnsMu.Lock() - defer forwardConnsMu.Unlock() - - return forwardConns[id] -} - -// GetAllForwardConns gets all ForwardConns -func GetAllForwardConns() map[uuid.UUID]*ForwardConn { - forwardConnsMu.Lock() - defer forwardConnsMu.Unlock() - - return forwardConns -} - -// RemoveForwardConn removes a ForwardConn by ID -func RemoveForwardConn(id uuid.UUID) { - forwardConnsMu.Lock() - defer forwardConnsMu.Unlock() - delete(forwardConns, id) -} - -// ForwardConn ... -type ForwardConn struct { - ID uuid.UUID - LocalPort int - RemotePort int - remoteConn net.Conn - closeOnce sync.Once - srv *http.Server - closeChan chan struct{} - log *logging.Logger -} - -// NewForwardConn creates a new forwarding conn -func NewForwardConn(log *logging.Logger, remoteConn net.Conn, remotePort, localPort int) *ForwardConn { - closeChan := make(chan struct{}) - var once sync.Once - handler := http.NewServeMux() - var lock sync.Mutex - handler.HandleFunc("/", handleFunc(remoteConn, log, closeChan, once, &lock)) - - srv := &http.Server{ - Addr: fmt.Sprintf(":%v", localPort), - Handler: handler, - ReadTimeout: 10 * time.Second, - WriteTimeout: 10 * time.Second, - MaxHeaderBytes: 1 << 20, - } - fwdConn := &ForwardConn{ - ID: uuid.New(), - remoteConn: remoteConn, - srv: srv, - LocalPort: localPort, - RemotePort: remotePort, - closeChan: closeChan, - log: log, - } - AddForwarding(fwdConn) - return fwdConn -} - -// Serve serves a HTTP forward conn that accepts all requests and forwards them directly to the remote server over the specified net.Conn. -func (f *ForwardConn) Serve() { - go func() { - err := f.srv.ListenAndServe() - if err != nil { - // don't print error if local server is closed - if !errors.Is(err, http.ErrServerClosed) { - f.log.WithError(err).Error("Error listening and serving app forwarding.") - } - } - }() - go func() { - <-f.closeChan - err := f.Close() - if err != nil { - f.log.Error(err) - } - }() - f.log.Debugf("Serving on localhost:%v", f.LocalPort) -} - -// Close closes the server and remote connection. -func (f *ForwardConn) Close() (err error) { - f.closeOnce.Do(func() { - err = f.srv.Close() - err = f.remoteConn.Close() - RemoveForwardConn(f.ID) - }) - return err -} - -func isClosed(c chan struct{}) bool { - select { - case <-c: - return true - default: - return false - } -} - -func handleFunc(remoteConn net.Conn, log *logging.Logger, closeChan chan struct{}, once sync.Once, lock *sync.Mutex) func(w http.ResponseWriter, req *http.Request) { - return func(w http.ResponseWriter, r *http.Request) { - lock.Lock() - defer lock.Unlock() - - if isClosed(closeChan) { - return - } - client := http.Client{Transport: MakeHTTPTransport(remoteConn, log)} - // Forward request to remote server - resp, err := client.Transport.RoundTrip(r) - if err != nil { - http.Error(w, "Could not reach remote server", 500) - log.WithError(err).Errorf("Could not reach remote server %v", resp) - once.Do(func() { - close(closeChan) - }) - return - } - - defer func() { - if err := resp.Body.Close(); err != nil { - log.WithError(err).Errorln("Failed to close forwarding response body") - } - }() - for key, value := range resp.Header { - for _, v := range value { - w.Header().Set(key, v) - } - } - w.WriteHeader(resp.StatusCode) - // Transfer response from remote server -> client - if resp.ContentLength > 0 { - if _, err := io.CopyN(w, resp.Body, resp.ContentLength); err != nil { - log.Warn(err) - } - } else if resp.Close { - // Copy until EOF or some other error occurs - for { - if _, err := io.Copy(w, resp.Body); err != nil { - break - } - } - } - } -} diff --git a/pkg/app/appnet/publish.go b/pkg/app/appnet/publish.go new file mode 100644 index 0000000000..a1c7decb6a --- /dev/null +++ b/pkg/app/appnet/publish.go @@ -0,0 +1,176 @@ +// Package appnet pkg/app/appnet/publish.go +package appnet + +import ( + "errors" + "fmt" + "io" + "net" + "net/http" + "net/http/httputil" + "net/url" + "sync" + + "github.com/gin-gonic/gin" + "github.com/google/uuid" + + "github.com/skycoin/skywire-utilities/pkg/logging" +) + +// PublishInfo represents the information of a published listener +type PublishInfo struct { + ID uuid.UUID `json:"id"` + SkyAddr Addr `json:"sky_addr"` + LocalPort int `json:"local_port"` + AppType AppType `json:"app_type"` +} + +// PublishLis represents a listner that is published on the skywire network +type PublishLis struct { + PublishInfo + skyLis net.Listener + closeOnce sync.Once + srv *http.Server + conn net.Conn + log *logging.Logger + nm *NetManager +} + +// NewPublishListener creates a new publishListener +func NewPublishListener(log *logging.Logger, nm *NetManager, skyLis net.Listener, localPort int, skyAddr Addr, appType AppType) (*PublishLis, error) { + + var srv *http.Server + var conn net.Conn + switch appType { + case HTTP: + srv = newHTTPPublishServer(localPort) + case TCP: + // conn = newTCPPublishConn(log, localPort) + return nil, errors.New("app type TCP is not supported yet") + case UDP: + return nil, errors.New("app type UDP is not supported yet") + } + + pubLis := &PublishLis{ + PublishInfo: PublishInfo{ + ID: uuid.New(), + SkyAddr: skyAddr, + LocalPort: localPort, + AppType: appType, + }, + skyLis: skyLis, + srv: srv, + conn: conn, + log: log, + nm: nm, + } + + if err := nm.AddPublish(pubLis); err != nil { + return nil, err + } + + return pubLis, nil +} + +// Listen initializes the server based on AppType of the PublishLis. +func (f *PublishLis) Listen() error { + switch f.AppType { + case HTTP: + go func() { + err := f.srv.Serve(f.skyLis) + if err != nil { + // don't print error if local server is closed + if !errors.Is(err, http.ErrServerClosed) { + f.log.WithError(err).Error("error listening and serving app forwarding.") + } + } + }() + case TCP: + // go func() { + // for { + // conn, err := f.skyLis.Accept() + // if err != nil { + // f.log.Errorf("error accepting connection: %v", err) + // return + // } + + // go f.handlePublishTCPConnection(conn) + // } + // }() + return errors.New("app type TCP is not supported yet") + case UDP: + return errors.New("app type UDP is not supported yet") + } + + f.log.Debugf("Serving local HTTP port: %v on SKY Addr %s", f.LocalPort, f.skyLis.Addr().String()) + return nil +} + +// Close closes the server and publish listner. +func (f *PublishLis) Close() (err error) { + f.closeOnce.Do(func() { + switch f.AppType { + case HTTP: + err = f.srv.Close() + case TCP: + // err = f.conn.Close() + return + case UDP: + return + } + err = f.skyLis.Close() + f.nm.RemovePublishListener(f.ID) + }) + return err +} + +func newHTTPPublishServer(localPort int) *http.Server { + r := gin.New() + r.Use(gin.Recovery()) + r.Use(loggingMiddleware()) + authRoute := r.Group("/") + authRoute.Any("/*path", func(c *gin.Context) { + targetURL, _ := url.Parse(fmt.Sprintf("http://127.0.0.1:%v%s?%s", localPort, c.Request.URL.Path, c.Request.URL.RawQuery)) //nolint + proxy := httputil.ReverseProxy{ + Director: func(req *http.Request) { + req.URL = targetURL + req.Host = targetURL.Host + req.Method = c.Request.Method + }, + Transport: &http.Transport{}, + } + proxy.ServeHTTP(c.Writer, c.Request) + }) + // #nosec G112 -- Ignoring potential Slowloris attacks as it the connection to close if the skynet connect is too slow to send the request + srv := &http.Server{ + Handler: r, + // todo(ersonp): Consider setting ReadHeaderTimeout to a reasonable value to address the Slowloris attack vector + } + + return srv +} + +func newTCPPublishConn(log *logging.Logger, localPort int) net.Conn { + + conn, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", localPort)) + if err != nil { + log.Printf("Error connecting to local port %d: %v", localPort, err) + return nil + } + + return conn +} + +func (f *PublishLis) handlePublishTCPConnection(conn net.Conn) { + defer conn.Close() //nolint + + copyConn := func(dst net.Conn, src net.Conn) { + _, err := io.Copy(dst, src) + if err != nil { + f.log.Printf("Error during copy: %v", err) + } + } + + go copyConn(conn, f.conn) + go copyConn(f.conn, conn) +} diff --git a/pkg/app/appnet/skynet.go b/pkg/app/appnet/skynet.go new file mode 100644 index 0000000000..1609fd920b --- /dev/null +++ b/pkg/app/appnet/skynet.go @@ -0,0 +1,256 @@ +// Package appnet pkg/app/appnet/skynet.go +package appnet + +import ( + "fmt" + "net/http" + "sync" + "time" + + "github.com/gin-gonic/gin" + "github.com/google/uuid" +) + +// AppType is the type of the network of the external app that is being published or connected to +type AppType string + +const ( + // TCP is the type of network of the external app that is being published or connected to + TCP AppType = "TCP" + // UDP is the type of network of the external app that is being published or connected to + UDP AppType = "UDP" + // HTTP is the type of network of the external app that is being published or connected to + HTTP AppType = "HTTP" +) + +// NetManager manages all the connections and listeners +type NetManager struct { + listeners map[uuid.UUID]*PublishLis + conns map[uuid.UUID]*ConnectConn + mu sync.Mutex +} + +// NewNetManager creates a new NetManager +func NewNetManager() *NetManager { + return &NetManager{ + listeners: make(map[uuid.UUID]*PublishLis), + conns: make(map[uuid.UUID]*ConnectConn), + } +} + +func (nm *NetManager) isPublishPortAvailable(addr Addr, localPort int, appType AppType) error { + + for _, l := range nm.listeners { + if l.SkyAddr.GetPort() == addr.GetPort() { + return fmt.Errorf("skyport %d is already in use for app type %v", addr.GetPort(), appType) + } + if l.LocalPort == localPort { + return fmt.Errorf("local port %d is already in use for app type %v", localPort, appType) + } + } + return nil +} + +// IsPublishPortAvailable checks if a port and apptype is available for publishing +func (nm *NetManager) IsPublishPortAvailable(addr Addr, localPort int, appType AppType) error { + nm.mu.Lock() + defer nm.mu.Unlock() + + return nm.isPublishPortAvailable(addr, localPort, appType) +} + +// AddPublish adds publishListener to the NetManager +func (nm *NetManager) AddPublish(lis *PublishLis) error { + nm.mu.Lock() + defer nm.mu.Unlock() + + if err := nm.isPublishPortAvailable(lis.SkyAddr, lis.LocalPort, lis.AppType); err != nil { + return err + } + + nm.listeners[lis.ID] = lis + return nil +} + +// GetPublishListener get's a publishListener by ID +func (nm *NetManager) GetPublishListener(id uuid.UUID) *PublishLis { + nm.mu.Lock() + defer nm.mu.Unlock() + + return nm.listeners[id] +} + +// GetAllPublishListeners gets all publishListeners +func (nm *NetManager) GetAllPublishListeners() map[uuid.UUID]*PublishLis { + nm.mu.Lock() + defer nm.mu.Unlock() + + return nm.listeners +} + +// RemovePublishListener removes a publishListener by ID +func (nm *NetManager) RemovePublishListener(id uuid.UUID) { + nm.mu.Lock() + defer nm.mu.Unlock() + + delete(nm.listeners, id) +} + +func (nm *NetManager) isConnectPortAvailable(webPort int) error { + + for _, c := range nm.conns { + if c.WebPort == webPort { + return fmt.Errorf("web port %d is already in use", webPort) + } + } + return nil +} + +// IsConnectPortAvailable checks if a web port is available +func (nm *NetManager) IsConnectPortAvailable(webPort int) error { + nm.mu.Lock() + defer nm.mu.Unlock() + + return nm.isConnectPortAvailable(webPort) +} + +// AddConnect adds ConnectConn to the NetManager +func (nm *NetManager) AddConnect(conn *ConnectConn) error { + nm.mu.Lock() + defer nm.mu.Unlock() + + if err := nm.isConnectPortAvailable(conn.WebPort); err != nil { + return err + } + + nm.conns[conn.ID] = conn + return nil +} + +// GetConnectConn get's a ConnectConn by ID +func (nm *NetManager) GetConnectConn(id uuid.UUID) *ConnectConn { + nm.mu.Lock() + defer nm.mu.Unlock() + + return nm.conns[id] +} + +// GetAllConnectConns gets all ConnectConns +func (nm *NetManager) GetAllConnectConns() map[uuid.UUID]*ConnectConn { + nm.mu.Lock() + defer nm.mu.Unlock() + + return nm.conns +} + +// RemoveConnectConn removes a ConnectConn by ID +func (nm *NetManager) RemoveConnectConn(id uuid.UUID) { + nm.mu.Lock() + defer nm.mu.Unlock() + + delete(nm.conns, id) +} + +// Close closes all the connections and listeners +func (nm *NetManager) Close() error { + nm.mu.Lock() + defer nm.mu.Unlock() + + for _, conn := range nm.conns { + err := conn.Close() + if err != nil { + return err + } + } + + for _, lis := range nm.listeners { + err := lis.Close() + if err != nil { + return err + } + } + + return nil +} + +func loggingMiddleware() gin.HandlerFunc { + return func(c *gin.Context) { + start := time.Now() + c.Next() + latency := time.Since(start) + if latency > time.Minute { + latency = latency.Truncate(time.Second) + } + statusCode := c.Writer.Status() + method := c.Request.Method + path := c.Request.URL.Path + // Get the background color based on the status code + statusCodeBackgroundColor := getBackgroundColor(statusCode) + // Get the method color + methodColor := getMethodColor(method) + // Print the logging in a custom format which includes the publickeyfrom c.Request.RemoteAddr ex.: + // [SKYNET] 2023/05/18 - 19:43:15 | 200 | 10.80885ms | | 02b5ee5333aa6b7f5fc623b7d5f35f505cb7f974e98a70751cf41962f84c8c4637:49153 | GET /node-info.json + fmt.Printf("[SKYNET] %s |%s %3d %s| %13v | %15s | %72s |%s %-7s %s %s\n", + time.Now().Format("2006/01/02 - 15:04:05"), + statusCodeBackgroundColor, + statusCode, + resetColor(), + latency, + c.ClientIP(), + c.Request.RemoteAddr, + methodColor, + method, + resetColor(), + path, + ) + } +} + +func getBackgroundColor(statusCode int) string { + switch { + case statusCode >= http.StatusOK && statusCode < http.StatusMultipleChoices: + return green + case statusCode >= http.StatusMultipleChoices && statusCode < http.StatusBadRequest: + return white + case statusCode >= http.StatusBadRequest && statusCode < http.StatusInternalServerError: + return yellow + default: + return red + } +} + +func getMethodColor(method string) string { + switch method { + case http.MethodGet: + return blue + case http.MethodPost: + return cyan + case http.MethodPut: + return yellow + case http.MethodDelete: + return red + case http.MethodPatch: + return green + case http.MethodHead: + return magenta + case http.MethodOptions: + return white + default: + return reset + } +} + +func resetColor() string { + return reset +} + +const ( + green = "\033[97;42m" + white = "\033[90;47m" + yellow = "\033[90;43m" + red = "\033[97;41m" + blue = "\033[97;44m" + magenta = "\033[97;45m" + cyan = "\033[97;46m" + reset = "\033[0m" +) diff --git a/pkg/skyenv/skyenv.go b/pkg/skyenv/skyenv.go index 518a592a5c..59a1e47a73 100644 --- a/pkg/skyenv/skyenv.go +++ b/pkg/skyenv/skyenv.go @@ -59,15 +59,13 @@ const ( // TODO(darkrengarius): this one's not needed for the app to run but lack of it causes errors - VPNClientPort uint16 = 43 // VPNClientPort ... - ExampleServerName = "example-server-app" // ExampleServerName ... - ExampleServerPort uint16 = 45 // ExampleServerPort ... - ExampleClientName = "example-client-app" // ExampleClientName ... - ExampleClientPort uint16 = 46 // ExampleClientPort ... - SkyForwardingServerName = "sky-forwarding" // SkyForwardingServerName ... - SkyForwardingServerPort uint16 = 47 // SkyForwardingServerPort ... - SkyPingName = "sky-ping" // SkyPingName ... - SkyPingPort uint16 = 48 // SkyPingPort ... + VPNClientPort uint16 = 43 // VPNClientPort ... + ExampleServerName = "example-server-app" // ExampleServerName ... + ExampleServerPort uint16 = 45 // ExampleServerPort ... + ExampleClientName = "example-client-app" // ExampleClientName ... + ExampleClientPort uint16 = 46 // ExampleClientPort ... + SkyPingName = "sky-ping" // SkyPingName ... + SkyPingPort uint16 = 48 // SkyPingPort ... // RPC constants. diff --git a/pkg/visor/api.go b/pkg/visor/api.go index 8e63206da8..c66c7f1c65 100644 --- a/pkg/visor/api.go +++ b/pkg/visor/api.go @@ -116,12 +116,12 @@ type API interface { RouteGroups() ([]RouteGroupInfo, error) SetMinHops(uint16) error - RegisterHTTPPort(localPort int) error - DeregisterHTTPPort(localPort int) error - ListHTTPPorts() ([]int, error) - Connect(remotePK cipher.PubKey, remotePort, localPort int) (uuid.UUID, error) + Publish(localPort, skyPort int, appType appnet.AppType) (appnet.PublishInfo, error) + Depublish(id uuid.UUID) error + ListPublished() ([]appnet.PublishInfo, error) + Connect(remotePK cipher.PubKey, remotePort, localPort int, appType appnet.AppType) (appnet.ConnectInfo, error) Disconnect(id uuid.UUID) error - List() (map[uuid.UUID]*appnet.ForwardConn, error) + ListConnected() ([]appnet.ConnectInfo, error) DialPing(config PingConfig) error Ping(config PingConfig) ([]time.Duration, error) StopPing(pk cipher.PubKey) error @@ -400,7 +400,7 @@ func (v *Visor) DeleteRewardAddress() error { path := v.conf.LocalPath + "/" + visorconfig.RewardFile err := os.Remove(path) if err != nil { - return fmt.Errorf("Error deleting file. err=%v", err) + return fmt.Errorf("error deleting file. err=%v", err) } return nil } @@ -585,7 +585,7 @@ func (v *Visor) FetchUptimeTrackerData(pk string) ([]byte, error) { if pk != "" { err := pubkey.Set(pk) if err != nil { - return body, fmt.Errorf("Invalid or missing public key") + return body, fmt.Errorf("invalid or missing public key") } } if v.uptimeTracker == nil { @@ -611,7 +611,7 @@ func (v *Visor) StartSkysocksClient(serverKey string) error { for index, app := range v.conf.Launcher.Apps { if app.Name == visorconfig.SkysocksClientName { if v.GetSkysocksClientAddress() == "" && serverKey == "" { - return errors.New("Skysocks server pub key is missing") + return errors.New("skysocks server pub key is missing") } if serverKey != "" { @@ -1279,7 +1279,7 @@ func (v *Visor) DialPing(conf PingConfig) error { skywireConn, isSkywireConn := conn.(*appnet.SkywireConn) if !isSkywireConn { - return fmt.Errorf("Can't get such info from this conn") + return fmt.Errorf("can't get such info from this conn") } v.pingConnMx.Lock() v.pingConns[conf.PK] = ping{ @@ -1543,132 +1543,105 @@ func (v *Visor) IsDMSGClientReady() (bool, error) { return false, errors.New("dmsg client is not ready") } -// RegisterHTTPPort implements API. -func (v *Visor) RegisterHTTPPort(localPort int) error { - v.allowedMX.Lock() - defer v.allowedMX.Unlock() - ok := isPortAvailable(v.log, localPort) - if ok { - return fmt.Errorf("No connection on local port :%v", localPort) - } - if v.allowedPorts[localPort] { - return fmt.Errorf("Port :%v already registered", localPort) - } - v.allowedPorts[localPort] = true - return nil -} - -// DeregisterHTTPPort implements API. -func (v *Visor) DeregisterHTTPPort(localPort int) error { - v.allowedMX.Lock() - defer v.allowedMX.Unlock() - if !v.allowedPorts[localPort] { - return fmt.Errorf("Port :%v not registered", localPort) - } - delete(v.allowedPorts, localPort) - return nil -} +// Connect implements API. +func (v *Visor) Connect(remotePK cipher.PubKey, remotePort, webPort int, appType appnet.AppType) (appnet.ConnectInfo, error) { -// ListHTTPPorts implements API. -func (v *Visor) ListHTTPPorts() ([]int, error) { - v.allowedMX.Lock() - defer v.allowedMX.Unlock() - keys := make([]int, 0, len(v.allowedPorts)) - for k := range v.allowedPorts { - keys = append(keys, k) + if err := v.nM.IsConnectPortAvailable(webPort); err != nil { + return appnet.ConnectInfo{}, err } - return keys, nil -} -// Connect implements API. -func (v *Visor) Connect(remotePK cipher.PubKey, remotePort, localPort int) (uuid.UUID, error) { - ok := isPortAvailable(v.log, localPort) - if !ok { - return uuid.UUID{}, fmt.Errorf(":%v local port already in use", localPort) - } - connApp := appnet.Addr{ + remoteAddr := appnet.Addr{ Net: appnet.TypeSkynet, PubKey: remotePK, - Port: routing.Port(skyenv.SkyForwardingServerPort), - } - conn, err := appnet.Dial(connApp) - if err != nil { - return uuid.UUID{}, err - } - remoteConn, err := appnet.WrapConn(conn) - if err != nil { - return uuid.UUID{}, err - } - - cMsg := clientMsg{ - Port: remotePort, + Port: routing.Port(remotePort), } - clientMsg, err := json.Marshal(cMsg) + conn, err := appnet.Dial(remoteAddr) if err != nil { - return uuid.UUID{}, err + return appnet.ConnectInfo{}, err } - _, err = remoteConn.Write([]byte(clientMsg)) + remoteConn, err := appnet.WrapConn(conn) if err != nil { - return uuid.UUID{}, err + return appnet.ConnectInfo{}, err } - v.log.Debugf("Msg sent %s", clientMsg) - buf := make([]byte, 32*1024) - n, err := remoteConn.Read(buf) + connectConn, err := appnet.NewConnectConn(v.log, v.nM, remoteConn, remoteAddr, webPort, appType) if err != nil { - return uuid.UUID{}, err + return appnet.ConnectInfo{}, err } - var sReply serverReply - err = json.Unmarshal(buf[:n], &sReply) + err = connectConn.Serve() if err != nil { - return uuid.UUID{}, err + return appnet.ConnectInfo{}, err } - v.log.Debugf("Received: %v", sReply) - if sReply.Error != nil { - sErr := sReply.Error - v.log.WithError(fmt.Errorf(*sErr)).Error("Server closed with error") - return uuid.UUID{}, fmt.Errorf(*sErr) - } - forwardConn := appnet.NewForwardConn(v.log, remoteConn, remotePort, localPort) - forwardConn.Serve() - return forwardConn.ID, nil + return connectConn.ConnectInfo, nil } // Disconnect implements API. func (v *Visor) Disconnect(id uuid.UUID) error { - forwardConn := appnet.GetForwardConn(id) - return forwardConn.Close() + connectConn := v.nM.GetConnectConn(id) + if connectConn == nil { + return ErrNotFound + } + return connectConn.Close() } -// List implements API. -func (v *Visor) List() (map[uuid.UUID]*appnet.ForwardConn, error) { - return appnet.GetAllForwardConns(), nil +// ListConnected implements API. +func (v *Visor) ListConnected() ([]appnet.ConnectInfo, error) { + cons := v.nM.GetAllConnectConns() + var connectInfos []appnet.ConnectInfo + for _, con := range cons { + connectInfos = append(connectInfos, con.ConnectInfo) + } + return connectInfos, nil } -func isPortAvailable(log *logging.Logger, port int) bool { - timeout := time.Second - conn, err := net.DialTimeout("tcp", fmt.Sprintf(":%v", port), timeout) +// Publish implements API. +func (v *Visor) Publish(localPort, skyPort int, appType appnet.AppType) (appnet.PublishInfo, error) { + + addr := appnet.Addr{ + Net: appnet.TypeSkynet, + PubKey: v.conf.PK, + Port: routing.Port(skyPort), + } + + if err := v.nM.IsPublishPortAvailable(addr, localPort, appType); err != nil { + return appnet.PublishInfo{}, err + } + + lis, err := appnet.Listen(addr) if err != nil { - return true + return appnet.PublishInfo{}, err } - if conn != nil { - defer closeConn(log, conn) - return false + + publishLis, err := appnet.NewPublishListener(v.log, v.nM, lis, localPort, addr, appType) + if err != nil { + return appnet.PublishInfo{}, err } - return true -} -func isPortRegistered(port int, v *Visor) bool { - ports, err := v.ListHTTPPorts() + err = publishLis.Listen() if err != nil { - return false + return appnet.PublishInfo{}, err } - for _, p := range ports { - if p == port { - return true - } + + return publishLis.PublishInfo, nil +} + +// Depublish implements API. +func (v *Visor) Depublish(id uuid.UUID) error { + publishConn := v.nM.GetPublishListener(id) + if publishConn == nil { + return ErrNotFound + } + return publishConn.Close() +} + +// ListPublished implements API. +func (v *Visor) ListPublished() ([]appnet.PublishInfo, error) { + liss := v.nM.GetAllPublishListeners() + var publishInfos []appnet.PublishInfo + for _, lis := range liss { + publishInfos = append(publishInfos, lis.PublishInfo) } - return false + return publishInfos, nil } diff --git a/pkg/visor/init.go b/pkg/visor/init.go index 4f341c1b88..2c4c074617 100644 --- a/pkg/visor/init.go +++ b/pkg/visor/init.go @@ -2,8 +2,6 @@ package visor import ( - "bufio" - "bytes" "context" "encoding/json" "errors" @@ -130,8 +128,8 @@ var ( dmsgHTTP vinit.Module // Dmsg trackers module dmsgTrackers vinit.Module - // Skywire Forwarding conn module - skyFwd vinit.Module + // Skywire Net module + skyNet vinit.Module // Ping module pi vinit.Module // visor that groups all modules together @@ -175,12 +173,12 @@ func registerModules(logger *logging.MasterLogger) { trs = maker("transport_setup", initTransportSetup, &dmsgC, &tr) tm = vinit.MakeModule("transports", vinit.DoNothing, logger, &sc, &sudphC, &dmsgCtrl, &dmsgHTTPLogServer, &dmsgTrackers, &launch) pvs = maker("public_visor", initPublicVisor, &tr, &ar, &disc, &stcprC) - skyFwd = maker("sky_forward_conn", initSkywireForwardConn, &dmsgC, &dmsgCtrl, &tr, &launch) + skyNet = maker("sky_net", initSkywireNet, &dmsgC, &dmsgCtrl, &tr, &launch) pi = maker("ping", initPing, &dmsgC, &tm) tc = maker("transportable", initEnsureVisorIsTransportable, &dmsgC, &tm) tpdco = maker("tpd_concurrency", initEnsureTPDConcurrency, &dmsgC, &tm) vis = vinit.MakeModule("visor", vinit.DoNothing, logger, &ebc, &ar, &disc, &pty, - &tr, &rt, &launch, &cli, &hvs, &ut, &pv, &pvs, &trs, &stcpC, &stcprC, &skyFwd, &pi, &systemSurvey, &tc, &tpdco) + &tr, &rt, &launch, &cli, &hvs, &ut, &pv, &pvs, &trs, &stcpC, &stcprC, &skyNet, &pi, &systemSurvey, &tc, &tpdco) hv = maker("hypervisor", initHypervisor, &vis) } @@ -612,182 +610,26 @@ func initTransportSetup(ctx context.Context, v *Visor, log *logging.Logger) erro return nil } -func initSkywireForwardConn(ctx context.Context, v *Visor, log *logging.Logger) error { - ctx, cancel := context.WithCancel(ctx) +func initSkywireNet(_ context.Context, v *Visor, log *logging.Logger) error { // waiting for at least one transport to initialize <-v.tpM.Ready() - connApp := appnet.Addr{ - Net: appnet.TypeSkynet, - PubKey: v.conf.PK, - Port: routing.Port(skyenv.SkyForwardingServerPort), - } - l, err := appnet.ListenContext(ctx, connApp) - if err != nil { - cancel() - return err - } - v.pushCloseStack("sky_forwarding", func() error { - cancel() - if cErr := l.Close(); cErr != nil { + nM := appnet.NewNetManager() + + v.pushCloseStack("sky_net", func() error { + if cErr := nM.Close(); cErr != nil { log.WithError(cErr).Error("Error closing listener.") } return nil }) - go func() { - for { - log.Debug("Accepting sky forwarding conn...") - conn, err := l.Accept() - if err != nil { - if !errors.Is(appnet.ErrClosedConn, err) { - log.WithError(err).Error("Failed to accept conn") - } - return - } - log.Debug("Accepted sky forwarding conn") - - v.pushCloseStack("sky_forwarding", func() error { - cancel() - if cErr := conn.Close(); cErr != nil { - log.WithError(cErr).Error("Error closing conn.") - } - return nil - }) - - log.Debug("Wrapping conn...") - wrappedConn, err := appnet.WrapConn(conn) - if err != nil { - log.WithError(err).Error("Failed to wrap conn") - return - } - - rAddr := wrappedConn.RemoteAddr().(appnet.Addr) - log.Debugf("Accepted sky forwarding conn on %s from %s", wrappedConn.LocalAddr(), rAddr.PubKey) - go handleServerConn(log, wrappedConn, v) - } - }() + v.initLock.Lock() + v.nM = nM + v.initLock.Unlock() return nil } -func handleServerConn(log *logging.Logger, remoteConn net.Conn, v *Visor) { - buf := make([]byte, 32*1024) - n, err := remoteConn.Read(buf) - if err != nil { - log.WithError(err).Error("Failed to read packet") - return - } - - var cMsg clientMsg - err = json.Unmarshal(buf[:n], &cMsg) - if err != nil { - log.WithError(err).Error("Failed to marshal json") - sendError(log, remoteConn, err) - return - } - log.Debugf("Received: %v", cMsg) - - lHost := fmt.Sprintf("localhost:%v", cMsg.Port) - ok := isPortRegistered(cMsg.Port, v) - if !ok { - log.Errorf("Port :%v not registered", cMsg.Port) - sendError(log, remoteConn, fmt.Errorf("Port :%v not registered", cMsg.Port)) - return - } - - ok = isPortAvailable(log, cMsg.Port) - if ok { - log.Errorf("Failed to dial port %v", cMsg.Port) - sendError(log, remoteConn, fmt.Errorf("Failed to dial port %v", cMsg.Port)) - return - } - - log.Debugf("Forwarding %s", lHost) - - // send nil error to indicate to the remote connection that everything is ok - sendError(log, remoteConn, nil) - - go forward(log, remoteConn, lHost) -} - -// forward reads a http.Request from the remote conn of the requesting visor forwards that request -// to the requested local server and forwards the http.Response from the local server to the requesting -// visor via the remote conn -func forward(log *logging.Logger, remoteConn net.Conn, lHost string) { - for { - buf := make([]byte, 32*1024) - n, err := remoteConn.Read(buf) - if err != nil { - log.WithError(err).Error("Failed to read packet") - closeConn(log, remoteConn) - return - } - req, err := http.ReadRequest(bufio.NewReader(bytes.NewBuffer(buf[:n]))) - if err != nil { - log.WithError(err).Error("Failed to ReadRequest") - closeConn(log, remoteConn) - return - } - req.RequestURI = "" - req.URL.Scheme = "http" - req.URL.Host = lHost - client := http.Client{} - resp, err := client.Do(req) - if err != nil { - log.WithError(err).Error("Failed to Do req") - closeConn(log, remoteConn) - return - } - err = resp.Write(remoteConn) - if err != nil { - log.WithError(err).Error("Failed to Write") - closeConn(log, remoteConn) - return - } - } -} - -func sendError(log *logging.Logger, remoteConn net.Conn, sendErr error) { - var sReply serverReply - if sendErr != nil { - sErr := sendErr.Error() - sReply = serverReply{ - Error: &sErr, - } - } - - srvReply, err := json.Marshal(sReply) - if err != nil { - log.WithError(err).Error("Failed to unmarshal json") - } - - _, err = remoteConn.Write([]byte(srvReply)) - if err != nil { - log.WithError(err).Error("Failed write server msg") - } - - log.Debugf("Server reply sent %s", srvReply) - // close conn if we send an error - if sendErr != nil { - closeConn(log, remoteConn) - } -} - -func closeConn(log *logging.Logger, conn net.Conn) { - if err := conn.Close(); err != nil { - log.WithError(err).Errorf("Error closing client %s connection", conn.RemoteAddr()) - } -} - -type clientMsg struct { - Port int `json:"port"` -} - -type serverReply struct { - Error *string `json:"error,omitempty"` -} - func initPing(ctx context.Context, v *Visor, log *logging.Logger) error { ctx, cancel := context.WithCancel(ctx) // waiting for at least one transport to initialize diff --git a/pkg/visor/rpc.go b/pkg/visor/rpc.go index 2b713bf8a6..e106727f47 100644 --- a/pkg/visor/rpc.go +++ b/pkg/visor/rpc.go @@ -727,38 +727,19 @@ func (r *RPC) IsDMSGClientReady(_ *struct{}, out *bool) (err error) { return err } -// RegisterHTTPPort registers the local port to be accessed by remote visors -func (r *RPC) RegisterHTTPPort(port *int, _ *struct{}) (err error) { - defer rpcutil.LogCall(r.log, "RegisterHTTPPort", port)(nil, &err) - return r.visor.RegisterHTTPPort(*port) -} - -// DeregisterHTTPPort deregisters the local port that can be accessed by remote visors -func (r *RPC) DeregisterHTTPPort(port *int, _ *struct{}) (err error) { - defer rpcutil.LogCall(r.log, "DeregisterHTTPPort", port)(nil, &err) - return r.visor.DeregisterHTTPPort(*port) -} - -// ListHTTPPorts lists all the local por that can be accessed by remote visors -func (r *RPC) ListHTTPPorts(_ *struct{}, out *[]int) (err error) { - defer rpcutil.LogCall(r.log, "ListHTTPPorts", nil)(out, &err) - ports, err := r.visor.ListHTTPPorts() - *out = ports - return err -} - // ConnectIn is input for Connect. type ConnectIn struct { RemotePK cipher.PubKey RemotePort int LocalPort int + AppType appnet.AppType } // Connect creates a connection with the remote visor to listen on the remote port and serve that on the local port -func (r *RPC) Connect(in *ConnectIn, out *uuid.UUID) (err error) { +func (r *RPC) Connect(in *ConnectIn, out *appnet.ConnectInfo) (err error) { defer rpcutil.LogCall(r.log, "Connect", in)(out, &err) - id, err := r.visor.Connect(in.RemotePK, in.RemotePort, in.LocalPort) + id, err := r.visor.Connect(in.RemotePK, in.RemotePort, in.LocalPort, in.AppType) *out = id return err } @@ -770,11 +751,42 @@ func (r *RPC) Disconnect(id *uuid.UUID, _ *struct{}) (err error) { return err } -// List returns all the ongoing skyforwarding connections -func (r *RPC) List(_ *struct{}, out *map[uuid.UUID]*appnet.ForwardConn) (err error) { - defer rpcutil.LogCall(r.log, "List", nil)(out, &err) - proxies, err := r.visor.List() - *out = proxies +// ListConnected lists all the sky connections that are connected +func (r *RPC) ListConnected(_ *struct{}, out *[]appnet.ConnectInfo) (err error) { + defer rpcutil.LogCall(r.log, "ListConnected", nil)(out, &err) + conns, err := r.visor.ListConnected() + *out = conns + return err +} + +// PublishIn is input for Publish. +type PublishIn struct { + LocalPort int + SkyPort int + AppType appnet.AppType +} + +// Publish publishes a listner for the local port to the skyport +func (r *RPC) Publish(in *PublishIn, out *appnet.PublishInfo) (err error) { + defer rpcutil.LogCall(r.log, "Publish", in)(out, &err) + + id, err := r.visor.Publish(in.LocalPort, in.SkyPort, in.AppType) + *out = id + return err +} + +// Depublish removes the published port with the given id +func (r *RPC) Depublish(id *uuid.UUID, _ *struct{}) (err error) { + defer rpcutil.LogCall(r.log, "Depublish", id)(nil, &err) + err = r.visor.Depublish(*id) + return err +} + +// ListPublished lists all the local ports that are being published +func (r *RPC) ListPublished(_ *struct{}, out *[]appnet.PublishInfo) (err error) { + defer rpcutil.LogCall(r.log, "ListPublished", nil)(out, &err) + liss, err := r.visor.ListPublished() + *out = liss return err } diff --git a/pkg/visor/rpc_client.go b/pkg/visor/rpc_client.go index ab8381a6b2..a89fe06682 100644 --- a/pkg/visor/rpc_client.go +++ b/pkg/visor/rpc_client.go @@ -556,12 +556,13 @@ func (rc *rpcClient) IsDMSGClientReady() (bool, error) { } // Connect calls Connect. -func (rc *rpcClient) Connect(remotePK cipher.PubKey, remotePort, localPort int) (uuid.UUID, error) { - var out uuid.UUID +func (rc *rpcClient) Connect(remotePK cipher.PubKey, remotePort, localPort int, appType appnet.AppType) (appnet.ConnectInfo, error) { + var out appnet.ConnectInfo err := rc.Call("Connect", &ConnectIn{ RemotePK: remotePK, RemotePort: remotePort, LocalPort: localPort, + AppType: appType, }, &out) return out, err } @@ -572,28 +573,34 @@ func (rc *rpcClient) Disconnect(id uuid.UUID) error { return err } -// List calls List. -func (rc *rpcClient) List() (map[uuid.UUID]*appnet.ForwardConn, error) { - var out map[uuid.UUID]*appnet.ForwardConn - err := rc.Call("List", &struct{}{}, &out) +// Publish calls Publish. +func (rc *rpcClient) Publish(localPort int, skyPort int, appType appnet.AppType) (appnet.PublishInfo, error) { + var out appnet.PublishInfo + err := rc.Call("Publish", &PublishIn{ + LocalPort: localPort, + SkyPort: skyPort, + AppType: appType, + }, &out) return out, err } -// RegisterHTTPPort calls RegisterHTTPPort. -func (rc *rpcClient) RegisterHTTPPort(localPort int) error { - return rc.Call("RegisterHTTPPort", &localPort, &struct{}{}) +// Depublish calls Depublish. +func (rc *rpcClient) Depublish(id uuid.UUID) error { + err := rc.Call("Depublish", &id, &struct{}{}) + return err } -// DeregisterHTTPPort calls DeregisterHTTPPort. -func (rc *rpcClient) DeregisterHTTPPort(localPort int) error { - err := rc.Call("DeregisterHTTPPort", &localPort, &struct{}{}) - return err +// ListPublished calls ListPublished. +func (rc *rpcClient) ListPublished() ([]appnet.PublishInfo, error) { + var out []appnet.PublishInfo + err := rc.Call("ListPublished", &struct{}{}, &out) + return out, err } -// ListHTTPPorts calls ListHTTPPorts. -func (rc *rpcClient) ListHTTPPorts() ([]int, error) { - var out []int - err := rc.Call("ListHTTPPorts", &struct{}{}, &out) +// ListConnected calls ListConnected. +func (rc *rpcClient) ListConnected() ([]appnet.ConnectInfo, error) { + var out []appnet.ConnectInfo + err := rc.Call("ListConnected", &struct{}{}, &out) return out, err } @@ -1311,8 +1318,8 @@ func (mc *mockRPCClient) IsDMSGClientReady() (bool, error) { } // Connect implements API. -func (mc *mockRPCClient) Connect(remotePK cipher.PubKey, remotePort, localPort int) (uuid.UUID, error) { //nolint:all - return uuid.UUID{}, nil +func (mc *mockRPCClient) Connect(remotePK cipher.PubKey, remotePort, localPort int, appType appnet.AppType) (appnet.ConnectInfo, error) { //nolint:all + return appnet.ConnectInfo{}, nil } // Disconnect implements API. @@ -1320,23 +1327,23 @@ func (mc *mockRPCClient) Disconnect(id uuid.UUID) error { //nolint:all return nil } -// List implements API. -func (mc *mockRPCClient) List() (map[uuid.UUID]*appnet.ForwardConn, error) { - return nil, nil +// Publish implements API. +func (mc *mockRPCClient) Publish(localPort int, skyPort int, appType appnet.AppType) (appnet.PublishInfo, error) { //nolint:all + return appnet.PublishInfo{}, nil } -// RegisterHTTPPort implements API. -func (mc *mockRPCClient) RegisterHTTPPort(localPort int) error { //nolint:all +// Depublish implements API. +func (mc *mockRPCClient) Depublish(id uuid.UUID) error { //nolint:all return nil } -// DeregisterHTTPPort implements API. -func (mc *mockRPCClient) DeregisterHTTPPort(localPort int) error { //nolint:all - return nil +// List implements API. +func (mc *mockRPCClient) ListConnected() ([]appnet.ConnectInfo, error) { + return nil, nil } // ListHTTPPorts implements API. -func (mc *mockRPCClient) ListHTTPPorts() ([]int, error) { +func (mc *mockRPCClient) ListPublished() ([]appnet.PublishInfo, error) { return nil, nil } diff --git a/pkg/visor/visor.go b/pkg/visor/visor.go index 4fb66884a6..208244072a 100644 --- a/pkg/visor/visor.go +++ b/pkg/visor/visor.go @@ -93,6 +93,7 @@ type Visor struct { arClient addrresolver.APIClient router router.Router rfClient rfclient.Client + nM *appnet.NetManager procM appserver.ProcManager // proc manager appL *launcher.AppLauncher // app launcher @@ -406,15 +407,6 @@ func (v *Visor) Close() error { log := v.MasterLogger().PackageLogger("visor:shutdown") log.Info("Begin shutdown.") - // Cleanly close ongoing forward conns - for _, forwardConn := range appnet.GetAllForwardConns() { - err := forwardConn.Close() - if err != nil { - log.WithError(err).Warn("Forward conn stopped with unexpected result.") - continue - } - } - for i := len(v.closeStack) - 1; i >= 0; i-- { cl := v.closeStack[i] diff --git a/pkg/visor/visorconfig/values.go b/pkg/visor/visorconfig/values.go index df50d143fd..8db28f0193 100644 --- a/pkg/visor/visorconfig/values.go +++ b/pkg/visor/visorconfig/values.go @@ -87,14 +87,12 @@ var ( VPNClientPort = skyenv.VPNClientPort // VPNClientPort ... - ExampleServerName = skyenv.ExampleServerName // ExampleServerName ... - ExampleServerPort = skyenv.ExampleServerPort // ExampleServerPort ... - ExampleClientName = skyenv.ExampleClientName // ExampleClientName ... - ExampleClientPort = skyenv.ExampleClientPort // ExampleClientPort ... - SkyForwardingServerName = skyenv.SkyForwardingServerName // SkyForwardingServerName ... - SkyForwardingServerPort = skyenv.SkyForwardingServerPort // SkyForwardingServerPort ... - SkyPingName = skyenv.SkyPingName // SkyPingName ... - SkyPingPort = skyenv.SkyPingPort // SkyPingPort ... + ExampleServerName = skyenv.ExampleServerName // ExampleServerName ... + ExampleServerPort = skyenv.ExampleServerPort // ExampleServerPort ... + ExampleClientName = skyenv.ExampleClientName // ExampleClientName ... + ExampleClientPort = skyenv.ExampleClientPort // ExampleClientPort ... + SkyPingName = skyenv.SkyPingName // SkyPingName ... + SkyPingPort = skyenv.SkyPingPort // SkyPingPort ... // RPC constants.