Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/republish invoices #350

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions rabbitmq/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type (
type Client interface {
SubscribeToLndInvoices(context.Context, IncomingInvoiceHandler) error
StartPublishInvoices(context.Context, SubscribeToInvoicesFunc, EncodeOutgoingInvoiceFunc) error
PublishToLndhubExchange(ctx context.Context, invoice models.Invoice, payloadFunc EncodeOutgoingInvoiceFunc) error
// Close will close all connections to rabbitmq
Close() error
}
Expand Down Expand Up @@ -274,13 +275,13 @@ func (client *DefaultClient) StartPublishInvoices(ctx context.Context, invoicesS
case <-ctx.Done():
return context.Canceled
case incomingInvoice := <-in:
err = client.publishToLndhubExchange(ctx, incomingInvoice, payloadFunc)
err = client.PublishToLndhubExchange(ctx, incomingInvoice, payloadFunc)

if err != nil {
captureErr(client.logger, err)
}
case outgoing := <-out:
err = client.publishToLndhubExchange(ctx, outgoing, payloadFunc)
err = client.PublishToLndhubExchange(ctx, outgoing, payloadFunc)

if err != nil {
captureErr(client.logger, err)
Expand All @@ -289,7 +290,7 @@ func (client *DefaultClient) StartPublishInvoices(ctx context.Context, invoicesS
}
}

func (client *DefaultClient) publishToLndhubExchange(ctx context.Context, invoice models.Invoice, payloadFunc EncodeOutgoingInvoiceFunc) error {
func (client *DefaultClient) PublishToLndhubExchange(ctx context.Context, invoice models.Invoice, payloadFunc EncodeOutgoingInvoiceFunc) error {
payload := bufPool.Get().(*bytes.Buffer)
err := payloadFunc(ctx, payload, invoice)
if err != nil {
Expand Down
90 changes: 90 additions & 0 deletions republish_invoices/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package main

import (
"context"
"fmt"
"os"
"time"

"github.com/getAlby/lndhub.go/db"
"github.com/getAlby/lndhub.go/db/models"
"github.com/getAlby/lndhub.go/lib"
"github.com/getAlby/lndhub.go/lib/service"
"github.com/getAlby/lndhub.go/rabbitmq"
"github.com/joho/godotenv"
"github.com/kelseyhightower/envconfig"
"github.com/sirupsen/logrus"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we use a new logger?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the same logger. It's what we use under the hood.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does it work? we initialize one here that we then use I think: https://github.com/getAlby/lndhub.go/blob/main/main.go#L80

)

func main() {

c := &service.Config{}
// Load configruation from environment variables
err := godotenv.Load(".env")
if err != nil {
fmt.Println("Failed to load .env file")
}
logger := lib.Logger(c.LogFilePath)
startDate, endDate, err := loadStartAndEndIdFromEnv()
if err != nil {
logger.Fatalf("Could not load start and end id from env %v", err)
}
err = envconfig.Process("", c)
if err != nil {
logger.Fatalf("Error loading environment variables: %v", err)
}
// Open a DB connection based on the configured DATABASE_URI
dbConn, err := db.Open(c)
if err != nil {
logger.Fatalf("Error initializing db connection: %v", err)
}
rabbitmqClient, err := rabbitmq.Dial(c.RabbitMQUri,
rabbitmq.WithLndInvoiceExchange(c.RabbitMQLndInvoiceExchange),
rabbitmq.WithLndHubInvoiceExchange(c.RabbitMQLndhubInvoiceExchange),
rabbitmq.WithLndInvoiceConsumerQueueName(c.RabbitMQInvoiceConsumerQueueName),
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be a function like getRabbitmqClient to make sure we have the same client config everwhere?

if err != nil {
logger.Fatal(err)
}

// close the connection gently at the end of the runtime
defer rabbitmqClient.Close()

result := []models.Invoice{}
err = dbConn.NewSelect().Model(&result).Where("settled_at > ?", startDate).Where("settled_at < ?", endDate).Scan(context.Background())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we only publish settled invoices, correct? thus it's ok to check settled there?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If settled_at is between those 2 values, then it's settled.

if err != nil {
logger.Fatal(err)
}
logrus.Infof("Found %d invoices", len(result))
svc := &service.LndhubService{
Config: c,
DB: dbConn,
Logger: logger,
RabbitMQClient: rabbitmqClient,
InvoicePubSub: service.NewPubsub(),
}
dryRun := os.Getenv("DRY_RUN") == "true"
errCount := 0
for _, inv := range result {
logger.Infof("Publishing invoice with hash %s", inv.RHash)
if dryRun {
continue
}
err = svc.RabbitMQClient.PublishToLndhubExchange(context.Background(), inv, svc.EncodeInvoiceWithUserLogin)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it OK to flood the queue? or do we need some slight delay?

but I guess it should be OK.

if err != nil {
errCount += 1
logger.Error(err)
}
}
logger.Infof("Published %d invoices, # errors %d", len(result), errCount)

}

func loadStartAndEndIdFromEnv() (start, end time.Time, err error) {
start, err = time.Parse(time.RFC3339, os.Getenv("START_DATE"))
if err != nil {
return
}
end, err = time.Parse(time.RFC3339, os.Getenv("END_DATE"))
return
}