Skip to content

Commit

Permalink
chore: added recovery in crons recovery (#4592)
Browse files Browse the repository at this point in the history
* crons recovery

* renaming

* wire external

* changes
  • Loading branch information
subhashish-devtron authored Jan 25, 2024
1 parent 3636243 commit ed7dc4f
Show file tree
Hide file tree
Showing 12 changed files with 64 additions and 40 deletions.
2 changes: 2 additions & 0 deletions Wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ import (
repository10 "github.com/devtron-labs/devtron/pkg/variables/repository"
util2 "github.com/devtron-labs/devtron/util"
"github.com/devtron-labs/devtron/util/argo"
cron2 "github.com/devtron-labs/devtron/util/cron"
"github.com/devtron-labs/devtron/util/rbac"
"github.com/google/wire"
)
Expand Down Expand Up @@ -940,6 +941,7 @@ func InitializeApp() (*App, error) {

pipeline.NewPipelineConfigListenerServiceImpl,
wire.Bind(new(pipeline.PipelineConfigListenerService), new(*pipeline.PipelineConfigListenerServiceImpl)),
cron2.NewCronLoggerImpl,
)
return &App{}, nil
}
20 changes: 4 additions & 16 deletions client/cron/CdApplicationStatusUpdateHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/devtron-labs/devtron/pkg/appStore/deployment/service"
"github.com/devtron-labs/devtron/pkg/pipeline"
"github.com/devtron-labs/devtron/util"
cron2 "github.com/devtron-labs/devtron/util/cron"
"github.com/robfig/cron/v3"
"go.uber.org/zap"
"k8s.io/utils/pointer"
Expand Down Expand Up @@ -51,30 +52,17 @@ type CdApplicationStatusUpdateHandlerImpl struct {
installedAppVersionRepository repository2.InstalledAppRepository
}

type CronLoggerImpl struct {
logger *zap.SugaredLogger
}

func (impl *CronLoggerImpl) Info(msg string, keysAndValues ...interface{}) {
impl.logger.Infow(msg, keysAndValues...)
}

func (impl *CronLoggerImpl) Error(err error, msg string, keysAndValues ...interface{}) {
keysAndValues = append([]interface{}{"err", err}, keysAndValues...)
impl.logger.Errorw(msg, keysAndValues...)
}

