Skip to content

Commit

Permalink
Merge pull request #2 from proxima-one/add-preprocces
Browse files Browse the repository at this point in the history
refactor, add interface for easy consuming without chan interface
  • Loading branch information
EugeneSkrebnev authored May 7, 2022
2 parents 842136e + 881fd39 commit 50e78a7
Show file tree
Hide file tree
Showing 8 changed files with 338 additions and 262 deletions.
17 changes: 9 additions & 8 deletions client/proxima_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@ import (
"log"
)

const defaultChannelSize = 8000

type ProximaClient struct {
config *config.Config
config config.Config
conn *grpc.ClientConn
grpc pb.MessagesServiceClient
}

func NewProximaClient(config *config.Config) *ProximaClient {
func NewProximaClient(config config.Config) *ProximaClient {
//todo: log about unused config fields
return &ProximaClient{
config: config,
}
Expand All @@ -26,7 +29,7 @@ func NewProximaClient(config *config.Config) *ProximaClient {
func (client *ProximaClient) Connect() error {
address := client.config.GetFullAddress()
dialOption := grpc.WithInsecure()
if client.config.Port == 443 {
if client.config.GetPort() == 443 {
dialOption = grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{InsecureSkipVerify: true}))
}
conn, err := grpc.Dial(address, dialOption)
Expand Down Expand Up @@ -77,8 +80,7 @@ func (client *ProximaClient) GetTransitionsAfter(ctx context.Context,
}

func (client *ProximaClient) GetStream(ctx context.Context,
streamState model.StreamState,
bufferSize int) (<-chan *model.Transition, <-chan error, error) {
streamState model.StreamState) (<-chan *model.Transition, <-chan error, error) {

if ctx == nil {
ctx = context.Background()
Expand All @@ -88,18 +90,17 @@ func (client *ProximaClient) GetStream(ctx context.Context,
LastMessageId: streamState.State.Id,
})
if err != nil {
log.Fatalf("Error while getting stream, %v", err)
return nil, nil, err
}
result := make(chan *model.Transition, bufferSize)
result := make(chan *model.Transition, defaultChannelSize)
errc := make(chan error, 1)
go func() {
defer close(result)
defer close(errc)
for {
//todo: check context cancellation works
messages, err := streamClient.Recv()
if err != nil {
log.Printf("Error while reading stream, %v\n", err)
errc <- err
break
}
Expand Down
118 changes: 83 additions & 35 deletions client/stream_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,24 @@ import (
)

type StreamReader struct {
config *config.Config
config config.Config
client *ProximaClient
lastState model.State
mutex sync.Mutex
preprocessFunc model.TransitionPreprocessingFunc
bufferLoad float32
}
preprocessFunc TransitionPreprocessingFunc

type ProximaStreamObject struct {
Transition *model.Transition
Preprocess *model.TransitionPreprocessingResult
}
processedCount int
workingChan *chan *ProximaStreamObject

func NewStreamReader(config config.Config,
lastState model.State,
preprocess model.TransitionPreprocessingFunc) (*StreamReader, error) {
runningStream *chan *ProximaStreamObject
errStream *chan error
ctx context.Context
}

func NewStreamReader(config config.Config, preprocess TransitionPreprocessingFunc) (*StreamReader, error) {
res := &StreamReader{
config: &config,
lastState: lastState,
config: config,
lastState: config.GetState(),
preprocessFunc: preprocess,
}
res.client = NewProximaClient(res.config)
Expand All @@ -41,12 +39,20 @@ func NewStreamReader(config config.Config,
return res, nil
}

func (reader *StreamReader) Close() error {
return reader.client.Close()
func (reader *StreamReader) GetBufferLoadPercent() int {
if reader.workingChan == nil {
return 0
}
load := float32(len(*reader.workingChan)) / float32(reader.config.GetChannelSize()) * 100
return int(load)
}

func (reader *StreamReader) Metrics() (int, int) {
return reader.processedCount, reader.GetBufferLoadPercent()
}

func (reader *StreamReader) GetStreamID() string {
return reader.config.StreamID
return reader.config.GetStreamID()
}

func (reader *StreamReader) FetchNextTransitions(ctx context.Context, count int) ([]*model.Transition, error) {
Expand All @@ -55,7 +61,7 @@ func (reader *StreamReader) FetchNextTransitions(ctx context.Context, count int)
}
reader.mutex.Lock()
transitions, err := reader.client.GetTransitionsAfter(ctx, model.StreamState{
StreamID: reader.config.StreamID,
StreamID: reader.config.GetStreamID(),
State: reader.lastState,
}, count)
if err != nil {
Expand All @@ -66,22 +72,22 @@ func (reader *StreamReader) FetchNextTransitions(ctx context.Context, count int)
reader.mutex.Unlock()
return nil, errors.New("no transitions found")
}
//todo: transitions is not empty
reader.lastState = transitions[len(transitions)-1].NewState
reader.processedCount += len(transitions)
reader.mutex.Unlock()
return transitions, err
}

func (reader *StreamReader) GetRawStreamFromState(ctx context.Context,
state model.State, buffer int) (<-chan *model.Transition, <-chan error, error) {
state model.State) (<-chan *model.Transition, <-chan error, error) {

if ctx == nil {
ctx = context.Background()
}
stream, errc, err := reader.client.GetStream(ctx, model.StreamState{
StreamID: reader.config.StreamID,
StreamID: reader.config.GetStreamID(),
State: state,
}, buffer)
})
return stream, errc, err
}

Expand All @@ -94,69 +100,111 @@ func (reader *StreamReader) streamObjForTransition(transition *model.Transition)
}
return &ProximaStreamObject{
Transition: transition,
Preprocess: model.NewTransitionPreprocessingResult(transition, reader.preprocessFunc),
Preprocess: NewTransitionPreprocessingResult(transition, reader.preprocessFunc),
}
}

func (reader *StreamReader) GetRawStream(ctx context.Context,
buffer int) (<-chan *ProximaStreamObject, <-chan error, error) {
func (reader *StreamReader) saveWorkingChan(newChan *chan *ProximaStreamObject) {
if reader.workingChan != nil {
log.Println("Warning: previous stream is still working, cancel context to close it")
panic("reader is already working") //todo: add log levels and close previous stream, cancel context
}
reader.workingChan = newChan
}

func (reader *StreamReader) StartGrpcStreamChannel(ctx context.Context) (<-chan *ProximaStreamObject, <-chan error, error) {
if ctx == nil {
ctx = context.Background()
}
stream, errc, err := reader.GetRawStreamFromState(ctx, reader.lastState, buffer)
stream, errc, err := reader.GetRawStreamFromState(ctx, reader.lastState)
buffer := reader.config.GetChannelSize()
result := make(chan *ProximaStreamObject, buffer)
errChan := make(chan error, 1)
reader.saveWorkingChan(&result)
go func() {
defer close(result)
for {
select {
case <-ctx.Done():
reader.workingChan = nil
return
case msg, ok := <-stream:
if !ok {
return
}
reader.lastState = msg.NewState
reader.bufferLoad = float32(len(result)) / float32(buffer)
result <- reader.streamObjForTransition(msg)
reader.lastState = msg.NewState
reader.processedCount++

case err := <-errc:
errChan <- err
}
}
}()
reader.ctx = ctx
reader.runningStream = &result
reader.errStream = &errChan
return result, errc, err
}

func (reader *StreamReader) GetStreamBufferLoad() float32 {
return reader.bufferLoad
}

func (reader *StreamReader) GetBatchedStream(ctx context.Context,
buffer int, count int) (<-chan *ProximaStreamObject, <-chan error, error) {
func (reader *StreamReader) StartGrpcRpcChannel(ctx context.Context,
count int) (<-chan *ProximaStreamObject, <-chan error, error) {

if ctx == nil {
ctx = context.Background()
}
buffer := reader.config.GetChannelSize()
result := make(chan *ProximaStreamObject, buffer)
errc := make(chan error, 1)
reader.saveWorkingChan(&result)
go func() {
defer close(result)
defer close(errc)
for {
select {
case <-ctx.Done():
reader.workingChan = nil
return
default:
messages, err := reader.FetchNextTransitions(ctx, count)
if err != nil {
log.Printf("Error while receiving stream, %v\n", err)
errc <- err
return
}
reader.bufferLoad = float32(len(result)) / float32(buffer)
for _, msg := range messages {
result <- reader.streamObjForTransition(msg)
}
}
}
}()
reader.ctx = ctx
reader.runningStream = &result
reader.errStream = &errc
return result, errc, nil
}

func (reader *StreamReader) Start(ctx context.Context) error {
_, _, err := reader.StartGrpcStreamChannel(ctx) //default implementation uses grpc stream
return err
}

func (reader *StreamReader) ReadNext() (*ProximaStreamObject, error) {
select {
case <-reader.ctx.Done():
return nil, reader.ctx.Err()

case obj := <-*reader.runningStream:
if obj.Preprocess.err != nil {
return nil, obj.Preprocess.err
}
return obj, nil

case err := <-*reader.errStream:
return nil, err

}
}

func (reader *StreamReader) Close() error {
return reader.client.Close()
}
56 changes: 56 additions & 0 deletions client/stream_reader_interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package client

import (
"context"
"encoding/json"
"github.com/proxima-one/streamdb-client-go/model"
"sync"
)

type TransitionPreprocessingResult struct {
wg sync.WaitGroup
res any
err error
}

type TransitionPreprocessingFunc func(t *model.Transition) (any, error)

func NewTransitionPreprocessingResult(t *model.Transition, f TransitionPreprocessingFunc) *TransitionPreprocessingResult {
res := &TransitionPreprocessingResult{
wg: sync.WaitGroup{},
res: nil,
err: nil,
}
res.wg.Add(1)
go func() {
res.res, res.err = f(t)
res.wg.Done()
}()
return res
}

func (tr *TransitionPreprocessingResult) PreprocessingResult() (any, error) {
tr.wg.Wait()
return tr.res, tr.err
}

type ProximaStreamObject struct {
Transition *model.Transition
Preprocess *TransitionPreprocessingResult
}

func JsonParsingPreprocessFunc(transition *model.Transition) (any, error) {
m := make(map[string]interface{})
err := json.Unmarshal(*transition.Event.Payload, &m)
return m, err
}

type ProximaStreamSimpleReaderInterface interface {
Start(ctx context.Context) error
ReadNext() (*ProximaStreamObject, error)
}

type ProximaStreamProviderInterface interface {
StartGrpcStreamChannel(ctx context.Context) (<-chan *ProximaStreamObject, <-chan error, error) //uses grpc stream inside
StartGrpcRpcChannel(ctx context.Context, countPerRequest int) (<-chan *ProximaStreamObject, <-chan error, error) //uses grpc rpc call inside
}
Loading

0 comments on commit 50e78a7

Please sign in to comment.