Skip to content

Commit

Permalink
feat: reconnection in nip47
Browse files Browse the repository at this point in the history
  • Loading branch information
im-adithya committed May 29, 2024
1 parent dd8305a commit 58c8867
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 173 deletions.
24 changes: 14 additions & 10 deletions internal/nostr/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,30 @@ const (
NIP_47_RESPONSE_KIND = 23195

// state of request event
REQUEST_EVENT_PUBLISH_CONFIRMED = "confirmed"
REQUEST_EVENT_PUBLISH_FAILED = "failed"
REQUEST_EVENT_PUBLISH_CONFIRMED = "CONFIRMED"
REQUEST_EVENT_PUBLISH_FAILED = "FAILED"
)

type Subscription struct {
ID uint
RelayUrl string `validate:"required"`
RelayUrl string `validate:"required"`
WebhookUrl string
Open bool
Ids *[]string `gorm:"-"`
Kinds *[]int `gorm:"-"`
Authors *[]string `gorm:"-"` // WalletPubkey is included in this
Tags *nostr.TagMap `gorm:"-"` // RequestEvent ID goes in the "e" tag
Ids *[]string `gorm:"-"`
Kinds *[]int `gorm:"-"`
Authors *[]string `gorm:"-"` // WalletPubkey is included in this
Tags *nostr.TagMap `gorm:"-"` // RequestEvent ID goes in the "e" tag
Since time.Time
Until time.Time
Limit int
Search string
CreatedAt time.Time
UpdatedAt time.Time
Uuid string `gorm:"type:uuid;default:gen_random_uuid()"`
Uuid string `gorm:"type:uuid;default:gen_random_uuid()"`
EventChan chan *nostr.Event `gorm:"-"`
RequestEvent *nostr.Event `gorm:"-"`
RequestID uint `gorm:"-"`
Published bool `gorm:"-"`

// TODO: fix an elegant solution to store datatypes
IdsString string
Expand Down Expand Up @@ -118,7 +122,7 @@ func (s *Subscription) AfterFind(tx *gorm.DB) error {

type RequestEvent struct {
ID uint
SubscriptionId uint `validate:"required"`
SubscriptionId *uint `validate:"required"`
NostrId string `validate:"required"`
Content string
State string
Expand All @@ -129,7 +133,7 @@ type RequestEvent struct {
type ResponseEvent struct {
ID uint
RequestId *uint
SubscriptionId uint `validate:"required"`
SubscriptionId *uint `validate:"required"`
NostrId string `validate:"required"`
Content string
RepliedAt time.Time
Expand Down
266 changes: 103 additions & 163 deletions internal/nostr/nostr.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"
"database/sql"
"encoding/json"
"fmt"
"http-nostr/migrations"
"net/http"
"os"
Expand Down Expand Up @@ -297,20 +296,20 @@ func (svc *Service) NIP47Handler(c echo.Context) error {
})
}

svc.Logger.WithFields(logrus.Fields{
"eventId": requestData.SignedEvent.ID,
"walletPubkey": requestData.WalletPubkey,
"relayUrl": requestData.RelayUrl,
"webhookUrl": requestData.WebhookUrl,
}).Info("Checking for duplicate request event")

subscription := Subscription{}
requestEvent := RequestEvent{}
findRequestResult := svc.db.Where("nostr_id = ?", requestData.SignedEvent.ID).Find(&requestEvent)

if findRequestResult.RowsAffected != 0 {
svc.Logger.WithFields(logrus.Fields{
"eventId": requestData.SignedEvent.ID,
"walletPubkey": requestData.WalletPubkey,
"relayUrl": requestData.RelayUrl,
"webhookUrl": requestData.WebhookUrl,
}).Info("Found duplicate request event")

responseEvent := ResponseEvent{}
findResponseResult := svc.db.Where("request_id = ?", requestEvent.ID).Find(&responseEvent)

if findResponseResult.RowsAffected != 0 {
return c.JSON(http.StatusBadRequest, NIP47Response{
Event: &nostr.Event{
Expand All @@ -321,78 +320,76 @@ func (svc *Service) NIP47Handler(c echo.Context) error {
State: "ALREADY_PUBLISHED",
})
}
} else {
svc.Logger.WithFields(logrus.Fields{
"eventId": requestData.SignedEvent.ID,
"walletPubkey": requestData.WalletPubkey,
"relayUrl": requestData.RelayUrl,
"webhookUrl": requestData.WebhookUrl,
}).Info("Storing request event")

requestEvent = RequestEvent{
NostrId: requestData.SignedEvent.ID,
Content: requestData.SignedEvent.Content,
}

err := svc.db.Create(&requestEvent).Error
if err != nil {
return c.JSON(http.StatusInternalServerError, ErrorResponse{
Message: "Failed to store request event",
Error: err.Error(),
})
}
}

svc.Logger.WithFields(logrus.Fields{
"eventId": requestData.SignedEvent.ID,
"walletPubkey": requestData.WalletPubkey,
"relayUrl": requestData.RelayUrl,
"webhookUrl": requestData.WebhookUrl,
}).Info("Storing request event and subscription")

subscription = Subscription{
RelayUrl: requestData.RelayUrl,
WebhookUrl: requestData.WebhookUrl,
Open: true,
Authors: &[]string{requestData.WalletPubkey},
Kinds: &[]int{NIP_47_RESPONSE_KIND},
Tags: &nostr.TagMap{"e": []string{requestData.SignedEvent.ID}},
Since: time.Now(),
Limit: 1,
}

err := svc.db.Create(&subscription).Error
if err != nil {
return c.JSON(http.StatusInternalServerError, ErrorResponse{
Message: "Failed to store subscription",
Error: err.Error(),
})
}

requestEvent = RequestEvent{
NostrId: requestData.SignedEvent.ID,
Content: requestData.SignedEvent.Content,
SubscriptionId: subscription.ID,
}

err = svc.db.Create(&requestEvent).Error
if err != nil {
return c.JSON(http.StatusInternalServerError, ErrorResponse{
Message: "Failed to store request event",
Error: err.Error(),
})
subscription := Subscription{
RelayUrl: requestData.RelayUrl,
WebhookUrl: requestData.WebhookUrl,
Open: true,
Authors: &[]string{requestData.WalletPubkey},
Kinds: &[]int{NIP_47_RESPONSE_KIND},
Tags: &nostr.TagMap{"e": []string{requestData.SignedEvent.ID}},
Since: time.Now(),
Limit: 1,
RequestEvent: requestData.SignedEvent,
RequestID: requestEvent.ID,
EventChan: make(chan *nostr.Event, 1),
}

if subscription.WebhookUrl != "" {
go func() {
ctx, cancel := context.WithTimeout(svc.Ctx, 90*time.Second)
defer cancel()
event, _, err := svc.processRequest(ctx, &subscription, &requestEvent, &requestData)
if err != nil {
svc.Logger.WithError(err).Error("Failed to process request for webhook")
// what to pass to the webhook?
return
}
svc.postEventToWebhook(event, requestData.WebhookUrl)
}()
go svc.startSubscription(svc.Ctx, &subscription)
return c.JSON(http.StatusOK, NIP47Response{
State: "WEBHOOK_RECEIVED",
})
}

ctx, cancel := context.WithTimeout(c.Request().Context(), 90*time.Second)
defer cancel()
event, httpStatusCode, err := svc.processRequest(ctx, &subscription, &requestEvent, &requestData)
if err != nil {
return c.JSON(httpStatusCode, ErrorResponse{
Message: "Failed to process nip-47 request",
Error: err.Error(),
go svc.startSubscription(ctx, &subscription)

select {
case <-ctx.Done():
svc.Logger.WithFields(logrus.Fields{
"eventId": requestData.SignedEvent.ID,
"walletPubkey": requestData.WalletPubkey,
"relayUrl": requestData.RelayUrl,
"webhookUrl": requestData.WebhookUrl,
}).Info("Stopped subscription without receiving event")
return c.JSON(http.StatusRequestTimeout, ErrorResponse{
Message: "Request canceled or timed out",
Error: ctx.Err().Error(),
})
case event := <-subscription.EventChan:
svc.Logger.WithFields(logrus.Fields{
"relayUrl": requestData.RelayUrl,
"walletPubkey": requestData.WalletPubkey,
"eventId": event.ID,
}).Info("Received info event")
return c.JSON(http.StatusOK, NIP47Response{
Event: event,
State: "PUBLISHED",
})
}
return c.JSON(http.StatusOK, NIP47Response{
Event: event,
State: "PUBLISHED",
})
}

func (svc *Service) NIP47NotificationHandler(c echo.Context) error {
Expand Down Expand Up @@ -669,6 +666,26 @@ func (svc *Service) processEvents(ctx context.Context, subscription *Subscriptio
"subscriptionId": subscription.ID,
}).Info("Received EOS")
receivedEOS = true

// Publish the NIP47 request once EOS is received
if (!subscription.Published && subscription.RequestEvent != nil) {
err := sub.Relay.Publish(ctx, *subscription.RequestEvent)
if err != nil {
// TODO: notify user about publish failure
svc.Logger.WithError(err).WithFields(logrus.Fields{
"subscriptionId": subscription.ID,
"relayUrl": subscription.RelayUrl,
}).Error("Failed to publish to relay")
sub.Unsub()
return
} else {
svc.Logger.WithFields(logrus.Fields{
"status": REQUEST_EVENT_PUBLISH_CONFIRMED,
"eventId": subscription.RequestEvent.ID,
}).Info("Published request event successfully")
subscription.Published = true
}
}
}()

go func(){
Expand All @@ -680,13 +697,28 @@ func (svc *Service) processEvents(ctx context.Context, subscription *Subscriptio
"subscriptionId": subscription.ID,
}).Info("Received event")
responseEvent := ResponseEvent{
SubscriptionId: subscription.ID,
NostrId: event.ID,
Content: event.Content,
NostrId: event.ID,
Content: event.Content,
RepliedAt: event.CreatedAt.Time(),
}
// NIP47 requests don't persist subscriptions in DB
if subscription.RequestEvent != nil {
responseEvent.RequestId = &subscription.RequestID
} else {
responseEvent.SubscriptionId = &subscription.ID
}
svc.db.Save(&responseEvent)
svc.postEventToWebhook(event, subscription.WebhookUrl)
if subscription.WebhookUrl != "" {
svc.postEventToWebhook(event, subscription.WebhookUrl)
} else {
// pass the event to NIP47 handler
subscription.EventChan <- event
}
// NIP47 handler only needs a one-time subscription
if subscription.RequestEvent != nil {
sub.Unsub()
return
}
}
}
}()
Expand Down Expand Up @@ -724,98 +756,6 @@ func (svc *Service) getRelayConnection(ctx context.Context, customRelayURL strin
}
}

func (svc *Service) processRequest(ctx context.Context, subscription *Subscription, requestEvent *RequestEvent, requestData *NIP47Request) (*nostr.Event, int, error) {
publishState := REQUEST_EVENT_PUBLISH_FAILED
defer func() {
subscription.Open = false
requestEvent.State = publishState
svc.db.Save(subscription)
svc.db.Save(requestEvent)
}()
relay, isCustomRelay, err := svc.getRelayConnection(ctx, subscription.RelayUrl)
if err != nil {
return &nostr.Event{}, http.StatusBadRequest, fmt.Errorf("error connecting to relay: %w", err)
}
if isCustomRelay {
defer relay.Close()
}

since := nostr.Timestamp(subscription.Since.Unix())
filter := nostr.Filter{
Kinds: *subscription.Kinds,
Authors: *subscription.Authors,
Tags: *subscription.Tags,
Since: &since,
Limit: subscription.Limit,
Search: subscription.Search,
}

if subscription.Ids != nil {
filter.IDs = *subscription.Ids
}
if !subscription.Until.IsZero() {
until := nostr.Timestamp(subscription.Until.Unix())
filter.Until = &until
}

svc.Logger.WithFields(logrus.Fields{
"eventId": requestData.SignedEvent.ID,
"walletPubkey": requestData.WalletPubkey,
"relayUrl": requestData.RelayUrl,
"webhookUrl": requestData.WebhookUrl,
}).Info("Subscribing to the relay for response event")

sub, err := relay.Subscribe(ctx, []nostr.Filter{filter})
if err != nil {
return &nostr.Event{}, http.StatusBadRequest, fmt.Errorf("error subscribing to relay: %w", err)
}

svc.Logger.WithFields(logrus.Fields{
"eventId": requestData.SignedEvent.ID,
"walletPubkey": requestData.WalletPubkey,
"relayUrl": requestData.RelayUrl,
"webhookUrl": requestData.WebhookUrl,
}).Info("Publishing request event to the relay")

err = relay.Publish(ctx, *requestData.SignedEvent)
if err != nil {
return &nostr.Event{}, http.StatusBadRequest, fmt.Errorf("error publishing request event: %w", err)
}

if err == nil {
publishState = REQUEST_EVENT_PUBLISH_CONFIRMED
svc.Logger.WithFields(logrus.Fields{
"status": publishState,
"eventId": requestEvent.ID,
}).Info("Published request event successfully")
} else {
svc.Logger.WithFields(logrus.Fields{
"status": publishState,
"eventId": requestEvent.ID,
}).Info("Failed to publish request event")
return &nostr.Event{}, http.StatusBadRequest, fmt.Errorf("error publishing request event: %s", err.Error())
}

select {
case <-ctx.Done():
return &nostr.Event{}, http.StatusRequestTimeout, fmt.Errorf("request canceled or timed out")
case event := <-sub.Events:
svc.Logger.WithFields(logrus.Fields{
"eventId": event.ID,
"eventKind": event.Kind,
}).Infof("Successfully received event")
responseEvent := ResponseEvent{
SubscriptionId: subscription.ID,
RequestId: &requestEvent.ID,
NostrId: event.ID,
Content: event.Content,
RepliedAt: event.CreatedAt.Time(),
}
svc.db.Save(&responseEvent)
return event, http.StatusOK, nil
}
}

func (svc *Service) postEventToWebhook(event *nostr.Event, webhookURL string) {
eventData, err := json.Marshal(event)
if err != nil {
Expand Down

0 comments on commit 58c8867

Please sign in to comment.