Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
# Changelog
All notable changes to this project will be documented in this file.

## [10.6.0]
### Added
- Added withdrawals to investment service ingestion

## [10.4.0]
### Changed
- Upgrade `customer-profile-api` version from `1.17.1` to `2.6.0`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class IngestConfigProperties {
private PortfolioConfig portfolio = new PortfolioConfig();
private AllocationConfig allocation = new AllocationConfig();
private DepositConfig deposit = new DepositConfig();
private WithdrawalConfig withdrawal = new WithdrawalConfig();
private AssetConfig asset = new AssetConfig();
private AssessmentConfig assessment = new AssessmentConfig();

Expand Down Expand Up @@ -114,7 +115,33 @@ public static class DepositConfig {
}

// -------------------------------------------------------------------------
// Deposit
// Withdrawal
// -------------------------------------------------------------------------

/**
* Settings that govern the automatic seed withdrawal created for portfolios during ingestion.
*
* <p>Withdrawals are only created when {@code defaultAmount} is greater than zero, or when an
* explicit {@code withdrawalAmount} is set on the {@code InvestmentPortfolio}.
*/
@Data
public static class WithdrawalConfig {

/**
* The payment provider identifier sent with every withdrawal request. Set this to the real
* provider name for non-mock environments.
*/
private String provider = null;

/**
* The monetary amount used as the withdrawal amount when no explicit value is set on the
* portfolio. Defaults to {@code 500.0} (5% of {@code DEFAULT_INIT_CASH}).
*/
private double defaultAmount = DEFAULT_INIT_CASH * 0.05d;
}

// -------------------------------------------------------------------------
// Asset
// -------------------------------------------------------------------------

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public class InvestmentArrangement {
private String currency;
private String productPortfolioName;
private BigDecimal initialCash;
private BigDecimal withdrawalAmount;

private UUID investmentProductId;
private List<String> legalEntityIds;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,16 @@ public class InvestmentPortfolio {

private PortfolioList portfolio;
private BigDecimal initialCash;
private BigDecimal withdrawalAmount;

public double getInitialCashOrDefault(double defaultAmount) {
return Optional.ofNullable(initialCash).map(BigDecimal::doubleValue)
.orElse(defaultAmount);
}

public double getWithdrawalAmountOrDefault(double defaultAmount) {
return Optional.ofNullable(withdrawalAmount).map(BigDecimal::doubleValue)
.orElse(defaultAmount);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public Mono<InvestmentTask> executeTask(InvestmentTask streamTask) {
.flatMap(this::upsertPortfolioTradingAccounts)
.flatMap(this::upsertInvestmentPortfolioDeposits)
.flatMap(this::upsertPortfoliosAllocations)
.flatMap(this::upsertInvestmentPortfolioWithdrawals)
.doOnSuccess(completedTask -> {
streamTask.setState(State.COMPLETED);
log.info("Successfully completed investment saga: taskId={}, taskName={}, state={}",
Expand Down Expand Up @@ -136,6 +137,41 @@ private Mono<InvestmentTask> upsertInvestmentPortfolioDeposits(InvestmentTask in
.map(_ -> investmentTask);
}

/**
* Upserts withdrawals for all investment portfolios in the task.
*
* <ol>
* <li>Iterates over every portfolio tracked in the task</li>
* <li>Calls {@link com.backbase.stream.investment.service.InvestmentPortfolioService#upsertWithdrawals}
* for each — portfolios with a zero or null withdrawal amount are silently skipped</li>
* <li>Per-portfolio errors are caught, logged as warnings, and skipped so that the rest of
* the batch is unaffected</li>
* </ol>
*
* @param investmentTask the task containing the portfolios to process
* @return Mono emitting the unchanged task after all withdrawals have been processed
*/
private Mono<InvestmentTask> upsertInvestmentPortfolioWithdrawals(InvestmentTask investmentTask) {
List<InvestmentPortfolio> portfolios =
Objects.requireNonNullElse(investmentTask.getData().getPortfolios(), List.of());
log.info("Starting upsert of investment portfolio withdrawals: taskId={}, portfolioCount={}",
investmentTask.getId(), portfolios.size());
if (portfolios.isEmpty()) {
log.warn("No portfolios found for withdrawal upsert — skipping: taskId={}", investmentTask.getId());
return Mono.just(investmentTask);
}
return Flux.fromIterable(portfolios)
.flatMap(portfolio -> investmentPortfolioService.upsertWithdrawals(portfolio)
.onErrorResume(throwable -> {
log.warn("Failed to upsert withdrawal for portfolio: portfolioUuid={}, taskId={}",
portfolio.getPortfolio() != null ? portfolio.getPortfolio().getUuid() : "unknown",
investmentTask.getId(), throwable);
return Mono.empty();
}))
.collectList()
.map(_ -> investmentTask);
}

/**
* Rollback is not implemented for investment saga.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
import com.backbase.investment.api.service.v1.model.Deposit;
import com.backbase.investment.api.service.v1.model.DepositRequest;
import com.backbase.investment.api.service.v1.model.DepositTypeEnum;
import com.backbase.investment.api.service.v1.model.IntegrationWithdrawalCreate;
import com.backbase.investment.api.service.v1.model.IntegrationWithdrawalCreateRequest;
import com.backbase.investment.api.service.v1.model.IntegrationWithdrawalList;
import com.backbase.investment.api.service.v1.model.IntegrationPortfolioCreateRequest;
import com.backbase.investment.api.service.v1.model.PaginatedPortfolioTradingAccountList;
import com.backbase.investment.api.service.v1.model.PatchedPortfolioUpdateRequest;
Expand All @@ -15,6 +18,7 @@
import com.backbase.investment.api.service.v1.model.PortfolioTradingAccountRequest;
import com.backbase.investment.api.service.v1.model.Status08fEnum;
import com.backbase.investment.api.service.v1.model.StatusA3dEnum;
import com.backbase.investment.api.service.v1.model.StatusDa8Enum;
import com.backbase.stream.configuration.IngestConfigProperties;
import com.backbase.stream.investment.InvestmentArrangement;
import com.backbase.stream.investment.model.InvestmentPortfolio;
Expand Down Expand Up @@ -74,6 +78,7 @@ public Mono<List<InvestmentPortfolio>> upsertPortfolios(List<InvestmentArrangeme
.builder()
.portfolio(p)
.initialCash(arrangement.getInitialCash())
.withdrawalAmount(arrangement.getWithdrawalAmount())
.build())
.doOnSuccess(ip -> log.debug(
"Successfully upserted investment portfolio: portfolioUuid={}, externalId={}, name={}",
Expand Down Expand Up @@ -314,6 +319,111 @@ private Mono<Deposit> createDeposit(PortfolioList portfolio, double defaultAmoun
});
}

/**
* Creates or updates a withdrawal for the given investment portfolio.
*
* <p>Implements the same idempotent upsert pattern as {@link #upsertDeposits}:
* <ol>
* <li>Skips processing entirely when the configured withdrawal amount is {@code <= 0}</li>
* <li>Queries existing withdrawals for the portfolio</li>
* <li>If the sum of existing withdrawals is less than the target amount, creates an
* additional withdrawal for the remaining difference</li>
* <li>If the target has already been met, returns the last existing withdrawal</li>
* <li>If no withdrawals exist, creates one for the full target amount</li>
* <li>On error, returns a synthetic {@link IntegrationWithdrawalCreate} to allow
* downstream allocation steps to proceed</li>
* </ol>
*
* @param investmentPortfolio the portfolio to process (must not be null)
* @return Mono emitting the created or existing withdrawal, or empty if amount is {@code <= 0}
*/
public Mono<IntegrationWithdrawalCreate> upsertWithdrawals(InvestmentPortfolio investmentPortfolio) {
PortfolioList portfolio = investmentPortfolio.getPortfolio();
double withdrawalAmount = investmentPortfolio.getWithdrawalAmountOrDefault(
config.getWithdrawal().getDefaultAmount());

if (withdrawalAmount <= 0) {
log.info("Skipping withdrawal for portfolio: uuid={}, withdrawalAmount={}",
portfolio.getUuid(), withdrawalAmount);
return Mono.empty();
}

log.debug("Listing existing withdrawals for portfolio: uuid={}, targetWithdrawalAmount={}",
portfolio.getUuid(), withdrawalAmount);
return paymentsApi.listWithdrawals(null, null, null, null, null,
null, portfolio.getUuid(), null)
.filter(Objects::nonNull)
// Use flatMap with Mono.justOrEmpty() to safely handle null results without NPE,
// ensuring switchIfEmpty fallback triggers for both null and empty withdrawal lists.
.flatMap(paginatedResult ->
Mono.justOrEmpty(paginatedResult.getResults())
.filter(list -> !list.isEmpty()))
.flatMap(withdrawals -> {
double withdrawn = withdrawals.stream().mapToDouble(IntegrationWithdrawalList::getAmount).sum();
double remaining = withdrawalAmount - withdrawn;
log.debug("Portfolio withdrawal check: uuid={}, existing={}, target={}, remaining={}",
portfolio.getUuid(), withdrawn, withdrawalAmount, remaining);
if (remaining > 0) {
return createWithdrawal(portfolio, remaining);
}
IntegrationWithdrawalList last = withdrawals.getLast();
return Mono.just(new IntegrationWithdrawalCreate()
.portfolio(last.getPortfolio())
.amount(last.getAmount())
.completedAt(last.getCompletedAt()));
})
.switchIfEmpty(Mono.defer(() -> createWithdrawal(portfolio, withdrawalAmount)))
.onErrorResume(ex -> {
if (ex instanceof WebClientResponseException wce) {
log.error("listWithdrawals API call failed for portfolio: uuid={}, status={}, body={}",
portfolio.getUuid(), wce.getStatusCode(), wce.getResponseBodyAsString(), wce);
} else {
log.error("Failed to process withdrawal for portfolio: uuid={}", portfolio.getUuid(), ex);
}
return Mono.just(new IntegrationWithdrawalCreate()
.portfolio(portfolio.getUuid())
.amount(withdrawalAmount)
.completedAt(portfolio.getActivated().plusDays(4)));
});
}

/**
* Creates a new withdrawal for the given portfolio via the payments API.
*
* <p>The withdrawal is created with:
* <ul>
* <li>Status {@code COMPLETED}</li>
* <li>Completion date set to portfolio activation date + 3 days</li>
* <li>Provider from configuration</li>
* </ul>
*
* @param portfolio the portfolio to create the withdrawal for
* @param amount the withdrawal amount
* @return Mono emitting the created withdrawal
*/
@Nonnull
private Mono<IntegrationWithdrawalCreate> createWithdrawal(PortfolioList portfolio, double amount) {
return paymentsApi.createWithdrawal(new IntegrationWithdrawalCreateRequest()
.portfolio(portfolio.getUuid())
.provider(config.getWithdrawal().getProvider())
.status(StatusDa8Enum.COMPLETED)
.completedAt(portfolio.getActivated().plusDays(3))
.amount(amount)
)
.doOnSuccess(withdrawal -> log.info("Created withdrawal {} for portfolio {}",
withdrawal.getUuid(), portfolio.getUuid()))
.doOnError(throwable -> {
if (throwable instanceof WebClientResponseException ex) {
log.warn(
"Portfolio withdrawal create failed: uuid={}, status={}, body={}",
portfolio.getUuid(), ex.getStatusCode(), ex.getResponseBodyAsString());
} else {
log.warn("Portfolio withdrawal create failed: uuid={}",
portfolio.getUuid(), throwable);
}
});
}

/**
* Upserts portfolio trading accounts derived from the provided investment portfolio accounts.
*
Expand Down
Loading
Loading