Skip to content

Commit

Permalink
add http/grpc header support for py/js functions (#40)
Browse files Browse the repository at this point in the history
* add http/grpc header support for py/js functions

* fix review comments

* fix tests for raspberry | bypass special chars in request headers

* the missing tests for the headers func + update sieve func
  • Loading branch information
ChaosRez authored and pfandzelter committed Aug 19, 2024
1 parent 6d59f22 commit 191f39d
Show file tree
Hide file tree
Showing 14 changed files with 206 additions and 18 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ Additionally, we provide scripts to read logs from your function and to wipe all

This tinyFaaS prototype only supports functions written for NodeJS 20, Python 3.9, and binary functions.
A good place to get started with writing functions is the selection of test functions in [`./test/fns`](./test/fns/).
HTTP headers and GRPC Metadata are accessible in NodeJS and Python functions as key values. Check the "show-headers" test functions for more information.

#### NodeJS 20

Expand Down
6 changes: 3 additions & 3 deletions pkg/coap/coap.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/pfandzelter/go-coap"
)

const async = false

func Start(r *rproxy.RProxy, listenAddr string) {

h := coap.FuncHandler(
Expand All @@ -17,8 +19,6 @@ func Start(r *rproxy.RProxy, listenAddr string) {
log.Printf("is confirmable: %v", m.IsConfirmable())
log.Printf("path: %s", m.PathString())

async := false

p := m.PathString()

for p != "" && p[0] == '/' {
Expand All @@ -27,7 +27,7 @@ func Start(r *rproxy.RProxy, listenAddr string) {

log.Printf("have request for path: %s (async: %v)", p, async)

s, res := r.Call(p, m.Payload, async)
s, res := r.Call(p, m.Payload, async, nil)

mes := &coap.Message{
Type: coap.Acknowledgement,
Expand Down
5 changes: 4 additions & 1 deletion pkg/docker/runtimes/python3/functionhandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@ def do_POST(self) -> None:
if d == "":
d = None

# Read headers into a dictionary
headers: typing.Dict[str, str] = {k: v for k, v in self.headers.items()}

try:
res = fn.fn(d)
res = fn.fn(d, headers)
self.send_response(200)
self.end_headers()
if res is not None:
Expand Down
17 changes: 16 additions & 1 deletion pkg/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/OpenFogStack/tinyFaaS/pkg/grpc/tinyfaas"
"github.com/OpenFogStack/tinyFaaS/pkg/rproxy"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

// GRPCServer is the grpc endpoint for this tinyFaaS instance.
Expand All @@ -21,7 +22,21 @@ func (gs *GRPCServer) Request(ctx context.Context, d *tinyfaas.Data) (*tinyfaas.

log.Printf("have request for path: %s (async: %v)", d.FunctionIdentifier, false)

s, res := gs.r.Call(d.FunctionIdentifier, []byte(d.Data), false)
// Extract metadata from the gRPC context
md, ok := metadata.FromIncomingContext(ctx)
headers := make(map[string]string)
if ok {
// Convert metadata to map[string]string
for k, v := range md {
if len(v) > 0 {
headers[k] = v[0]
}
}
} else {
log.Print("failed to extract metadata from context, using empty headers GRPC request")
}

s, res := gs.r.Call(d.FunctionIdentifier, []byte(d.Data), false, headers)

switch s {
case rproxy.StatusOK:
Expand Down
7 changes: 6 additions & 1 deletion pkg/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,12 @@ func Start(r *rproxy.RProxy, listenAddr string) {
return
}

s, res := r.Call(p, req_body, async)
headers := make(map[string]string)
for k, v := range req.Header {
headers[k] = v[0]
}

s, res := r.Call(p, req_body, async, headers)

switch s {
case rproxy.StatusOK:
Expand Down
31 changes: 22 additions & 9 deletions pkg/rproxy/rproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log"
"math/rand"
"net/http"
"regexp"
"sync"
)

Expand Down Expand Up @@ -59,7 +60,7 @@ func (r *RProxy) Del(name string) error {
return nil
}

func (r *RProxy) Call(name string, payload []byte, async bool) (Status, []byte) {
func (r *RProxy) Call(name string, payload []byte, async bool, headers map[string]string) (Status, []byte) {

handler, ok := r.hosts[name]

Expand All @@ -75,27 +76,33 @@ func (r *RProxy) Call(name string, payload []byte, async bool) (Status, []byte)

log.Printf("chosen handler: %s", h)

// call function
req, err := http.NewRequest("POST", fmt.Sprintf("http://%s:8000/fn", h), bytes.NewBuffer(payload))
if err != nil {
log.Print(err)
return StatusError, nil
}
for k, v := range headers {
cleanedKey := cleanHeaderKey(k) // remove special chars from key
req.Header.Set(cleanedKey, v)
}

// call function asynchronously
if async {
log.Printf("async request accepted")
go func() {
resp, err := http.Post(fmt.Sprintf("http://%s:8000/fn", h), "application/binary", bytes.NewBuffer(payload))

if err != nil {
resp, err2 := http.DefaultClient.Do(req)
if err2 != nil {
return
}

resp.Body.Close()

log.Printf("async request finished")
}()
return StatusAccepted, nil
}

// call function and return results
log.Printf("sync request starting")
resp, err := http.Post(fmt.Sprintf("http://%s:8000/fn", h), "application/binary", bytes.NewBuffer(payload))

resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Print(err)
return StatusError, nil
Expand All @@ -115,3 +122,9 @@ func (r *RProxy) Call(name string, payload []byte, async bool) (Status, []byte)

return StatusOK, res_body
}
func cleanHeaderKey(key string) string {
// a regex pattern to match special characters
re := regexp.MustCompile(`[:()<>@,;:\"/[\]?={} \t]`)
// Replace special characters with an empty string
return re.ReplaceAllString(key, "")
}
1 change: 1 addition & 0 deletions test/fns/echo-js/index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@

module.exports = (req, res) => {
const response = req.body;
const headers = req.headers; // headers from the http request or GRPC metadata

console.log(response);

Expand Down
2 changes: 1 addition & 1 deletion test/fns/echo/fn.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

import typing

def fn(input: typing.Optional[str]) -> typing.Optional[str]:
def fn(input: typing.Optional[str], headers: typing.Optional[typing.Dict[str, str]]) -> typing.Optional[str]:
"""echo the input"""
return input
9 changes: 9 additions & 0 deletions test/fns/show-headers-js/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@

module.exports = (req, res) => {
const body = req.body;
const headers = req.headers; // headers from the http request or GRPC metadata

console.log("Headers:", headers);

res.send(headers);
}
1 change: 1 addition & 0 deletions test/fns/show-headers-js/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"name": "fn", "version": "1.0.0", "main": "index.js"}
9 changes: 9 additions & 0 deletions test/fns/show-headers/fn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import json
import typing

def fn(input: typing.Optional[str], headers: typing.Optional[typing.Dict[str, str]]) -> typing.Optional[str]:
"""echo the input"""
if headers is not None:
return json.dumps(headers)
else:
return "{}"
Empty file.
2 changes: 1 addition & 1 deletion test/fns/sieve-of-eratosthenes/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ module.exports = (req, res) => {
}
}

response = ("Found " + primes.length + " primes under " + max);
let response = ("Found " + primes.length + " primes under " + max);

console.log(response);

Expand Down
133 changes: 132 additions & 1 deletion test/test_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import unittest

import json
import os
import os.path as path
import signal
Expand Down Expand Up @@ -48,7 +49,7 @@ def setUpModule() -> None:
uname = os.uname()
if uname.machine == "x86_64":
arch = "amd64"
elif uname.machine == "arm64":
elif uname.machine == "arm64" or uname.machine == "aarch64":
arch = "arm64"
else:
raise Exception(f"Unsupported architecture: {uname.machine}")
Expand Down Expand Up @@ -534,6 +535,136 @@ def test_invoke_grpc(self) -> None:
self.assertIsNotNone(response)
self.assertEqual(response.response, payload)

class TestShowHeadersJS(TinyFaaSTest):
fn = ""

@classmethod
def setUpClass(cls) -> None:
super(TestShowHeadersJS, cls).setUpClass()
cls.fn = startFunction(path.join(fn_path, "show-headers-js"), "headersjs", "nodejs", 1)

def setUp(self) -> None:
super(TestShowHeadersJS, self).setUp()
self.fn = TestShowHeadersJS.fn

def test_invoke_http(self) -> None:
"""invoke a function"""

# make a request to the function with a custom headers
req = urllib.request.Request(
f"http://{self.host}:{self.http_port}/{self.fn}",
headers={"lab": "scalable_software_systems_group"},
)

res = urllib.request.urlopen(req, timeout=10)

# check the response
self.assertEqual(res.status, 200)
response_body = res.read().decode("utf-8")
response_json = json.loads(response_body)
self.assertIn("lab", response_json)
self.assertEqual(response_json["lab"], "scalable_software_systems_group") # custom header
self.assertIn("user-agent", response_json)
self.assertEqual(response_json["user-agent"], "Python-urllib/3.11") # python client

return

# def test_invoke_coap(self) -> None: # CoAP does not support headers


def test_invoke_grpc(self) -> None:
"""invoke a function"""
try:
import grpc
except ImportError:
self.skipTest(
"grpc is not installed -- if you want to run gRPC tests, install the dependencies in requirements.txt"
)

import tinyfaas_pb2
import tinyfaas_pb2_grpc

# make a request to the function with a payload
payload = ""
metadata = (("lab", "scalable_software_systems_group"),)

with grpc.insecure_channel(f"{self.host}:{self.grpc_port}") as channel:
stub = tinyfaas_pb2_grpc.TinyFaaSStub(channel)
response = stub.Request(
tinyfaas_pb2.Data(functionIdentifier=self.fn, data=payload), metadata=metadata
)

response_json = json.loads(response.response)
self.assertIn("lab", response_json)
self.assertEqual(response_json["lab"], "scalable_software_systems_group") # custom header
self.assertIn("user-agent", response_json)
self.assertIn("grpc-python/1.64.1", response_json["user-agent"]) # client header

class TestShowHeaders(TinyFaaSTest): # Note: In Python, the http.server module (and many other HTTP libraries) automatically capitalizes the first character of each word in the header keys.
fn = ""

@classmethod
def setUpClass(cls) -> None:
super(TestShowHeaders, cls).setUpClass()
cls.fn = startFunction(path.join(fn_path, "show-headers"), "headers", "python3", 1)

def setUp(self) -> None:
super(TestShowHeaders, self).setUp()
self.fn = TestShowHeaders.fn

def test_invoke_http(self) -> None:
"""invoke a function"""

# make a request to the function with a custom headers
req = urllib.request.Request(
f"http://{self.host}:{self.http_port}/{self.fn}",
headers={"Lab": "scalable_software_systems_group"},
)

res = urllib.request.urlopen(req, timeout=10)

# check the response
self.assertEqual(res.status, 200)
response_body = res.read().decode("utf-8")
response_json = json.loads(response_body)
self.assertIn("Lab", response_json)
self.assertEqual(response_json["Lab"], "scalable_software_systems_group") # custom header
self.assertIn("User-Agent", response_json)
self.assertIn("Python-urllib", response_json["User-Agent"]) # python client

return

# def test_invoke_coap(self) -> None: # CoAP does not support headers, instead you have

def test_invoke_grpc(self) -> None:
"""invoke a function"""
try:
import grpc
except ImportError:
self.skipTest(
"grpc is not installed -- if you want to run gRPC tests, install the dependencies in requirements.txt"
)

import tinyfaas_pb2
import tinyfaas_pb2_grpc

# make a request to the function with a payload
payload = ""
metadata = (("lab", "scalable_software_systems_group"),)

with grpc.insecure_channel(f"{self.host}:{self.grpc_port}") as channel:
stub = tinyfaas_pb2_grpc.TinyFaaSStub(channel)
response = stub.Request(
tinyfaas_pb2.Data(functionIdentifier=self.fn, data=payload), metadata=metadata
)

response_json = json.loads(response.response)

self.assertIn("Lab", response_json)
self.assertEqual(response_json["Lab"], "scalable_software_systems_group") # custom header
self.assertIn("User-Agent", response_json)
self.assertIn("grpc-python", response_json["User-Agent"]) # client header


if __name__ == "__main__":
# check that make is installed
Expand Down

0 comments on commit 191f39d

Please sign in to comment.