Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add http/grpc header support for py/js functions #40

Merged
merged 5 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ Additionally, we provide scripts to read logs from your function and to wipe all
### Writing Functions

This tinyFaaS prototype only supports functions written for NodeJS 20, Python 3.9, and binary functions.
HTTP headers and GRPC Metadata are accessible in NodeJS and Python functions as key values. Check the example functions below for more information.

#### NodeJS 20

Expand Down
6 changes: 3 additions & 3 deletions pkg/coap/coap.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/pfandzelter/go-coap"
)

const async = false

func Start(r *rproxy.RProxy, listenAddr string) {

h := coap.FuncHandler(
Expand All @@ -17,8 +19,6 @@ func Start(r *rproxy.RProxy, listenAddr string) {
log.Printf("is confirmable: %v", m.IsConfirmable())
log.Printf("path: %s", m.PathString())

async := false

p := m.PathString()

for p != "" && p[0] == '/' {
Expand All @@ -27,7 +27,7 @@ func Start(r *rproxy.RProxy, listenAddr string) {

log.Printf("have request for path: %s (async: %v)", p, async)

s, res := r.Call(p, m.Payload, async)
s, res := r.Call(p, m.Payload, async, nil)

mes := &coap.Message{
Type: coap.Acknowledgement,
Expand Down
5 changes: 4 additions & 1 deletion pkg/docker/runtimes/python3/functionhandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@ def do_POST(self) -> None:
if d == "":
d = None

# Read headers into a dictionary
headers: typing.Dict[str, str] = {k: v for k, v in self.headers.items()}

try:
res = fn.fn(d)
res = fn.fn(d, headers)
self.send_response(200)
self.end_headers()
if res is not None:
Expand Down
17 changes: 16 additions & 1 deletion pkg/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/OpenFogStack/tinyFaaS/pkg/grpc/tinyfaas"
"github.com/OpenFogStack/tinyFaaS/pkg/rproxy"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

// GRPCServer is the grpc endpoint for this tinyFaaS instance.
Expand All @@ -21,7 +22,21 @@ func (gs *GRPCServer) Request(ctx context.Context, d *tinyfaas.Data) (*tinyfaas.

log.Printf("have request for path: %s (async: %v)", d.FunctionIdentifier, false)

s, res := gs.r.Call(d.FunctionIdentifier, []byte(d.Data), false)
// Extract metadata from the gRPC context
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, fmt.Errorf("failed to extract metadata from context")
ChaosRez marked this conversation as resolved.
Show resolved Hide resolved
}

// Convert metadata to map[string]string
headers := make(map[string]string)
for k, v := range md {
if len(v) > 0 {
headers[k] = v[0]
}
}

s, res := gs.r.Call(d.FunctionIdentifier, []byte(d.Data), false, headers)

switch s {
case rproxy.StatusOK:
Expand Down
7 changes: 6 additions & 1 deletion pkg/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,12 @@ func Start(r *rproxy.RProxy, listenAddr string) {
return
}

s, res := r.Call(p, req_body, async)
headers := make(map[string]string)
for k, v := range req.Header {
headers[k] = v[0]
}

s, res := r.Call(p, req_body, async, headers)

switch s {
case rproxy.StatusOK:
Expand Down
25 changes: 18 additions & 7 deletions pkg/rproxy/rproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (r *RProxy) Del(name string) error {
return nil
}

func (r *RProxy) Call(name string, payload []byte, async bool) (Status, []byte) {
func (r *RProxy) Call(name string, payload []byte, async bool, headers map[string]string) (Status, []byte) {

handler, ok := r.hosts[name]

Expand All @@ -79,23 +79,34 @@ func (r *RProxy) Call(name string, payload []byte, async bool) (Status, []byte)
if async {
log.Printf("async request accepted")
go func() {
resp, err := http.Post(fmt.Sprintf("http://%s:8000/fn", h), "application/binary", bytes.NewBuffer(payload))

req, err := http.NewRequest("POST", fmt.Sprintf("http://%s:8000/fn", h), bytes.NewBuffer(payload))
if err != nil {
return
}
for k, v := range headers {
req.Header.Set(k, v)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return
}

resp.Body.Close()

log.Printf("async request finished")
}()
return StatusAccepted, nil
}

// call function and return results
log.Printf("sync request starting")
resp, err := http.Post(fmt.Sprintf("http://%s:8000/fn", h), "application/binary", bytes.NewBuffer(payload))

req, err := http.NewRequest("POST", fmt.Sprintf("http://%s:8000/fn", h), bytes.NewBuffer(payload))
ChaosRez marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
log.Print(err)
return StatusError, nil
}
for k, v := range headers {
req.Header.Set(k, v)
}
resp, err := http.DefaultClient.Do(req) // send the request
if err != nil {
log.Print(err)
return StatusError, nil
Expand Down
2 changes: 2 additions & 0 deletions test/fns/echo-js/index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@

module.exports = (req, res) => {
const response = req.body;
const headers = req.headers; // headers from the http request or GRPC metadata

console.log("Headers:", headers);
console.log(response);

res.send(response);
Expand Down
5 changes: 4 additions & 1 deletion test/fns/echo/fn.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import typing

def fn(input: typing.Optional[str]) -> typing.Optional[str]:
def fn(input: typing.Optional[str], headers: typing.Optional[typing.Dict[str, str]]) -> typing.Optional[str]:
"""echo the input"""
if headers is not None:
print ("headers: " + str(headers))
ChaosRez marked this conversation as resolved.
Show resolved Hide resolved

return input
2 changes: 2 additions & 0 deletions test/fns/sieve-of-eratosthenes/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@

module.exports = (req, res) => {
const data = req.body;
const headers = req.headers; // headers from the http request or GRPC metadata
const max = 10000;
let sieve = [], i, j, primes = [];
for (i = 2; i <= max; ++i) {
Expand Down
Loading