Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Coordinator #44

Merged
merged 4 commits into from
Jun 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 39 additions & 6 deletions atlante/atlante.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"image/png"
"io"
"log"
"os"
"path/filepath"
"strings"
Expand All @@ -17,9 +16,13 @@ import (
fsmulti "github.com/go-spatial/maptoolkit/atlante/filestore/multi"
"github.com/go-spatial/maptoolkit/atlante/grids"
"github.com/go-spatial/maptoolkit/atlante/internal/resolution"
"github.com/go-spatial/maptoolkit/atlante/notifiers"
_ "github.com/go-spatial/maptoolkit/atlante/notifiers/http"
"github.com/go-spatial/maptoolkit/atlante/server/coordinator/field"
"github.com/go-spatial/maptoolkit/mbgl/bounds"
"github.com/go-spatial/maptoolkit/mbgl/image"
"github.com/go-spatial/maptoolkit/svg2pdf"
"github.com/prometheus/common/log"
)

// ImgStruct is a wrapper around an image that makes the image available to the
Expand Down Expand Up @@ -182,6 +185,10 @@ func GeneratePDF(ctx context.Context, sheet *Sheet, grid *grids.Cell, filenames
return ErrNilGrid
}

if sheet.Emitter != nil {
sheet.Emitter.Emit(field.Started{})
}

// TODO(gdey): use MdgID once we move to partial templates system
// grp := grid.MdgID.String(), an empty group is current directory
grp := ""
Expand All @@ -202,7 +209,7 @@ func GeneratePDF(ctx context.Context, sheet *Sheet, grid *grids.Cell, filenames
}
}

log.Println("filenames: ", filenames.IMG, filenames.SVG, filenames.PDF)
log.Infoln("filenames: ", filenames.IMG, filenames.SVG, filenames.PDF)

const tilesize = 4096 / 2

