Skip to content

Commit

Permalink
Command channel metadata (#261)
Browse files Browse the repository at this point in the history
* feat: integrations run by cmd channel submit event when finished containing exit code

* fix centos5 build

* refactor: var rename

* feat: cmd_stop_mode = process-not-found

* fix: notify 0 exit code

* feat: exit code is -3 on unknown errors

* feat: command channel cmd metadata support

* fix: old import

* fix: old import

* test: event submission
  • Loading branch information
varas authored Dec 15, 2020
1 parent ea1d9e2 commit de6490d
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 41 deletions.
6 changes: 3 additions & 3 deletions internal/agent/cmdchannel/runintegration/runintegration.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@ func NewHandler(definitionQ chan<- integration.Definition, il integration.Instan
return
}

cmdChanReq := ctx2.NewCmdChannelRequest(cmdName, args.Hash(), args.IntegrationName, args.IntegrationArgs)
cmdChanReq := ctx2.NewCmdChannelRequest(cmdName, args.Hash(), args.IntegrationName, args.IntegrationArgs, cmd.Metadata)
def.CmdChanReq = &cmdChanReq

definitionQ <- def

ev := cmd.Event(args.IntegrationName, args.IntegrationArgs)
ev["cmd_stop_hash"] = args.Hash()
ev := cmdChanReq.Event("cmd-api")

NotifyPlatform(dmEmitter, def, ev)

return
Expand Down
42 changes: 42 additions & 0 deletions internal/agent/cmdchannel/runintegration/runintegration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/newrelic/infrastructure-agent/internal/integrations/v4/integration"
"github.com/newrelic/infrastructure-agent/pkg/backend/commandapi"
dm "github.com/newrelic/infrastructure-agent/pkg/integrations/v4/dm/testutils"
"github.com/newrelic/infrastructure-agent/pkg/integrations/v4/protocol"
"github.com/newrelic/infrastructure-agent/pkg/log"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -53,3 +54,44 @@ func TestHandle_queuesIntegrationToBeRun(t *testing.T) {
assert.Equal(t, "nri-foo", d.Name)
// Definition won't allow assert further
}

func TestHandle_notifiesPlatform(t *testing.T) {
defQueue := make(chan integration.Definition, 1)
il := integration.InstancesLookup{
Legacy: func(_ integration.DefinitionCommandConfig) (integration.Definition, error) {
return integration.Definition{}, nil
},
ByName: func(_ string) (string, error) {
return "/path/to/nri-foo", nil
},
}
em := dm.NewRecordEmitter()
h := NewHandler(defQueue, il, em, l)

cmd := commandapi.Command{
Args: []byte(`{ "integration_name": "nri-foo", "integration_args": ["bar", "baz"] }`),
Metadata: map[string]interface{}{
"meta key": "meta value",
},
}

err := h.Handle(context.Background(), cmd, false)
require.NoError(t, err)

gotFRs := em.Received()
require.Len(t, gotFRs, 1)
require.Len(t, gotFRs[0].Data.DataSets, 1)
gotEvents := gotFRs[0].Data.DataSets[0].Events
require.Len(t, gotEvents, 1)
expectedEvent := protocol.EventData{
"eventType": "InfrastructureEvent",
"category": "notifications",
"summary": "cmd-api",
"cmd_name": "run_integration",
"cmd_hash": "nri-foo#[bar baz]",
"cmd_args_name": "nri-foo",
"cmd_args_args": "[bar baz]",
"cmd_metadata.meta key": "meta value",
}
assert.Equal(t, expectedEvent, gotEvents[0])
}
11 changes: 10 additions & 1 deletion internal/agent/cmdchannel/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,18 @@ func TestSrv_InitialFetch_EnablesRegisterAndHandlesBackoff(t *testing.T) {
assert.True(t, c.RegisterEnabled)
}

func TestSrv_InitialFetch_HandlesRunIntegration(t *testing.T) {
func TestSrv_InitialFetch_HandlesRunIntegrationAndMetadata(t *testing.T) {
serializedCmds := `
{
"return_value": [
{
"name": "run_integration",
"arguments": {
"integration_name": "nri-foo"
},
"metadata": {
"target_pid": 123,
"target_strategy": "<STRATEGY>"
}
}
]
Expand All @@ -164,6 +168,11 @@ func TestSrv_InitialFetch_HandlesRunIntegration(t *testing.T) {

d := <-defQueue
assert.Equal(t, "nri-foo", d.Name)
require.NotNil(t, d.CmdChanReq)
require.Contains(t, d.CmdChanReq.Metadata, "target_pid")
require.Contains(t, d.CmdChanReq.Metadata, "target_strategy")
assert.Equal(t, float64(123), d.CmdChanReq.Metadata["target_pid"])
assert.Equal(t, "<STRATEGY>", d.CmdChanReq.Metadata["target_strategy"])
}

func TestSrv_Run(t *testing.T) {
Expand Down
7 changes: 4 additions & 3 deletions internal/agent/cmdchannel/stopintegration/stopintegration.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,12 @@ func notifyPlatform(dmEmitter dm.Emitter, il integration.InstancesLookup, cmd co
return err
}

ccReq := ctx.NewCmdChannelRequest(cmdName, cmd.Hash, args.IntegrationName, args.IntegrationArgs)
ccReq := ctx.NewCmdChannelRequest(cmdName, cmd.Hash, args.IntegrationName, args.IntegrationArgs, cmd.Metadata)
def.CmdChanReq = &ccReq
ev := cmd.Event(args.IntegrationName, args.IntegrationArgs)
ev["cmd_stop_hash"] = args.Hash()

ev := ccReq.Event("cmd-api")
ev["cmd_stop_mode"] = stopModeUsed

runintegration.NotifyPlatform(dmEmitter, def, ev)

return nil
Expand Down
25 changes: 5 additions & 20 deletions pkg/backend/commandapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

backendhttp "github.com/newrelic/infrastructure-agent/pkg/backend/http"
"github.com/newrelic/infrastructure-agent/pkg/entity"
"github.com/newrelic/infrastructure-agent/pkg/integrations/v4/protocol"
)

type Client interface {
Expand All @@ -20,25 +19,11 @@ type Client interface {
}

type Command struct {
ID int `json:"id"`
Hash string `json:"hash"`
Name string `json:"name"`
Args json.RawMessage `json:"arguments"`
}

// Event creates an event from command.
func (c *Command) Event(integrationName string, integrationArgs []string) protocol.EventData {
return protocol.EventData{
"eventType": "InfrastructureEvent",
"category": "notifications",
"summary": "cmd-api",
"cmd_id": c.ID,
"cmd_hash": c.Hash,
"cmd_name": c.Name,
"cmd_args": string(c.Args),
"cmd_args_name": integrationName,
"cmd_args_args": fmt.Sprintf("%+v", integrationArgs),
}
ID int `json:"id"`
Hash string `json:"hash"`
Metadata map[string]interface{} `json:"metadata"`
Name string `json:"name"`
Args json.RawMessage `json:"arguments"`
}

type client struct {
Expand Down
26 changes: 25 additions & 1 deletion pkg/integrations/track/ctx/ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,44 @@

package ctx

import (
"fmt"

"github.com/newrelic/infrastructure-agent/pkg/integrations/v4/protocol"
)

// CmdChannelRequest DTO storing context required to handle actions on integration run exit.
type CmdChannelRequest struct {
CmdChannelCmdName string
CmdChannelCmdHash string
IntegrationName string
IntegrationArgs []string
Metadata map[string]interface{}
}

// NewCmdChannelRequest create new CmdChannelRequest.
func NewCmdChannelRequest(cmdChanCmdName, cmdChanCmdHash, integrationName string, integrationArgs []string) CmdChannelRequest {
func NewCmdChannelRequest(cmdChanCmdName, cmdChanCmdHash, integrationName string, integrationArgs []string, metadata map[string]interface{}) CmdChannelRequest {
return CmdChannelRequest{
CmdChannelCmdName: cmdChanCmdName,
CmdChannelCmdHash: cmdChanCmdHash,
IntegrationName: integrationName,
IntegrationArgs: integrationArgs,
Metadata: metadata,
}
}

func (r *CmdChannelRequest) Event(summary string) protocol.EventData {
ev := protocol.EventData{
"eventType": "InfrastructureEvent",
"category": "notifications",
"summary": summary,
"cmd_name": r.CmdChannelCmdName,
"cmd_hash": r.CmdChannelCmdHash,
"cmd_args_name": r.IntegrationName,
"cmd_args_args": fmt.Sprintf("%+v", r.IntegrationArgs),
}
for k, v := range r.Metadata {
ev["cmd_metadata."+k] = v
}
return ev
}
15 changes: 4 additions & 11 deletions pkg/integrations/track/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package track

import (
"context"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -113,16 +112,10 @@ func (t *Tracker) NotifyExit(hash string, exitCode int) {
return
}

ds := protocol.NewEventDataset(ts, protocol.EventData{
"eventType": "InfrastructureEvent",
"category": "notifications",
"summary": "integration-exited",
"cmd_name": iCtx.def.CmdChanReq.CmdChannelCmdName,
"cmd_hash": iCtx.def.CmdChanReq.CmdChannelCmdHash,
"cmd_args_name": iCtx.def.CmdChanReq.IntegrationName,
"cmd_args_args": fmt.Sprintf("%+v", iCtx.def.CmdChanReq.IntegrationArgs),
"cmd_exit_code": exitCode,
})
ev := iCtx.def.CmdChanReq.Event("integration-exited")
ev["cmd_exit_code"] = exitCode

ds := protocol.NewEventDataset(ts, ev)
data := protocol.NewData("tracker.notifyexit", "1", []protocol.Dataset{ds})
t.eventEmitter.Send(fwrequest.NewFwRequest(iCtx.def, nil, nil, data))
}
27 changes: 25 additions & 2 deletions pkg/integrations/v4/dm/testutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,33 @@ import (
"github.com/newrelic/infrastructure-agent/pkg/integrations/v4/dm"
)

// NoopEmitter /dev/null sink.
type NoopEmitter struct{}

func (e *NoopEmitter) Send(_ fwrequest.FwRequest) {}

func NewNoopEmitter() dm.Emitter {
return &NoopEmitter{}
}

func (e *NoopEmitter) Send(_ fwrequest.FwRequest) {}

// RecordEmitter stores all received requests.
type RecordEmitter struct {
received []fwrequest.FwRequest
}

// implementation fulfills the interface.
var _ dm.Emitter = &RecordEmitter{}

func NewRecordEmitter() *RecordEmitter {
return &RecordEmitter{
received: []fwrequest.FwRequest{},
}
}

func (e *RecordEmitter) Send(r fwrequest.FwRequest) {
e.received = append(e.received, r)
}

func (e *RecordEmitter) Received() []fwrequest.FwRequest {
return e.received
}

0 comments on commit de6490d

Please sign in to comment.