Skip to content

Commit

Permalink
feat: market shredder
Browse files Browse the repository at this point in the history
  • Loading branch information
antmat committed Jan 29, 2019
1 parent dadc712 commit d3990de
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 32 deletions.
4 changes: 0 additions & 4 deletions etc/node.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
migration_dwh:
endpoint:


# Local Node settings
node:
# Node's port to listen for client connection
Expand Down
43 changes: 35 additions & 8 deletions insonmnia/migration/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,42 +3,69 @@ package migration
import (
"context"
"crypto/ecdsa"
"fmt"

"github.com/ethereum/go-ethereum/crypto"
"github.com/sonm-io/core/blockchain"
"github.com/sonm-io/core/insonmnia/dwh"
"github.com/sonm-io/core/proto"
"github.com/sonm-io/core/util/multierror"
"github.com/sonm-io/core/util/xgrpc"
"google.golang.org/grpc/credentials"
)

type MarketShredder struct {
type MarketShredder interface {
WeDontNeedNoWaterLetTheMothefuckerBurn(ctx context.Context, pKey *ecdsa.PrivateKey) error
}

type nilMarketShredder struct {
}

func (m *nilMarketShredder) WeDontNeedNoWaterLetTheMothefuckerBurn(ctx context.Context, pKey *ecdsa.PrivateKey) error {
return nil
}

type marketShredder struct {
api blockchain.API
dwh sonm.DWHClient
}

func NewV1MarketShredder(ctx context.Context, bcCfg *blockchain.Config, dwhCfg dwh.YAMLConfig) (*MarketShredder, error) {
api, err := blockchain.NewAPI(ctx, blockchain.WithConfig(bcCfg), blockchain.WithVersion(1))
type MigrationConfig struct {
Blockchain *blockchain.Config
MigrationDWH *dwh.YAMLConfig
Enabled *bool `default:"true"`
Version uint
}

func NewV1MarketShredder(ctx context.Context, cfg *MigrationConfig, credentials credentials.TransportCredentials) (MarketShredder, error) {
if !*cfg.Enabled {
return &nilMarketShredder{}, nil
}
if cfg.MigrationDWH == nil {
return nil, fmt.Errorf("dwh config is required for enabled migrartion")
}
api, err := blockchain.NewAPI(ctx, blockchain.WithConfig(cfg.Blockchain), blockchain.WithVersion(1))
if err != nil {
return nil, err
}

cc, err := xgrpc.NewClient(ctx, dwhCfg.Endpoint, credentials)
cc, err := xgrpc.NewClient(ctx, cfg.MigrationDWH.Endpoint, credentials)
if err != nil {
return err
return nil, err
}

dwh := sonm.NewDWHClient(cc)
return NewMarketShredder(api, dwh), nil
}

func NewMarketShredder(api blockchain.API, dwh sonm.DWHClient) *MarketShredder {
return &MarketShredder{
func NewMarketShredder(api blockchain.API, dwh sonm.DWHClient) *marketShredder {
return &marketShredder{
api: api,
dwh: dwh,
}
}

func (m *MarketShredder) WeDontNeedNoWaterLetTheMothefuckerBurn(ctx context.Context, pKey *ecdsa.PrivateKey) error {
func (m *marketShredder) WeDontNeedNoWaterLetTheMothefuckerBurn(ctx context.Context, pKey *ecdsa.PrivateKey) error {
author := crypto.PubkeyToAddress(pKey.PublicKey)
ordersRequest := &sonm.OrdersRequest{
AuthorID: sonm.NewEthAddress(author),
Expand Down
48 changes: 28 additions & 20 deletions insonmnia/worker/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package worker

import (
"fmt"

"github.com/ethereum/go-ethereum/common"
"github.com/jinzhu/configor"
"github.com/opencontainers/runtime-spec/specs-go"
Expand All @@ -9,6 +11,7 @@ import (
"github.com/sonm-io/core/insonmnia/dwh"
"github.com/sonm-io/core/insonmnia/logging"
"github.com/sonm-io/core/insonmnia/matcher"
"github.com/sonm-io/core/insonmnia/migration"
"github.com/sonm-io/core/insonmnia/npp"
"github.com/sonm-io/core/insonmnia/state"
"github.com/sonm-io/core/insonmnia/worker/plugin"
Expand Down Expand Up @@ -37,26 +40,27 @@ type DevConfig struct {
}

type Config struct {
Endpoint string `yaml:"endpoint" required:"true"`
Logging logging.Config `yaml:"logging"`
Resources *ResourcesConfig `yaml:"resources" required:"false"`
Blockchain *blockchain.Config `yaml:"blockchain"`
NPP npp.Config `yaml:"npp"`
SSH *SSHConfig `yaml:"ssh" required:"false" `
PublicIPs []string `yaml:"public_ip_addrs" required:"false" `
Plugins plugin.Config `yaml:"plugins"`
Storage state.StorageConfig `yaml:"store"`
Benchmarks benchmarks.Config `yaml:"benchmarks"`
Whitelist WhitelistConfig `yaml:"whitelist"`
MetricsListenAddr string `yaml:"metrics_listen_addr" default:"127.0.0.1:14000"`
DWH dwh.YAMLConfig `yaml:"dwh"`
Matcher *matcher.YAMLConfig `yaml:"matcher"`
Salesman salesman.YAMLConfig `yaml:"salesman"`
Master common.Address `yaml:"master" required:"true"`
Development DevConfig `yaml:"development"`
Admin *common.Address `yaml:"admin"`
MetricsCollector *common.Address `yaml:"metrics_collector"`
Debug *debug.Config `yaml:"debug"`
Endpoint string `yaml:"endpoint" required:"true"`
Logging logging.Config `yaml:"logging"`
Resources *ResourcesConfig `yaml:"resources" required:"false"`
Blockchain *blockchain.Config `yaml:"blockchain"`
NPP npp.Config `yaml:"npp"`
SSH *SSHConfig `yaml:"ssh" required:"false" `
PublicIPs []string `yaml:"public_ip_addrs" required:"false" `
Plugins plugin.Config `yaml:"plugins"`
Storage state.StorageConfig `yaml:"store"`
Benchmarks benchmarks.Config `yaml:"benchmarks"`
Whitelist WhitelistConfig `yaml:"whitelist"`
MetricsListenAddr string `yaml:"metrics_listen_addr" default:"127.0.0.1:14000"`
DWH dwh.YAMLConfig `yaml:"dwh"`
Matcher *matcher.YAMLConfig `yaml:"matcher"`
Salesman salesman.YAMLConfig `yaml:"salesman"`
Master common.Address `yaml:"master" required:"true"`
Development DevConfig `yaml:"development"`
Admin *common.Address `yaml:"admin"`
MetricsCollector *common.Address `yaml:"metrics_collector"`
Debug *debug.Config `yaml:"debug"`
Migration *migration.MigrationConfig `yaml:"migration" required:"true"`
}

// NewConfig creates a new Worker config from the specified YAML file.
Expand All @@ -73,5 +77,9 @@ func NewConfig(path string) (*Config, error) {
cfg.Whitelist.PrivilegedIdentityLevel = sonm.IdentityLevel_ANONYMOUS
}

if cfg.Migration.MigrationDWH == nil && *cfg.Migration.Enabled {
return nil, fmt.Errorf("dwh config is required for enabled migrartion")
}

return cfg, nil
}
25 changes: 25 additions & 0 deletions insonmnia/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/sonm-io/core/insonmnia/inspect"
"github.com/sonm-io/core/insonmnia/logging"
"github.com/sonm-io/core/insonmnia/matcher"
"github.com/sonm-io/core/insonmnia/migration"
"github.com/sonm-io/core/insonmnia/npp"
"github.com/sonm-io/core/insonmnia/resource"
"github.com/sonm-io/core/insonmnia/state"
Expand Down Expand Up @@ -198,6 +199,8 @@ type Worker struct {
metrics *metrics.Handler
// Embedded inspection service.
*inspect.InspectService

shredder migration.MarketShredder
}

func NewWorker(cfg *Config, storage *state.Storage, options ...Option) (*Worker, error) {
Expand Down Expand Up @@ -275,6 +278,10 @@ func NewWorker(cfg *Config, storage *state.Storage, options ...Option) (*Worker,
return nil, err
}

if err := m.setupMarketShredder(); err != nil {
return nil, err
}

dg.CancelExec()

return m, nil
Expand Down Expand Up @@ -571,6 +578,15 @@ func (m *Worker) setupInspectService(cfg *Config, authWatcher *auth.AnyOfTranspo
return nil
}

func (m *Worker) setupMarketShredder() error {
shredder, err := migration.NewV1MarketShredder(m.ctx, m.cfg.Migration, m.credentials)
if err != nil {
return err
}
m.shredder = shredder
return nil
}

func (m *Worker) loadOrGenerateSSHSigner() (ssh.Signer, error) {
var privateKeyData []byte
ok, err := m.storage.Load(sshPrivateKeyKey, &privateKeyData)
Expand Down Expand Up @@ -632,6 +648,15 @@ func (m *Worker) Serve() error {

return m.externalGrpc.Serve(m.listener)
})
wg.Go(func() error {
log.S(ctx).Infof("shredding old market")
if err := m.shredder.WeDontNeedNoWaterLetTheMothefuckerBurn(ctx, m.key); err != nil {
log.S(ctx).Warnf("failed to shred old market")
return err
}
log.S(ctx).Infof("successfully shredded old market")
return nil
})

<-ctx.Done()
m.Close()
Expand Down

0 comments on commit d3990de

Please sign in to comment.