Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add http/grpc header support for py/js functions #40

Merged
merged 5 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ Additionally, we provide scripts to read logs from your function and to wipe all

This tinyFaaS prototype only supports functions written for NodeJS 20, Python 3.9, and binary functions.
A good place to get started with writing functions is the selection of test functions in [`./test/fns`](./test/fns/).
HTTP headers and GRPC Metadata are accessible in NodeJS and Python functions as key values. Check the "show-headers" test functions for more information.

#### NodeJS 20

Expand Down
6 changes: 3 additions & 3 deletions pkg/coap/coap.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/pfandzelter/go-coap"
)

const async = false

func Start(r *rproxy.RProxy, listenAddr string) {

h := coap.FuncHandler(
Expand All @@ -17,8 +19,6 @@ func Start(r *rproxy.RProxy, listenAddr string) {
log.Printf("is confirmable: %v", m.IsConfirmable())
log.Printf("path: %s", m.PathString())

async := false

p := m.PathString()

for p != "" && p[0] == '/' {
Expand All @@ -27,7 +27,7 @@ func Start(r *rproxy.RProxy, listenAddr string) {

log.Printf("have request for path: %s (async: %v)", p, async)

s, res := r.Call(p, m.Payload, async)
s, res := r.Call(p, m.Payload, async, nil)

mes := &coap.Message{
Type: coap.Acknowledgement,
Expand Down
5 changes: 4 additions & 1 deletion pkg/docker/runtimes/python3/functionhandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@ def do_POST(self) -> None:
if d == "":
d = None

# Read headers into a dictionary
headers: typing.Dict[str, str] = {k: v for k, v in self.headers.items()}

try:
res = fn.fn(d)
res = fn.fn(d, headers)
self.send_response(200)
self.end_headers()
if res is not None:
Expand Down
17 changes: 16 additions & 1 deletion pkg/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/OpenFogStack/tinyFaaS/pkg/grpc/tinyfaas"
"github.com/OpenFogStack/tinyFaaS/pkg/rproxy"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

// GRPCServer is the grpc endpoint for this tinyFaaS instance.
Expand All @@ -21,7 +22,21 @@ func (gs *GRPCServer) Request(ctx context.Context, d *tinyfaas.Data) (*tinyfaas.

log.Printf("have request for path: %s (async: %v)", d.FunctionIdentifier, false)

s, res := gs.r.Call(d.FunctionIdentifier, []byte(d.Data), false)
// Extract metadata from the gRPC context
md, ok := metadata.FromIncomingContext(ctx)
headers := make(map[string]string)
if ok {
// Convert metadata to map[string]string
for k, v := range md {
if len(v) > 0 {
headers[k] = v[0]
}
}
} else {
log.Print("failed to extract metadata from context, using empty headers GRPC request")
}

s, res := gs.r.Call(d.FunctionIdentifier, []byte(d.Data), false, headers)

switch s {
case rproxy.StatusOK:
Expand Down
7 changes: 6 additions & 1 deletion pkg/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,12 @@ func Start(r *rproxy.RProxy, listenAddr string) {
return
}

s, res := r.Call(p, req_body, async)
headers := make(map[string]string)
for k, v := range req.Header {
headers[k] = v[0]
}

s, res := r.Call(p, req_body, async, headers)

switch s {
case rproxy.StatusOK:
Expand Down
31 changes: 22 additions & 9 deletions pkg/rproxy/rproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log"
"math/rand"
"net/http"
"regexp"
"sync"
)

Expand Down Expand Up @@ -59,7 +60,7 @@ func (r *RProxy) Del(name string) error {
return nil
}

func (r *RProxy) Call(name string, payload []byte, async bool) (Status, []byte) {
func (r *RProxy) Call(name string, payload []byte, async bool, headers map[string]string) (Status, []byte) {

handler, ok := r.hosts[name]

Expand All @@ -75,27 +76,33 @@ func (r *RProxy) Call(name string, payload []byte, async bool) (Status, []byte)

log.Printf("chosen handler: %s", h)

// call function
req, err := http.NewRequest("POST", fmt.Sprintf("http://%s:8000/fn", h), bytes.NewBuffer(payload))
if err != nil {
log.Print(err)
return StatusError, nil
}
for k, v := range headers {
cleanedKey := cleanHeaderKey(k) // remove special chars from key
req.Header.Set(cleanedKey, v)
}

// call function asynchronously
if async {
log.Printf("async request accepted")
go func() {
resp, err := http.Post(fmt.Sprintf("http://%s:8000/fn", h), "application/binary", bytes.NewBuffer(payload))

if err != nil {
resp, err2 := http.DefaultClient.Do(req)
if err2 != nil {
return
}

resp.Body.Close()

log.Printf("async request finished")
}()
return StatusAccepted, nil
}

// call function and return results
log.Printf("sync request starting")
resp, err := http.Post(fmt.Sprintf("http://%s:8000/fn", h), "application/binary", bytes.NewBuffer(payload))

resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Print(err)
return StatusError, nil
Expand All @@ -115,3 +122,9 @@ func (r *RProxy) Call(name string, payload []byte, async bool) (Status, []byte)

return StatusOK, res_body
}
func cleanHeaderKey(key string) string {
// a regex pattern to match special characters
re := regexp.MustCompile(`[:()<>@,;:\"/[\]?={} \t]`)
// Replace special characters with an empty string
return re.ReplaceAllString(key, "")
}
1 change: 1 addition & 0 deletions test/fns/echo-js/index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@

module.exports = (req, res) => {
const response = req.body;
const headers = req.headers; // headers from the http request or GRPC metadata

console.log(response);

Expand Down
2 changes: 1 addition & 1 deletion test/fns/echo/fn.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

import typing

def fn(input: typing.Optional[str]) -> typing.Optional[str]:
def fn(input: typing.Optional[str], headers: typing.Optional[typing.Dict[str, str]]) -> typing.Optional[str]:
"""echo the input"""
return input
9 changes: 9 additions & 0 deletions test/fns/show-headers-js/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@

module.exports = (req, res) => {
const body = req.body;
const headers = req.headers; // headers from the http request or GRPC metadata

console.log("Headers:", headers);

res.send(headers);
}
1 change: 1 addition & 0 deletions test/fns/show-headers-js/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"name": "fn", "version": "1.0.0", "main": "index.js"}
9 changes: 9 additions & 0 deletions test/fns/show-headers/fn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import json
import typing

def fn(input: typing.Optional[str], headers: typing.Optional[typing.Dict[str, str]]) -> typing.Optional[str]:
"""echo the input"""
if headers is not None:
return json.dumps(headers)
else:
return "{}"
Empty file.
2 changes: 1 addition & 1 deletion test/fns/sieve-of-eratosthenes/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ module.exports = (req, res) => {
}
}

response = ("Found " + primes.length + " primes under " + max);
let response = ("Found " + primes.length + " primes under " + max);

console.log(response);

Expand Down
133 changes: 132 additions & 1 deletion test/test_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import unittest

import json
import os
import os.path as path
import signal
Expand Down Expand Up @@ -48,7 +49,7 @@ def setUpModule() -> None:
uname = os.uname()
if uname.machine == "x86_64":
arch = "amd64"
elif uname.machine == "arm64":
elif uname.machine == "arm64" or uname.machine == "aarch64":
arch = "arm64"
else:
raise Exception(f"Unsupported architecture: {uname.machine}")
Expand Down Expand Up @@ -534,6 +535,136 @@ def test_invoke_grpc(self) -> None:
self.assertIsNotNone(response)
self.assertEqual(response.response, payload)

class TestShowHeadersJS(TinyFaaSTest):
fn = ""

@classmethod
def setUpClass(cls) -> None:
super(TestShowHeadersJS, cls).setUpClass()
cls.fn = startFunction(path.join(fn_path, "show-headers-js"), "headersjs", "nodejs", 1)

def setUp(self) -> None:
super(TestShowHeadersJS, self).setUp()
self.fn = TestShowHeadersJS.fn

def test_invoke_http(self) -> None:
"""invoke a function"""

# make a request to the function with a custom headers
req = urllib.request.Request(
f"http://{self.host}:{self.http_port}/{self.fn}",
headers={"lab": "scalable_software_systems_group"},
)

res = urllib.request.urlopen(req, timeout=10)

# check the response
self.assertEqual(res.status, 200)
response_body = res.read().decode("utf-8")
response_json = json.loads(response_body)
self.assertIn("lab", response_json)
self.assertEqual(response_json["lab"], "scalable_software_systems_group") # custom header
self.assertIn("user-agent", response_json)
self.assertEqual(response_json["user-agent"], "Python-urllib/3.11") # python client

return

# def test_invoke_coap(self) -> None: # CoAP does not support headers


def test_invoke_grpc(self) -> None:
"""invoke a function"""
try:
import grpc
except ImportError:
self.skipTest(
"grpc is not installed -- if you want to run gRPC tests, install the dependencies in requirements.txt"
)

import tinyfaas_pb2
import tinyfaas_pb2_grpc

# make a request to the function with a payload
payload = ""
metadata = (("lab", "scalable_software_systems_group"),)

with grpc.insecure_channel(f"{self.host}:{self.grpc_port}") as channel:
stub = tinyfaas_pb2_grpc.TinyFaaSStub(channel)
response = stub.Request(
tinyfaas_pb2.Data(functionIdentifier=self.fn, data=payload), metadata=metadata
)

response_json = json.loads(response.response)
self.assertIn("lab", response_json)
self.assertEqual(response_json["lab"], "scalable_software_systems_group") # custom header
self.assertIn("user-agent", response_json)
self.assertIn("grpc-python/1.64.1", response_json["user-agent"]) # client header

class TestShowHeaders(TinyFaaSTest): # Note: In Python, the http.server module (and many other HTTP libraries) automatically capitalizes the first character of each word in the header keys.
fn = ""

@classmethod
def setUpClass(cls) -> None:
super(TestShowHeaders, cls).setUpClass()
cls.fn = startFunction(path.join(fn_path, "show-headers"), "headers", "python3", 1)

def setUp(self) -> None:
super(TestShowHeaders, self).setUp()
self.fn = TestShowHeaders.fn

def test_invoke_http(self) -> None:
"""invoke a function"""

# make a request to the function with a custom headers
req = urllib.request.Request(
f"http://{self.host}:{self.http_port}/{self.fn}",
headers={"Lab": "scalable_software_systems_group"},
)

res = urllib.request.urlopen(req, timeout=10)

# check the response
self.assertEqual(res.status, 200)
response_body = res.read().decode("utf-8")
response_json = json.loads(response_body)
self.assertIn("Lab", response_json)
self.assertEqual(response_json["Lab"], "scalable_software_systems_group") # custom header
self.assertIn("User-Agent", response_json)
self.assertIn("Python-urllib", response_json["User-Agent"]) # python client

return

# def test_invoke_coap(self) -> None: # CoAP does not support headers, instead you have

def test_invoke_grpc(self) -> None:
"""invoke a function"""
try:
import grpc
except ImportError:
self.skipTest(
"grpc is not installed -- if you want to run gRPC tests, install the dependencies in requirements.txt"
)

import tinyfaas_pb2
import tinyfaas_pb2_grpc

# make a request to the function with a payload
payload = ""
metadata = (("lab", "scalable_software_systems_group"),)

with grpc.insecure_channel(f"{self.host}:{self.grpc_port}") as channel:
stub = tinyfaas_pb2_grpc.TinyFaaSStub(channel)
response = stub.Request(
tinyfaas_pb2.Data(functionIdentifier=self.fn, data=payload), metadata=metadata
)

response_json = json.loads(response.response)

self.assertIn("Lab", response_json)
self.assertEqual(response_json["Lab"], "scalable_software_systems_group") # custom header
self.assertIn("User-Agent", response_json)
self.assertIn("grpc-python", response_json["User-Agent"]) # client header


if __name__ == "__main__":
# check that make is installed
Expand Down
Loading