Skip to content

Commit

Permalink
Merge branch 'main' into task-client-pubkey
Browse files Browse the repository at this point in the history
  • Loading branch information
im-adithya committed Jul 30, 2024
2 parents c4cae2a + e03b7f1 commit b673146
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 29 deletions.
2 changes: 1 addition & 1 deletion cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func main() {
}()
//handle graceful shutdown
<-svc.Ctx.Done()
svc.Logger.Infof("Shutting down echo server...")
svc.Logger.Info("Shutting down echo server...")
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
e.Shutdown(ctx)
Expand Down
60 changes: 32 additions & 28 deletions internal/nostr/nostr.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (svc *Service) InfoHandler(c echo.Context) error {
svc.Logger.WithFields(logrus.Fields{
"relay_url": requestData.RelayUrl,
"wallet_pubkey": requestData.WalletPubkey,
}).Info("Subscribing to info event")
}).Debug("Subscribing to info event")

filter := nostr.Filter{
Authors: []string{requestData.WalletPubkey},
Expand All @@ -205,7 +205,7 @@ func (svc *Service) InfoHandler(c echo.Context) error {
svc.Logger.WithFields(logrus.Fields{
"relay_url": requestData.RelayUrl,
"wallet_pubkey": requestData.WalletPubkey,
}).Info("Exiting info subscription without receiving")
}).Error("Exiting info subscription without receiving event")
return c.JSON(http.StatusRequestTimeout, ErrorResponse{
Message: "Request canceled or timed out",
Error: ctx.Err().Error(),
Expand Down Expand Up @@ -261,15 +261,15 @@ func (svc *Service) PublishHandler(c echo.Context) error {
svc.Logger.WithFields(logrus.Fields{
"event_id": requestData.SignedEvent.ID,
"relay_url": requestData.RelayUrl,
}).Info("Publishing event")
}).Debug("Publishing event")

err = relay.Publish(ctx, *requestData.SignedEvent)
if err != nil {
svc.Logger.WithError(err).WithFields(logrus.Fields{
"event_id": requestData.SignedEvent.ID,
"relay_url": requestData.RelayUrl,
}).Error("Failed to publish event")

return c.JSON(http.StatusInternalServerError, ErrorResponse{
Message: "Error publishing the event",
Error: err.Error(),
Expand Down Expand Up @@ -316,7 +316,7 @@ func (svc *Service) NIP47Handler(c echo.Context) error {
"client_pubkey": requestData.SignedEvent.PubKey,
"wallet_pubkey": requestData.WalletPubkey,
"relay_url": requestData.RelayUrl,
}).Info("Processing request event")
}).Debug("Processing request event")

if svc.db.Where("nostr_id = ?", requestData.SignedEvent.ID).Find(&RequestEvent{}).RowsAffected != 0 {
svc.Logger.WithFields(logrus.Fields{
Expand Down Expand Up @@ -356,7 +356,7 @@ func (svc *Service) NIP47Handler(c echo.Context) error {

ctx, cancel := context.WithTimeout(c.Request().Context(), 90*time.Second)
defer cancel()
go svc.startSubscription(ctx, &subscription, svc.publishEvent, svc.handleResponseEvent)
go svc.startSubscription(ctx, &subscription, svc.publishRequestEvent, svc.handleResponseEvent)

select {
case <-ctx.Done():
Expand Down Expand Up @@ -419,7 +419,7 @@ func (svc *Service) NIP47WebhookHandler(c echo.Context) error {
"wallet_pubkey": requestData.WalletPubkey,
"relay_url": requestData.RelayUrl,
"webhook_url": requestData.WebhookUrl,
}).Info("Processing request event")
}).Debug("Processing request event")

if svc.db.Where("nostr_id = ?", requestData.SignedEvent.ID).First(&RequestEvent{}).RowsAffected != 0 {
svc.Logger.WithFields(logrus.Fields{
Expand Down Expand Up @@ -456,7 +456,7 @@ func (svc *Service) NIP47WebhookHandler(c echo.Context) error {
ctx, cancel := context.WithTimeout(svc.Ctx, 90*time.Second)
defer cancel()

go svc.startSubscription(ctx, &subscription, svc.publishEvent, svc.handleResponseEvent)
go svc.startSubscription(ctx, &subscription, svc.publishRequestEvent, svc.handleResponseEvent)
return c.JSON(http.StatusOK, NIP47Response{
State: WEBHOOK_RECEIVED,
})
Expand Down Expand Up @@ -514,7 +514,7 @@ func (svc *Service) NIP47NotificationHandler(c echo.Context) error {
"wallet_pubkey": requestData.WalletPubkey,
"relay_url": requestData.RelayUrl,
"webhook_url": requestData.WebhookUrl,
}).Info("Subscribing to notifications")
}).Debug("Subscribing to notifications")

subscription := Subscription{
RelayUrl: requestData.RelayUrl,
Expand Down Expand Up @@ -631,13 +631,13 @@ func (svc *Service) StopSubscriptionHandler(c echo.Context) error {

svc.Logger.WithFields(logrus.Fields{
"subscription_id": subscription.ID,
}).Info("Stopping subscription")
}).Debug("Stopping subscription")

err := svc.stopSubscription(&subscription)
if err != nil {
svc.Logger.WithFields(logrus.Fields{
"subscription_id": subscription.ID,
}).Info("Subscription is stopped already")
}).Debug("Subscription is stopped already")

return c.JSON(http.StatusAlreadyReported, StopSubscriptionResponse{
Message: "Subscription is already closed",
Expand Down Expand Up @@ -686,9 +686,10 @@ func (svc *Service) startSubscription(ctx context.Context, subscription *Subscri

svc.Logger.WithFields(logrus.Fields{
"subscription_id": subscription.ID,
"relay_url": subscription.RelayUrl,
"wallet_pubkey": walletPubkey,
"client_pubkey": clientPubkey,
}).Info("Starting subscription")
}).Debug("Starting subscription")

filter := svc.subscriptionToFilter(subscription)

Expand Down Expand Up @@ -735,9 +736,10 @@ func (svc *Service) startSubscription(ctx context.Context, subscription *Subscri

svc.Logger.WithFields(logrus.Fields{
"subscription_id": subscription.ID,
"relay_url": subscription.RelayUrl,
"wallet_pubkey": walletPubkey,
"client_pubkey": clientPubkey,
}).Info("Started subscription")
}).Debug("Started subscription")

err = svc.processEvents(ctx, subscription, onReceiveEOS, handleEvent)

Expand Down Expand Up @@ -770,13 +772,13 @@ func (svc *Service) startSubscription(ctx context.Context, subscription *Subscri
"relay_url": subscription.RelayUrl,
"wallet_pubkey": walletPubkey,
"client_pubkey": clientPubkey,
}).Info("Stopping subscription")
}).Debug("Stopping subscription")
break
}
}
}

