diff --git a/atlante/atlante.go b/atlante/atlante.go index dce31f6..89345e9 100644 --- a/atlante/atlante.go +++ b/atlante/atlante.go @@ -5,7 +5,6 @@ import ( "fmt" "image/png" "io" - "log" "os" "path/filepath" "strings" @@ -17,9 +16,13 @@ import ( fsmulti "github.com/go-spatial/maptoolkit/atlante/filestore/multi" "github.com/go-spatial/maptoolkit/atlante/grids" "github.com/go-spatial/maptoolkit/atlante/internal/resolution" + "github.com/go-spatial/maptoolkit/atlante/notifiers" + _ "github.com/go-spatial/maptoolkit/atlante/notifiers/http" + "github.com/go-spatial/maptoolkit/atlante/server/coordinator/field" "github.com/go-spatial/maptoolkit/mbgl/bounds" "github.com/go-spatial/maptoolkit/mbgl/image" "github.com/go-spatial/maptoolkit/svg2pdf" + "github.com/prometheus/common/log" ) // ImgStruct is a wrapper around an image that makes the image available to the @@ -182,6 +185,10 @@ func GeneratePDF(ctx context.Context, sheet *Sheet, grid *grids.Cell, filenames return ErrNilGrid } + if sheet.Emitter != nil { + sheet.Emitter.Emit(field.Started{}) + } + // TODO(gdey): use MdgID once we move to partial templates system // grp := grid.MdgID.String(), an empty group is current directory grp := "" @@ -202,7 +209,7 @@ func GeneratePDF(ctx context.Context, sheet *Sheet, grid *grids.Cell, filenames } } - log.Println("filenames: ", filenames.IMG, filenames.SVG, filenames.PDF) + log.Infoln("filenames: ", filenames.IMG, filenames.SVG, filenames.PDF) const tilesize = 4096 / 2 @@ -226,8 +233,8 @@ func GeneratePDF(ctx context.Context, sheet *Sheet, grid *grids.Cell, filenames prj := bounds.ESPG3857 width, height := grid.WidthHeightForZoom(zoom) latLngCenterPt := grid.CenterPtForZoom(zoom) - log.Println("width", width, "height", height) - log.Println("zoom", zoom, "Scale", sheet.Scale, "dpi", sheet.DPI, "ground measure", nground) + log.Infoln("width", width, "height", height) + log.Infoln("zoom", zoom, "Scale", sheet.Scale, "dpi", sheet.DPI, "ground measure", nground) centerPt := bounds.LatLngToPoint(prj, latLngCenterPt[0], latLngCenterPt[1], zoom, tilesize) dstimg, err := image.New( @@ -258,6 +265,11 @@ func GeneratePDF(ctx context.Context, sheet *Sheet, grid *grids.Cell, filenames img.Close() }() + if sheet.Emitter != nil { + sheet.Emitter.Emit(field.Processing{ + Description: fmt.Sprintf("intermediate file: %v ", filenames.SVG), + }) + } file, err := multiWriter.Writer(filenames.SVG, true) defer file.Close() @@ -271,7 +283,7 @@ func GeneratePDF(ctx context.Context, sheet *Sheet, grid *grids.Cell, filenames Grid: grid, }) if err != nil { - log.Printf("error trying to fillout sheet template") + log.Warnf("error trying to fillout sheet template") return err } @@ -289,6 +301,11 @@ func GeneratePDF(ctx context.Context, sheet *Sheet, grid *grids.Cell, filenames pdffn := assetsWriter.Path(filenames.PDF) svgfn := assetsWriter.Path(filenames.SVG) + if sheet.Emitter != nil { + sheet.Emitter.Emit(field.Processing{ + Description: fmt.Sprintf("file: %v ", filenames.PDF), + }) + } if err = svg2pdf.GeneratePDF(svgfn, pdffn, 2028, 2607); err != nil { return err } @@ -318,6 +335,8 @@ type Atlante struct { workDirectory string sLock sync.RWMutex sheets map[string]*Sheet + Notifier notifiers.Provider + JobID string } func (a *Atlante) Shutdown() {} @@ -366,7 +385,6 @@ func (a *Atlante) GeneratePDFLatLng(ctx context.Context, sheetName string, lat, if err != nil { return nil, err } - err = GeneratePDF(ctx, provider, cell, filenames) return filenames, err @@ -377,7 +395,21 @@ func (a *Atlante) generatePDF(ctx context.Context, sheet *Sheet, grid *grids.Cel if err != nil { return nil, err } + if a.Notifier != nil && a.JobID != "" { + sheet.Emitter, err = a.Notifier.NewEmitter(a.JobID) + if err != nil { + sheet.Emitter = nil + log.Warnf("Failed to init emitter: %v", err) + } + } err = GeneratePDF(ctx, sheet, grid, filenames) + if sheet.Emitter != nil { + if err != nil { + sheet.Emitter.Emit(field.Failed{Error: err}) + } else { + sheet.Emitter.Emit(field.Completed{}) + } + } return filenames, err } @@ -387,6 +419,7 @@ func (a *Atlante) GeneratePDFJob(ctx context.Context, job Job, filenameTemplate if err != nil { return nil, err } + a.JobID = job.MetaData["job_id"] return a.generatePDF(ctx, sheet, cell, filenameTemplate) } diff --git a/atlante/config/config.go b/atlante/config/config.go index 9e7030a..f3b55a8 100644 --- a/atlante/config/config.go +++ b/atlante/config/config.go @@ -20,6 +20,8 @@ type Config struct { // Webserver is the configuration for the webserver Webserver Webserver `toml:"webserver"` + Notifier env.Dict `toml:"notifier"` + Providers []env.Dict `toml:"providers"` Sheets []Sheet `toml:"sheets"` @@ -38,11 +40,13 @@ type Config struct { // Webserver represents the config values for the webserver potion // of the application. type Webserver struct { - HostName env.String `toml:"hostname"` - Port env.String `toml:"port"` - Scheme env.String `toml:"scheme"` - Headers map[string]string `toml:"headers"` - Queue env.Dict `toml:"queue"` + HostName env.String `toml:"hostname"` + Port env.String `toml:"port"` + Scheme env.String `toml:"scheme"` + Headers map[string]string `toml:"headers"` + Queue env.Dict `toml:"queue"` + DisableNotificationEP bool `toml:"disable_notification_endpoint"` + Coordinator env.Dict `toml:"coordinator"` } // Sheet models a sheet in the config file @@ -53,7 +57,6 @@ type Sheet struct { DPI env.Int `toml:"dpi"` Template env.String `toml:"template"` Style env.String `toml:"style"` - Notifier env.String `toml:"notifier"` Description env.String `toml:"description"` } diff --git a/atlante/grids/register.go b/atlante/grids/register.go index 5695479..974db69 100644 --- a/atlante/grids/register.go +++ b/atlante/grids/register.go @@ -69,7 +69,7 @@ func Register(providerType string, init InitFunc, cleanup CleanupFunc) error { init: init, cleanup: cleanup, } - log.Infof("registerd grid provider: %v", providerType) + log.Infof("registered grid provider: %v", providerType) return nil } diff --git a/atlante/notifiers/http/http.go b/atlante/notifiers/http/http.go new file mode 100644 index 0000000..5f6a7ee --- /dev/null +++ b/atlante/notifiers/http/http.go @@ -0,0 +1,105 @@ +package http + +import ( + "bytes" + "encoding/json" + "io/ioutil" + "net/http" + "strings" + "text/template" + + "github.com/gdey/errors" + "github.com/go-spatial/maptoolkit/atlante/notifiers" + "github.com/go-spatial/maptoolkit/atlante/server/coordinator/field" + "github.com/prometheus/common/log" +) + +const ( + TYPE = "http" + + DefaultContentType = "application/json" + DefaultURLTemplate = "/job/{{.job_id}}/status" + + ConfigKeyContentType = "content_type" + ConfigKeyURLTemplate = "url_template" +) + +func initFunc(cfg notifiers.Config) (notifiers.Provider, error) { + var err error + contentType := DefaultContentType + contentType, err = cfg.String(ConfigKeyContentType, &contentType) + if err != nil { + return nil, err + } + urlTemplate := DefaultURLTemplate + urlTemplate, err = cfg.String(ConfigKeyURLTemplate, &urlTemplate) + t, err := template.New("url").Parse(urlTemplate) + if err != nil { + return nil, err + } + log.Infof("configured notifier %v", TYPE) + return &Provider{ + contentType: contentType, + urlTpl: t, + }, nil +} + +func init() { + notifiers.Register(TYPE, initFunc, nil) +} + +type Provider struct { + contentType string + urlTpl *template.Template +} + +func (p *Provider) NewEmitter(jobid string) (notifiers.Emitter, error) { + var str strings.Builder + var ctx = struct { + JobID string + }{ + JobID: jobid, + } + if err := p.urlTpl.Execute(&str, ctx); err != nil { + return nil, err + } + + return &emitter{ + contentType: p.contentType, + url: str.String(), + }, nil +} + +type emitter struct { + jobid string + contentType string + url string +} + +func (e *emitter) Emit(se field.StatusEnum) error { + if e == nil { + return errors.String("emitter is nil") + } + bdy, err := json.Marshal(field.Status{se}) + if err != nil { + return err + } + buff := bytes.NewBuffer(bdy) + // Don't care about the response + log.Infof("posting to %v:%s", e.url, string(bdy)) + resp, err := http.Post(e.url, e.contentType, buff) + if err != nil { + log.Warnf("error posting to (%v): %v", e.url, err) + } + // If the status code was a Client Error or a Server Error we should log + // the code and body. + if resp.StatusCode >= 400 { + codetype := "client error" + if resp.StatusCode >= 500 { + codetype = "server error" + } + bdy, _ := ioutil.ReadAll(resp.Body) + log.Infof("%v (%v): %v", codetype, resp.StatusCode, bdy) + } + return err +} diff --git a/atlante/notifiers/notifiers.go b/atlante/notifiers/notifiers.go index f704ff0..77182db 100644 --- a/atlante/notifiers/notifiers.go +++ b/atlante/notifiers/notifiers.go @@ -1,4 +1,11 @@ package notifiers -type Interface interface{ +import "github.com/go-spatial/maptoolkit/atlante/server/coordinator/field" + +type Emitter interface { + Emit(field.StatusEnum) error +} + +type Provider interface { + NewEmitter(jobid string) (Emitter, error) } diff --git a/atlante/notifiers/register.go b/atlante/notifiers/register.go index fd11d92..925069e 100644 --- a/atlante/notifiers/register.go +++ b/atlante/notifiers/register.go @@ -8,30 +8,30 @@ import ( "github.com/go-spatial/tegola/dict" ) -type ErrNotifierAlreadyExists string +type ErrAlreadyExists string -func (err ErrNotifierAlreadyExists) Error() string { +func (err ErrAlreadyExists) Error() string { return "notifier (" + string(err) + ") already exists" } -func (err ErrNotifierAlreadyExists) Cause() error { return nil } +func (err ErrAlreadyExists) Cause() error { return nil } const ( - ErrNoNotifiersRegistered = errors.String("no notifiers registered") - ErrKey = errors.String("bad key provided") + ErrNoneRegistered = errors.String("no notifiers registered") + ErrKey = errors.String("bad key provided") + + // ConfigKeyType is the name for the config key + ConfigKeyType = "type" ) -type NotifierConfiger interface { +type Config interface { dict.Dicter - // NamedNotifierProvider returns a configured Notifer for the provided key. - // if the named provider does not exist ErrNoNotifiersRegistered will be returned - NamedNotifierProvider(name string) (Notifier, error) } /*****************************************************************************/ // InitFunc initilizes a notifier given the config. // InitFunc should validate the config, and report any errors. -type InitFunc func(NotifierConfiger) (Notifer, error) +type InitFunc func(Config) (Provider, error) // CleanupFunc is called when the system is shutting down. // this allows the provider do any needed cleanup. @@ -56,9 +56,9 @@ func Register(notifierType string, init InitFunc, cleanup CleanupFunc) error { notifiers = make(map[string]funcs) } if _, ok := notifiers[notifierType]; ok { - return ErrNotifierAlreadyExists(notifierType) + return ErrAlreadyExists(notifierType) } - notifiers[notiferType] = funcs{ + notifiers[notifierType] = funcs{ init: init, cleanup: cleanup, } @@ -70,7 +70,7 @@ func Unregister(notifierType string) { notifiersLock.Lock() defer notifiersLock.Unlock() - n, ok := notifiers[notifiersType] + n, ok := notifiers[notifierType] if !ok { return // nothing to do. } @@ -95,19 +95,29 @@ func Registered() (n []string) { } // For function returns a configured provider of the given type, and provided the correct config. -func For(notifierType string, config NotifierConfig) (Notifier, error) { +func For(notifierType string, config Config) (Provider, error) { notifiersLock.RLock() defer notifiersLock.RUnlock() if notifiers == nil { - return nil, ErrNoNotifiersRegistered + return nil, ErrNoneRegistered } n, ok := notifiers[notifierType] if !ok { - return nil, ErrNotifierNotRegistered(notifierType) + return nil, ErrNoneRegistered } return n.init(config) } +// From is like For but assumes that the config has a ConfigKeyType value informing the type +// of provider being configured +func From(config Config) (Provider, error) { + cType, err := config.String(ConfigKeyType, nil) + if err != nil { + return nil, err + } + return For(cType, config) +} + // Cleanup should be called when the system is shutting down. This given each provider // a chance to do any needed cleanup. This will unregister all providers. func Cleanup() { diff --git a/atlante/server/coordinator/coordinator.go b/atlante/server/coordinator/coordinator.go new file mode 100644 index 0000000..a6a5024 --- /dev/null +++ b/atlante/server/coordinator/coordinator.go @@ -0,0 +1,70 @@ +package coordinator + +import ( + "time" + + "github.com/gdey/errors" + "github.com/go-spatial/maptoolkit/atlante" + "github.com/go-spatial/maptoolkit/atlante/grids" + "github.com/go-spatial/maptoolkit/atlante/server/coordinator/field" +) + +const ( + ErrNilAtlanteJob = errors.String("nil atlante job provided") + ErrNilJob = errors.String("nil job provided") +) + +type Job struct { + JobID string `json:"job_id"` + // QJobID is the job id returned by the queue when + // the item was enqueued + QJobID string `json:"-"` + MdgID string `json:"mdgid"` + MdgIDPart uint32 `json:"sheet_number,omitempty"` + SheetName string `json:"sheet_name"` + Status field.Status `json:"status"` + EnqueuedAt time.Time `json:"enqueued_at"` + UpdatedAt time.Time `json:"updated_at"` +} + +type Provider interface { + + // NewJob is expected to return a Job with the ID, JobID, MdgID, MdgIDPart, Status, EnquedAt, and UpdatedAt fields filled in. + // If there is already a job in the system then it should return that job, otherwise it should return a new job + NewJob(job *atlante.Job) (jb *Job, err error) + + // FindJob will look for a job described by the given atlante.Job (MDGID/SheetName) and return it, or return nil, and a false for + // found + FindByJob(job *atlante.Job) (jb *Job, found bool) + + // FindJobID will attempt to locate the job by the given jobId, if a job is found non-nil job will be returned. If an error + // occurs then err will be non-nil. If job is not found, the both jb and err will be nil + FindByJobID(jobid string) (jb *Job, found bool) + + // UpdateField will attempt to update the job field info for the given job. + UpdateField(job *Job, fields ...field.Value) error +} + +// NewJob is a helper function that will create a new job object with basic fields filled in. +func NewJob(jobID string, ajob *atlante.Job) *Job { + t := time.Now() + var mdgid grids.MDGID + var sheetName string + + if ajob != nil { + if ajob.Cell.Mdgid != nil { + mdgid = *ajob.Cell.Mdgid + } + sheetName = ajob.SheetName + } + + return &Job{ + JobID: jobID, + MdgID: mdgid.Id, + MdgIDPart: mdgid.Part, + SheetName: sheetName, + Status: field.Status{field.Requested{}}, + EnqueuedAt: t, + UpdatedAt: t, + } +} diff --git a/atlante/server/coordinator/field/field.go b/atlante/server/coordinator/field/field.go new file mode 100644 index 0000000..71380da --- /dev/null +++ b/atlante/server/coordinator/field/field.go @@ -0,0 +1,14 @@ +package field + +type Value interface { + field() +} + +// QJobID is used to update the Queue Job ID field +type QJobID string + +func (QJobID) field() {} + +type JobData string + +func (JobData) field() {} diff --git a/atlante/server/coordinator/field/status.go b/atlante/server/coordinator/field/status.go new file mode 100644 index 0000000..cf7b5b8 --- /dev/null +++ b/atlante/server/coordinator/field/status.go @@ -0,0 +1,179 @@ +package field + +import ( + "encoding/json" + "errors" + "fmt" + "strings" +) + +const ( + requested = "requested" + completed = "completed" + started = "started" + processing = "processing" + failed = "failed" + + errorKey = "error" + descriptionKey = "description" + statusKey = "status" +) + +type ( + + // Status is used to hold a status for searilization + Status struct { + Status StatusEnum + } + + // StatusEnum is the status reference type + StatusEnum interface { + fmt.Stringer + + statusenum() + } + + // Requested is the status of the job when it is first requested + Requested struct{} + // Started is the status of a started job + Started struct{} + // Processing is the status of a job that is processing + Processing struct { + // Description of what processing is being done. + Description string `json:"description"` + } + // Failed is the status of a job that failed + Failed struct { + // Error as to why it failed + Error error `json:"error"` + } + // Completed is the status of a successful completed job + Completed struct{} +) + +func (s Status) String() string { return s.Status.String() } +func (s Status) field() {} + +// MarshalJSON implements the json.Marshaler interface +func (s Status) MarshalJSON() ([]byte, error) { + + if s.Status == nil { + return json.Marshal(nil) + } + + type sentinalEnum struct { + Type string `json:"status"` + } + type processingEnum struct { + Type string `json:"status"` + Description string `json:"description"` + } + type failedEnum struct { + Type string `json:"status"` + Error string `json:"error"` + } + + var jsonval interface{} + switch senum := s.Status.(type) { + case Started: + jsonval = sentinalEnum{ + Type: started, + } + case Requested: + jsonval = sentinalEnum{ + Type: requested, + } + case Processing: + jsonval = processingEnum{ + Type: processing, + Description: senum.Description, + } + case Failed: + jsonval = failedEnum{ + Type: failed, + Error: senum.Error.Error(), + } + case Completed: + jsonval = sentinalEnum{ + Type: completed, + } + default: + return []byte{}, fmt.Errorf("Unknown type %t", s.Status) + + } + return json.Marshal(jsonval) +} + +// UnmarshalJSON implements the json.Marshaler interface +func (s *Status) UnmarshalJSON(b []byte) error { + var obj map[string]json.RawMessage + if err := json.Unmarshal(b, &obj); err != nil { + return err + } + var typ string + if err := json.Unmarshal(obj[statusKey], &typ); err != nil { + return err + } + + switch typ { + case started: + s.Status = Started{} + case requested: + s.Status = Requested{} + case processing: + var p Processing + if err := json.Unmarshal(obj[descriptionKey], &p.Description); err != nil { + return nil + } + s.Status = p + case failed: + + var errStr string + if err := json.Unmarshal(obj[errorKey], &errStr); err != nil { + return nil + } + s.Status = Failed{ + Error: errors.New(errStr), + } + + case completed: + s.Status = Completed{} + + default: + return fmt.Errorf("Unknown status type: %v", typ) + + } + return nil +} + +func NewStatusFor(status, desc string) (StatusEnum, error) { + switch strings.ToLower(status) { + case started: + return Started{}, nil + case requested: + return Requested{}, nil + case completed: + return Completed{}, nil + case processing: + return Processing{Description: desc}, nil + case failed: + return Failed{Error: errors.New(desc)}, nil + default: + return nil, fmt.Errorf("Unknown status type: %v", status) + } +} + +func (Requested) statusenum() {} +func (Requested) String() string { return requested } + +func (Started) statusenum() {} +func (Started) String() string { return started } + +func (p Processing) statusenum() {} +func (p Processing) String() string { return processing + ":" + p.Description } + +func (f Failed) statusenum() {} +func (f Failed) String() string { return failed + ":" + f.Error.Error() } + +func (Completed) statusenum() {} +func (Completed) String() string { return completed } diff --git a/atlante/server/coordinator/logger/logger.go b/atlante/server/coordinator/logger/logger.go new file mode 100644 index 0000000..f0131a7 --- /dev/null +++ b/atlante/server/coordinator/logger/logger.go @@ -0,0 +1,118 @@ +package logger + +import ( + "fmt" + + "github.com/gdey/errors" + "github.com/go-spatial/maptoolkit/atlante" + "github.com/go-spatial/maptoolkit/atlante/server/coordinator" + "github.com/go-spatial/maptoolkit/atlante/server/coordinator/field" + "github.com/go-spatial/tegola/dict" + "github.com/prometheus/common/log" +) + +const ( + // TYPE is the name of the provider + TYPE = "logger" + + // ConfigKeyProvider is the key used in the config to select the Name of the provider to proxy calls to. + ConfigKeyProvider = "provider" +) + +func initFunc(cfg coordinator.Config) (coordinator.Provider, error) { + var ( + pConfig dict.Dicter + err error + ) + pConfig, err = cfg.Map(ConfigKeyProvider) + if err != nil { + return &Provider{}, err + } + + subProvider, err := coordinator.From(pConfig) + if err != nil { + return nil, err + } + log.Infof("initalizing log coordinator with: %T ", subProvider) + return &Provider{ + Provider: subProvider, + }, nil +} + +func init() { + coordinator.Register(TYPE, initFunc, nil) +} + +type Provider struct { + Provider coordinator.Provider +} + +func (p *Provider) NewJob(job *atlante.Job) (jb *coordinator.Job, err error) { + if job == nil { + log.Infof("job is nil") + return nil, coordinator.ErrNilAtlanteJob + } + jbID := fmt.Sprintf("%v:%v", job.SheetName, job.Cell.Mdgid.AsString()) + log.Infof("created a new jobID: %v", jbID) + if p == nil || p.Provider != nil { + return p.Provider.NewJob(job) + } + return coordinator.NewJob(jbID, job), nil +} + +func (p *Provider) UpdateField(job *coordinator.Job, fields ...field.Value) error { + if job == nil { + log.Infof("job is nil") + return coordinator.ErrNilJob + } + + log.Infof("update fields in job: %v", job.JobID) + for i, f := range fields { + switch fld := f.(type) { + case field.QJobID: + log.Infof("update q job id to: %v", string(fld)) + case field.Status: + switch status := fld.Status.(type) { + case field.Requested: + log.Infof("update status to requested") + case field.Started: + log.Infof("update status to started") + case field.Processing: + log.Infof("update status to processing %v", status.Description) + case field.Failed: + log.Infof("update status to failed - reason %v", status.Error) + default: + log.Infof("unknown status: %t", status) + } + default: + log.Infof("unkown field[%v] %t", i, fld) + return errors.String("unknown field type") + } + } + if p == nil || p.Provider != nil { + return p.Provider.UpdateField(job, fields...) + } + return nil +} + +func (p *Provider) FindByJob(job *atlante.Job) (jb *coordinator.Job, found bool) { + if job == nil { + log.Infof("job is nil") + return nil, false + } + log.Infof("looking for job via sheet: %v mdgid: %v ", job.SheetName, job.Cell.Mdgid.AsString()) + if p == nil || p.Provider != nil { + return p.Provider.FindByJob(job) + } + return nil, false +} + +func (p *Provider) FindByJobID(jobid string) (jb *coordinator.Job, found bool) { + log.Infof("looking for job : %v ", jobid) + if p == nil || p.Provider != nil { + return p.Provider.FindByJobID(jobid) + } + return nil, false +} + +var _ = coordinator.Provider(&Provider{}) diff --git a/atlante/server/coordinator/null/null.go b/atlante/server/coordinator/null/null.go new file mode 100644 index 0000000..19bfcf5 --- /dev/null +++ b/atlante/server/coordinator/null/null.go @@ -0,0 +1,44 @@ +package null + +import ( + "fmt" + + "github.com/go-spatial/maptoolkit/atlante" + "github.com/go-spatial/maptoolkit/atlante/server/coordinator" + "github.com/go-spatial/maptoolkit/atlante/server/coordinator/field" +) + +const ( + // TYPE is the name of the provider + TYPE = "null" +) + +func initFunc(cfg coordinator.Config) (coordinator.Provider, error) { return &Provider{}, nil } + +func init() { + coordinator.Register(TYPE, initFunc, nil) +} + +type Provider struct{} + +func (Provider) NewJob(job *atlante.Job) (jb *coordinator.Job, err error) { + if job == nil { + return nil, coordinator.ErrNilAtlanteJob + } + jbID := fmt.Sprintf("%v:%v", job.SheetName, job.Cell.Mdgid.AsString()) + return coordinator.NewJob(jbID, job), nil +} + +func (Provider) UpdateField(job *coordinator.Job, fields ...field.Value) error { + return nil +} + +func (Provider) FindByJob(job *atlante.Job) (jb *coordinator.Job, found bool) { + return nil, false +} + +func (Provider) FindByJobID(jobid string) (jb *coordinator.Job, found bool) { + return nil, false +} + +var _ = coordinator.Provider(Provider{}) diff --git a/atlante/server/coordinator/postgresql/README.md b/atlante/server/coordinator/postgresql/README.md new file mode 100644 index 0000000..30bc185 --- /dev/null +++ b/atlante/server/coordinator/postgresql/README.md @@ -0,0 +1,149 @@ +# postgresql + +A coorinator provider that can connect to a Postgres Database. + +```toml + +[[webserver.coordinator]] + type = "postresql" + host = "docker.for.mac.localhost" + database = "management" + user = "postgis" + password = "password" + +``` + +# Properties + +The provider supports the following properties + +* `type` (string) : [required] should be 'grid5k' +* `name` (string) : [required] the name of the provider (this will be normalized to the lowercase) +* `host` (string) : [required] the database host +* `port` (string) : [required] the database post +* `user` (string) : [required] the user for the database +* `password` (string) : [required] the user password +* `ssl_mode` (string) : [optional] the ssl mode for postgres SSL +* `ssl_key` (string) : [optional] the ssl key for postgres SSL +* `ssl_cert` (string) : [optional] the ssl cert for postgres SSL +* `ssl_root_cert` (string) : [optional] the ssl root cert +* `max_connections` (number) : [optional] the max number of connections to keep in the pool + +## SQL options. +The following options are used to specify the sql that is used to manage the +database. (note this can be brittle.) + +* `query_new_job` (string): the sql is run to create a new job. + +Default SQL: + +```sql + +INSERT INTO jobs( + mdgid, + sheet_number, + sheet_name, + bounds +) +VALUES($1,$2,$3,ST_GeometryFromText($4, $5)) +RETURNING id; + +``` + + * $1 will be the mdgid (string) + * $2 will be a sheet number (uint32) + * $3 will be the sheet name (string) + * $4 will be wkt of the bounds of the grid + * $5 will be the srid -- hardcode to 4326 for now + +* `query_update_queue_job_id` (string): the sql is run to update the queue job id. +Default SQL: + +```sql + +UPDATE jobs +SET queue_id=$2 +WHERE id=$1 + +``` + + * $1 will be the jobid (int) + * $2 will be the queue_id (string) + +* `query_update_job_data` (string): the sql is run to update the job data +Default SQL: + +```sql + +UPDATE jobs +SET job_data=$2 +WHERE id=$1 + +``` + * $1 will be the jobid (int) + * $2 will be the job_data (string) + + +* `query_insert_status` (string): the sql is run to insert a new status for a job + +```sql + +INSERT INTO statuses( + job_id, + status, + description +) +VALUES($1,$2,$3); + +``` + * $1 will be the jobid (int) + * $2 will be the status (string) + * $3 will be the description (string) + +* `query_select_job_id` (string): the sql is used to find job for a job_id + +```sql +SELECT + job.mdgid, + job.sheet_number, + job.sheet_name, + job.queue_id, + job.created as enqueued, + jobstatus.status, + jobstatus.description, + jobstatus.created as updated +FROM jobs AS job +JOIN statuses AS jobstatus ON job.id = jobstatus.job_id +WHERE job.id = $1 +ORDER BY jobstatus.id desc limit 1; +``` + * $1 will be the jobid (int) + + The list order is the order in which the items need to occure. + The system is expect the sql to return zero or one row only. + + +* `query_select_mdgid_sheetname` (string): the sql is used to find jobs for an mdgid/sheetname + +```sql +SELECT + job.id, + job.queue_id, + job.created as enqueued, + jobstatus.status, + jobstatus.description, + jobstatus.created as updated +FROM jobs AS job +JOIN statuses AS jobstatus ON job.id = jobstatus.job_id +WHERE job.mdgid = $1 AND job.sheet_number = $2 AND job.sheet_name = $3 +ORDER BY jobstatus.id desc limit 1; +``` + + * $1 will be the mdgid (string) + * $2 will be the sheet number (int) + * $3 will be the sheet name (string) + + The list order is the order in which the items need to occure. + The system is expect the sql to return zero or one row only. + +Create sqls for the original tables can be found in the [docs/jobs.sql folder.](doc/jobs.sql) diff --git a/atlante/server/coordinator/postgresql/docs/jobs.sql b/atlante/server/coordinator/postgresql/docs/jobs.sql new file mode 100644 index 0000000..d16c090 --- /dev/null +++ b/atlante/server/coordinator/postgresql/docs/jobs.sql @@ -0,0 +1,49 @@ +CREATE EXTENSION IF NOT EXISTS postgis ; + +CREATE TABLE jobs ( + id SERIAL PRIMARY KEY, + mdgid TEXT NOT NULL, + sheet_number integer DEFAULT 0, + sheet_name TEXT NOT NULL, + queue_id TEXT, + job_data TEXT, + bounds geometry(Polygon, 4326) NOT NULL, + created TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE INDEX ON jobs (mdgid); + +CREATE INDEX ON jobs (sheet_number); + +CREATE INDEX ON jobs (sheet_name); + +CREATE INDEX ON jobs (queue_id); + +CREATE INDEX bounds_polygon_idx ON jobs USING GIST (bounds); + + +CREATE TABLE statuses ( + id SERIAL PRIMARY KEY, + job_id INTEGER NOT NULL, + status TEXT NOT NULL, + description TEXT NOT NULL, + created TIMESTAMP WITH TIME ZONE default NOW() +); + + +CREATE INDEX ON statuses (job_id); + +-- Find job by job id +SELECT + job.mdgid, + job.sheet_number, + job.sheet_name, + job.queue_id, + job.created as enqueued, + jobstatus.status, + jobstatus.description, + jobstatus.created as updated +FROM jobs AS job +JOIN statuses AS jobstatus ON job.id = jobstatus.job_id +WHERE job.id = $1 +ORDER BY jobstatus.id desc limit 1; diff --git a/atlante/server/coordinator/postgresql/postgresql.go b/atlante/server/coordinator/postgresql/postgresql.go new file mode 100644 index 0000000..970b0a4 --- /dev/null +++ b/atlante/server/coordinator/postgresql/postgresql.go @@ -0,0 +1,524 @@ +package postgresql + +import ( + "crypto/tls" + "crypto/x509" + "fmt" + "io/ioutil" + "strconv" + "sync" + "time" + + "github.com/go-spatial/geom" + "github.com/go-spatial/geom/encoding/wkt" + "github.com/go-spatial/maptoolkit/atlante" + "github.com/go-spatial/maptoolkit/atlante/server/coordinator" + "github.com/go-spatial/maptoolkit/atlante/server/coordinator/field" + "github.com/prometheus/common/log" + + "github.com/go-spatial/tegola" + "github.com/jackc/pgx" +) + +// Name is the name of the provider type +const TYPE = "postgresql" + +// AppName is shown by the pqclient +var AppName = "atlante" + +// Provider implements the Grid.Provider interface +type Provider struct { + config pgx.ConnPoolConfig + pool *pgx.ConnPool + srid uint + QueryNewJob string + QueryUpdateQueueJobID string + QueryUpdateJobData string + QueryInsertStatus string + QuerySelectMDGIDSheetName string + QuerySelectJobID string +} + +const ( + // DefaultSRID is the assumed srid of data unless specified + DefaultSRID = tegola.WebMercator + // DefaultPort is the default port for postgis + DefaultPort = 5432 + // DefaultMaxConn is the max number of connections to attempt + DefaultMaxConn = 100 + // DefaultSSLMode by default ssl is disabled + DefaultSSLMode = "disable" + // DefaultSSLKey by default is empty + DefaultSSLKey = "" + // DefaultSSLCert by default is empty + DefaultSSLCert = "" + // DefaultEditDateFormat the time format to expect + DefaultEditDateFormat = time.RFC3339 + // DefaultEditBy who edited the content if not provided + DefaultEditBy = "" +) + +const ( + // ConfigKeyHost is the config key for the postgres host + ConfigKeyHost = "host" + // ConfigKeyPort is the config key for the postgres port + ConfigKeyPort = "port" + // ConfigKeyDB is the config key for the postgres db + ConfigKeyDB = "database" + // ConfigKeyUser is the config key for the postgres user + ConfigKeyUser = "user" + // ConfigKeyPassword is the config key for the postgres user's password + ConfigKeyPassword = "password" + // ConfigKeySSLMode is the config key for the postgres SSL + ConfigKeySSLMode = "ssl_mode" + // ConfigKeySSLKey is the config key for the postgres SSL + ConfigKeySSLKey = "ssl_key" + // ConfigKeySSLCert is the config key for the postgres SSL + ConfigKeySSLCert = "ssl_cert" + // ConfigKeySSLRootCert is the config key for the postgres SSL + ConfigKeySSLRootCert = "ssl_root_cert" + // ConfigKeyMaxConn is the max number of connections to keep in the pool + ConfigKeyMaxConn = "max_connections" + // ConfigKeySRID is the srid of the data + ConfigKeySRID = "srid" +) + +// ErrInvalidSSLMode is returned when something is wrong with SSL configuration +type ErrInvalidSSLMode string + +func (e ErrInvalidSSLMode) Error() string { + return fmt.Sprintf("postgis: invalid ssl mode (%v)", string(e)) +} + +func init() { + coordinator.Register(TYPE, initFunc, cleanup) +} + +// initFunc returns a new provider based on the postgresql database +func initFunc(config coordinator.Config) (coordinator.Provider, error) { + var emptystr string + + host, err := config.String(ConfigKeyHost, nil) + if err != nil { + return nil, err + } + + db, err := config.String(ConfigKeyDB, nil) + if err != nil { + return nil, err + } + + user, err := config.String(ConfigKeyUser, nil) + if err != nil { + return nil, err + } + + password, err := config.String(ConfigKeyPassword, nil) + if err != nil { + return nil, err + } + + sslmode := DefaultSSLMode + sslmode, err = config.String(ConfigKeySSLMode, &sslmode) + + sslkey := DefaultSSLKey + sslkey, err = config.String(ConfigKeySSLKey, &sslkey) + if err != nil { + return nil, err + } + + sslcert := DefaultSSLCert + sslcert, err = config.String(ConfigKeySSLCert, &sslcert) + if err != nil { + return nil, err + } + + sslrootcert := DefaultSSLCert + sslrootcert, err = config.String(ConfigKeySSLRootCert, &sslrootcert) + if err != nil { + return nil, err + } + + port := DefaultPort + if port, err = config.Int(ConfigKeyPort, &port); err != nil { + return nil, err + } + + maxcon := DefaultMaxConn + if maxcon, err = config.Int(ConfigKeyMaxConn, &maxcon); err != nil { + return nil, err + } + + srid := DefaultSRID + if srid, err = config.Int(ConfigKeySRID, &srid); err != nil { + return nil, err + } + + connConfig := pgx.ConnConfig{ + Host: host, + Port: uint16(port), + Database: db, + User: user, + Password: password, + LogLevel: pgx.LogLevelWarn, + RuntimeParams: map[string]string{ + // "default_transaction_read_only": "TRUE", + "application_name": AppName, + }, + } + + err = ConfigTLS(sslmode, sslkey, sslcert, sslrootcert, &connConfig) + if err != nil { + return nil, err + } + + p := Provider{ + config: pgx.ConnPoolConfig{ + ConnConfig: connConfig, + MaxConnections: int(maxcon), + }, + srid: uint(srid), + } + if p.pool, err = pgx.NewConnPool(p.config); err != nil { + return nil, fmt.Errorf("Failed while creating connection pool: %v", err) + } + + // Check and step up all the queries. + p.QueryNewJob, _ = config.String("query_new_job", &emptystr) + p.QueryUpdateQueueJobID, _ = config.String("query_update_queue_job_id", &emptystr) + p.QueryUpdateJobData, _ = config.String("query_update_job_data", &emptystr) + p.QueryInsertStatus, _ = config.String("query_insert_status", &emptystr) + p.QuerySelectMDGIDSheetName, _ = config.String("query_select_mdgid_sheetname", &emptystr) + p.QuerySelectJobID, _ = config.String("query_select_job_id", &emptystr) + + // track the provider so we can clean it up later + pLock.Lock() + providers = append(providers, p) + pLock.Unlock() + return &p, nil +} + +// ConfigTLS is used to configure TLS +// derived from github.com/jackc/pgx configTLS (https://github.com/jackc/pgx/blob/master/conn.go) +func ConfigTLS(sslMode string, sslKey string, sslCert string, sslRootCert string, cc *pgx.ConnConfig) error { + + switch sslMode { + case "disable": + cc.UseFallbackTLS = false + cc.TLSConfig = nil + cc.FallbackTLSConfig = nil + return nil + case "allow": + cc.UseFallbackTLS = true + cc.FallbackTLSConfig = &tls.Config{InsecureSkipVerify: true} + case "prefer": + cc.TLSConfig = &tls.Config{InsecureSkipVerify: true} + cc.UseFallbackTLS = true + cc.FallbackTLSConfig = nil + case "require": + cc.TLSConfig = &tls.Config{InsecureSkipVerify: true} + case "verify-ca", "verify-full": + cc.TLSConfig = &tls.Config{ + ServerName: cc.Host, + } + default: + return ErrInvalidSSLMode(sslMode) + } + + if sslRootCert != "" { + caCertPool := x509.NewCertPool() + + caCert, err := ioutil.ReadFile(sslRootCert) + if err != nil { + return fmt.Errorf("unable to read CA file (%q): %v", sslRootCert, err) + } + + if !caCertPool.AppendCertsFromPEM(caCert) { + return fmt.Errorf("unable to add CA to cert pool") + } + + cc.TLSConfig.RootCAs = caCertPool + cc.TLSConfig.ClientCAs = caCertPool + } + + if (sslCert == "") != (sslKey == "") { + return fmt.Errorf("both 'sslcert' and 'sslkey' are required") + } else if sslCert != "" { // we must have both now + cert, err := tls.LoadX509KeyPair(sslCert, sslKey) + if err != nil { + return fmt.Errorf("unable to read cert: %v", err) + } + + cc.TLSConfig.Certificates = []tls.Certificate{cert} + } + + return nil +} + +func (p *Provider) NewJob(job *atlante.Job) (jb *coordinator.Job, err error) { + if job == nil { + return nil, coordinator.ErrNilAtlanteJob + } + + const insertQuery = ` +INSERT INTO jobs( + mdgid, + sheet_number, + sheet_name, + bounds +) +VALUES($1,$2,$3,ST_GeometryFromText($4,$5)) +RETURNING id; +` + + query := insertQuery + if p.QueryNewJob != "" { + query = p.QueryNewJob + } + var id int + + // TODO(gdey):Bug in wkt.Encode it should close the ring + // bounds, _ := wkt.Encode(job.Cell.Hull().AsPolygon()) + h := job.Cell.Hull().Vertices() + h = append(h, h[0]) + bounds, _ := wkt.Encode(geom.Polygon{h}) + + row := p.pool.QueryRow( + query, + job.Cell.Mdgid.Id, + job.Cell.Mdgid.Part, + job.SheetName, + bounds, + 4326, + ) + if err := row.Scan(&id); err != nil { + return nil, err + } + return coordinator.NewJob(fmt.Sprintf("%v", id), job), nil +} + +func (p *Provider) UpdateField(job *coordinator.Job, fields ...field.Value) error { + const updateQJobIDQuery = ` +UPDATE jobs +SET queue_id=$2 +WHERE id=$1 + ` + const updateJobDataQuery = ` +UPDATE jobs +SET job_data=$2 +WHERE id=$1 + ` + const insertStatusQuery = ` +INSERT INTO statuses( + job_id, + status, + description +) +VALUES($1,$2,$3); + ` + + var err error + for _, f := range fields { + switch fld := f.(type) { + case field.QJobID: + query := updateQJobIDQuery + if p.QueryUpdateQueueJobID != "" { + query = p.QueryUpdateQueueJobID + } + qjbid := string(fld) + _, err = p.pool.Exec(query, job.JobID, qjbid) + case field.JobData: + query := updateJobDataQuery + if p.QueryUpdateJobData != "" { + query = p.QueryUpdateJobData + } + jbdata := string(fld) + _, err = p.pool.Exec(query, job.JobID, jbdata) + + case field.Status: + query := insertStatusQuery + if p.QueryInsertStatus != "" { + query = p.QueryInsertStatus + } + switch status := fld.Status.(type) { + case field.Requested, field.Started, field.Completed: + _, err = p.pool.Exec( + query, + job.JobID, + fld.Status.String(), + "", + ) + case field.Processing: + _, err = p.pool.Exec( + query, + job.JobID, + "processing", + status.Description, + ) + case field.Failed: + _, err = p.pool.Exec( + query, + job.JobID, + "failed", + status.Error.Error(), + ) + } + } + if err != nil { + return err + } + } + return nil +} + +func (p *Provider) FindByJob(job *atlante.Job) (jb *coordinator.Job, found bool) { + + const selectQuery = ` +SELECT + job.id, + job.queue_id, + job.created as enqueued, + jobstatus.status, + jobstatus.description, + jobstatus.created as updated +FROM jobs AS job +JOIN statuses AS jobstatus ON job.id = jobstatus.job_id +WHERE job.mdgid = $1 AND job.sheet_number = $2 AND job.sheet_name = $3 +ORDER BY jobstatus.id desc limit 1; + ` + query := selectQuery + if p.QuerySelectMDGIDSheetName != "" { + query = p.QuerySelectMDGIDSheetName + } + if job == nil { + return nil, false + } + mdgid := job.Cell.Mdgid.Id + sheetNumber := job.Cell.Mdgid.Part + sheetName := job.SheetName + + var ( + jobid int + queueID string + enqueued time.Time + status string + desc string + updated time.Time + ) + + row := p.pool.QueryRow(query, mdgid, sheetNumber, sheetName) + if err := row.Scan( + &jobid, + &queueID, + &enqueued, + &status, + &desc, + &updated, + ); err != nil { + log.Warnf("got error finding job %v-%v-%v: %v -- \n%v", mdgid, sheetNumber, sheetName, err, selectQuery) + return nil, false + } + s, err := field.NewStatusFor(status, desc) + if err != nil { + log.Warnf("jobid(%v) got bad status from database:%v -- %v", jobid, s, err) + } + cjb := &coordinator.Job{ + JobID: fmt.Sprintf("%v", jobid), + QJobID: queueID, + MdgID: mdgid, + MdgIDPart: uint32(sheetNumber), + SheetName: sheetName, + Status: field.Status{s}, + EnqueuedAt: enqueued, + UpdatedAt: updated, + } + return cjb, true +} + +func (p *Provider) FindByJobID(jobid string) (jb *coordinator.Job, found bool) { + + const selectQuery = ` +SELECT + job.mdgid, + job.sheet_number, + job.sheet_name, + job.queue_id, + job.created as enqueued, + jobstatus.status, + jobstatus.description, + jobstatus.created as updated +FROM jobs AS job +JOIN statuses AS jobstatus ON job.id = jobstatus.job_id +WHERE job.id = $1 +ORDER BY jobstatus.id desc limit 1; + ` + query := selectQuery + if p.QuerySelectJobID != "" { + query = p.QuerySelectJobID + } + id, err := strconv.ParseInt(jobid, 10, 64) + if err != nil { + return nil, false + } + + var ( + mdgid string + sheetNumber int + sheetName string + queueID string + enqueued time.Time + status string + desc string + updated time.Time + ) + row := p.pool.QueryRow(query, id) + if err := row.Scan( + &mdgid, + &sheetNumber, + &sheetName, + &queueID, + &enqueued, + &status, + &desc, + &updated, + ); err != nil { + log.Warnf("got error finding job %v: %v -- \n%v", id, err, selectQuery) + return nil, false + } + s, err := field.NewStatusFor(status, desc) + if err != nil { + log.Warnf("jobid(%v) got bad status from database:%v -- %v", jobid, s, err) + } + return &coordinator.Job{ + JobID: jobid, + QJobID: queueID, + MdgID: mdgid, + MdgIDPart: uint32(sheetNumber), + SheetName: sheetName, + Status: field.Status{s}, + EnqueuedAt: enqueued, + UpdatedAt: updated, + }, true +} + +// Close will close the provider's database connection +func (p *Provider) Close() { p.pool.Close() } + +var pLock sync.RWMutex + +// reference to all instantiated providers +var providers []Provider + +// cleanup will close all database connections and destroy all prviously instantiated Provider instatnces +func cleanup() { + if len(providers) == 0 { + // Nothing to do + return + } + pLock.Lock() + for i := range providers { + providers[i].Close() + } + providers = make([]Provider, 0) + pLock.Unlock() +} diff --git a/atlante/server/coordinator/register.go b/atlante/server/coordinator/register.go new file mode 100644 index 0000000..61c4be4 --- /dev/null +++ b/atlante/server/coordinator/register.go @@ -0,0 +1,142 @@ +package coordinator + +import ( + "fmt" + "sort" + "sync" + + "github.com/gdey/errors" + "github.com/go-spatial/tegola/dict" + "github.com/prometheus/common/log" +) + +// ErrProviderTypeExists is returned when a provider is already registered with that name +type ErrProviderTypeExists string + +func (err ErrProviderTypeExists) Error() string { + return "coordinator provider (" + string(err) + ") already exists" +} + +const ( + // ErrNoProvidersRegistered is returned when no coordinators are registered with the system + ErrNoProvidersRegistered = errors.String("no coordinator providers registered") + + // ConfigKeyType is the name for the config key + ConfigKeyType = "type" +) + +// ErrUnknownProvider is returned when a requested queuer is not registered +type ErrUnknownProvider string + +func (err ErrUnknownProvider) Error() string { + return fmt.Sprintf("error unknown queue provider %v", string(err)) +} + +// Config is the interface that is passed to the queue provider to configure them +type Config interface { + dict.Dicter +} + +// InitFunc initilizes a queue provider given a config +// The InitFunc should validate the config and report any errors. +// Called by the For function +type InitFunc func(Config) (Provider, error) + +// CleanupFunc is called when the system is shuting down; +// Allows queue provider a way to do cleanup +type CleanupFunc func() + +type funcs struct { + init InitFunc + cleanup CleanupFunc +} + +var providerLock sync.RWMutex +var providers map[string]funcs + +// Register is called by the init functions of each of the providers +func Register(providerType string, init InitFunc, cleanup CleanupFunc) error { + providerLock.Lock() + defer providerLock.Unlock() + if providers == nil { + providers = make(map[string]funcs) + } + + if _, ok := providers[providerType]; ok { + return ErrProviderTypeExists(providerType) + } + providers[providerType] = funcs{ + init: init, + cleanup: cleanup, + } + log.Infof("registered coordinator provider: %v", providerType) + return nil +} + +// Unregister will remove a provider and call it's cleanup function +func Unregister(providerType string) { + providerLock.Lock() + defer providerLock.Unlock() + + p, ok := providers[providerType] + if !ok { + return // nothing to do + } + + p.cleanup() + delete(providers, providerType) +} + +// Registered returns the providers that have been registered +func Registered() []string { + p := make([]string, len(providers)) + i := 0 + providerLock.RLock() + for k := range providers { + p[i] = k + i++ + } + providerLock.RUnlock() + sort.Strings(p) + return p +} + +// For function returns a configured provider given the type and config +func For(providerType string, config Config) (Provider, error) { + providerLock.RLock() + defer providerLock.RUnlock() + + if providers == nil { + return nil, ErrNoProvidersRegistered + } + + p, ok := providers[providerType] + if !ok { + return nil, ErrUnknownProvider(providerType) + } + return p.init(config) +} + +// From is like for but assumes that the config has a ConfigKeyType value informing the type +// of provider being configured +func From(config Config) (Provider, error) { + cType, err := config.String(ConfigKeyType, nil) + if err != nil { + return nil, err + } + return For(cType, config) +} + +// Cleanup should be called when the system is shutting down. This gives weach provider +// a chance to do any needed cleanup. this will unreigster all providers +func Cleanup() { + providerLock.Lock() + for _, p := range providers { + if p.cleanup == nil { + continue + } + p.cleanup() + } + providers = make(map[string]funcs) + providerLock.Unlock() +} diff --git a/atlante/server/server.go b/atlante/server/server.go index bd3703d..1a073ec 100644 --- a/atlante/server/server.go +++ b/atlante/server/server.go @@ -12,6 +12,11 @@ import ( "strings" "time" + "github.com/go-spatial/maptoolkit/atlante/server/coordinator/field" + "github.com/go-spatial/maptoolkit/atlante/server/coordinator/null" + + "github.com/go-spatial/maptoolkit/atlante/server/coordinator" + "github.com/go-spatial/maptoolkit/atlante/filestore" "github.com/go-spatial/maptoolkit/atlante/queuer" @@ -38,6 +43,9 @@ const ( // ParamsKeySheetname is the key used for the sheetname ParamsKeySheetname = URLPlaceholder("sheetname") + // ParamsKeyJobID is the key used for the jobid + ParamsKeyJobID = URLPlaceholder("job_id") + // HTTPErrorHeader is the name of the X-header where details of the error // is provided. HTTPErrorHeader = "X-HTTP-Error-Description" @@ -75,19 +83,6 @@ func GenPath(paths ...interface{}) string { } type ( - jobItem struct { - ID int `json:"-"` - JobID string `json:"job_id"` - // QJobID is the job id returned by the queue when - // the item was enqueued - QJobID string `json:"-"` - MdgID string `json:"mdgid"` - MdgIDPart uint32 `json:"sheet_number,omitempty"` - Status string `json:"status,omitempty"` - EnquedAt time.Time `json:"enqued_at,omitempty"` - UpdatedAt time.Time `json:"updated_at,omitempty"` - } - // Server is used to serve up grid information, and generate print jobs Server struct { // HostName is the name of the host to use for construction of URLS. @@ -111,6 +106,12 @@ type ( // jobsDB is the database (sqlite) containing the jobs we have sent to be processed // this is for job tracking jobsDB *sql.DB + + // Coordinator is a Coordinator Provider for managing jobs + Coordinator coordinator.Provider + + // DisableNotificationEP will disable the job notification end points from being registered. + DisableNotificationEP bool } ) @@ -154,7 +155,9 @@ func setHeaders(h map[string]string, w http.ResponseWriter) { } func badRequest(w http.ResponseWriter, reasonFmt string, data ...interface{}) { - w.Header().Set(HTTPErrorHeader, fmt.Sprintf(reasonFmt, data...)) + str := fmt.Sprintf(reasonFmt, data...) + log.Infof("got error: %v", str) + w.Header().Set(HTTPErrorHeader, str) w.WriteHeader(http.StatusBadRequest) } @@ -163,7 +166,7 @@ func serverError(w http.ResponseWriter, reasonFmt string, data ...interface{}) { w.WriteHeader(http.StatusInternalServerError) } -func encodeCellAsJSON(w io.Writer, cell *grids.Cell, pdf string, lat, lng *float64, lastGen time.Time) { +func encodeCellAsJSON(w io.Writer, cell *grids.Cell, pdf string, lat, lng *float64, lastGen time.Time, status field.Status) { // Build out the geojson const geoJSONFmt = `{"type":"FeatureCollection","features":[{"type":"Feature","properties":{"objectid":"%v"},"geometry":{"type":"Polygon","coordinates":[[[%v,%v],[%v, %v],[%v, %v],[%v, %v],[%v, %v]]]}}]}` mdgid := cell.GetMdgid() @@ -171,6 +174,7 @@ func encodeCellAsJSON(w io.Writer, cell *grids.Cell, pdf string, lat, lng *float jsonCell := struct { MDGID string `json:"mdgid"` Part *uint32 `json:"sheet_number,omitempty"` + JobStatus field.Status `json:"status"` PDF string `json:"pdf_url"` LastGen string `json:"last_generated,omitempty"` // RFC 3339 format LastEdited string `json:"last_edited,omitempty"` // RFC 3339 format @@ -181,13 +185,17 @@ func encodeCellAsJSON(w io.Writer, cell *grids.Cell, pdf string, lat, lng *float GeoJSON json.RawMessage `json:"geo_json"` }{ MDGID: mdgid.Id, - LastGen: lastGen.Format(time.RFC3339), + JobStatus: status, Lat: lat, Lng: lng, PDF: pdf, Series: cell.GetSeries(), SheetName: cell.GetSheet(), } + + if !lastGen.IsZero() { + jsonCell.LastGen = lastGen.Format(time.RFC3339) + } pubdate, err := cell.PublicationDate() if err != nil && !pubdate.IsZero() { jsonCell.LastEdited = pubdate.Format(time.RFC3339) @@ -210,131 +218,10 @@ func encodeCellAsJSON(w io.Writer, cell *grids.Cell, pdf string, lat, lng *float ), ) // Encoding the cell into the json - json.NewEncoder(w).Encode(jsonCell) -} - -// CreateJobDB will create the job tracking database -func CreateJobDB(filename string) (*sql.DB, error) { - database, err := sql.Open("sqlite3", filename) - if err != nil { - return nil, err - } - statement, err := database.Prepare(` - CREATE TABLE IF NOT EXISTS jobs ( - id INTEGER PRIMARY KEY, - job_id TEXT, - mdgid TEXT, - status TEXT, - enqued_at TEXT, - updated_at TEXT, - qjob_id TEXT, - jobobj TEXT - ) -`) + err = json.NewEncoder(w).Encode(jsonCell) if err != nil { - return nil, err + log.Warnf("failed to encode jsonCell: %v", err) } - statement.Exec() - statement, err = database.Prepare(`CREATE UNIQUE INDEX idx_job_id ON jobs (job_id);`) - if err != nil { - return nil, err - } - statement.Exec() - statement, err = database.Prepare(`CREATE UNIQUE INDEX idx_qjob_id ON jobs (qjob_id);`) - if err != nil { - return nil, err - } - statement.Exec() - return database, nil -} - -func (s *Server) findJobItem(mdgidstr string) (jbs []jobItem, err error) { - database := s.jobsDB - if database == nil { - return nil, nil - } - mdgid := grids.NewMDGID(mdgidstr) - const selectSQL = ` - SELECT - id, - job_id, - qjob_id, - status, - enqued_at, - updated_at - FROM - jobs - WHERE - mdgid=? - ORDER BY id DESC - ` - rows, err := database.Query(selectSQL, mdgid.AsString()) - if err != nil { - return jbs, err - } - - for rows.Next() { - var enquedat, updatedat string - ji := jobItem{MdgID: mdgid.Id} - if mdgid.Part != 0 { - ji.MdgIDPart = mdgid.Part - } - err = rows.Scan( - &ji.ID, - &ji.JobID, - &ji.QJobID, - &ji.Status, - &enquedat, - &updatedat, - ) - if err != nil { - return []jobItem{}, err - } - ji.EnquedAt, err = time.Parse(time.RFC3339, enquedat) - if err != nil { - log.Warnf("failed to parse enqued at from db: %v for id: %v", enquedat, ji.ID) - } - ji.UpdatedAt, err = time.Parse(time.RFC3339, updatedat) - if err != nil { - log.Warnf("failed to parse updated at from db: %v for id: %v", updatedat, ji.ID) - } - jbs = append(jbs, ji) - } - return jbs, nil -} - -// TODO(gdey): job management -func (s *Server) addJobItem(ji *jobItem) (*jobItem, error) { - if s == nil || ji == nil { - return nil, fmt.Errorf("server or jobItem is nil") - } - database := s.jobsDB - if database == nil { - return nil, fmt.Errorf("jobDB not initilized") - } - const insertSQL = ` - INSERT INTO jobs ( - job_id, - qjob_id, - mdgid, - status, - enqued_at - ) VALUES (?, ?, ?, ?, ?) - ` - enqueded := ji.EnquedAt.Format(time.RFC3339) - stm, err := database.Prepare(insertSQL) - if err != nil { - return nil, err - } - mdgid := grids.MDGID{ - Id: ji.MdgID, - Part: ji.MdgIDPart, - } - _, err = stm.Exec(ji.ID, ji.JobID, mdgid.AsString, ji.Status, enqueded) - if err != nil { - return nil, err - } - return ji, nil } // GetHostName returns determines the hostname:port to return based on the following hierarchy @@ -408,8 +295,11 @@ func (s *Server) GridInfoHandler(w http.ResponseWriter, request *http.Request, u // We will fill this out later pdfURL string + // We will get this from the filestore lastGen time.Time + + status field.Status ) sheetName, ok := urlParams[string(ParamsKeySheetname)] @@ -429,6 +319,7 @@ func (s *Server) GridInfoHandler(w http.ResponseWriter, request *http.Request, u // check to see if the mdgid key was given. if mdgidStr, ok := urlParams[string(ParamsKeyMDGID)]; ok { + log.Infof("MDGID route %v", mdgidStr) mdgid = grids.NewMDGID(mdgidStr) cell, err = sheet.CellForMDGID(mdgid) if err != nil { @@ -483,6 +374,15 @@ func (s *Server) GridInfoHandler(w http.ResponseWriter, request *http.Request, u } } + // Ask the coordinator for the status: + if jb, ok := s.Coordinator.FindByJob(&atlante.Job{SheetName: sheetName, Cell: cell}); ok { + status = jb.Status + lastGen = jb.UpdatedAt + if jb.UpdatedAt.IsZero() { + lastGen = jb.EnqueuedAt + } + } + // content type w.Header().Add("Content-Type", "application/json") @@ -491,7 +391,7 @@ func (s *Server) GridInfoHandler(w http.ResponseWriter, request *http.Request, u w.Header().Add("Pragma", "no-cache") w.Header().Add("Expires", "0") - encodeCellAsJSON(w, cell, pdfURL, latp, lngp, lastGen) + encodeCellAsJSON(w, cell, pdfURL, latp, lngp, lastGen, status) } // QueueHandler takes a job from a post and enqueues it on the configured queue @@ -500,9 +400,16 @@ func (s *Server) QueueHandler(w http.ResponseWriter, request *http.Request, urlP // TODO(gdey): this initial version will not do job tracking. // meaning every request to the handler will get a job enqued into the // queueing system. + if s.Coordinator == nil { + s.Coordinator = &null.Provider{} + } + + var ji struct { + MdgID string `json:"mdgid"` + MdgIDPart uint32 `json:"sheet_number,omitempty"` + } // Get json body - var ji jobItem bdy, err := ioutil.ReadAll(request.Body) request.Body.Close() if err != nil { @@ -528,10 +435,6 @@ func (s *Server) QueueHandler(w http.ResponseWriter, request *http.Request, urlP sheetName = s.Atlante.NormalizeSheetName(sheetName, false) - // ji is now going to be what get's returned - ji.EnquedAt = time.Now() - ji.JobID = fmt.Sprintf("%v:%v", sheetName, mdgid.AsString()) - sheet, err := s.Atlante.SheetFor(sheetName) if err != nil { badRequest(w, "error getting sheet(%v):%v", sheetName, err) @@ -546,19 +449,48 @@ func (s *Server) QueueHandler(w http.ResponseWriter, request *http.Request, urlP qjob := atlante.Job{ Cell: cell, SheetName: sheetName, - MetaData: map[string]string{ - "job_id": ji.JobID, - }, } - // TODO(gdye):Ignoring the returned job id for now. Will need it when we - // have the job management - _, err = s.Queue.Enqueue(ji.JobID, &qjob) + + // Check the queue to see if there is already a job with these params: + if jb, found := s.Coordinator.FindByJob(&qjob); found { + switch jb.Status.Status.(type) { + default: + // do nothing we should enqueue a + // new job. + case field.Requested, field.Started: + // Job is already there just return + // info about the old job. + err = json.NewEncoder(w).Encode(jb) + if err != nil { + serverError(w, "failed marshal json: %v", err) + } + return + } + } + + jb, err := s.Coordinator.NewJob(&qjob) + if err != nil { + serverError(w, "failed to get new job from coordinator: %v", err) + return + } + // Fill out the Metadata with JobID + qjob.MetaData = map[string]string{ + "job_id": jb.JobID, + } + + qjobid, err := s.Queue.Enqueue(jb.JobID, &qjob) if err != nil { badRequest(w, "failed to queue job: %v", err) return } + jbData, _ := qjob.Base64Marshal() + s.Coordinator.UpdateField(jb, + field.QJobID(qjobid), + field.JobData(jbData), + field.Status{field.Requested{}}, + ) - err = json.NewEncoder(w).Encode(ji) + err = json.NewEncoder(w).Encode(jb) if err != nil { serverError(w, "failed marshal json: %v", err) return @@ -594,6 +526,67 @@ func (s *Server) SheetInfoHandler(w http.ResponseWriter, request *http.Request, } } +func (s *Server) JobInfoHandler(w http.ResponseWriter, request *http.Request, urlParams map[string]string) { + + jobid, ok := urlParams[string(ParamsKeyJobID)] + if !ok { + // We need a sheetnumber. + badRequest(w, "missing job_id") + return + } + job, ok := s.Coordinator.FindByJobID(jobid) + if !ok { + w.WriteHeader(http.StatusNotFound) + return + } + + if err := json.NewEncoder(w).Encode(job); err != nil { + serverError(w, "failed to marshal json: %v", err) + } + +} + +func (s *Server) NotificationHandler(w http.ResponseWriter, request *http.Request, urlParams map[string]string) { + + log.Infof("Got a post to Notification: %v", urlParams) + jobid, ok := urlParams[string(ParamsKeyJobID)] + if !ok { + // We need a sheetnumber. + log.Infof("Missing job_id: %v", urlParams) + badRequest(w, "missing job_id") + return + } + + job, ok := s.Coordinator.FindByJobID(jobid) + if !ok { + log.Infof("failed to find job: %v", jobid) + w.WriteHeader(http.StatusNotFound) + return + } + + // Get json body + bdy, err := ioutil.ReadAll(request.Body) + request.Body.Close() + if err != nil { + badRequest(w, "error reading body") + return + } + + var si field.Status + log.Infof("Body: %s", bdy) + err = json.Unmarshal(bdy, &si) + if err != nil { + badRequest(w, "unable to unmarshal json: %v", err) + return + } + log.Infof("Status: %v", si) + + if err := s.Coordinator.UpdateField(job, si); err != nil { + serverError(w, "failed to update job %v: %v", jobid, err) + } + w.WriteHeader(http.StatusNoContent) +} + // RegisterRoutes setup the routes func (s *Server) RegisterRoutes(r *httptreemux.TreeMux) { @@ -604,17 +597,24 @@ func (s *Server) RegisterRoutes(r *httptreemux.TreeMux) { group := r.NewGroup(GenPath("sheets", ParamsKeySheetname)) log.Infof("registering: GET /sheets/:sheetname/info/:lng/:lat") group.GET(GenPath("info", ParamsKeyLng, ParamsKeyLat), s.GridInfoHandler) - log.Infof("registering: GET /sheets/:sheetname/info/:mdgid") + log.Infof("registering: GET /sheets/:sheetname/info/mdgid/:mdgid") group.GET(GenPath("info", "mdgid", ParamsKeyMDGID), s.GridInfoHandler) if s.Queue != nil { log.Infof("registering: POST /sheets/:sheetname/mdgid") group.POST("/mdgid", s.QueueHandler) } -} + jgroup := r.NewGroup(GenPath("jobs", ParamsKeyJobID)) + log.Infof("registering: GET /jobs/:jobid/status") + jgroup.GET("/status", s.JobInfoHandler) + if !s.DisableNotificationEP { + log.Infof("registering: POST /jobs/:jobid/status") + jgroup.POST("/status", s.NotificationHandler) + } +} // corsHanlder is used to respond to all OPTIONS requests for registered routes func corsHandler(w http.ResponseWriter, r *http.Request, params map[string]string) { - setHeaders(map[string]string{},w) + setHeaders(map[string]string{}, w) return } diff --git a/atlante/sheet.go b/atlante/sheet.go index 3460409..1171258 100644 --- a/atlante/sheet.go +++ b/atlante/sheet.go @@ -10,6 +10,7 @@ import ( "github.com/go-spatial/maptoolkit/atlante/filestore" "github.com/go-spatial/maptoolkit/atlante/grids" "github.com/go-spatial/maptoolkit/atlante/internal/urlutil" + "github.com/go-spatial/maptoolkit/atlante/notifiers" ) // Sheet describes a map sheet @@ -34,6 +35,8 @@ type Sheet struct { // Description of the sheet Desc string + + Emitter notifiers.Emitter } // NewSheet returns a new sheet diff --git a/cmd/atlante/cmd/server.go b/cmd/atlante/cmd/server.go index 7761442..624fcbf 100644 --- a/cmd/atlante/cmd/server.go +++ b/cmd/atlante/cmd/server.go @@ -5,6 +5,9 @@ import ( "net/http" "net/url" + "github.com/go-spatial/maptoolkit/atlante/server/coordinator" + crdnull "github.com/go-spatial/maptoolkit/atlante/server/coordinator/null" + "github.com/go-spatial/maptoolkit/atlante/queuer" "github.com/prometheus/common/log" @@ -64,11 +67,29 @@ func serverCmdRunE(cmd *cobra.Command, args []string) error { // Need to initialize the server srv := server.Server{ - Hostname: string(conf.Webserver.HostName), - Port: port, - Scheme: string(conf.Webserver.Scheme), - Headers: make(map[string]string), - Atlante: a, + Hostname: string(conf.Webserver.HostName), + Port: port, + Scheme: string(conf.Webserver.Scheme), + Headers: make(map[string]string), + Atlante: a, + Coordinator: coordinator.Provider(crdnull.Provider{}), + DisableNotificationEP: conf.Webserver.DisableNotificationEP, + } + + // Setup Coordinator + if conf.Webserver.Coordinator != nil { + var cType string = crdnull.TYPE + cType, _ = conf.Webserver.Coordinator.String(coordinator.ConfigKeyType, &cType) + srv.Coordinator, err = coordinator.For(cType, coordinator.Config(conf.Webserver.Coordinator)) + if err != nil { + if _, ok := err.(coordinator.ErrUnknownProvider); ok { + log.Infoln("known coordinator providers:") + for _, p := range coordinator.Registered() { + log.Infoln("\t", p) + } + } + return err + } } // Now we need to look to see if a queue has been configured diff --git a/cmd/atlante/config/config.go b/cmd/atlante/config/config.go index 73ada59..9e6fe6c 100644 --- a/cmd/atlante/config/config.go +++ b/cmd/atlante/config/config.go @@ -5,11 +5,13 @@ import ( "net/url" "strings" + "github.com/gdey/errors" "github.com/go-spatial/maptoolkit/atlante" "github.com/go-spatial/maptoolkit/atlante/config" "github.com/go-spatial/maptoolkit/atlante/filestore" fsmulti "github.com/go-spatial/maptoolkit/atlante/filestore/multi" "github.com/go-spatial/maptoolkit/atlante/grids" + "github.com/go-spatial/maptoolkit/atlante/notifiers" "github.com/go-spatial/tegola/dict" "github.com/prometheus/common/log" ) @@ -60,6 +62,15 @@ func LoadConfig(conf config.Config, dpi int, overrideDPI bool) (*atlante.Atlante var ok bool var a atlante.Atlante + // Notifier + if conf.Notifier != nil { + note, err := notifiers.From(notifiers.Config(conf.Notifier)) + if err != nil { + return nil, errors.String(fmt.Sprintf("notifier: %v", err)) + } + a.Notifier = note + } + // Loop through providers creating a provider type mapping. for i, p := range conf.Providers { // type is required diff --git a/cmd/atlante/coordinator_reg.go b/cmd/atlante/coordinator_reg.go new file mode 100644 index 0000000..a883418 --- /dev/null +++ b/cmd/atlante/coordinator_reg.go @@ -0,0 +1,7 @@ +package main + +import ( + _ "github.com/go-spatial/maptoolkit/atlante/server/coordinator/logger" + _ "github.com/go-spatial/maptoolkit/atlante/server/coordinator/null" + _ "github.com/go-spatial/maptoolkit/atlante/server/coordinator/postgresql" +) diff --git a/cmd/atlante/notifiers_reg.go b/cmd/atlante/notifiers_reg.go new file mode 100644 index 0000000..b6d4e24 --- /dev/null +++ b/cmd/atlante/notifiers_reg.go @@ -0,0 +1,5 @@ +package main + +import ( + _ "github.com/go-spatial/maptoolkit/atlante/notifiers/http" +) diff --git a/vendor/github.com/go-spatial/geom/encoding/wkt/wkt.go b/vendor/github.com/go-spatial/geom/encoding/wkt/wkt.go new file mode 100644 index 0000000..686a5e0 --- /dev/null +++ b/vendor/github.com/go-spatial/geom/encoding/wkt/wkt.go @@ -0,0 +1,224 @@ +package wkt + +import ( + "fmt" + "reflect" + "strings" + + "github.com/go-spatial/geom" +) + +func isNil(a interface{}) bool { + defer func() { recover() }() + return a == nil || reflect.ValueOf(a).IsNil() +} + +func isMultiLineStringerEmpty(ml geom.MultiLineStringer) bool { + if isNil(ml) || len(ml.LineStrings()) == 0 { + return true + } + lns := ml.LineStrings() + // It's not nil, and there are several lines. + // We need to go through all the lines and make sure that at least one of them has a non-zero length. + for i := range lns { + if len(lns[i]) != 0 { + return false + } + } + return true +} + +func isPolygonerEmpty(p geom.Polygoner) bool { + if isNil(p) || len(p.LinearRings()) == 0 { + return true + } + lns := p.LinearRings() + // It's not nil, and there are several lines. + // We need to go through all the lines and make sure that at least one of them has a non-zero length. + for i := range lns { + if len(lns[i]) != 0 { + return false + } + } + return true +} + +func isMultiPolygonerEmpty(mp geom.MultiPolygoner) bool { + if isNil(mp) || len(mp.Polygons()) == 0 { + return true + } + plys := mp.Polygons() + for i := range plys { + for j := range plys[i] { + if len(plys[i][j]) != 0 { + return false + } + } + } + return true +} + +func isCollectionerEmpty(col geom.Collectioner) bool { + if isNil(col) || len(col.Geometries()) == 0 { + return true + } + geos := col.Geometries() + for i := range geos { + switch g := geos[i].(type) { + case geom.Pointer: + if !isNil(g) { + return false + } + case geom.MultiPointer: + if !(isNil(g) || len(g.Points()) == 0) { + return false + } + case geom.LineStringer: + if !(isNil(g) || len(g.Verticies()) == 0) { + return false + } + case geom.MultiLineStringer: + if !isMultiLineStringerEmpty(g) { + return false + } + case geom.Polygoner: + if !isPolygonerEmpty(g) { + return false + } + case geom.MultiPolygoner: + if !isMultiPolygonerEmpty(g) { + return false + } + case geom.Collectioner: + if !isCollectionerEmpty(g) { + return false + } + } + } + return true +} + +/* +This purpose of this file is to house the wkt functions. These functions are +use to take a tagola.Geometry and convert it to a wkt string. It will, also, +contain functions to parse a wkt string into a wkb.Geometry. +*/ + +func _encode(geo geom.Geometry) string { + + switch g := geo.(type) { + + case geom.Pointer: + xy := g.XY() + return fmt.Sprintf("%v %v", xy[0], xy[1]) + + case geom.MultiPointer: + var points []string + for _, p := range g.Points() { + points = append(points, _encode(geom.Point(p))) + } + return "(" + strings.Join(points, ",") + ")" + + case geom.LineStringer: + var points []string + for _, p := range g.Verticies() { + points = append(points, _encode(geom.Point(p))) + } + return "(" + strings.Join(points, ",") + ")" + + case geom.MultiLineStringer: + var lines []string + for _, l := range g.LineStrings() { + if len(l) == 0 { + continue + } + lines = append(lines, _encode(geom.LineString(l))) + } + return "(" + strings.Join(lines, ",") + ")" + + case geom.Polygoner: + var rings []string + for _, l := range g.LinearRings() { + if len(l) == 0 { + continue + } + rings = append(rings, _encode(geom.LineString(l))) + } + return "(" + strings.Join(rings, ",") + ")" + + case geom.MultiPolygoner: + var polygons []string + for _, p := range g.Polygons() { + if len(p) == 0 { + continue + } + polygons = append(polygons, _encode(geom.Polygon(p))) + } + return "(" + strings.Join(polygons, ",") + ")" + + } + panic(fmt.Sprintf("Don't know the geometry type! %+v", geo)) +} + +//WKT returns a WKT representation of the Geometry if possible. +// the Error will be non-nil if geometry is unknown. +func Encode(geo geom.Geometry) (string, error) { + switch g := geo.(type) { + default: + return "", geom.ErrUnknownGeometry{geo} + case geom.Pointer: + // POINT( 10 10) + if isNil(g) { + return "POINT EMPTY", nil + } + return "POINT (" + _encode(geo) + ")", nil + + case geom.MultiPointer: + if isNil(g) || len(g.Points()) == 0 { + return "MULTIPOINT EMPTY", nil + } + return "MULTIPOINT " + _encode(geo), nil + + case geom.LineStringer: + if isNil(g) || len(g.Verticies()) == 0 { + return "LINESTRING EMPTY", nil + } + return "LINESTRING " + _encode(geo), nil + + case geom.MultiLineStringer: + if isMultiLineStringerEmpty(g) { + return "MULTILINESTRING EMPTY", nil + } + return "MULTILINESTRING " + _encode(geo), nil + + case geom.Polygoner: + if isPolygonerEmpty(g) { + return "POLYGON EMPTY", nil + } + return "POLYGON " + _encode(geo), nil + + case geom.MultiPolygoner: + if isMultiPolygonerEmpty(g) { + return "MULTIPOLYGON EMPTY", nil + } + return "MULTIPOLYGON " + _encode(geo), nil + + case geom.Collectioner: + if isCollectionerEmpty(g) { + return "GEOMETRYCOLLECTION EMPTY", nil + } + var geometries []string + for _, sg := range g.Geometries() { + s, err := Encode(sg) + if err != nil { + return "", err + } + geometries = append(geometries, s) + } + return "GEOMETRYCOLLECTION (" + strings.Join(geometries, ",") + ")", nil + } +} + +func Decode(text string) (geo geom.Geometry, err error) { + return nil, nil +} diff --git a/vendor/modules.txt b/vendor/modules.txt index ee7bb7c..512d011 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -11,6 +11,7 @@ github.com/arolek/p github.com/aws/aws-sdk-go/aws github.com/aws/aws-sdk-go/aws/credentials github.com/aws/aws-sdk-go/aws/session +github.com/aws/aws-sdk-go/service/s3 github.com/aws/aws-sdk-go/service/s3/s3manager github.com/aws/aws-sdk-go/service/batch github.com/aws/aws-sdk-go/aws/awserr @@ -26,26 +27,25 @@ github.com/aws/aws-sdk-go/aws/csm github.com/aws/aws-sdk-go/aws/defaults github.com/aws/aws-sdk-go/aws/request github.com/aws/aws-sdk-go/aws/awsutil -github.com/aws/aws-sdk-go/service/s3 -github.com/aws/aws-sdk-go/service/s3/s3iface github.com/aws/aws-sdk-go/aws/client/metadata github.com/aws/aws-sdk-go/aws/signer/v4 +github.com/aws/aws-sdk-go/internal/s3err github.com/aws/aws-sdk-go/private/protocol +github.com/aws/aws-sdk-go/private/protocol/eventstream +github.com/aws/aws-sdk-go/private/protocol/eventstream/eventstreamapi +github.com/aws/aws-sdk-go/private/protocol/rest +github.com/aws/aws-sdk-go/private/protocol/restxml +github.com/aws/aws-sdk-go/private/protocol/xml/xmlutil +github.com/aws/aws-sdk-go/service/s3/s3iface github.com/aws/aws-sdk-go/private/protocol/restjson github.com/aws/aws-sdk-go/internal/sdkrand github.com/aws/aws-sdk-go/service/sts github.com/aws/aws-sdk-go/aws/credentials/ec2rolecreds github.com/aws/aws-sdk-go/aws/credentials/endpointcreds github.com/aws/aws-sdk-go/aws/ec2metadata -github.com/aws/aws-sdk-go/internal/s3err -github.com/aws/aws-sdk-go/private/protocol/eventstream -github.com/aws/aws-sdk-go/private/protocol/eventstream/eventstreamapi -github.com/aws/aws-sdk-go/private/protocol/rest -github.com/aws/aws-sdk-go/private/protocol/restxml -github.com/aws/aws-sdk-go/private/protocol/xml/xmlutil +github.com/aws/aws-sdk-go/private/protocol/query github.com/aws/aws-sdk-go/private/protocol/json/jsonutil github.com/aws/aws-sdk-go/private/protocol/jsonrpc -github.com/aws/aws-sdk-go/private/protocol/query github.com/aws/aws-sdk-go/internal/sdkuri github.com/aws/aws-sdk-go/private/protocol/query/queryutil # github.com/dimfeld/httptreemux v5.0.1+incompatible @@ -55,6 +55,7 @@ github.com/gdey/errors # github.com/go-spatial/geom v0.0.0-20190319224731-c03a62f3a74a github.com/go-spatial/geom github.com/go-spatial/geom/spherical +github.com/go-spatial/geom/encoding/wkt github.com/go-spatial/geom/slippy github.com/go-spatial/geom/cmp # github.com/go-spatial/tegola v0.9.0