Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/mpga-development' into mpga-deve…
Browse files Browse the repository at this point in the history
…lopment

# Conflicts:
#	pkg/nexus/bulk/scheduler/bulk-scheduler.go
#	pkg/nexus/bulk/scheduler/bulk-scheduler_test.go
#	pkg/nexus/common/scheduler/base-nexus-scheduler.go
#	pkg/nexus/deadline/scheduler/deadline-scheduler_test.go
  • Loading branch information
KonsumGandalf committed Jan 25, 2024
2 parents 5f011c2 + 78b221a commit 2e7d532
Show file tree
Hide file tree
Showing 9 changed files with 236 additions and 94 deletions.
3 changes: 2 additions & 1 deletion pkg/nexus/bulk/scheduler/bulk-scheduler.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package scheduler

import (
"github.com/nuclio/nuclio/pkg/nexus/common/models/structs"
"log"
"time"

"github.com/nuclio/nuclio/pkg/nexus/bulk/models"
"github.com/nuclio/nuclio/pkg/nexus/common/models/interfaces"
"github.com/nuclio/nuclio/pkg/nexus/common/models/structs"
common "github.com/nuclio/nuclio/pkg/nexus/common/scheduler"
)

Expand Down
81 changes: 24 additions & 57 deletions pkg/nexus/bulk/scheduler/bulk-scheduler_test.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
package scheduler
package scheduler_test

import (
"net/http"
"testing"
"time"

"github.com/nuclio/nuclio/pkg/nexus/bulk/models"
bulk "github.com/nuclio/nuclio/pkg/nexus/bulk/scheduler"
"github.com/nuclio/nuclio/pkg/nexus/common/models/config"
"github.com/nuclio/nuclio/pkg/nexus/common/models/structs"
common "github.com/nuclio/nuclio/pkg/nexus/common/queue"
scheduler "github.com/nuclio/nuclio/pkg/nexus/common/scheduler"
utils "github.com/nuclio/nuclio/pkg/nexus/utils"
"github.com/stretchr/testify/suite"
"net/http"
"net/url"
"testing"
"time"
)

