This repository has been archived by the owner on Nov 8, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathlocal.go
256 lines (213 loc) · 7.96 KB
/
local.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
// Copyright (c) 2017 Intel Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package executor
import (
"fmt"
"os"
"os/exec"
"path/filepath"
"syscall"
"time"
"github.com/intelsdi-x/swan/pkg/isolation"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)
// Local provisioning is responsible for providing the execution environment
// on local machine via exec.Command.
// It runs command as current user.
type Local struct {
commandDecorators isolation.Decorators
}
// NewLocal returns instance of local executors without any isolators.
func NewLocal() Local {
return NewLocalIsolated(isolation.Decorators{})
}
// NewLocalIsolated returns a Local instance with some isolators set.
func NewLocalIsolated(deco ...isolation.Decorator) Local {
return Local{commandDecorators: deco}
}
// String returns user-friendly name of executor.
func (l Local) String() string {
return "Local Executor"
}
// Execute runs the command given as input.
// Returned Task is able to stop & monitor the provisioned process.
func (l Local) Execute(command string) (TaskHandle, error) {
log.Debug("Local Executor: Starting '", l.commandDecorators.Decorate(command), "' locally ")
cmd := exec.Command("sh", "-c", l.commandDecorators.Decorate(command))
// TODO: delete this as we use PID namespace instead
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
outputDirectory, err := createOutputDirectory(command, "local")
if err != nil {
return nil, errors.Wrapf(err, "createOutputDirectory for command %q failed", command)
}
stdoutFile, stderrFile, err := createExecutorOutputFiles(outputDirectory)
if err != nil {
removeDirectory(outputDirectory)
return nil, errors.Wrapf(err, "createExecutorOutputFiles for command %q failed", command)
}
log.Debug("Local Executor: Created temporary files ",
"stdout path: ", stdoutFile.Name(), ", stderr path: ", stderrFile.Name())
cmd.Stdout = stdoutFile
cmd.Stderr = stderrFile
err = cmd.Start()
if err != nil {
removeDirectory(outputDirectory)
return nil, errors.Wrapf(err, "command %q start failed", command)
}
log.Debug("Local Executor: Started with pid ", cmd.Process.Pid)
// hasProcessExited channel is closed when launched process exits.
hasProcessExited := make(chan struct{})
taskHandle := localTaskHandle{
cmdHandler: cmd,
command: command,
stdoutFilePath: stdoutFile.Name(),
stderrFilePath: stderrFile.Name(),
hasProcessExited: hasProcessExited,
}
// Wait for local task in go routine.
go func() {
defer close(hasProcessExited)
// Wait for task completion.
// NOTE: Wait() returns an error. We grab the process state in any case
// (success or failure) below, so the error object matters less in the
// status handling for now.
if err := cmd.Wait(); err != nil {
if _, ok := err.(*exec.ExitError); !ok {
// In case of NON Exit Errors we are not sure if task does
// terminate so panic.
// This error happens very rarely and it represent the critical state of the
// server like volume or HW problems.
err = errors.Wrap(err, "wait returned with NON exit error")
log.Panicf("Waiting for local task failed\n%+v", err)
}
}
err = syncAndClose(stdoutFile)
if err != nil {
log.Errorf("Cannot syncAndClose stdout file: %s", err.Error())
}
err = syncAndClose(stderrFile)
if err != nil {
log.Errorf("Cannot syncAndClose stderrFile file: %s", err.Error())
}
exitCode := taskHandle.cmdHandler.ProcessState.Sys().(syscall.WaitStatus).ExitStatus()
log.Debugf("Local Executor: task %q exited with code %d", command, exitCode)
}()
// Best effort potential way to check if binary is started properly.
taskHandle.Wait(100 * time.Millisecond)
err = checkIfProcessFailedToExecute(command, l.String(), &taskHandle)
if err != nil {
return nil, err
}
log.Debugf("Local Executor: pid %d started successfully", cmd.Process.Pid)
return &taskHandle, nil
}
// localTaskHandle implements TaskHandle interface.
type localTaskHandle struct {
cmdHandler *exec.Cmd
stdoutFilePath string
stderrFilePath string
// This channel is closed immediately when process exits.
// It is used to signal task termination.
hasProcessExited chan struct{}
// Command requested by user. This is how this TaskHandle presents.
command string
}
// isTerminated checks if channel processHasExited is closed. If it is closed, it means
// that wait ended and task is in terminated state.
// NOTE: If it's true then ProcessState is not nil. ProcessState contains information
// about an exited process available after call to Wait or Run.
func (taskHandle *localTaskHandle) isTerminated() bool {
select {
case <-taskHandle.hasProcessExited:
// If waitEndChannel is closed then task is terminated.
return true
default:
return false
}
}
func (taskHandle *localTaskHandle) getPid() int {
return taskHandle.cmdHandler.Process.Pid
}
// Stop terminates the local task.
func (taskHandle *localTaskHandle) Stop() error {
if taskHandle.isTerminated() {
return nil
}
// Sending SIGKILL signal to local task.
// TODO: Add PID namespace to handle orphan tasks properly.
log.Debug("Sending ", syscall.SIGKILL, " to PID ", -taskHandle.getPid())
err := syscall.Kill(-taskHandle.getPid(), syscall.SIGKILL)
if err != nil {
log.Errorf("Local Stop() of command %q has failed: %s", taskHandle.command, err.Error())
return errors.Wrapf(err, "Local Stop() of command %q has failed", taskHandle.command)
}
// Checking if kill was successful.
isTerminated, _ := taskHandle.Wait(killWaitTimeout)
if !isTerminated {
log.Errorf("Local Stop() of command %q has failed: timeout", taskHandle.command)
return errors.Errorf("Local Stop() of command %q has failed: timeout", taskHandle.command)
}
// No error, task terminated.
return nil
}
// Status returns a state of the task.
func (taskHandle *localTaskHandle) Status() TaskState {
if !taskHandle.isTerminated() {
return RUNNING
}
return TERMINATED
}
// ExitCode returns a exitCode. If task is not terminated it returns error.
func (taskHandle *localTaskHandle) ExitCode() (int, error) {
if !taskHandle.isTerminated() {
return -1, errors.Errorf("task %q is not terminated", taskHandle.command)
}
return (taskHandle.cmdHandler.ProcessState.Sys().(syscall.WaitStatus)).ExitStatus(), nil
}
// StdoutFile returns a file handle for file to the task's stdout file.
func (taskHandle *localTaskHandle) StdoutFile() (*os.File, error) {
return openFile(taskHandle.stdoutFilePath)
}
// StderrFile returns a file handle for file to the task's stderr file.
func (taskHandle *localTaskHandle) StderrFile() (*os.File, error) {
return openFile(taskHandle.stderrFilePath)
}
// EraseOutput deletes the directory where stdout file resides.
func (taskHandle *localTaskHandle) EraseOutput() error {
outputDir := filepath.Dir(taskHandle.stdoutFilePath)
return removeDirectory(outputDir)
}
// Wait waits for the command to finish with the given timeout time.
// It returns true if task is terminated.
func (taskHandle *localTaskHandle) Wait(timeout time.Duration) (bool, error) {
if taskHandle.isTerminated() {
return true, nil
}
timeoutChannel := getTimeoutChan(timeout)
select {
case <-taskHandle.hasProcessExited:
// If waitEndChannel is closed then task is terminated.
return true, nil
case <-timeoutChannel:
// If timeout time exceeded return then task did not terminate yet.
return false, nil
}
}
func (taskHandle *localTaskHandle) String() string {
return fmt.Sprintf("Local %q", taskHandle.command)
}
func (taskHandle *localTaskHandle) Address() string {
return "127.0.0.1"
}