diff --git a/README.md b/README.md index 711148d..d90e651 100644 --- a/README.md +++ b/README.md @@ -90,6 +90,7 @@ Additionally, we provide scripts to read logs from your function and to wipe all This tinyFaaS prototype only supports functions written for NodeJS 20, Python 3.9, and binary functions. A good place to get started with writing functions is the selection of test functions in [`./test/fns`](./test/fns/). +HTTP headers and GRPC Metadata are accessible in NodeJS and Python functions as key values. Check the "show-headers" test functions for more information. #### NodeJS 20 diff --git a/pkg/coap/coap.go b/pkg/coap/coap.go index 4a82cd0..6d29d32 100644 --- a/pkg/coap/coap.go +++ b/pkg/coap/coap.go @@ -8,6 +8,8 @@ import ( "github.com/pfandzelter/go-coap" ) +const async = false + func Start(r *rproxy.RProxy, listenAddr string) { h := coap.FuncHandler( @@ -17,8 +19,6 @@ func Start(r *rproxy.RProxy, listenAddr string) { log.Printf("is confirmable: %v", m.IsConfirmable()) log.Printf("path: %s", m.PathString()) - async := false - p := m.PathString() for p != "" && p[0] == '/' { @@ -27,7 +27,7 @@ func Start(r *rproxy.RProxy, listenAddr string) { log.Printf("have request for path: %s (async: %v)", p, async) - s, res := r.Call(p, m.Payload, async) + s, res := r.Call(p, m.Payload, async, nil) mes := &coap.Message{ Type: coap.Acknowledgement, diff --git a/pkg/docker/runtimes/python3/functionhandler.py b/pkg/docker/runtimes/python3/functionhandler.py index 748e596..08af6a8 100644 --- a/pkg/docker/runtimes/python3/functionhandler.py +++ b/pkg/docker/runtimes/python3/functionhandler.py @@ -32,8 +32,11 @@ def do_POST(self) -> None: if d == "": d = None + # Read headers into a dictionary + headers: typing.Dict[str, str] = {k: v for k, v in self.headers.items()} + try: - res = fn.fn(d) + res = fn.fn(d, headers) self.send_response(200) self.end_headers() if res is not None: diff --git a/pkg/grpc/grpc.go b/pkg/grpc/grpc.go index 91d32db..73ed25f 100644 --- a/pkg/grpc/grpc.go +++ b/pkg/grpc/grpc.go @@ -9,6 +9,7 @@ import ( "github.com/OpenFogStack/tinyFaaS/pkg/grpc/tinyfaas" "github.com/OpenFogStack/tinyFaaS/pkg/rproxy" "google.golang.org/grpc" + "google.golang.org/grpc/metadata" ) // GRPCServer is the grpc endpoint for this tinyFaaS instance. @@ -21,7 +22,21 @@ func (gs *GRPCServer) Request(ctx context.Context, d *tinyfaas.Data) (*tinyfaas. log.Printf("have request for path: %s (async: %v)", d.FunctionIdentifier, false) - s, res := gs.r.Call(d.FunctionIdentifier, []byte(d.Data), false) + // Extract metadata from the gRPC context + md, ok := metadata.FromIncomingContext(ctx) + headers := make(map[string]string) + if ok { + // Convert metadata to map[string]string + for k, v := range md { + if len(v) > 0 { + headers[k] = v[0] + } + } + } else { + log.Print("failed to extract metadata from context, using empty headers GRPC request") + } + + s, res := gs.r.Call(d.FunctionIdentifier, []byte(d.Data), false, headers) switch s { case rproxy.StatusOK: diff --git a/pkg/http/http.go b/pkg/http/http.go index 4bd9bae..81af12d 100644 --- a/pkg/http/http.go +++ b/pkg/http/http.go @@ -31,7 +31,12 @@ func Start(r *rproxy.RProxy, listenAddr string) { return } - s, res := r.Call(p, req_body, async) + headers := make(map[string]string) + for k, v := range req.Header { + headers[k] = v[0] + } + + s, res := r.Call(p, req_body, async, headers) switch s { case rproxy.StatusOK: diff --git a/pkg/rproxy/rproxy.go b/pkg/rproxy/rproxy.go index 4ddec27..af4137f 100644 --- a/pkg/rproxy/rproxy.go +++ b/pkg/rproxy/rproxy.go @@ -7,6 +7,7 @@ import ( "log" "math/rand" "net/http" + "regexp" "sync" ) @@ -59,7 +60,7 @@ func (r *RProxy) Del(name string) error { return nil } -func (r *RProxy) Call(name string, payload []byte, async bool) (Status, []byte) { +func (r *RProxy) Call(name string, payload []byte, async bool, headers map[string]string) (Status, []byte) { handler, ok := r.hosts[name] @@ -75,18 +76,25 @@ func (r *RProxy) Call(name string, payload []byte, async bool) (Status, []byte) log.Printf("chosen handler: %s", h) - // call function + req, err := http.NewRequest("POST", fmt.Sprintf("http://%s:8000/fn", h), bytes.NewBuffer(payload)) + if err != nil { + log.Print(err) + return StatusError, nil + } + for k, v := range headers { + cleanedKey := cleanHeaderKey(k) // remove special chars from key + req.Header.Set(cleanedKey, v) + } + + // call function asynchronously if async { log.Printf("async request accepted") go func() { - resp, err := http.Post(fmt.Sprintf("http://%s:8000/fn", h), "application/binary", bytes.NewBuffer(payload)) - - if err != nil { + resp, err2 := http.DefaultClient.Do(req) + if err2 != nil { return } - resp.Body.Close() - log.Printf("async request finished") }() return StatusAccepted, nil @@ -94,8 +102,7 @@ func (r *RProxy) Call(name string, payload []byte, async bool) (Status, []byte) // call function and return results log.Printf("sync request starting") - resp, err := http.Post(fmt.Sprintf("http://%s:8000/fn", h), "application/binary", bytes.NewBuffer(payload)) - + resp, err := http.DefaultClient.Do(req) if err != nil { log.Print(err) return StatusError, nil @@ -115,3 +122,9 @@ func (r *RProxy) Call(name string, payload []byte, async bool) (Status, []byte) return StatusOK, res_body } +func cleanHeaderKey(key string) string { + // a regex pattern to match special characters + re := regexp.MustCompile(`[:()<>@,;:\"/[\]?={} \t]`) + // Replace special characters with an empty string + return re.ReplaceAllString(key, "") +} diff --git a/test/fns/echo-js/index.js b/test/fns/echo-js/index.js index 414500e..493ff92 100644 --- a/test/fns/echo-js/index.js +++ b/test/fns/echo-js/index.js @@ -1,6 +1,7 @@ module.exports = (req, res) => { const response = req.body; + const headers = req.headers; // headers from the http request or GRPC metadata console.log(response); diff --git a/test/fns/echo/fn.py b/test/fns/echo/fn.py index d934c9f..7b8231a 100644 --- a/test/fns/echo/fn.py +++ b/test/fns/echo/fn.py @@ -2,6 +2,6 @@ import typing -def fn(input: typing.Optional[str]) -> typing.Optional[str]: +def fn(input: typing.Optional[str], headers: typing.Optional[typing.Dict[str, str]]) -> typing.Optional[str]: """echo the input""" return input diff --git a/test/fns/show-headers-js/index.js b/test/fns/show-headers-js/index.js new file mode 100644 index 0000000..e50a16f --- /dev/null +++ b/test/fns/show-headers-js/index.js @@ -0,0 +1,9 @@ + +module.exports = (req, res) => { + const body = req.body; + const headers = req.headers; // headers from the http request or GRPC metadata + + console.log("Headers:", headers); + + res.send(headers); +} diff --git a/test/fns/show-headers-js/package.json b/test/fns/show-headers-js/package.json new file mode 100644 index 0000000..f04076a --- /dev/null +++ b/test/fns/show-headers-js/package.json @@ -0,0 +1 @@ +{"name": "fn", "version": "1.0.0", "main": "index.js"} \ No newline at end of file diff --git a/test/fns/show-headers/fn.py b/test/fns/show-headers/fn.py new file mode 100644 index 0000000..775f3d2 --- /dev/null +++ b/test/fns/show-headers/fn.py @@ -0,0 +1,9 @@ +import json +import typing + +def fn(input: typing.Optional[str], headers: typing.Optional[typing.Dict[str, str]]) -> typing.Optional[str]: + """echo the input""" + if headers is not None: + return json.dumps(headers) + else: + return "{}" \ No newline at end of file diff --git a/test/fns/show-headers/requirements.txt b/test/fns/show-headers/requirements.txt new file mode 100644 index 0000000..e69de29 diff --git a/test/fns/sieve-of-eratosthenes/index.js b/test/fns/sieve-of-eratosthenes/index.js index 9a6b99f..7b95446 100644 --- a/test/fns/sieve-of-eratosthenes/index.js +++ b/test/fns/sieve-of-eratosthenes/index.js @@ -12,7 +12,7 @@ module.exports = (req, res) => { } } - response = ("Found " + primes.length + " primes under " + max); + let response = ("Found " + primes.length + " primes under " + max); console.log(response); diff --git a/test/test_all.py b/test/test_all.py index e050a62..11a5fa4 100755 --- a/test/test_all.py +++ b/test/test_all.py @@ -2,6 +2,7 @@ import unittest +import json import os import os.path as path import signal @@ -48,7 +49,7 @@ def setUpModule() -> None: uname = os.uname() if uname.machine == "x86_64": arch = "amd64" - elif uname.machine == "arm64": + elif uname.machine == "arm64" or uname.machine == "aarch64": arch = "arm64" else: raise Exception(f"Unsupported architecture: {uname.machine}") @@ -534,6 +535,136 @@ def test_invoke_grpc(self) -> None: self.assertIsNotNone(response) self.assertEqual(response.response, payload) +class TestShowHeadersJS(TinyFaaSTest): + fn = "" + + @classmethod + def setUpClass(cls) -> None: + super(TestShowHeadersJS, cls).setUpClass() + cls.fn = startFunction(path.join(fn_path, "show-headers-js"), "headersjs", "nodejs", 1) + + def setUp(self) -> None: + super(TestShowHeadersJS, self).setUp() + self.fn = TestShowHeadersJS.fn + + def test_invoke_http(self) -> None: + """invoke a function""" + + # make a request to the function with a custom headers + req = urllib.request.Request( + f"http://{self.host}:{self.http_port}/{self.fn}", + headers={"lab": "scalable_software_systems_group"}, + ) + + res = urllib.request.urlopen(req, timeout=10) + + # check the response + self.assertEqual(res.status, 200) + response_body = res.read().decode("utf-8") + response_json = json.loads(response_body) + self.assertIn("lab", response_json) + self.assertEqual(response_json["lab"], "scalable_software_systems_group") # custom header + self.assertIn("user-agent", response_json) + self.assertEqual(response_json["user-agent"], "Python-urllib/3.11") # python client + + return + +# def test_invoke_coap(self) -> None: # CoAP does not support headers + + + def test_invoke_grpc(self) -> None: + """invoke a function""" + try: + import grpc + except ImportError: + self.skipTest( + "grpc is not installed -- if you want to run gRPC tests, install the dependencies in requirements.txt" + ) + + import tinyfaas_pb2 + import tinyfaas_pb2_grpc + + # make a request to the function with a payload + payload = "" + metadata = (("lab", "scalable_software_systems_group"),) + + with grpc.insecure_channel(f"{self.host}:{self.grpc_port}") as channel: + stub = tinyfaas_pb2_grpc.TinyFaaSStub(channel) + response = stub.Request( + tinyfaas_pb2.Data(functionIdentifier=self.fn, data=payload), metadata=metadata + ) + + response_json = json.loads(response.response) + self.assertIn("lab", response_json) + self.assertEqual(response_json["lab"], "scalable_software_systems_group") # custom header + self.assertIn("user-agent", response_json) + self.assertIn("grpc-python/1.64.1", response_json["user-agent"]) # client header + +class TestShowHeaders(TinyFaaSTest): # Note: In Python, the http.server module (and many other HTTP libraries) automatically capitalizes the first character of each word in the header keys. + fn = "" + + @classmethod + def setUpClass(cls) -> None: + super(TestShowHeaders, cls).setUpClass() + cls.fn = startFunction(path.join(fn_path, "show-headers"), "headers", "python3", 1) + + def setUp(self) -> None: + super(TestShowHeaders, self).setUp() + self.fn = TestShowHeaders.fn + + def test_invoke_http(self) -> None: + """invoke a function""" + + # make a request to the function with a custom headers + req = urllib.request.Request( + f"http://{self.host}:{self.http_port}/{self.fn}", + headers={"Lab": "scalable_software_systems_group"}, + ) + + res = urllib.request.urlopen(req, timeout=10) + + # check the response + self.assertEqual(res.status, 200) + response_body = res.read().decode("utf-8") + response_json = json.loads(response_body) + self.assertIn("Lab", response_json) + self.assertEqual(response_json["Lab"], "scalable_software_systems_group") # custom header + self.assertIn("User-Agent", response_json) + self.assertIn("Python-urllib", response_json["User-Agent"]) # python client + + return + +# def test_invoke_coap(self) -> None: # CoAP does not support headers, instead you have + + def test_invoke_grpc(self) -> None: + """invoke a function""" + try: + import grpc + except ImportError: + self.skipTest( + "grpc is not installed -- if you want to run gRPC tests, install the dependencies in requirements.txt" + ) + + import tinyfaas_pb2 + import tinyfaas_pb2_grpc + + # make a request to the function with a payload + payload = "" + metadata = (("lab", "scalable_software_systems_group"),) + + with grpc.insecure_channel(f"{self.host}:{self.grpc_port}") as channel: + stub = tinyfaas_pb2_grpc.TinyFaaSStub(channel) + response = stub.Request( + tinyfaas_pb2.Data(functionIdentifier=self.fn, data=payload), metadata=metadata + ) + + response_json = json.loads(response.response) + + self.assertIn("Lab", response_json) + self.assertEqual(response_json["Lab"], "scalable_software_systems_group") # custom header + self.assertIn("User-Agent", response_json) + self.assertIn("grpc-python", response_json["User-Agent"]) # client header + if __name__ == "__main__": # check that make is installed