const (
Expand All @@ -21,17 +22,7 @@ const (

type BulkSchedulerTestSuite struct {
suite.Suite
bs *BulkScheduler
}

var mockRequest = &http.Request{
Method: "GET",
URL: &url.URL{
Path: "/api",
Scheme: "http",
Host: "localhost:8070",
},
Header: make(http.Header),
bs *bulk.BulkScheduler
}

func (suite *BulkSchedulerTestSuite) SetupTest() {
Expand All @@ -47,44 +38,26 @@ func (suite *BulkSchedulerTestSuite) SetupTest() {
baseSchedulerConfig := config.NewBaseNexusSchedulerConfig(true, sleepDuration)
nexusConfig := config.NewDefaultNexusConfig()

baseScheduler := scheduler.NewBaseNexusScheduler(defaultQueue, &baseSchedulerConfig, &nexusConfig, nil)

suite.bs = NewScheduler(baseScheduler, bulkConfig)
}

func (suite *BulkSchedulerTestSuite) pushTasksToQueue() {
// Normally tasks with the same name would have different Values
task1_1 := &structs.NexusItem{
Name: task_1,
Request: mockRequest,
}
suite.bs.Push(task1_1)
task1_2 := &structs.NexusItem{
Name: task_1,
Request: mockRequest,
Client := &http.Client{
Transport: &utils.MockRoundTripper{},
}
suite.bs.Push(task1_2)

task2_1 := &structs.NexusItem{
Name: task_2,
Request: mockRequest,
}
suite.bs.Push(task2_1)
task2_2 := &structs.NexusItem{
Name: task_2,
Request: mockRequest,
}
suite.bs.Push(task2_2)
baseScheduler := scheduler.NewBaseNexusScheduler(defaultQueue, &baseSchedulerConfig, &nexusConfig, Client, nil)

task3_1 := &structs.NexusItem{
Name: task_3,
Request: mockRequest,
}
suite.bs.Push(task3_1)
suite.bs = bulk.NewScheduler(baseScheduler, bulkConfig)
}

func (suite *BulkSchedulerTestSuite) TestBulkScheduler() {
suite.pushTasksToQueue()

names := []string{
task_1,
task_1,
task_2,
task_2,
task_3,
}

utils.PushMockedTasksToQueue(&suite.bs.BaseNexusScheduler, names, 2)

// Start scheduling to remove tasks that have passed their deadline
go suite.bs.Start()
Expand All @@ -94,10 +67,7 @@ func (suite *BulkSchedulerTestSuite) TestBulkScheduler() {

suite.Equal(5, suite.bs.Queue.Len())

suite.bs.Push(&structs.NexusItem{
Name: task_2,
Request: mockRequest,
})
utils.PushMockedTasksToQueue(&suite.bs.BaseNexusScheduler, []string{task_2}, 2)

suite.Equal(6, suite.bs.Queue.Len())

Expand All @@ -110,10 +80,7 @@ func (suite *BulkSchedulerTestSuite) TestBulkScheduler() {
suite.bs.MaxParallelRequests.Store(0)

for i := 0; i < suite.bs.MinAmountOfBulkItems; i++ {
suite.bs.Push(&structs.NexusItem{
Name: task_2,
Request: mockRequest,
})
utils.PushMockedTasksToQueue(&suite.bs.BaseNexusScheduler, []string{task_2}, 2)
}

suite.Equal(6, suite.bs.Queue.Len())
Expand Down
12 changes: 6 additions & 6 deletions pkg/nexus/common/scheduler/base-nexus-scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"encoding/json"
"fmt"
elastic_deploy "github.com/nuclio/nuclio/pkg/nexus/elastic-deploy"
"net/http"
"net/url"
"time"
Expand All @@ -14,6 +13,7 @@ import (
"github.com/nuclio/nuclio/pkg/nexus/common/models/structs"
queue "github.com/nuclio/nuclio/pkg/nexus/common/queue"
"github.com/nuclio/nuclio/pkg/nexus/common/utils"
elastic_deploy "github.com/nuclio/nuclio/pkg/nexus/elastic-deploy"
)

type BaseNexusScheduler struct {
Expand All @@ -26,29 +26,29 @@ type BaseNexusScheduler struct {
deployer *elastic_deploy.ProElasticDeploy
}

func NewBaseNexusScheduler(queue *queue.NexusQueue, config *config.BaseNexusSchedulerConfig, nexusConfig *config.NexusConfig, deployer *elastic_deploy.ProElasticDeploy) *BaseNexusScheduler {
func NewBaseNexusScheduler(queue *queue.NexusQueue, config *config.BaseNexusSchedulerConfig, nexusConfig *config.NexusConfig, client *http.Client, deployer *elastic_deploy.ProElasticDeploy) *BaseNexusScheduler {
return &BaseNexusScheduler{
BaseNexusSchedulerConfig: config,
Queue: queue,
requestUrl: models.NUCLIO_NEXUS_REQUEST_URL,
client: &http.Client{},
client: client,
NexusConfig: nexusConfig,
deployer: deployer,
}
}

func NewDefaultBaseNexusScheduler(queue *queue.NexusQueue, nexusConfig *config.NexusConfig, deployer *elastic_deploy.ProElasticDeploy) *BaseNexusScheduler {
baseSchedulerConfig := config.NewDefaultBaseNexusSchedulerConfig()
return NewBaseNexusScheduler(queue, &baseSchedulerConfig, nexusConfig, deployer)
return NewBaseNexusScheduler(queue, &baseSchedulerConfig, nexusConfig, &http.Client{}, deployer)
}

func (bns *BaseNexusScheduler) Push(elem *structs.NexusItem) {
bns.Queue.Push(elem)
}

func (bns *BaseNexusScheduler) Pop() (nexusItem *structs.NexusItem) {
bns.NexusConfig.MaxParallelRequests.Add(-1)
defer bns.NexusConfig.MaxParallelRequests.Add(1)
bns.MaxParallelRequests.Add(-1)
defer bns.MaxParallelRequests.Add(1)

nexusItem = bns.Queue.Pop()

Expand Down
9 changes: 8 additions & 1 deletion pkg/nexus/deadline/scheduler/deadline-scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/nuclio/nuclio/pkg/nexus/common/models/interfaces"
structs "github.com/nuclio/nuclio/pkg/nexus/common/models/structs"
common "github.com/nuclio/nuclio/pkg/nexus/common/scheduler"
"github.com/nuclio/nuclio/pkg/nexus/deadline/models"
)
Expand Down Expand Up @@ -58,7 +59,13 @@ func (ds *DeadlineScheduler) executeSchedule() {
for ds.Queue.Len() > 0 &&
ds.Queue.Peek().Deadline.Before(removeUntil) {

ds.Pop()
ds.MaxParallelRequests.Add(-1)
task := ds.Queue.Pop()

go func(taskInFunction *structs.NexusItem) {
defer ds.MaxParallelRequests.Add(1)
ds.CallSynchronized(taskInFunction)
}(task)
}

fmt.Println("Sleeping:", time.Until(nextWakeUpTime).Seconds(), "seconds")
Expand Down
45 changes: 19 additions & 26 deletions pkg/nexus/deadline/scheduler/deadline-scheduler_test.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
package deadline
package deadline_test

import (
"net/http"
"testing"
"time"
"fmt"

"github.com/nuclio/nuclio/pkg/nexus/common/models/config"
structsCommon "github.com/nuclio/nuclio/pkg/nexus/common/models/structs"
common "github.com/nuclio/nuclio/pkg/nexus/common/queue"
"github.com/nuclio/nuclio/pkg/nexus/common/scheduler"
deadline "github.com/nuclio/nuclio/pkg/nexus/deadline/models"
models "github.com/nuclio/nuclio/pkg/nexus/deadline/models"
deadline "github.com/nuclio/nuclio/pkg/nexus/deadline/scheduler"
utils "github.com/nuclio/nuclio/pkg/nexus/utils"
"github.com/stretchr/testify/suite"
"net/http"
"net/url"
"testing"
"time"
)

type DeadlineSchedulerTestSuite struct {
suite.Suite
ds *DeadlineScheduler
ds *deadline.DeadlineScheduler
}

type MockDeployer struct{}
Expand All @@ -28,35 +29,27 @@ func (bns *MockDeployer) Unpause(functionName string) {
func (suite *DeadlineSchedulerTestSuite) SetupTest() {
deadlineRemovalThreshold, sleepDuration := 2*time.Millisecond, 1*time.Millisecond

deadlineConfig := deadline.DeadlineSchedulerConfig{
deadlineConfig := models.DeadlineSchedulerConfig{
DeadlineRemovalThreshold: deadlineRemovalThreshold,
}

defaultQueue := common.Initialize()
baseSchedulerConfig := config.NewBaseNexusSchedulerConfig(true, sleepDuration)
nexusConfig := config.NewDefaultNexusConfig()

baseScheduler := scheduler.NewBaseNexusScheduler(defaultQueue, &baseSchedulerConfig, &nexusConfig, nil)
Client := &http.Client{
Transport: &utils.MockRoundTripper{},
}

suite.ds = NewScheduler(baseScheduler, deadlineConfig)
baseScheduler := scheduler.NewBaseNexusScheduler(defaultQueue, &baseSchedulerConfig, &nexusConfig, Client, nil)

suite.ds = deadline.NewScheduler(baseScheduler, deadlineConfig)
}

func (suite *DeadlineSchedulerTestSuite) TestDeadlineScheduler() {
mockTask := &structsCommon.NexusItem{
Request: &http.Request{
Method: "GET",
URL: &url.URL{
Path: "/api",
Scheme: "http",
Host: "localhost:8070",
},
Header: make(http.Header),
},
Deadline: time.Now().Add(2 * time.Millisecond),
}

// Push a task to the queue
suite.ds.Push(mockTask)
utils.PushMockedTasksToQueue(&suite.ds.BaseNexusScheduler, []string{"task1"}, 2)

// Start scheduling to remove tasks that have passed their deadline
go suite.ds.Start()
Expand All @@ -65,7 +58,7 @@ func (suite *DeadlineSchedulerTestSuite) TestDeadlineScheduler() {
time.Sleep(suite.ds.DeadlineRemovalThreshold + 1*time.Millisecond)

// Push another task to the queue which is expected not to be removed in time since the scheduler currently sleeps for 2 seconds
suite.ds.Push(mockTask)
utils.PushMockedTasksToQueue(&suite.ds.BaseNexusScheduler, []string{"task1"}, 2)

time.Sleep(1 * time.Microsecond)

Expand All @@ -78,7 +71,7 @@ func (suite *DeadlineSchedulerTestSuite) TestDeadlineScheduler() {

// Pause the scheduler
suite.ds.Stop()
suite.ds.Push(mockTask)
utils.PushMockedTasksToQueue(&suite.ds.BaseNexusScheduler, []string{"task1"}, 2)

time.Sleep(suite.ds.DeadlineRemovalThreshold + 200*time.Millisecond)
suite.Equal(1, suite.ds.Queue.Len())
Expand Down
59 changes: 59 additions & 0 deletions pkg/nexus/idle/scheduler/idle-scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package idle

import (
"time"

"github.com/nuclio/nuclio/pkg/nexus/common/models/interfaces"
structs "github.com/nuclio/nuclio/pkg/nexus/common/models/structs"
common "github.com/nuclio/nuclio/pkg/nexus/common/scheduler"
)

type IdleScheduler struct {
common.BaseNexusScheduler
}

func NewScheduler(baseNexusScheduler *common.BaseNexusScheduler) *IdleScheduler {
return &IdleScheduler{
BaseNexusScheduler: *baseNexusScheduler,
}
}

func NewDefaultScheduler(baseNexusScheduler *common.BaseNexusScheduler) *IdleScheduler {
return NewScheduler(baseNexusScheduler)
}

func (ds *IdleScheduler) Start() {
ds.RunFlag = true

ds.executeSchedule()
}

func (ds *IdleScheduler) Stop() {
ds.RunFlag = false
}

func (ds *IdleScheduler) GetStatus() interfaces.SchedulerStatus {
if ds.RunFlag {
return interfaces.Running
} else {
return interfaces.Stopped
}
}

func (ds *IdleScheduler) executeSchedule() {
for ds.RunFlag {
nextWakeUpTime := time.Now().Add(ds.SleepDuration)

for ds.Queue.Len() > 0 && ds.MaxParallelRequests.Load() > 0 {
ds.MaxParallelRequests.Add(-1)
task := ds.Queue.Pop()
go func(taskInFunction *structs.NexusItem) {
defer ds.MaxParallelRequests.Add(1)
ds.CallSynchronized(taskInFunction)
}(task)
}

// sleep until the next wake up time
time.Sleep(time.Until(nextWakeUpTime))
}
}
Loading

0 comments on commit 2e7d532

Please sign in to comment.