Skip to content

Commit

Permalink
debug(agent): add error messages on send failures
Browse files Browse the repository at this point in the history
  • Loading branch information
hspedro committed Jan 17, 2025
1 parent 5ae042b commit 0f28ff2
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 10 deletions.
25 changes: 21 additions & 4 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,20 +264,31 @@ func (a *agentImpl) packetEncodeMessage(m *message.Message) ([]byte, error) {
func (a *agentImpl) send(pendingMsg pendingMessage) (err error) {
defer func() {
if panicErr := recover(); panicErr != nil {
brokenPipeErr := errors.NewError(constants.ErrBrokenPipe, errors.ErrClientClosedRequest)
err = fmt.Errorf("%w: %v", brokenPipeErr, panicErr)
err = errors.NewError(
fmt.Errorf("%s: %s", constants.ErrBrokenPipe.Error(), panicErr),
errors.ErrClientClosedRequest,
)
logger.Log.Error("agent send panicked: ", err)
}
}()
a.reportChannelSize()

m, err := a.getMessageFromPendingMessage(pendingMsg)
if err != nil {
logger.Log.Errorf(
"agent send failed when getting pending msg. route: %s, type: %s, err: %s",
pendingMsg.route, &pendingMsg.typ, err,
)
return err
}

// packet encode
p, err := a.packetEncodeMessage(m)
if err != nil {
logger.Log.Errorf(
"agent send failed when encoding the msg. route: %s, type: %s, err: %s",
pendingMsg.route, &pendingMsg.typ, err,
)
return err
}

Expand Down Expand Up @@ -522,10 +533,16 @@ func (a *agentImpl) write() {
if writeErr != nil {
if e.Is(writeErr, os.ErrDeadlineExceeded) {
// Log the timeout error but continue processing
logger.Log.Warnf("Context deadline exceeded for write in conn %s: %s (ctx=%v)", writeErr.Error(), ctx)
logger.Log.Warnf(
"Context deadline exceeded for write in conn (%s) | session (%s): %s",
a.conn.RemoteAddr(), a.Session.UID(), writeErr.Error(),
)
} else {
err = errors.NewError(writeErr, errors.ErrClosedRequest)
logger.Log.Errorf("Failed to write in conn: %s (ctx=%v), agent will close", writeErr.Error(), ctx)
logger.Log.Errorf(
"Failed to write in conn (%s) | session (%s): %s, agent will close",
a.conn.RemoteAddr(), a.Session.UID(), writeErr.Error(),
)
metrics.ReportTimingFromCtx(ctx, a.metricsReporters, handlerType, err)
// close agent if low-level conn broke during write
return
Expand Down
20 changes: 14 additions & 6 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,10 @@ func TestAgentSend(t *testing.T) {
err error
}{
{"success", nil},
{"failure", e.NewError(constants.ErrBrokenPipe, e.ErrClientClosedRequest)},
{"failure", e.NewError(
fmt.Errorf("%s: send on closed channel", constants.ErrBrokenPipe.Error()),
e.ErrClientClosedRequest,
)},
}

for _, table := range tables {
Expand Down Expand Up @@ -363,7 +366,10 @@ func TestAgentPush(t *testing.T) {
err error
}{
{"success_raw", []byte("ok"), nil},
{"failure", []byte("ok"), e.NewError(constants.ErrBrokenPipe, e.ErrClientClosedRequest)},
{"failure", []byte("ok"), e.NewError(
fmt.Errorf("%s: send on closed channel", constants.ErrBrokenPipe.Error()),
e.ErrClientClosedRequest,
)},
}

for _, table := range tables {
Expand Down Expand Up @@ -490,8 +496,10 @@ func TestAgentResponseMID(t *testing.T) {
{"success_raw_msg_err", uint(rand.Int()), []byte("ok"), true, nil},
{"success_struct", uint(rand.Int()), &someStruct{A: "ok"}, false, nil},
{"failure_empty_mid", 0, []byte("ok"), false, constants.ErrSessionOnNotify},
{"failure_send", uint(rand.Int()), []byte("ok"), false,
e.NewError(constants.ErrBrokenPipe, e.ErrClientClosedRequest)},
{"failure_send", uint(rand.Int()), []byte("ok"), false, e.NewError(
fmt.Errorf("%s: send on closed channel", constants.ErrBrokenPipe.Error()),
e.ErrClientClosedRequest,
)},
}

for _, table := range tables {
Expand Down Expand Up @@ -1267,7 +1275,7 @@ func TestAgentWriteChSendWriteError(t *testing.T) {
mockMetricsReporter.EXPECT().ReportGauge(metrics.ConnectedClients, gomock.Any(), gomock.Any())
mockMetricsReporter.EXPECT().ReportSummary(metrics.ResponseTime, errorTags, gomock.Any())

mockConn.EXPECT().RemoteAddr().Return(&mockAddr{}).Times(3)
mockConn.EXPECT().RemoteAddr().Return(&mockAddr{}).Times(4)
mockConn.EXPECT().Close().Do(func() {
wg.Done()
})
Expand Down Expand Up @@ -1311,7 +1319,7 @@ func TestAgentWriteChSendWriteTimeout(t *testing.T) {

mockMetricsReporter.EXPECT().ReportSummary(metrics.ResponseTime, gomock.Any(), gomock.Any()).Times(2)

mockConn.EXPECT().RemoteAddr().Return(&mockAddr{}).Times(4)
mockConn.EXPECT().RemoteAddr().Return(&mockAddr{}).Times(5)
mockConn.EXPECT().SetWriteDeadline(gomock.Any()).Return(nil).Times(2)
mockConn.EXPECT().Write(expectedFirstPacket).Do(func(b []byte) {
time.Sleep(writeTimeout * 2)
Expand Down

0 comments on commit 0f28ff2

Please sign in to comment.