Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ Tests require PostgreSQL running locally. Start it with `docker compose up -d po

Keep domain test helpers under `openmeter/.../testutils` independent from `app/common`. Build test dependencies from the underlying package constructors (repos, adapters, services, `lockr`) instead of importing the application wiring layer, or unrelated wiring additions can create test-only import cycles.

For usage-based billing lifecycle tests, prefer driving behavior through `charges.Service.Create`, `AdvanceCharges`, and `ApplyPatches` rather than calling lower-level charge adapters directly. To model late-arriving or newly visible usage, use `MockStreamingConnector` events with explicit `StoredAt` values (or `SetSimpleEvents`) so the test exercises the real stored-at cutoff logic in finalization.

For OpenMeter Go tests that touch the database, explicitly set `POSTGRES_HOST=127.0.0.1`. Without it, many suites will skip during setup even if PostgreSQL is running and the repo environment is otherwise loaded correctly.

Use the repo's Nix CI dev shell when `go`, `gofmt`, or other toolchain binaries are missing from the ambient shell. The CI and local-compatible invocation pattern is:
Expand Down Expand Up @@ -180,6 +182,8 @@ All builds use `GO_BUILD_FLAGS=-tags=dynamic`.

See the `/service` skill for service/adapter patterns, constructors, input types, errors, transactions, hooks, logging, multi-tenancy, and DI wiring. See the `/api` skill for HTTP handler patterns and ValidationIssue. See the `/ent` skill for Ent ORM patterns and Postgres type gotchas. See the `/ledger` skill for ledger package architecture, wiring, and testing. See the `/subscription` skill for subscription domain model, sync algorithm, patch system, workflow layer, and addon sub-system. See the `/notification` skill for notification event pipeline, Kafka consumers, Svix webhook delivery, reconciliation loop, and payload versioning.

In `openmeter/billing/charges/.../adapter`, keep Ent access transaction-aware even in shared helper functions. If a helper accepts a raw `*entdb.Client`, still wrap its body with `entutils.TransactingRepo(...)` / `TransactingRepoWithNoValue(...)` so it rebinds to the transaction already carried in `ctx` instead of depending on the caller to pass a tx-specific client.

## Key Dependencies

| Category | Libraries |
Expand Down
81 changes: 74 additions & 7 deletions app/common/charges.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ import (
flatfeeadapter "github.com/openmeterio/openmeter/openmeter/billing/charges/flatfee/adapter"
flatfeelineengine "github.com/openmeterio/openmeter/openmeter/billing/charges/flatfee/lineengine"
flatfeeservice "github.com/openmeterio/openmeter/openmeter/billing/charges/flatfee/service"
"github.com/openmeterio/openmeter/openmeter/billing/charges/lineage"
lineageadapter "github.com/openmeterio/openmeter/openmeter/billing/charges/lineage/adapter"
lineageservice "github.com/openmeterio/openmeter/openmeter/billing/charges/lineage/service"
"github.com/openmeterio/openmeter/openmeter/billing/charges/meta"
metaadapter "github.com/openmeterio/openmeter/openmeter/billing/charges/meta/adapter"
chargesservice "github.com/openmeterio/openmeter/openmeter/billing/charges/service"
Expand All @@ -26,6 +29,8 @@ import (
"github.com/openmeterio/openmeter/openmeter/ledger"
ledgeraccount "github.com/openmeterio/openmeter/openmeter/ledger/account"
ledgerchargeadapter "github.com/openmeterio/openmeter/openmeter/ledger/chargeadapter"
ledgercollector "github.com/openmeterio/openmeter/openmeter/ledger/collector"
"github.com/openmeterio/openmeter/openmeter/ledger/transactions"
"github.com/openmeterio/openmeter/openmeter/productcatalog/feature"
"github.com/openmeterio/openmeter/openmeter/streaming"
"github.com/openmeterio/openmeter/pkg/framework/lockr"
Expand All @@ -46,12 +51,30 @@ func NewChargesMetaAdapter(
return metaAdapter, nil
}

func NewChargesCollectorService(
ledgerService ledger.Ledger,
accountResolver ledger.AccountResolver,
accountService ledgeraccount.Service,
) ledgercollector.Service {
return ledgercollector.NewService(ledgercollector.Config{
Ledger: ledgerService,
Dependencies: transactions.ResolverDependencies{
AccountService: accountResolver,
SubAccountService: accountService,
},
})
}

func NewChargesFlatFeeHandler(
ledgerService ledger.Ledger,
accountResolver ledger.AccountResolver,
accountService ledgeraccount.Service,
collectorService ledgercollector.Service,
) flatfee.Handler {
return ledgerchargeadapter.NewFlatFeeHandler(ledgerService, accountResolver, accountService)
return ledgerchargeadapter.NewFlatFeeHandler(ledgerService, transactions.ResolverDependencies{
AccountService: accountResolver,
SubAccountService: accountService,
}, collectorService)
}

func NewChargesCreditPurchaseHandler(
Expand All @@ -62,8 +85,8 @@ func NewChargesCreditPurchaseHandler(
return ledgerchargeadapter.NewCreditPurchaseHandler(ledgerService, accountResolver, accountService)
}

func NewChargesUsageBasedHandler() usagebased.Handler {
return usagebased.UnimplementedHandler{}
func NewChargesUsageBasedHandler(collectorService ledgercollector.Service) usagebased.Handler {
return ledgerchargeadapter.NewUsageBasedHandler(collectorService)
}

func NewChargesFlatFeeAdapter(
Expand All @@ -83,15 +106,43 @@ func NewChargesFlatFeeAdapter(
return flatFeeAdapter, nil
}

func NewChargesLineageAdapter(
db *entdb.Client,
) (lineage.Adapter, error) {
lineageAdapter, err := lineageadapter.New(lineageadapter.Config{
Client: db,
})
if err != nil {
return nil, fmt.Errorf("failed to create charges lineage adapter: %w", err)
}

return lineageAdapter, nil
}

func NewChargesLineageService(
lineageAdapter lineage.Adapter,
) (lineage.Service, error) {
lineageService, err := lineageservice.New(lineageservice.Config{
Adapter: lineageAdapter,
})
if err != nil {
return nil, fmt.Errorf("failed to create charges lineage service: %w", err)
}

return lineageService, nil
}

func NewChargesFlatFeeService(
flatFeeAdapter flatfee.Adapter,
flatFeeHandler flatfee.Handler,
lineageService lineage.Service,
metaAdapter meta.Adapter,
locker *lockr.Locker,
) (flatfee.Service, error) {
flatFeeSvc, err := flatfeeservice.New(flatfeeservice.Config{
Adapter: flatFeeAdapter,
Handler: flatFeeHandler,
Lineage: lineageService,
MetaAdapter: metaAdapter,
Locker: locker,
})
Expand Down Expand Up @@ -122,6 +173,7 @@ func NewChargesUsageBasedAdapter(
func NewChargesUsageBasedService(
usageBasedAdapter usagebased.Adapter,
usageBasedHandler usagebased.Handler,
lineageService lineage.Service,
locker *lockr.Locker,
metaAdapter meta.Adapter,
billingService billing.Service,
Expand All @@ -132,6 +184,7 @@ func NewChargesUsageBasedService(
usageBasedSvc, err := usagebasedservice.New(usagebasedservice.Config{
Adapter: usageBasedAdapter,
Handler: usageBasedHandler,
Lineage: lineageService,
Locker: locker,
MetaAdapter: metaAdapter,
CustomerOverrideService: billingService,
Expand Down Expand Up @@ -166,11 +219,13 @@ func NewChargesCreditPurchaseAdapter(
func NewChargesCreditPurchaseService(
creditPurchaseAdapter creditpurchase.Adapter,
creditPurchaseHandler creditpurchase.Handler,
lineageService lineage.Service,
metaAdapter meta.Adapter,
) (creditpurchase.Service, error) {
creditPurchaseSvc, err := creditpurchaseservice.New(creditpurchaseservice.Config{
Adapter: creditPurchaseAdapter,
Handler: creditPurchaseHandler,
Lineage: lineageService,
MetaAdapter: metaAdapter,
})
if err != nil {
Expand Down Expand Up @@ -242,16 +297,27 @@ func newChargesRegistry(
return nil, err
}

flatFeeHandler := NewChargesFlatFeeHandler(ledgerService, accountResolver, accountService)
lineageAdapter, err := NewChargesLineageAdapter(db)
if err != nil {
return nil, err
}

lineageService, err := NewChargesLineageService(lineageAdapter)
if err != nil {
return nil, err
}

collectorService := NewChargesCollectorService(ledgerService, accountResolver, accountService)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NewChargesFlatFeeService, NewChargesUsageBasedService, and NewChargesCreditPurchaseService all require a lineage.Service in their config now (Config.Validate() returns "lineage service cannot be null"), but none of them receive one here. The server will crash on startup.

Fix: create a lineage adapter + service here (same pattern as testutils/service.go:114-126) and pass it to all three service constructors. Same issue in customerbalance.go where the flat-fee and usage-based services are also constructed without lineage.

flatFeeHandler := NewChargesFlatFeeHandler(ledgerService, accountResolver, accountService, collectorService)
creditPurchaseHandler := NewChargesCreditPurchaseHandler(ledgerService, accountResolver, accountService)
usageBasedHandler := NewChargesUsageBasedHandler()
usageBasedHandler := NewChargesUsageBasedHandler(collectorService)

flatFeeAdapter, err := NewChargesFlatFeeAdapter(db, logger, metaAdapter)
if err != nil {
return nil, err
}

flatFeeSvc, err := NewChargesFlatFeeService(flatFeeAdapter, flatFeeHandler, metaAdapter, locker)
flatFeeSvc, err := NewChargesFlatFeeService(flatFeeAdapter, flatFeeHandler, lineageService, metaAdapter, locker)
if err != nil {
return nil, err
}
Expand All @@ -276,6 +342,7 @@ func newChargesRegistry(
usageBasedSvc, err := NewChargesUsageBasedService(
usageBasedAdapter,
usageBasedHandler,
lineageService,
locker,
metaAdapter,
billingService,
Expand All @@ -292,7 +359,7 @@ func newChargesRegistry(
return nil, err
}

creditPurchaseSvc, err := NewChargesCreditPurchaseService(creditPurchaseAdapter, creditPurchaseHandler, metaAdapter)
creditPurchaseSvc, err := NewChargesCreditPurchaseService(creditPurchaseAdapter, creditPurchaseHandler, lineageService, metaAdapter)
if err != nil {
return nil, err
}
Expand Down
32 changes: 30 additions & 2 deletions app/common/customerbalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/openmeterio/openmeter/openmeter/billing/charges/flatfee"
flatfeeadapter "github.com/openmeterio/openmeter/openmeter/billing/charges/flatfee/adapter"
flatfeeservice "github.com/openmeterio/openmeter/openmeter/billing/charges/flatfee/service"
lineageadapter "github.com/openmeterio/openmeter/openmeter/billing/charges/lineage/adapter"
lineageservice "github.com/openmeterio/openmeter/openmeter/billing/charges/lineage/service"
"github.com/openmeterio/openmeter/openmeter/billing/charges/meta"
metaadapter "github.com/openmeterio/openmeter/openmeter/billing/charges/meta/adapter"
"github.com/openmeterio/openmeter/openmeter/billing/charges/usagebased"
Expand All @@ -22,7 +24,9 @@ import (
"github.com/openmeterio/openmeter/openmeter/ledger"
ledgeraccount "github.com/openmeterio/openmeter/openmeter/ledger/account"
ledgerchargeadapter "github.com/openmeterio/openmeter/openmeter/ledger/chargeadapter"
ledgercollector "github.com/openmeterio/openmeter/openmeter/ledger/collector"
"github.com/openmeterio/openmeter/openmeter/ledger/customerbalance"
"github.com/openmeterio/openmeter/openmeter/ledger/transactions"
"github.com/openmeterio/openmeter/openmeter/productcatalog/feature"
"github.com/openmeterio/openmeter/openmeter/streaming"
"github.com/openmeterio/openmeter/pkg/framework/lockr"
Expand Down Expand Up @@ -59,6 +63,20 @@ func NewCustomerBalanceService(
return nil, err
}

lineageAdapter, err := lineageadapter.New(lineageadapter.Config{
Client: db,
})
if err != nil {
return nil, err
}

lineageService, err := lineageservice.New(lineageservice.Config{
Adapter: lineageAdapter,
})
if err != nil {
return nil, err
}

searchAdapter, err := chargeadapter.New(chargeadapter.Config{
Client: db,
Logger: logger,
Expand All @@ -76,9 +94,18 @@ func NewCustomerBalanceService(
return nil, err
}

collectorService := ledgercollector.NewService(ledgercollector.Config{
Ledger: historicalLedger,
Dependencies: transactions.ResolverDependencies{
AccountService: accountResolver,
SubAccountService: accountService,
},
})

flatFeeService, err := flatfeeservice.New(flatfeeservice.Config{
Adapter: flatFeeAdapter,
Handler: ledgerchargeadapter.NewFlatFeeHandler(historicalLedger, accountResolver, accountService),
Handler: ledgerchargeadapter.NewFlatFeeHandler(historicalLedger, transactions.ResolverDependencies{AccountService: accountResolver, SubAccountService: accountService}, collectorService),
Lineage: lineageService,
MetaAdapter: metaAdapter,
Locker: locker,
})
Expand All @@ -97,7 +124,8 @@ func NewCustomerBalanceService(

usageService, err := usagebasedservice.New(usagebasedservice.Config{
Adapter: usageAdapter,
Handler: usagebased.UnimplementedHandler{},
Handler: ledgerchargeadapter.NewUsageBasedHandler(collectorService),
Lineage: lineageService,
Locker: locker,
MetaAdapter: metaAdapter,
CustomerOverrideService: billingRegistry.Billing,
Expand Down
41 changes: 30 additions & 11 deletions openmeter/billing/charges/creditpurchase/service/external.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/openmeterio/openmeter/openmeter/billing/charges/creditpurchase"
"github.com/openmeterio/openmeter/openmeter/billing/charges/lineage"
"github.com/openmeterio/openmeter/openmeter/billing/charges/meta"
"github.com/openmeterio/openmeter/openmeter/billing/charges/models/ledgertransaction"
"github.com/openmeterio/openmeter/openmeter/billing/charges/models/payment"
Expand All @@ -19,20 +20,38 @@ func (s *service) onExternalCreditPurchase(ctx context.Context, charge creditpur

targetStatus := externalCreditPurchaseSettlement.InitialStatus

// Let's handle the initial state
ledgerTransactionGroupReference, err := s.handler.OnCreditPurchaseInitiated(ctx, charge)
if err != nil {
return creditpurchase.Charge{}, err
}
charge, err = transaction.Run(ctx, s.adapter, func(ctx context.Context) (creditpurchase.Charge, error) {
ledgerTransactionGroupReference, err := s.handler.OnCreditPurchaseInitiated(ctx, charge)
if err != nil {
return creditpurchase.Charge{}, err
}

charge.State.CreditGrantRealization = &ledgertransaction.TimedGroupReference{
GroupReference: ledgerTransactionGroupReference,
Time: clock.Now(),
}
charge.State.CreditGrantRealization = &ledgertransaction.TimedGroupReference{
GroupReference: ledgerTransactionGroupReference,
Time: clock.Now(),
}

if ledgerTransactionGroupReference.TransactionGroupID != "" {
if err := s.lineage.BackfillAdvanceLineageSegments(ctx, lineage.BackfillAdvanceLineageSegmentsInput{
Namespace: charge.Namespace,
CustomerID: charge.Intent.CustomerID,
Currency: charge.Intent.Currency,
Amount: charge.Intent.CreditAmount,
BackingTransactionGroupID: ledgerTransactionGroupReference.TransactionGroupID,
}); err != nil {
return creditpurchase.Charge{}, err
}
}

charge.Status = meta.ChargeStatusActive

charge.Status = meta.ChargeStatusActive
updatedCharge, err := s.adapter.UpdateCharge(ctx, charge)
if err != nil {
return creditpurchase.Charge{}, err
}

charge, err = s.adapter.UpdateCharge(ctx, charge)
return updatedCharge, nil
})
if err != nil {
return creditpurchase.Charge{}, err
}
Expand Down
41 changes: 29 additions & 12 deletions openmeter/billing/charges/creditpurchase/service/invoice.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,43 @@ import (

"github.com/openmeterio/openmeter/openmeter/billing"
"github.com/openmeterio/openmeter/openmeter/billing/charges/creditpurchase"
"github.com/openmeterio/openmeter/openmeter/billing/charges/lineage"
"github.com/openmeterio/openmeter/openmeter/billing/charges/meta"
"github.com/openmeterio/openmeter/openmeter/billing/charges/models/ledgertransaction"
"github.com/openmeterio/openmeter/openmeter/billing/charges/models/payment"
"github.com/openmeterio/openmeter/pkg/clock"
"github.com/openmeterio/openmeter/pkg/framework/transaction"
)

func (s *service) PostInvoiceDraftCreated(ctx context.Context, charge creditpurchase.Charge, lineWithHeader billing.StandardLineWithInvoiceHeader) error {
ledgerTransactionGroupReference, err := s.handler.OnCreditPurchaseInitiated(ctx, charge)
if err != nil {
return transaction.RunWithNoValue(ctx, s.adapter, func(ctx context.Context) error {
ledgerTransactionGroupReference, err := s.handler.OnCreditPurchaseInitiated(ctx, charge)
if err != nil {
return err
}

charge.State.CreditGrantRealization = &ledgertransaction.TimedGroupReference{
GroupReference: ledgerTransactionGroupReference,
Time: clock.Now(),
}

if ledgerTransactionGroupReference.TransactionGroupID != "" {
if err := s.lineage.BackfillAdvanceLineageSegments(ctx, lineage.BackfillAdvanceLineageSegmentsInput{
Namespace: charge.Namespace,
CustomerID: charge.Intent.CustomerID,
Currency: charge.Intent.Currency,
Amount: charge.Intent.CreditAmount,
BackingTransactionGroupID: ledgerTransactionGroupReference.TransactionGroupID,
}); err != nil {
return err
}
}

charge.Status = meta.ChargeStatusActive

_, err = s.adapter.UpdateCharge(ctx, charge)
return err
}

charge.State.CreditGrantRealization = &ledgertransaction.TimedGroupReference{
GroupReference: ledgerTransactionGroupReference,
Time: clock.Now(),
}
charge.Status = meta.ChargeStatusActive

_, err = s.adapter.UpdateCharge(ctx, charge)
return err
})
}

// PostInvoicePaymentAuthorized is called when the invoice is approved/issued.
Expand Down
Loading
Loading