Skip to content

Commit

Permalink
Merge pull request #14 from alexandrevilain/improve-persistence-recon…
Browse files Browse the repository at this point in the history
…cilations

fix: improve persistence reconciliation by relying on the cluster status
  • Loading branch information
alexandrevilain authored May 30, 2022
2 parents e8307c0 + 64cd59f commit a930793
Show file tree
Hide file tree
Showing 9 changed files with 393 additions and 52 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ out
*.swp
*.swo
*~
.vscode
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -a -o manager main.go

FROM alpine/git as cloner
WORKDIR /workspace
RUN git clone --depth 1 -b v1.16.0 https://github.com/temporalio/temporal.git
RUN git clone --depth 1 -b v1.16.2 https://github.com/temporalio/temporal.git

# Use distroless as minimal base image to package the manager binary
# Refer to https://github.com/GoogleContainerTools/distroless for more details
Expand Down
19 changes: 16 additions & 3 deletions api/v1alpha1/temporalcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (s *TemporalDatastoreSpec) GetDatastoreType() (DatastoreType, error) {
return MySQLDatastore, nil
}
}
return DatastoreType(""), errors.New("can't specify datastore type from current spec")
return DatastoreType(""), errors.New("can't get datastore type from current spec")
}

// GetTLSKeyFileMountPath returns the client TLS cert mount path.
Expand Down Expand Up @@ -244,12 +244,22 @@ type ServiceStatus struct {
Version string `json:"version"`
}

// PersistenceStatus reports datastores schema versions.
type PersistenceStatus struct {
// DefaultStoreSchemaVersion holds the current schema version for the default store.
DefaultStoreSchemaVersion string `json:"defaultStoreSchemaVersion"`
// VisibilityStoreSchemaVersion holds the current schema version for the visibility store.
VisibilityStoreSchemaVersion string `json:"visibilityStoreSchemaVersion"`
}

// TemporalClusterStatus defines the observed state of TemporalCluster.
type TemporalClusterStatus struct {
// Version holds the current temporal version.
Version string `json:"version"`
Version string `json:"version,omitempty"`
// Persistence holds the persistence status.
Persistence PersistenceStatus `json:"persistence,omitempty"`
// Services holds all services statuses.
Services []ServiceStatus `json:"components"`
Services []ServiceStatus `json:"services,omitempty"`
// TODO(alexandrevilain): add conditions
}

