This repository has been archived by the owner on Nov 8, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathkubernetes.go
787 lines (672 loc) · 25.2 KB
/
kubernetes.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
// Copyright (c) 2017 Intel Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/*
Kubernetes executor under the hood is using these are three components:
- kubernetesExecutor:
- talks to kubernetes cluster to create new pod,
- starts watcher (by calling Execute)
- runs in main goroutine,
- gives back control to user by returning newly created taskHandle
- watcher:
- creates "log copier" via setupLogs() funcion invocation
- responsible for monitoring state of Pod and passing to information to taskHandle,
- also in case of failure or part of cleaning up or when asked directly by taskHandle - deletes pod,
- copier:
- Resides in setupLogs() function
- It is responsible for copying logs from streamed kubernetes response
- Closes logsCopyFinished channel when logs finishes streaming or failed to create stream
Actually pod transitions by those phases which maps to those handles:
- Pending: do nothing, just log,
- Running and Ready: calls whenPodReady() handler
- Success or Failed: calls whenPodFinishes() handler and more importantly deletePod() action.
- Deleted: whenPodDeleted - to signal taskHandler
Note:
- whenPodFinished always calls whenPodReady - if pod finished it had to be running before - to setup logs,
- whenPodFinished may be skipped at all then outputCopied never closed
Communication with taskHandle is through taskWatcher events:
- whenPodReady it signals to "started" to taskHandle (Execute method is unblocked)
- whenPodFinished it just examines the state of pod and stores it exit code you can read taskHandle.ExitCode,
- whenPodDeleted signals by closing "stopped" channel - taskHandle.Status() return terminated,
The logs are prepared ("copier" goroutine) with setupLogs() and happens at every occasion (but only once).
Every "handler" (when*) and every action like setupLogs() and deletePod() can happen only once.
Additionally every handler or action is protected by sync.Once to make sure that is run only once.
*/
package executor
import (
"fmt"
"io"
"os"
"path/filepath"
"sync"
"time"
"github.com/intelsdi-x/swan/pkg/conf"
"github.com/intelsdi-x/swan/pkg/isolation"
"github.com/intelsdi-x/swan/pkg/k8sports"
"github.com/intelsdi-x/swan/pkg/utils/uuid"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/pkg/api"
"k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
const (
defaultContainerImage = "intelsdi/swan"
)
var (
kubeconfigFlag = conf.NewStringFlag("kubernetes_kubeconfig", "(optional) Absolute path to the kubeconfig file. Overrides pod configuration passed through flags. ", "")
kubernetesPrivilegedPodsFlag = conf.NewBoolFlag("kubernetes_privileged_pods", "Kubernetes containers will be run as privileged.", false)
kubernetesPodLunchTimeoutFlag = conf.NewDurationFlag("kubernetes_pod_launch_timeout", "Kubernetes Pod launch timeout.", 30*time.Second)
kubernetesContainerImageFlag = conf.NewStringFlag("kubernetes_container_image", "Name of the container image to be used. It needs to be available locally or downloadable.", defaultContainerImage)
)
// KubernetesConfig describes the necessary information to connect to a Kubernetes cluster.
type KubernetesConfig struct {
// PodName vs PodNamePrefix:
// - PodName(deprecated; by default is empty string) - when this field is
// configured, Kubernetes executor is using it as a Pod name. Kubernetes
// doesn't support spawning pods with same name, so this field shouldn't
// be in use.
// - PodNamePrefix(by default is "swan") - If PodName field is not
// configured, this field is used as a prefix for random generated Pod
// name.
PodName string
PodNamePrefix string
NodeName string
Address string
CPURequest int64
CPULimit int64
MemoryRequest int64
MemoryLimit int64
Decorators isolation.Decorators
ContainerName string
ContainerImage string
Namespace string
Privileged bool
HostNetwork bool
LaunchTimeout time.Duration
}
// LaunchTimedOutError is the error type returned when launching new pods exceed
// the timeout value defined in kubernetes.Config.LaunchTimeout.
type LaunchTimedOutError struct {
errorMessage string
}
// Error is one method needed to implement the error interface. Here, we just return
// an the error message.
func (err *LaunchTimedOutError) Error() string {
return err.errorMessage
}
// DefaultKubernetesConfig returns a KubernetesConfig object with safe defaults.
func DefaultKubernetesConfig() KubernetesConfig {
return KubernetesConfig{
NodeName: "",
PodName: "",
PodNamePrefix: "swan",
Address: "127.0.0.1:8080",
CPURequest: 0,
CPULimit: 0,
MemoryRequest: 0,
MemoryLimit: 0,
Decorators: isolation.Decorators{},
ContainerName: "swan",
ContainerImage: kubernetesContainerImageFlag.Value(),
Namespace: v1.NamespaceDefault,
Privileged: kubernetesPrivilegedPodsFlag.Value(),
HostNetwork: false,
LaunchTimeout: kubernetesPodLunchTimeoutFlag.Value(),
}
}
type k8s struct {
config KubernetesConfig
clientset *kubernetes.Clientset
}
// NewKubernetes returns an executor which lets the user run commands in pods in a
// kubernetes cluster.
func NewKubernetes(config KubernetesConfig) (Executor, error) {
k8s := &k8s{
config: config,
}
var err error
kubeConfigPath := kubeconfigFlag.Value()
if kubeConfigPath == "" {
k8s.clientset, err = kubernetes.NewForConfig(&rest.Config{
Host: config.Address,
})
} else {
var kubeconfig *rest.Config
kubeconfig, err = clientcmd.BuildConfigFromFlags("", kubeConfigPath)
k8s.clientset, err = kubernetes.NewForConfig(kubeconfig)
}
if err != nil {
return nil, errors.Wrapf(err, "can't initilize kubernetes clientset for host '%s'", config.Address)
}
return k8s, nil
}
// containerResources helper to create ResourceRequirements for the container.
func (k8s *k8s) containerResources() v1.ResourceRequirements {
// requests
resourceListRequests := v1.ResourceList{}
if k8s.config.CPURequest > 0 {
resourceListRequests[v1.ResourceCPU] = *resource.NewMilliQuantity(k8s.config.CPURequest, resource.DecimalSI)
}
if k8s.config.MemoryRequest > 0 {
resourceListRequests[v1.ResourceMemory] = *resource.NewQuantity(k8s.config.MemoryRequest, resource.DecimalSI)
}
// limits
resourceListLimits := v1.ResourceList{}
if k8s.config.CPULimit > 0 {
resourceListLimits[v1.ResourceCPU] = *resource.NewMilliQuantity(k8s.config.CPULimit, resource.DecimalSI)
}
if k8s.config.MemoryLimit > 0 {
resourceListLimits[v1.ResourceMemory] = *resource.NewQuantity(k8s.config.MemoryLimit, resource.DecimalSI)
}
return v1.ResourceRequirements{
Requests: resourceListRequests,
Limits: resourceListLimits,
}
}
// String returns user-friendly name of executor.
func (k8s *k8s) String() string {
return "Kubernetes Executor"
}
// generatePodName is generating pods based on KubernetesConfig struct.
// If KubernetesConfig has got non-empty PodName field then this field is
// used as a Pod name (even if selected Pod name is already in use - in
// this case pod spawning should fail).
// It returns string as a Pod name which should be used or error if cannot
// generate random suffix.
func (k8s *k8s) generatePodName() string {
if k8s.config.PodName != "" {
return k8s.config.PodName
}
return fmt.Sprintf("%s-%s", k8s.config.PodNamePrefix, uuid.New()[:8])
}
// newPod is a helper to build in-memory structure representing pod
// before sending it as request to API server. It can returns also
// error if cannot generate Pod name.
func (k8s *k8s) newPod(command string) (*v1.Pod, error) {
resources := k8s.containerResources()
podName := k8s.generatePodName()
var zero int64
return &v1.Pod{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: k8s.config.Namespace,
Labels: map[string]string{"name": podName},
},
Spec: v1.PodSpec{
NodeName: k8s.config.NodeName,
DNSPolicy: "Default",
RestartPolicy: "Never",
HostNetwork: k8s.config.HostNetwork,
TerminationGracePeriodSeconds: &zero,
Containers: []v1.Container{
{
Name: k8s.config.ContainerName,
Image: k8s.config.ContainerImage,
Command: []string{"sh", "-c", command},
Resources: resources,
ImagePullPolicy: v1.PullIfNotPresent, // Default because swan image is not published yet.
SecurityContext: &v1.SecurityContext{Privileged: &k8s.config.Privileged},
},
},
},
}, nil
}
// Execute creates a pod and runs the provided command in it. When the command completes, the pod
// is stopped i.e. the container is not restarted automatically.
func (k8s *k8s) Execute(command string) (TaskHandle, error) {
podsAPI := k8s.clientset.Pods(k8s.config.Namespace)
command = k8s.config.Decorators.Decorate(command)
// This is a workaround for kubernetes #31446
// Make sure that at least one line of text is outputted from pod, to unblock .GetLogs() on apiserver call
// with streamed response (when follow=true). Check kubernetes #31446 issue for more details.
// https://github.com/kubernetes/kubernetes/pull/31446
wrappedCommand := "echo;" + command
// See http://kubernetes.io/docs/api-reference/v1/definitions/ for definition of the pod manifest.
podManifest, err := k8s.newPod(wrappedCommand)
if err != nil {
log.Errorf("K8s executor: cannot create pod manifest")
return nil, errors.Wrapf(err, "cannot create pod manifest")
}
scheme := NewRuntimeScheme()
apiPod := &api.Pod{}
scheme.Convert(podManifest, apiPod, nil)
log.Debugf("Starting '%s' pod=%s node=%s QoSclass=%s on kubernetes", command, podManifest.ObjectMeta.Name, podManifest.Spec.NodeName, apiPod.Status.QOSClass)
pod, err := podsAPI.Create(podManifest)
if err != nil {
log.Errorf("K8s executor: cannot schedule pod %q with namespace %q", k8s.config.PodName, k8s.config.Namespace)
return nil, errors.Wrapf(err, "cannot schedule pod %q with namespace %q",
k8s.config.PodName, k8s.config.Namespace)
}
// Prepare local files
outputDirectory, err := createOutputDirectory(command, "kubernetes")
if err != nil {
log.Errorf("Kubernetes Execute: cannot create output directory for command %q: %s", command, err.Error())
return nil, err
}
stdoutFile, stderrFile, err := createExecutorOutputFiles(outputDirectory)
if err != nil {
removeDirectory(outputDirectory)
log.Errorf("Kubernetes Execute: cannot create output files for command %q: %s", command, err.Error())
return nil, err
}
stdoutFileName := stdoutFile.Name()
stderrFileName := stderrFile.Name()
stdoutFile.Close()
stderrFile.Close()
taskHandle := &k8sTaskHandle{
podName: pod.Name,
command: command,
stdoutFilePath: stdoutFileName,
stderrFilePath: stderrFileName,
started: make(chan struct{}),
stopped: make(chan struct{}),
requestDelete: make(chan struct{}, 1),
exitCodeChannel: make(chan int, 1),
}
taskWatcher := &k8sWatcher{
podsAPI: podsAPI,
pod: pod,
taskHandle: taskHandle,
command: wrappedCommand,
stdoutFilePath: stdoutFileName,
stderrFilePath: stderrFileName,
logsCopyFinished: make(chan struct{}, 1),
started: taskHandle.started,
stopped: taskHandle.stopped,
failed: make(chan struct{}),
deleted: make(chan struct{}),
requestDelete: taskHandle.requestDelete,
exitCodeChannel: taskHandle.exitCodeChannel,
}
err = taskWatcher.watch(k8s.config.LaunchTimeout)
if err != nil {
removeDirectory(outputDirectory)
log.Errorf("K8s executor: cannot create task on pod %q", pod.Name)
return nil, errors.Wrapf(err, "cannot create task on pod %q", pod.Name)
}
var started bool
select {
case <-taskWatcher.started:
started = true
break
case <-taskWatcher.stopped:
break
}
// Best effort potential way to check if binary is started properly.
select {
case <-taskWatcher.failed:
// Pod was started but shell failed to launch requested process.
log.Errorf("K8s executor: task %q on pod %s failed", taskHandle.command, taskHandle.podName)
break
case <-taskWatcher.deleted:
// Pod was deleted before it even failed. It may happen when image is not available or communication with Docker fails.
if !started {
log.Errorf("K8s executor: pod %s was deleted unexpectedly", taskHandle.podName)
}
break
case <-time.After(100 * time.Millisecond):
return taskHandle, nil
}
// It has been determined that task failed, waiting for Kubernetes to officially acknowledge it.
taskHandle.Wait(0)
err = checkIfProcessFailedToExecute(command, k8s.String(), taskHandle)
if err != nil {
return nil, err
}
return taskHandle, nil
}
// k8sTaskHandle implements the TaskHandle interface
type k8sTaskHandle struct {
stopped chan struct{}
started chan struct{}
requestDelete chan struct{}
stdoutFilePath string
stderrFilePath string // Kubernetes does not support separation of stderr & stdout, so this file will be empty
// Use pointer with nil to indicate exitCode wasn't received.
exitCode *int
exitCodeChannel chan int
podName string
podHostIP string
// Command requested by user. This is how this TaskHandle presents.
command string
}
func (th *k8sTaskHandle) isTerminated() bool {
select {
case <-th.stopped:
return true
default:
return false
}
}
// Stop will delete the pod and block caller until done.
func (th *k8sTaskHandle) Stop() error {
if th.isTerminated() {
return nil
}
log.Debugf("K8s task handle: delete pod %q", th.podName)
// Ask "watcher" to do deletion (watcher exists until pod was actually deleted!).
// Ignore if request already done.
select {
case th.requestDelete <- struct{}{}:
default:
}
log.Debugf("K8s task handle: waiting for pod %q to stop", th.podName)
<-th.stopped
log.Debugf("K8s task handle: pod %q stopped", th.podName)
return nil
}
// Status returns the current task state in terms of RUNNING or TERMINATED.
func (th *k8sTaskHandle) Status() TaskState {
if th.isTerminated() {
return TERMINATED
}
return RUNNING
}
// ExitCode returns the exit code of the container running in the pod.
func (th *k8sTaskHandle) ExitCode() (int, error) {
if !th.isTerminated() {
return 0, errors.New("task is still running")
}
select {
case exitCode, ok := <-th.exitCodeChannel:
if ok {
log.Debugf("K8s task handle: received exit code: %#v", exitCode)
th.exitCode = &exitCode
}
log.Debug("K8s task handle: 'exitCode' channel is already closed")
default:
log.Debug("K8s task handle: no exit code received (channel no closed yet)")
}
// Examine just or previously received exitCode.
if th.exitCode == nil {
log.Error("K8s task handle: exit code is unknown")
return 0, errors.New("exit code unknown")
}
return *th.exitCode, nil
}
// Wait blocks until the pod terminates _or_ if timeout is provided, will exit earlier with
// false if the pod didn't terminate before the provided timeout.
func (th *k8sTaskHandle) Wait(timeout time.Duration) (bool, error) {
if th.isTerminated() {
return true, nil
}
timeoutChannel := getTimeoutChan(timeout)
select {
case <-th.stopped:
return true, nil
case <-timeoutChannel:
return false, nil
}
}
// EraseOutput deletes the directory where stdout file resides.
func (th *k8sTaskHandle) EraseOutput() error {
outputDir := filepath.Dir(th.stdoutFilePath)
return removeDirectory(outputDir)
}
func (th *k8sTaskHandle) String() string {
return fmt.Sprintf("Kubernetes pod named %q with command %q on %q", th.podName, th.command, th.Address())
}
// Address returns the host IP where the pod was scheduled.
func (th *k8sTaskHandle) Address() string {
return th.podHostIP
}
// StdoutFile returns a file handle to the stdout file for the pod.
// NOTE: StdoutFile will block until stdout file is available.
func (th *k8sTaskHandle) StdoutFile() (*os.File, error) {
return openFile(th.stdoutFilePath)
}
// StderrFile returns a file handle to the stderr file for the pod.
// NOTE: StderrFile will block until stderr file is available.
// For kubernetes there is not stderr stream - return stdout stream.
func (th *k8sTaskHandle) StderrFile() (*os.File, error) {
return openFile(th.stderrFilePath)
}
type k8sWatcher struct {
podsAPI corev1.PodInterface
pod *v1.Pod
stdoutFilePath string
stderrFilePath string
started chan struct{}
stopped chan struct{}
failed chan struct{}
deleted chan struct{}
logsCopyFinished chan struct{}
requestDelete chan struct{}
exitCodeChannel chan int
taskHandle *k8sTaskHandle
command string
// one time events
oncePodReady, oncePodFinished, oncePodDeleted sync.Once
// one time actions
onceDeletePod, onceSetupLogs sync.Once
hasBeenRunning bool
}
// watch creates instance of TaskHandle and is responsible for keeping it in-sync with k8s cluster
func (kw *k8sWatcher) watch(timeout time.Duration) error {
selectorRaw := fmt.Sprintf("name=%s", kw.pod.Name)
// Prepare events watcher.
watcher, err := kw.podsAPI.Watch(metav1.ListOptions{LabelSelector: selectorRaw})
if err != nil {
return errors.Wrapf(err, "cannot create watcher over selector %q", selectorRaw)
}
go func() {
var timeoutChannel <-chan time.Time
if timeout != 0 {
timeoutChannel = time.After(timeout)
}
for {
select {
case event, ok := <-watcher.ResultChan():
if !ok {
// In some cases sender may close watcher channel. Usually it is safe to recreate it.
log.Debugf("Pod %s: watcher event channel was unexpectedly closed. Recreating.", kw.pod.Name)
var err error
watcher, err = kw.podsAPI.Watch(metav1.ListOptions{LabelSelector: selectorRaw})
if err != nil {
// We cannot recover from here.
log.Panicf("Pod %s: cannot recreate watcher stream - %q", kw.pod.Name, err)
}
continue
}
// Don't care about anything else than pods.
pod, ok := event.Object.(*v1.Pod)
if !ok {
continue
}
log.Debugf("K8s task watcher: event type=%v phase=%v", event.Type, pod.Status.Phase)
switch event.Type {
case watch.Added, watch.Modified:
switch pod.Status.Phase {
case v1.PodPending:
continue
case v1.PodRunning:
if k8sports.IsPodReady(pod) {
kw.whenPodReady()
} else {
log.Debug("K8s task watcher: Running but not ready")
}
case v1.PodFailed, v1.PodSucceeded:
kw.whenPodFinished(pod)
kw.deletePod()
case v1.PodUnknown:
log.Warnf("K8s task watcher: pod %q with command %q is in unknown phase. "+
"Probably state of the pod could not be obtained, "+
"typically due to an error in communicating with the host of the pod", pod.Name, kw.command)
default:
log.Warnf("K8s task watcher: unhandled pod phase event %q for pod %q", pod.Status.Phase, pod.Name)
}
case watch.Deleted:
kw.whenPodDeleted()
return // then only place we'are going to leave loop!
case watch.Error:
log.Errorf("K8s task watcher: kubernetes pod error event: %v", event.Object)
default:
log.Warnf("K8s task watcher: unhandled event type: %v", event.Type)
}
case <-kw.requestDelete:
kw.deletePod()
case <-timeoutChannel:
// If task has been running then we need to ignore timeout
if kw.hasBeenRunning {
continue
}
log.Errorf("Kubernetes Executor: pod %s has not been created: timeout after %f seconds.", kw.pod.Name, timeout.Seconds())
kw.deletePod()
}
}
}()
return nil
}
// ----------------------------- when pod ready
// closeStarted close started channel to indicate that pod has just started.
func (kw *k8sWatcher) whenPodReady() {
kw.oncePodReady.Do(func() {
log.Debug("K8s task watcher: Pod ready handler - mark Pod as running.")
kw.hasBeenRunning = true
kw.setupLogs()
log.Debug("K8s task watcher: pod started [started]")
close(kw.started)
})
}
// whenPodFinished handler to acquire exit code from pod and pass it taskHandle.
// Additionally call whenPodReady handler to setupLogs and mark pod as running.
func (kw *k8sWatcher) whenPodFinished(pod *v1.Pod) {
kw.oncePodFinished.Do(func() {
if pod.Status.Phase == v1.PodFailed {
close(kw.failed)
if pod.Status.Message != "" {
log.Debugf("K8s task watcher: fail status message: %q", pod.Status.Message)
}
}
kw.setupLogs()
kw.setExitCode(pod)
log.Debug("K8s task watcher: pod finished")
})
}
// setExitCode retrieves exit code from the cluster and sends it to task handle.
// Used only by whenPodFinished handler.
func (kw *k8sWatcher) setExitCode(pod *v1.Pod) {
// Send exit code and close channel.
sendExitCode := func(exitCode int) {
kw.exitCodeChannel <- exitCode
log.Debug("K8s task watcher: exit code sent [exitCodeChannel]")
close(kw.exitCodeChannel)
}
// Look for an exit status from the container.
// If more than one container is present, the last takes precedence.
exitCode := -1
for _, status := range pod.Status.ContainerStatuses {
if status.State.Terminated == nil {
continue
}
exitCode = int(status.State.Terminated.ExitCode)
}
if pod.Status.Phase == v1.PodFailed {
// Depending on how pod was stopped change log level and explanation.
switch exitCode {
case 128 + 9:
log.Warnf("K8s task watcher: pod %q exited with code %d (forced to stop with SIGKILL)", pod.Name, exitCode)
case 128 + 15:
log.Warnf("K8s task watcher: pod %q exited with code %d (forced to stop with SIGTERM)", pod.Name, exitCode)
default:
log.Errorf("K8s task watcher: pod %q failed with exit code %d", pod.Name, exitCode)
}
} else {
log.Debugf("K8s task watcher: exit code retrieved: %d", exitCode)
}
sendExitCode(exitCode)
}
// whenPodDeleted handler on when watcher receives "deleted" event to signal taskHandle
// that pod is "stopped" and optionally setup the logs.
func (kw *k8sWatcher) whenPodDeleted() {
kw.oncePodDeleted.Do(func() {
close(kw.deleted)
kw.setupLogs()
log.Debug("K8s task watcher: pod stopped [stopped]")
// wait for logs to finish copying before announcing pod termination.
<-kw.logsCopyFinished
close(kw.stopped)
log.Debug("K8s task watcher: 'stopped' channel closed")
})
}
// deletePod action that body is executed only once to ask kubernetes to deleted pod.
func (kw *k8sWatcher) deletePod() {
kw.onceDeletePod.Do(func() {
// Setting gracePeriodSeconds to zero will erase pod from API server and won't wait for it to exit.
// Setting it to 5 second leaves responsibility of deleting the pod to kubelet.
gracePeriodSeconds := int64(5)
log.Debugf("deleting pod %q", kw.pod.Name)
err := kw.podsAPI.Delete(kw.pod.Name, &metav1.DeleteOptions{
GracePeriodSeconds: &gracePeriodSeconds,
})
if err != nil {
log.Warnf("unsuccessful attempt to delete pod %q", kw.pod.Name)
}
log.Debugf("K8s task watcher: delete pod %q", kw.pod.Name)
})
}
// setupLogs action creates log files and initializes goroutine that copies stream from kubernetes.
func (kw *k8sWatcher) setupLogs() {
kw.onceSetupLogs.Do(func() {
log.Debugf("K8s task watcher: setting up logs for pod %q", kw.pod.Name)
// Wire up logs to task handle stdout.
logStream, err := kw.podsAPI.GetLogs(kw.pod.Name, &v1.PodLogOptions{Follow: true}).Stream()
if err != nil {
log.Warnf("K8s task watcher: cannot create log stream: %s ", err.Error())
close(kw.logsCopyFinished)
return
}
log.Debugf("K8s task watcher: logs for pod %q set up", kw.pod.Name)
// Start "copier" goroutine for copying logs api to local files.
go func() {
defer close(kw.logsCopyFinished)
stdoutFile, err := os.OpenFile(kw.stdoutFilePath, os.O_WRONLY|os.O_SYNC, outputFilePrivileges)
log.Debugf("K8s copier: stdout destination file opened: %q", kw.stdoutFilePath)
if err != nil {
log.Errorf("K8s copier: cannot open file to copy logs: %s", err.Error())
return
}
stderrFile, err := os.OpenFile(kw.stderrFilePath, os.O_WRONLY|os.O_SYNC, outputFilePrivileges)
log.Debugf("K8s task copier: stderr destination file opened: %q", kw.stderrFilePath)
if err != nil {
log.Errorf("K8s copier: cannot open file to copy logs: %s", err.Error())
return
}
defer syncAndClose(stdoutFile)
defer syncAndClose(stderrFile)
log.Debug("K8s copier: starting copying pod output")
_, err = io.Copy(io.MultiWriter(stdoutFile, stderrFile), logStream)
log.Debug("K8s copier: finished copying pod output")
if err != nil {
log.Errorf("K8s copier: failed to copy container log stream to task output: %s", err.Error())
return
}
log.Debugf("K8s copier: log copy and sync done for pod %q", kw.pod.Name)
}()
})
}
//NewRuntimeScheme creates instance of runtime.Scheme and registers default conversions.
func NewRuntimeScheme() *runtime.Scheme {
scheme := runtime.NewScheme()
v1.RegisterConversions(scheme)
return scheme
}