Expand All @@ -226,8 +233,8 @@ func GeneratePDF(ctx context.Context, sheet *Sheet, grid *grids.Cell, filenames
prj := bounds.ESPG3857
width, height := grid.WidthHeightForZoom(zoom)
latLngCenterPt := grid.CenterPtForZoom(zoom)
log.Println("width", width, "height", height)
log.Println("zoom", zoom, "Scale", sheet.Scale, "dpi", sheet.DPI, "ground measure", nground)
log.Infoln("width", width, "height", height)
log.Infoln("zoom", zoom, "Scale", sheet.Scale, "dpi", sheet.DPI, "ground measure", nground)

centerPt := bounds.LatLngToPoint(prj, latLngCenterPt[0], latLngCenterPt[1], zoom, tilesize)
dstimg, err := image.New(
Expand Down Expand Up @@ -258,6 +265,11 @@ func GeneratePDF(ctx context.Context, sheet *Sheet, grid *grids.Cell, filenames
img.Close()
}()

if sheet.Emitter != nil {
sheet.Emitter.Emit(field.Processing{
Description: fmt.Sprintf("intermediate file: %v ", filenames.SVG),
})
}
file, err := multiWriter.Writer(filenames.SVG, true)
defer file.Close()

Expand All @@ -271,7 +283,7 @@ func GeneratePDF(ctx context.Context, sheet *Sheet, grid *grids.Cell, filenames
Grid: grid,
})
if err != nil {
log.Printf("error trying to fillout sheet template")
log.Warnf("error trying to fillout sheet template")
return err
}

Expand All @@ -289,6 +301,11 @@ func GeneratePDF(ctx context.Context, sheet *Sheet, grid *grids.Cell, filenames
pdffn := assetsWriter.Path(filenames.PDF)
svgfn := assetsWriter.Path(filenames.SVG)

if sheet.Emitter != nil {
sheet.Emitter.Emit(field.Processing{
Description: fmt.Sprintf("file: %v ", filenames.PDF),
})
}
if err = svg2pdf.GeneratePDF(svgfn, pdffn, 2028, 2607); err != nil {
return err
}
Expand Down Expand Up @@ -318,6 +335,8 @@ type Atlante struct {
workDirectory string
sLock sync.RWMutex
sheets map[string]*Sheet
Notifier notifiers.Provider
JobID string
}

func (a *Atlante) Shutdown() {}
Expand Down Expand Up @@ -366,7 +385,6 @@ func (a *Atlante) GeneratePDFLatLng(ctx context.Context, sheetName string, lat,
if err != nil {
return nil, err
}

err = GeneratePDF(ctx, provider, cell, filenames)
return filenames, err

Expand All @@ -377,7 +395,21 @@ func (a *Atlante) generatePDF(ctx context.Context, sheet *Sheet, grid *grids.Cel
if err != nil {
return nil, err
}
if a.Notifier != nil && a.JobID != "" {
sheet.Emitter, err = a.Notifier.NewEmitter(a.JobID)
if err != nil {
sheet.Emitter = nil
log.Warnf("Failed to init emitter: %v", err)
}
}
err = GeneratePDF(ctx, sheet, grid, filenames)
if sheet.Emitter != nil {
if err != nil {
sheet.Emitter.Emit(field.Failed{Error: err})
} else {
sheet.Emitter.Emit(field.Completed{})
}
}
return filenames, err
}

Expand All @@ -387,6 +419,7 @@ func (a *Atlante) GeneratePDFJob(ctx context.Context, job Job, filenameTemplate
if err != nil {
return nil, err
}
a.JobID = job.MetaData["job_id"]
return a.generatePDF(ctx, sheet, cell, filenameTemplate)
}

Expand Down
15 changes: 9 additions & 6 deletions atlante/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type Config struct {
// Webserver is the configuration for the webserver
Webserver Webserver `toml:"webserver"`

Notifier env.Dict `toml:"notifier"`

Providers []env.Dict `toml:"providers"`
Sheets []Sheet `toml:"sheets"`

Expand All @@ -38,11 +40,13 @@ type Config struct {
// Webserver represents the config values for the webserver potion
// of the application.
type Webserver struct {
HostName env.String `toml:"hostname"`
Port env.String `toml:"port"`
Scheme env.String `toml:"scheme"`
Headers map[string]string `toml:"headers"`
Queue env.Dict `toml:"queue"`
HostName env.String `toml:"hostname"`
Port env.String `toml:"port"`
Scheme env.String `toml:"scheme"`
Headers map[string]string `toml:"headers"`
Queue env.Dict `toml:"queue"`
DisableNotificationEP bool `toml:"disable_notification_endpoint"`
Coordinator env.Dict `toml:"coordinator"`
}

// Sheet models a sheet in the config file
Expand All @@ -53,7 +57,6 @@ type Sheet struct {
DPI env.Int `toml:"dpi"`
Template env.String `toml:"template"`
Style env.String `toml:"style"`
Notifier env.String `toml:"notifier"`
Description env.String `toml:"description"`
}

Expand Down
2 changes: 1 addition & 1 deletion atlante/grids/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func Register(providerType string, init InitFunc, cleanup CleanupFunc) error {
init: init,
cleanup: cleanup,
}
log.Infof("registerd grid provider: %v", providerType)
log.Infof("registered grid provider: %v", providerType)
return nil
}

Expand Down
105 changes: 105 additions & 0 deletions atlante/notifiers/http/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package http

import (
"bytes"
"encoding/json"
"io/ioutil"
"net/http"
"strings"
"text/template"

"github.com/gdey/errors"
"github.com/go-spatial/maptoolkit/atlante/notifiers"
"github.com/go-spatial/maptoolkit/atlante/server/coordinator/field"
"github.com/prometheus/common/log"
)

const (
TYPE = "http"

DefaultContentType = "application/json"
DefaultURLTemplate = "/job/{{.job_id}}/status"

ConfigKeyContentType = "content_type"
ConfigKeyURLTemplate = "url_template"
)

func initFunc(cfg notifiers.Config) (notifiers.Provider, error) {
var err error
contentType := DefaultContentType
contentType, err = cfg.String(ConfigKeyContentType, &contentType)
if err != nil {
return nil, err
}
urlTemplate := DefaultURLTemplate
urlTemplate, err = cfg.String(ConfigKeyURLTemplate, &urlTemplate)
t, err := template.New("url").Parse(urlTemplate)
if err != nil {
return nil, err
}
log.Infof("configured notifier %v", TYPE)
return &Provider{
contentType: contentType,
urlTpl: t,
}, nil
}

func init() {
notifiers.Register(TYPE, initFunc, nil)
}

type Provider struct {
contentType string
urlTpl *template.Template
}

func (p *Provider) NewEmitter(jobid string) (notifiers.Emitter, error) {
var str strings.Builder
var ctx = struct {
JobID string
}{
JobID: jobid,
}
if err := p.urlTpl.Execute(&str, ctx); err != nil {
return nil, err
}

return &emitter{
contentType: p.contentType,
url: str.String(),
}, nil
}

type emitter struct {
jobid string
contentType string
url string
}

func (e *emitter) Emit(se field.StatusEnum) error {
if e == nil {
return errors.String("emitter is nil")
}
bdy, err := json.Marshal(field.Status{se})
if err != nil {
return err
}
buff := bytes.NewBuffer(bdy)
// Don't care about the response
gdey marked this conversation as resolved.
Show resolved Hide resolved
log.Infof("posting to %v:%s", e.url, string(bdy))
resp, err := http.Post(e.url, e.contentType, buff)
if err != nil {
log.Warnf("error posting to (%v): %v", e.url, err)
}
// If the status code was a Client Error or a Server Error we should log
// the code and body.
if resp.StatusCode >= 400 {
codetype := "client error"
if resp.StatusCode >= 500 {
codetype = "server error"
}
bdy, _ := ioutil.ReadAll(resp.Body)
log.Infof("%v (%v): %v", codetype, resp.StatusCode, bdy)
}
return err
}
9 changes: 8 additions & 1 deletion atlante/notifiers/notifiers.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
package notifiers

type Interface interface{
import "github.com/go-spatial/maptoolkit/atlante/server/coordinator/field"

type Emitter interface {
Emit(field.StatusEnum) error
}

type Provider interface {
NewEmitter(jobid string) (Emitter, error)
}
42 changes: 26 additions & 16 deletions atlante/notifiers/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,30 @@ import (
"github.com/go-spatial/tegola/dict"
)

type ErrNotifierAlreadyExists string
type ErrAlreadyExists string

func (err ErrNotifierAlreadyExists) Error() string {
func (err ErrAlreadyExists) Error() string {
return "notifier (" + string(err) + ") already exists"
}
func (err ErrNotifierAlreadyExists) Cause() error { return nil }
func (err ErrAlreadyExists) Cause() error { return nil }

const (
ErrNoNotifiersRegistered = errors.String("no notifiers registered")
ErrKey = errors.String("bad key provided")
ErrNoneRegistered = errors.String("no notifiers registered")
ErrKey = errors.String("bad key provided")

// ConfigKeyType is the name for the config key
ConfigKeyType = "type"
)

type NotifierConfiger interface {
type Config interface {
dict.Dicter
// NamedNotifierProvider returns a configured Notifer for the provided key.
// if the named provider does not exist ErrNoNotifiersRegistered will be returned
NamedNotifierProvider(name string) (Notifier, error)
}

/*****************************************************************************/

// InitFunc initilizes a notifier given the config.
// InitFunc should validate the config, and report any errors.
type InitFunc func(NotifierConfiger) (Notifer, error)
type InitFunc func(Config) (Provider, error)

// CleanupFunc is called when the system is shutting down.
// this allows the provider do any needed cleanup.
Expand All @@ -56,9 +56,9 @@ func Register(notifierType string, init InitFunc, cleanup CleanupFunc) error {
notifiers = make(map[string]funcs)
}
if _, ok := notifiers[notifierType]; ok {
return ErrNotifierAlreadyExists(notifierType)
return ErrAlreadyExists(notifierType)
}
notifiers[notiferType] = funcs{
notifiers[notifierType] = funcs{
init: init,
cleanup: cleanup,
}
Expand All @@ -70,7 +70,7 @@ func Unregister(notifierType string) {
notifiersLock.Lock()
defer notifiersLock.Unlock()

n, ok := notifiers[notifiersType]
n, ok := notifiers[notifierType]
if !ok {
return // nothing to do.
}
Expand All @@ -95,19 +95,29 @@ func Registered() (n []string) {
}

// For function returns a configured provider of the given type, and provided the correct config.
func For(notifierType string, config NotifierConfig) (Notifier, error) {
func For(notifierType string, config Config) (Provider, error) {
notifiersLock.RLock()
defer notifiersLock.RUnlock()
if notifiers == nil {
return nil, ErrNoNotifiersRegistered
return nil, ErrNoneRegistered
}
n, ok := notifiers[notifierType]
if !ok {
return nil, ErrNotifierNotRegistered(notifierType)
return nil, ErrNoneRegistered
}
return n.init(config)
}

// From is like For but assumes that the config has a ConfigKeyType value informing the type
// of provider being configured
func From(config Config) (Provider, error) {
cType, err := config.String(ConfigKeyType, nil)
if err != nil {
return nil, err
}
return For(cType, config)
}

// Cleanup should be called when the system is shutting down. This given each provider
// a chance to do any needed cleanup. This will unregister all providers.
func Cleanup() {
Expand Down
Loading