func (svc *Service) publishEvent(ctx context.Context, subscription *Subscription) {
func (svc *Service) publishRequestEvent(ctx context.Context, subscription *Subscription) {
walletPubkey := ""
clientPubkey := ""

Expand All @@ -792,18 +794,18 @@ func (svc *Service) publishEvent(ctx context.Context, subscription *Subscription
if err != nil {
// TODO: notify user about publish failure
svc.Logger.WithError(err).WithFields(logrus.Fields{
"subscription_id": subscription.ID,
"relay_url": subscription.RelayUrl,
"request_event_id": subscription.RequestEvent.ID,
"relay_url": subscription.RelayUrl,
"wallet_pubkey": walletPubkey,
"client_pubkey": clientPubkey,
}).Error("Failed to publish to relay")
sub.Unsub()
} else {
svc.Logger.WithFields(logrus.Fields{
"publish_status": REQUEST_EVENT_PUBLISH_CONFIRMED,
"event_id": subscription.RequestEvent.ID,
"wallet_pubkey": walletPubkey,
"client_pubkey": clientPubkey,
"request_event_id": subscription.RequestEvent.ID,
"relay_url": subscription.RelayUrl,
"wallet_pubkey": walletPubkey,
"client_pubkey": clientPubkey,
}).Info("Published request event successfully")
subscription.RequestEventDB.State = REQUEST_EVENT_PUBLISH_CONFIRMED
}
Expand Down Expand Up @@ -857,7 +859,8 @@ func (svc *Service) handleSubscribedEvent(event *nostr.Event, subscription *Subs
"subscription_id": subscription.ID,
"wallet_pubkey": walletPubkey,
"client_pubkey": clientPubkey,
}).Info("Received event")
"relay_url": subscription.RelayUrl,
}).Info("Received subscribed event")
responseEvent := ResponseEvent{
NostrId: event.ID,
Content: event.Content,
Expand Down Expand Up @@ -888,8 +891,9 @@ func (svc *Service) processEvents(ctx context.Context, subscription *Subscriptio
"subscription_id": subscription.ID,
"wallet_pubkey": walletPubkey,
"client_pubkey": clientPubkey,
}).Info("Received EOS")

"relay_url": subscription.RelayUrl,
}).Debug("Received EOS")

if (onReceiveEOS != nil) {
onReceiveEOS(ctx, subscription)
}
Expand All @@ -903,7 +907,8 @@ func (svc *Service) processEvents(ctx context.Context, subscription *Subscriptio
"subscription_id": subscription.ID,
"wallet_pubkey": walletPubkey,
"client_pubkey": clientPubkey,
}).Info("Relay subscription events channel ended")
"relay_url": subscription.RelayUrl,
}).Debug("Relay subscription events channel ended")
}()

select {
Expand All @@ -920,11 +925,10 @@ func (svc *Service) getRelayConnection(ctx context.Context, customRelayURL strin
if customRelayURL != "" && customRelayURL != svc.Cfg.DefaultRelayURL {
svc.Logger.WithFields(logrus.Fields{
"custom_relay_url": customRelayURL,
}).Infof("Connecting to custom relay")
}).Info("Connecting to custom relay")
relay, err := nostr.RelayConnect(ctx, customRelayURL)
return relay, true, err // true means custom and the relay should be closed
}
svc.Logger.Info("Fetching default relay")
// use mutex otherwise the svc.Relay will be reconnected more than once
svc.relayMutex.Lock()
defer svc.relayMutex.Unlock()
Expand Down Expand Up @@ -966,7 +970,7 @@ func (svc *Service) postEventToWebhook(event *nostr.Event, webhookURL string) {
"event_id": event.ID,
"event_kind": event.Kind,
"webhook_url": webhookURL,
}).Infof("Successfully posted event to webhook")
}).Info("Successfully posted event to webhook")
}

func (svc *Service) subscriptionToFilter(subscription *Subscription) (*nostr.Filter){
Expand Down

0 comments on commit b673146

Please sign in to comment.