Skip to content

Commit

Permalink
Merge pull request #3 from proxima-one/PRX-553
Browse files Browse the repository at this point in the history
refactor, add self recovery logic (watchdog, auto reconnect), add readme
  • Loading branch information
EugeneSkrebnev authored May 20, 2022
2 parents 50e78a7 + 72be624 commit e209319
Show file tree
Hide file tree
Showing 5 changed files with 463 additions and 184 deletions.
67 changes: 57 additions & 10 deletions client/proxima_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"log"
"time"
)

const defaultChannelSize = 8000
const defaultChannelSize = 10000
const defaultRpcChannelSize = 1000
const defaultRpcSleepTime = 200 * time.Millisecond

type ProximaClient struct {
config config.Config
Expand All @@ -20,7 +23,13 @@ type ProximaClient struct {
}

func NewProximaClient(config config.Config) *ProximaClient {
//todo: log about unused config fields
if config.User != "" || config.Password != "" || config.Token != "" {
log.Println("[WARN] Authorization is not supported yet")
log.Println("[WARN] User is not used in the client yet")
log.Println("[WARN] Password is not used in the client yet")
log.Println("[WARN] Token is not used in the client yet")
}

return &ProximaClient{
config: config,
}
Expand Down Expand Up @@ -98,18 +107,56 @@ func (client *ProximaClient) GetStream(ctx context.Context,
defer close(result)
defer close(errc)
for {
//todo: check context cancellation works
messages, err := streamClient.Recv()
if err != nil {
errc <- err
break
select {
case <-ctx.Done():
return
default:
messages, err := streamClient.Recv()
if err != nil {
errc <- err
return
}
for _, msg := range messages.Messages {
result <- streamMessageToModel(msg)
}
}
}
}()

return result, errc, nil
}

for _, msg := range messages.Messages {
result <- streamMessageToModel(msg)
func (client *ProximaClient) GetStreamBasedOnRpc(ctx context.Context,
streamState model.StreamState) (<-chan *model.Transition, <-chan error, error) {
if ctx == nil {
ctx = context.Background()
}

result := make(chan *model.Transition, defaultChannelSize)
errc := make(chan error, 1)
go func() {
defer close(result)
defer close(errc)
for {
select {
case <-ctx.Done():
return
default:
messages, err := client.GetTransitionsAfter(ctx, streamState, defaultRpcChannelSize)
if err != nil {
errc <- err
return
}
if len(messages) == 0 {
time.Sleep(defaultRpcSleepTime)
continue
}
for _, msg := range messages {
result <- msg
}
streamState.State = messages[len(messages)-1].NewState
}
}
}()

return result, errc, nil
}
Loading

0 comments on commit e209319

Please sign in to comment.