Expand Down Expand Up @@ -291,6 +301,9 @@ func (c *TemporalCluster) ChildResourceName(resource string) string {

// Default sets default values on the temporal Cluster.
func (c *TemporalCluster) Default() {
if c.Spec.Version == "" {
c.Spec.Version = "1.16.2"
}
if c.Spec.Image == "" {
c.Spec.Image = "temporalio/server"
}
Expand Down
16 changes: 16 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 12 additions & 8 deletions config/crd/bases/apps.alexandrevilain.dev_temporalclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,18 @@ spec:
status:
description: TemporalClusterStatus defines the observed state of TemporalCluster.
properties:
components:
persistence:
description: Persistence holds the persistence status.
properties:
defaultStoreSchemaVersion:
type: string
visibilityStoreSchemaVersion:
type: string
required:
- defaultStoreSchemaVersion
- visibilityStoreSchemaVersion
type: object
services:
description: Services holds all services statuses.
items:
description: ServiceStatus reports a service status.
Expand All @@ -292,16 +303,9 @@ spec:
- version
type: object
type: array
schemaVersion:
description: SchemaVersion holds the current schema version.
type: string
version:
description: Version holds the current temporal version.
type: string
required:
- components
- schemaVersion
- version
type: object
type: object
served: true
Expand Down
97 changes: 91 additions & 6 deletions controllers/temporalcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package controllers

import (
"context"
"errors"
"fmt"

apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -37,6 +38,8 @@ import (
"github.com/alexandrevilain/temporal-operator/pkg/persistence"
"github.com/alexandrevilain/temporal-operator/pkg/resource"
"github.com/alexandrevilain/temporal-operator/pkg/status"
"github.com/alexandrevilain/temporal-operator/pkg/version"
"github.com/blang/semver/v4"
)

const (
Expand Down Expand Up @@ -81,7 +84,14 @@ func (r *TemporalClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ
// Set defaults on unfiled fields.
temporalCluster.Default()

if err := r.reconcilePersistence(ctx, temporalCluster); err != nil {
// Validate that the cluster version is a supported one.
// TODO(alexandrevilain): this should be moved to an AdmissionWebhook.
clusterVersion, err := version.ParseAndValidateTemporalVersion(temporalCluster.Spec.Version)
if err != nil {
return ctrl.Result{}, err
}

if err := r.reconcilePersistence(ctx, temporalCluster, clusterVersion); err != nil {
return ctrl.Result{}, err
}

Expand All @@ -92,16 +102,81 @@ func (r *TemporalClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ
return ctrl.Result{}, nil
}

func (r *TemporalClusterReconciler) reconcilePersistence(ctx context.Context, temporalCluster *appsv1alpha1.TemporalCluster) error {
if err := r.PersistenceManager.RunDefaultStoreSchemaTasks(ctx, temporalCluster); err != nil {
// reconcilePersistence tries to reconcile the cluster persistence.
// If first checks if the schema status field for both of the default and visibility stores are empty. If empty it tries to setup the stores' schemas.
// Then it compares the current schema version (from the cluster's status) and determine if a schema upgrade is needed.
func (r *TemporalClusterReconciler) reconcilePersistence(ctx context.Context, temporalCluster *appsv1alpha1.TemporalCluster, clusterVersion semver.Version) error {
defaultStore, found := temporalCluster.GetDefaultDatastore()
if !found {
return errors.New("default datastore not found")
}

visibilityStore, found := temporalCluster.GetVisibilityDatastore()
if !found {
return errors.New("visibility datastore not found")
}

if temporalCluster.Status.Persistence.DefaultStoreSchemaVersion == "" {
err := r.PersistenceManager.RunStoreSetupTask(ctx, temporalCluster, defaultStore)
if err != nil {
return err
}
temporalCluster.Status.Persistence.DefaultStoreSchemaVersion = "0.0.0"
}

if temporalCluster.Status.Persistence.VisibilityStoreSchemaVersion == "" {
err := r.PersistenceManager.RunStoreSetupTask(ctx, temporalCluster, visibilityStore)
if err != nil {
return err
}
temporalCluster.Status.Persistence.VisibilityStoreSchemaVersion = "0.0.0"
}

defaultStoreType, err := defaultStore.GetDatastoreType()
if err != nil {
return err
}

if err := r.PersistenceManager.RunVisibilityStoreSchemaTasks(ctx, temporalCluster); err != nil {
visibilityStoreType, err := visibilityStore.GetDatastoreType()
if err != nil {
return err
}

return nil
expectedSchemaVersions, err := version.GetExpectedSchemaVersions(clusterVersion)
if err != nil {
return err
}

expectedDefaultStoreSchemaVersion := expectedSchemaVersions[defaultStoreType]
expectedVisibilityStoreSchemaVersion := expectedSchemaVersions[visibilityStoreType]

currentDefaultStoreSchemaVersion, err := version.Parse(temporalCluster.Status.Persistence.DefaultStoreSchemaVersion)
if err != nil {
return err
}

currentVisibilityStoreSchemaVersion, err := version.Parse(temporalCluster.Status.Persistence.VisibilityStoreSchemaVersion)
if err != nil {
return err
}

if expectedDefaultStoreSchemaVersion.GT(currentDefaultStoreSchemaVersion) {
err := r.PersistenceManager.RunDefaultStoreUpdateTask(ctx, temporalCluster, defaultStore, expectedDefaultStoreSchemaVersion)
if err != nil {
return err
}
temporalCluster.Status.Persistence.DefaultStoreSchemaVersion = expectedDefaultStoreSchemaVersion.String()
}

if expectedVisibilityStoreSchemaVersion.GT(currentVisibilityStoreSchemaVersion) {
err := r.PersistenceManager.RunVisibilityStoreUpdateTask(ctx, temporalCluster, visibilityStore, expectedVisibilityStoreSchemaVersion)
if err != nil {
return err
}
temporalCluster.Status.Persistence.VisibilityStoreSchemaVersion = expectedVisibilityStoreSchemaVersion.String()
}

return r.updateTemporalClusterStatus(ctx, temporalCluster)
}

func (r *TemporalClusterReconciler) reconcileResources(ctx context.Context, temporalCluster *appsv1alpha1.TemporalCluster) error {
Expand Down Expand Up @@ -164,7 +239,17 @@ func (r *TemporalClusterReconciler) reconcileResources(ctx context.Context, temp
temporalCluster.Status.Version = temporalCluster.Spec.Version
}

return r.Status().Update(ctx, temporalCluster)
return r.updateTemporalClusterStatus(ctx, temporalCluster)
}

func (r *TemporalClusterReconciler) updateTemporalClusterStatus(ctx context.Context, temporalCluster *appsv1alpha1.TemporalCluster) error {
err := r.Status().Update(ctx, temporalCluster)
if err != nil {
return err
}
// Set back defaults as the status update retrieve the object from the API server.
temporalCluster.Default()
return nil
}

func (r *TemporalClusterReconciler) operationResultToAction(operationResult controllerutil.OperationResult) string {
Expand Down
Loading

0 comments on commit a930793

Please sign in to comment.