func NewCdApplicationStatusUpdateHandlerImpl(logger *zap.SugaredLogger, appService app.AppService,
workflowDagExecutor pipeline.WorkflowDagExecutor, installedAppService service.InstalledAppService,
CdHandler pipeline.CdHandler, AppStatusConfig *app.AppServiceConfig, pubsubClient *pubsub.PubSubClientServiceImpl,
pipelineStatusTimelineRepository pipelineConfig.PipelineStatusTimelineRepository,
eventClient client2.EventClient, appListingRepository repository.AppListingRepository,
cdWorkflowRepository pipelineConfig.CdWorkflowRepository,
pipelineRepository pipelineConfig.PipelineRepository, installedAppVersionHistoryRepository repository2.InstalledAppVersionHistoryRepository,
installedAppVersionRepository repository2.InstalledAppRepository) *CdApplicationStatusUpdateHandlerImpl {
cronLogger := &CronLoggerImpl{logger: logger}
installedAppVersionRepository repository2.InstalledAppRepository, cronLogger *cron2.CronLoggerImpl) *CdApplicationStatusUpdateHandlerImpl {

cron := cron.New(
cron.WithChain(cron.SkipIfStillRunning(cronLogger)))
cron.WithChain(cron.SkipIfStillRunning(cronLogger), cron.Recover(cronLogger)))
cron.Start()
impl := &CdApplicationStatusUpdateHandlerImpl{
logger: logger,
Expand Down
5 changes: 3 additions & 2 deletions client/cron/CiStatusUpdateCron.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/devtron-labs/devtron/internal/sql/repository/pipelineConfig"
"github.com/devtron-labs/devtron/pkg/app"
"github.com/devtron-labs/devtron/pkg/pipeline"
cron2 "github.com/devtron-labs/devtron/util/cron"
"github.com/robfig/cron/v3"
"go.uber.org/zap"
"strconv"
Expand All @@ -26,9 +27,9 @@ type CiStatusUpdateCronImpl struct {

func NewCiStatusUpdateCronImpl(logger *zap.SugaredLogger, appService app.AppService,
ciWorkflowStatusUpdateConfig *CiWorkflowStatusUpdateConfig, ciPipelineRepository pipelineConfig.CiPipelineRepository,
ciHandler pipeline.CiHandler) *CiStatusUpdateCronImpl {
ciHandler pipeline.CiHandler, cronLogger *cron2.CronLoggerImpl) *CiStatusUpdateCronImpl {
cron := cron.New(
cron.WithChain())
cron.WithChain(cron.Recover(cronLogger)))
cron.Start()
impl := &CiStatusUpdateCronImpl{
logger: logger,
Expand Down
5 changes: 3 additions & 2 deletions client/cron/CiTriggerCron.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
bean2 "github.com/devtron-labs/devtron/pkg/pipeline/bean"
"github.com/devtron-labs/devtron/pkg/pipeline/repository"
repository3 "github.com/devtron-labs/devtron/pkg/plugin/repository"
cron2 "github.com/devtron-labs/devtron/util/cron"
"github.com/robfig/cron/v3"
"go.uber.org/zap"
)
Expand All @@ -28,9 +29,9 @@ type CiTriggerCronImpl struct {
}

func NewCiTriggerCronImpl(logger *zap.SugaredLogger, cfg *CiTriggerCronConfig, pipelineStageRepository repository.PipelineStageRepository,
ciHandler pipeline.CiHandler, ciArtifactRepository repository2.CiArtifactRepository, globalPluginRepository repository3.GlobalPluginRepository) *CiTriggerCronImpl {
ciHandler pipeline.CiHandler, ciArtifactRepository repository2.CiArtifactRepository, globalPluginRepository repository3.GlobalPluginRepository, cronLogger *cron2.CronLoggerImpl) *CiTriggerCronImpl {
cron := cron.New(
cron.WithChain())
cron.WithChain(cron.Recover(cronLogger)))
cron.Start()
impl := &CiTriggerCronImpl{
logger: logger,
Expand Down
5 changes: 3 additions & 2 deletions client/telemetry/TelemetryEventClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"fmt"
cloudProviderIdentifier "github.com/devtron-labs/common-lib/cloud-provider-identifier"
cron3 "github.com/devtron-labs/devtron/util/cron"
"net/http"
"time"

Expand Down Expand Up @@ -73,9 +74,9 @@ func NewTelemetryEventClientImpl(logger *zap.SugaredLogger, client *http.Client,
K8sUtil *k8s.K8sServiceImpl, aCDAuthConfig *util3.ACDAuthConfig, userService user2.UserService,
attributeRepo repository.AttributesRepository, ssoLoginService sso.SSOLoginService,
PosthogClient *PosthogClient, moduleRepository moduleRepo.ModuleRepository, serverDataStore *serverDataStore.ServerDataStore, userAuditService user2.UserAuditService, helmAppClient client.HelmAppClient, InstalledAppRepository repository2.InstalledAppRepository,
cloudProviderIdentifierService cloudProviderIdentifier.ProviderIdentifierService) (*TelemetryEventClientImpl, error) {
cloudProviderIdentifierService cloudProviderIdentifier.ProviderIdentifierService, cronLogger *cron3.CronLoggerImpl) (*TelemetryEventClientImpl, error) {
cron := cron.New(
cron.WithChain())
cron.WithChain(cron.Recover(cronLogger)))
cron.Start()
watcher := &TelemetryEventClientImpl{
cron: cron,
Expand Down
5 changes: 3 additions & 2 deletions client/telemetry/TelemetryEventClientExtended.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package telemetry
import (
"encoding/json"
cloudProviderIdentifier "github.com/devtron-labs/common-lib/cloud-provider-identifier"
cron3 "github.com/devtron-labs/devtron/util/cron"
"net/http"
"time"

Expand Down Expand Up @@ -61,10 +62,10 @@ func NewTelemetryEventClientImplExtended(logger *zap.SugaredLogger, client *http
chartRepository chartRepoRepository.ChartRepository, userAuditService user2.UserAuditService,
ciBuildConfigService pipeline.CiBuildConfigService, moduleRepository moduleRepo.ModuleRepository, serverDataStore *serverDataStore.ServerDataStore,
helmAppClient client.HelmAppClient, InstalledAppRepository repository2.InstalledAppRepository, userAttributesRepository repository.UserAttributesRepository,
cloudProviderIdentifierService cloudProviderIdentifier.ProviderIdentifierService) (*TelemetryEventClientImplExtended, error) {
cloudProviderIdentifierService cloudProviderIdentifier.ProviderIdentifierService, cronLogger *cron3.CronLoggerImpl) (*TelemetryEventClientImplExtended, error) {

cron := cron.New(
cron.WithChain())
cron.WithChain(cron.Recover(cronLogger)))
cron.Start()
watcher := &TelemetryEventClientImplExtended{
environmentService: environmentService,
Expand Down
2 changes: 2 additions & 0 deletions cmd/external-app/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
util2 "github.com/devtron-labs/devtron/pkg/util"
util3 "github.com/devtron-labs/devtron/util"
"github.com/devtron-labs/devtron/util/argo"
"github.com/devtron-labs/devtron/util/cron"
"github.com/devtron-labs/devtron/util/rbac"
"github.com/google/wire"
)
Expand Down Expand Up @@ -210,6 +211,7 @@ func InitializeApp() (*App, error) {
// chart group repository layer wire injection ended

// end: docker registry wire set injection
cron.NewCronLoggerImpl,
)
return &App{}, nil
}
5 changes: 3 additions & 2 deletions pkg/cluster/ClusterCronService.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cluster
import (
"fmt"
"github.com/caarlos0/env/v6"
cron2 "github.com/devtron-labs/devtron/util/cron"
"github.com/robfig/cron/v3"
"go.uber.org/zap"
)
Expand All @@ -19,13 +20,13 @@ type ClusterStatusConfig struct {
ClusterStatusCronTime int `env:"CLUSTER_STATUS_CRON_TIME" envDefault:"15"`
}

func NewClusterCronServiceImpl(logger *zap.SugaredLogger, clusterService ClusterService) (*ClusterCronServiceImpl, error) {
func NewClusterCronServiceImpl(logger *zap.SugaredLogger, clusterService ClusterService, cronLogger *cron2.CronLoggerImpl) (*ClusterCronServiceImpl, error) {
clusterCronServiceImpl := &ClusterCronServiceImpl{
logger: logger,
clusterService: clusterService,
}
// initialise cron
newCron := cron.New(cron.WithChain())
newCron := cron.New(cron.WithChain(cron.Recover(cronLogger)))
newCron.Start()
cfg := &ClusterStatusConfig{}
err := env.Parse(cfg)
Expand Down
5 changes: 3 additions & 2 deletions pkg/clusterTerminalAccess/UserTerminalAccessService.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/devtron-labs/devtron/pkg/k8s/capacity"
"github.com/devtron-labs/devtron/pkg/terminal"
"github.com/devtron-labs/devtron/util"
cron3 "github.com/devtron-labs/devtron/util/cron"
"github.com/go-pg/pg"
"github.com/robfig/cron/v3"
"github.com/yannh/kubeconform/pkg/resource"
Expand Down Expand Up @@ -79,9 +80,9 @@ func GetTerminalAccessConfig() (*models.UserTerminalSessionConfig, error) {
return config, err
}

func NewUserTerminalAccessServiceImpl(logger *zap.SugaredLogger, terminalAccessRepository repository.TerminalAccessRepository, config *models.UserTerminalSessionConfig, k8sCommonService k8s.K8sCommonService, terminalSessionHandler terminal.TerminalSessionHandler, K8sCapacityService capacity.K8sCapacityService, k8sUtil *k8s2.K8sServiceImpl) (*UserTerminalAccessServiceImpl, error) {
func NewUserTerminalAccessServiceImpl(logger *zap.SugaredLogger, terminalAccessRepository repository.TerminalAccessRepository, config *models.UserTerminalSessionConfig, k8sCommonService k8s.K8sCommonService, terminalSessionHandler terminal.TerminalSessionHandler, K8sCapacityService capacity.K8sCapacityService, k8sUtil *k8s2.K8sServiceImpl, cronLogger *cron3.CronLoggerImpl) (*UserTerminalAccessServiceImpl, error) {
//fetches all running and starting entities from db and start SyncStatus
podStatusSyncCron := cron.New(cron.WithChain())
podStatusSyncCron := cron.New(cron.WithChain(cron.Recover(cronLogger)))
terminalAccessDataArrayMutex := &sync.RWMutex{}
map1 := make(map[int]*UserTerminalAccessSessionData)
accessServiceImpl := &UserTerminalAccessServiceImpl{
Expand Down
5 changes: 3 additions & 2 deletions pkg/module/ModuleCronService.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
serverBean "github.com/devtron-labs/devtron/pkg/server/bean"
serverEnvConfig "github.com/devtron-labs/devtron/pkg/server/config"
"github.com/devtron-labs/devtron/util"
cron2 "github.com/devtron-labs/devtron/util/cron"
"github.com/go-pg/pg"
"github.com/robfig/cron/v3"
"github.com/tidwall/gjson"
Expand All @@ -52,7 +53,7 @@ type ModuleCronServiceImpl struct {

func NewModuleCronServiceImpl(logger *zap.SugaredLogger, moduleEnvConfig *ModuleEnvConfig, moduleRepository moduleRepo.ModuleRepository,
serverEnvConfig *serverEnvConfig.ServerEnvConfig, helmAppService client.HelmAppService, moduleServiceHelper ModuleServiceHelper, moduleResourceStatusRepository moduleRepo.ModuleResourceStatusRepository,
moduleDataStore *moduleDataStore.ModuleDataStore) (*ModuleCronServiceImpl, error) {
moduleDataStore *moduleDataStore.ModuleDataStore, cronLogger *cron2.CronLoggerImpl) (*ModuleCronServiceImpl, error) {

moduleCronServiceImpl := &ModuleCronServiceImpl{
logger: logger,
Expand All @@ -70,7 +71,7 @@ func NewModuleCronServiceImpl(logger *zap.SugaredLogger, moduleEnvConfig *Module
// cron job to update module status
// initialise cron
cron := cron.New(
cron.WithChain())
cron.WithChain(cron.Recover(cronLogger)))
cron.Start()

// add function into cron
Expand Down
23 changes: 23 additions & 0 deletions util/cron/CronLogger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package cron

import (
"github.com/devtron-labs/common-lib/constants"
"go.uber.org/zap"
)

type CronLoggerImpl struct {
logger *zap.SugaredLogger
}

func (impl *CronLoggerImpl) Info(msg string, keysAndValues ...interface{}) {
impl.logger.Infow(msg, keysAndValues...)
}

func (impl *CronLoggerImpl) Error(err error, msg string, keysAndValues ...interface{}) {
keysAndValues = append([]interface{}{"err", err}, keysAndValues...)
impl.logger.Errorw(constants.PanicLogIdentifier+": "+msg, keysAndValues...)
}

func NewCronLoggerImpl(logger *zap.SugaredLogger) *CronLoggerImpl {
return &CronLoggerImpl{logger: logger}
}
Loading

0 comments on commit ed7dc4f

Please sign in to comment.