"""
CSV Bank Statement Parser โ Full Pipeline
Architecture: 5-stage pipeline, each stage is independently testable.
Stage 1: FileReader โ decode bytes, detect encoding
Stage 2: FormatDetector โ detect delimiter, find header row, map columns
Stage 3: RowFilter โ skip blanks, metadata rows, summary rows
Stage 4: RowParser โ parse each row (date, amount, description, type)
Stage 5: PostProcessor โ validate balance, compute stats, build result
Design principles:
- Never crash on bad data; collect warnings and continue
- Always preserve raw values for debugging
- Amount is always positive; direction is explicit ('C' / 'D')
- All warnings attached to the ParsedBankStatement result
"""
import csv
import io
import re
from decimal import Decimal
import chardet
from app.core.exceptions import (
CSVEncodingError,
CSVMissingRequiredColumnsError,
CSVNoDataRowsError,
CSVParseError,
)
from app.core.logging import get_logger
from app.utils.amount_parser import ParsedAmount, parse_amount, parse_split_amounts
from app.utils.date_parser import ParsedDate, infer_date_format_hint, parse_date
from app.models.enums import TransactionType
from app.parsers.base import BaseParser
from app.parsers.schemas import ParsedTransaction, ColumnMapping, ParsedBankStatement
from app.parsers.constants import (
CSV_PARSER_VERSION as PARSER_VERSION,
CSV_MIN_DATA_ROWS as MIN_DATA_ROWS,
CSV_SKIP_ROW_PATTERNS as SKIP_ROW_PATTERNS,
CSV_TYPE_KEYWORD_MAP as TYPE_KEYWORD_MAP,
)
logger = get_logger(__name__)
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
# Stage 1: FileReader โ decode bytes, detect encoding
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
[docs]
class FileReader:
"""
Reads raw bytes โ decoded text string.
Handles: UTF-8, UTF-8-BOM, Latin-1, Windows-1252, UTF-16.
"""
SUPPORTED_ENCODINGS = [
"utf-8-sig", # UTF-8 with BOM (Excel exports) โ try first
"utf-8",
"latin-1",
"windows-1252",
"utf-16",
]
[docs]
def read(self, content: bytes) -> tuple[str, str]:
"""
Decode bytes to string.
Returns:
(decoded_text, detected_encoding)
Raises:
CSVEncodingError: If all known encodings fail.
"""
# โโ Use chardet for detection โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
detected = chardet.detect(content)
chardet_encoding = detected.get("encoding") or "utf-8"
chardet_confidence = detected.get("confidence", 0)
logger.debug(
"Encoding detection",
chardet_encoding=chardet_encoding,
confidence=chardet_confidence,
)
# โโ Build candidate list (chardet first, then fallbacks) โโโโโโโโโโโโโ
candidates = [chardet_encoding] + [
e for e in self.SUPPORTED_ENCODINGS if e.lower() != chardet_encoding.lower()
]
last_error: Exception | None = None
for encoding in candidates:
try:
text = content.decode(encoding)
# Check for replacement characters (sign of wrong encoding)
replacement_count = text.count("\ufffd")
if replacement_count > 10:
logger.warning(
"High replacement char count โ encoding may be wrong",
encoding=encoding,
replacement_count=replacement_count,
)
continue
logger.info("Decoded CSV", encoding=encoding)
return text, encoding
except (UnicodeDecodeError, LookupError) as e:
last_error = e
continue
# โโ Last resort: decode with errors='replace' โโโโโโโโโโโโโโโโโโโโโโโโ
logger.warning("All encodings failed; falling back to UTF-8 with replacement chars")
try:
text = content.decode("utf-8", errors="replace")
return text, "utf-8 (forced, may contain errors)"
except Exception:
raise CSVEncodingError(
f"Cannot decode file content. Tried: {candidates}. Last error: {last_error}"
)
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
# Stage 2: FormatDetector โ delimiter, header row, column mapping
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
# Column name synonym maps โ order matters (more specific first)
# Covers: Standard, HDFC, SBI, ICICI, European/German, and common variants
DATE_COLUMNS = [
# Standard / generic
"transaction date", "txn date", "trans date", "date",
"posting date", "post date", "entry date", "booking date",
# German / European
"datum", "buchungsdatum", "buchungstag", "wertstellungsdatum",
# HDFC / Indian bank specific
"value dt", "tran date", "val date",
]
VALUE_DATE_COLUMNS = [
"value date", "settlement date", "effective date",
"value dt", "wertstellung", "val date",
]
DESCRIPTION_COLUMNS = [
"transaction details", "transaction description", "narration",
"description", "particulars", "details", "remarks", "memo",
"reference details", "trans description",
# German
"buchungstext", "verwendungszweck", "betreff",
# HDFC / SBI
"tran particular", "transaction particulars",
]
REFERENCE_COLUMNS = [
"reference number", "ref number", "ref no", "reference",
"cheque no", "cheque number", "check no", "transaction id",
"txn id", "trans id", "utr", "utr number",
# HDFC: Chq./Ref.No.
"chq./ref.no.", "chq/ref no", "chq.ref.no", "cheque ref no",
]
AMOUNT_COLUMNS = [
"transaction amount", "txn amount", "trans amount",
"amount (inr)", "amount (usd)", "amount",
# German
"betrag", "umsatz",
]
DEBIT_COLUMNS = [
"debit amount", "withdrawal amount", "withdrawal (dr)",
"debit", "dr", "withdrawal", "dr amount", "debit(dr)",
# HDFC format
"withdrawal amt (dr)", "debit amt", "dr amt",
# SBI / other Indian banks
"withdrawals", "debit (dr)",
]
CREDIT_COLUMNS = [
"credit amount", "deposit amount", "deposit (cr)",
"credit", "cr", "deposit", "cr amount", "credit(cr)",
# HDFC format
"deposit amt (cr)", "credit amt", "cr amt",
# SBI / other
"deposits", "credit (cr)",
]
BALANCE_COLUMNS = [
"running balance", "available balance", "closing balance",
"balance", "bal", "closing bal",
# German
"kontostand", "saldo", "endbestand",
# HDFC
"closing balance",
]
CURRENCY_COLUMNS = [
"currency", "ccy", "curr",
# German
"wรคhrung", "wahrung",
]
TYPE_COLUMNS = ["transaction type", "txn type", "type", "trans type"]
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
# Stage 3: RowFilter โ skip blanks, metadata, and summary rows
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
[docs]
class RowFilter:
"""Filters out non-data rows before parsing."""
def __init__(self, expected_col_count: int):
self.expected_col_count = expected_col_count
self.skipped: list[dict] = []
[docs]
def should_skip(self, row: list[str], row_index: int) -> bool:
"""
Returns True if this row should be skipped.
Appends to self.skipped for audit purposes.
"""
reason = self._get_skip_reason(row, row_index)
if reason:
self.skipped.append({"row_index": row_index, "reason": reason, "content": row[:3]})
return True
return False
def _get_skip_reason(self, row: list[str], row_index: int) -> str | None:
# โโ Blank row โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
if not row or all(cell.strip() == "" for cell in row):
return "blank_row"
# โโ Too few columns (malformed) โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
if len(row) < max(2, self.expected_col_count - 2):
return f"too_few_columns ({len(row)} vs expected ~{self.expected_col_count})"
# โโ Summary / metadata row โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
first_cell = row[0].strip()
if SKIP_ROW_PATTERNS.match(first_cell):
return f"summary_row ({first_cell!r})"
# โโ Repeated header row (continuation pages) โโโโโโโโโโโโโโโโโโโโโโโโโ
date_signals = {"date", "txn date", "transaction date", "value date"}
if first_cell.strip().lower() in date_signals:
return "repeated_header_row"
return None
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
# Stage 4: RowParser โ parse individual rows
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
[docs]
class RowParser:
"""Parses a single CSV data row into a ParsedTransaction."""
def __init__(self, mapping: ColumnMapping, dayfirst: bool = True):
self.mapping = mapping
self.dayfirst = dayfirst
[docs]
def parse(self, row: list[str], row_index: int) -> ParsedTransaction | None:
"""
Parse one CSV row.
Returns:
ParsedTransaction on success.
None if the row cannot be parsed at all (e.g., date missing).
"""
warnings: list[str] = []
m = self.mapping
def get(col_index: int | None) -> str:
if col_index is None or col_index >= len(row):
return ""
return row[col_index].strip()
# โโ Date โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
raw_date = get(m.date)
try:
parsed_date: ParsedDate | None = parse_date(raw_date, dayfirst=self.dayfirst)
except ValueError as e:
logger.warning("Date parse failure", row_index=row_index, raw=raw_date, error=str(e))
return None # Cannot continue without a date
if parsed_date is None:
return None # Missing date โ skip row
if parsed_date.warning:
warnings.append(f"date: {parsed_date.warning}")
# โโ Value date (optional) โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
value_date_result: ParsedDate | None = None
if m.value_date is not None:
raw_value_date = get(m.value_date)
if raw_value_date:
try:
value_date_result = parse_date(raw_value_date, dayfirst=self.dayfirst)
except ValueError:
warnings.append("value_date: Could not parse, ignored")
# โโ Amount โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
parsed_amount: ParsedAmount | None = None
if m.debit is not None or m.credit is not None:
# Split debit/credit columns
raw_debit = get(m.debit) if m.debit is not None else None
raw_credit = get(m.credit) if m.credit is not None else None
try:
parsed_amount = parse_split_amounts(raw_debit, raw_credit)
except ValueError as e:
warnings.append(f"amount: Split parse failed โ {e}")
elif m.amount is not None:
# Single amount column
raw_amount = get(m.amount)
try:
parsed_amount = parse_amount(raw_amount)
except ValueError as e:
warnings.append(f"amount: {e}")
if parsed_amount is None:
warnings.append("amount: Missing or null โ row retained without amount")
# Store as 0 with warning (don't skip โ date may still be useful)
amount_value = Decimal("0")
direction = "D"
else:
amount_value = parsed_amount.value
direction = parsed_amount.direction
if parsed_amount.warning:
warnings.append(f"amount: {parsed_amount.warning}")
if parsed_amount.is_inferred:
warnings.append("amount: Direction inferred (no explicit CR/DR marker)")
# โโ Balance โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
balance_after: Decimal | None = None
if m.balance is not None:
raw_balance = get(m.balance)
if raw_balance:
try:
parsed_bal = parse_amount(raw_balance, default_direction="C")
balance_after = parsed_bal.value if parsed_bal else None
except ValueError:
warnings.append("balance: Could not parse running balance")
# โโ Description โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
raw_description = get(m.description)
description = _clean_description(raw_description)
# โโ Reference โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
reference = get(m.reference) or None
# โโ Currency โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
currency = get(m.currency).upper() if m.currency is not None and get(m.currency) else None
# โโ Transaction type โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
explicit_type_raw = get(m.transaction_type)
transaction_type = _infer_transaction_type(
explicit_raw=explicit_type_raw,
description=description,
direction=direction,
)
return ParsedTransaction(
row_index=row_index,
transaction_date=parsed_date.value,
value_date=value_date_result.value if value_date_result else None,
description=description,
raw_description=raw_description if raw_description != description else None,
reference_number=reference,
transaction_type=transaction_type,
amount=amount_value,
direction=direction,
balance_after=balance_after,
currency=currency,
parse_warnings=warnings,
)
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
# Stage 5: PostProcessor โ validate, compute stats, build result
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
[docs]
class PostProcessor:
"""
Final validation and result assembly.
- Infers statement date range from transactions
- Detects dominant currency
- Validates running balance continuity
- Computes statistics
"""
[docs]
def process(
self,
transactions: list[ParsedTransaction],
raw_headers: dict,
encoding: str,
delimiter: str,
column_mapping: ColumnMapping,
rows_skipped: int,
format_warnings: list[dict],
) -> ParsedBankStatement:
warnings = list(format_warnings)
if not transactions:
raise CSVNoDataRowsError(
"No valid transaction rows found after parsing. "
"Check that the file contains data rows below the header."
)
# โโ Infer date range โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
dates = [t.transaction_date for t in transactions]
statement_from = min(dates)
statement_to = max(dates)
# โโ Dominant currency โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
currencies = [t.currency for t in transactions if t.currency]
dominant_currency = _most_common(currencies) if currencies else None
if len(set(currencies)) > 1:
warnings.append({
"field": "currency",
"message": f"Multiple currencies detected: {set(currencies)}. "
f"Dominant: {dominant_currency}. "
"Per-row currency stored on each transaction.",
})
# โโ Opening / closing balance from first/last transactions โโโโโโโโโโโโ
opening_balance = transactions[0].balance_after # approximate
closing_balance = transactions[-1].balance_after
# โโ Balance continuity check โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
if all(t.balance_after is not None for t in transactions):
warnings.extend(self._check_balance_continuity(transactions))
# โโ Row-level warnings โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
for tx in transactions:
if tx.parse_warnings:
warnings.append({
"field": f"row_{tx.row_index}",
"message": "; ".join(tx.parse_warnings),
})
return ParsedBankStatement(
bank_name=None, # Not typically in transaction CSV
account_number=None, # Would come from metadata rows
account_holder=None,
currency=dominant_currency,
statement_from=statement_from,
statement_to=statement_to,
opening_balance=opening_balance,
closing_balance=closing_balance,
transactions=transactions,
detected_encoding=encoding,
detected_delimiter=delimiter,
detected_format=_detect_bank_format(raw_headers),
raw_headers=raw_headers,
column_mapping=column_mapping,
total_rows_parsed=len(transactions),
total_rows_skipped=rows_skipped,
warnings=warnings,
)
def _check_balance_continuity(
self, transactions: list[ParsedTransaction]
) -> list[dict]:
"""Detect gaps in running balance (signs of missing rows)."""
issues = []
for i in range(1, len(transactions)):
prev = transactions[i - 1]
curr = transactions[i]
if prev.balance_after is None or curr.balance_after is None:
continue
expected = (
prev.balance_after + curr.amount
if curr.direction == "C"
else prev.balance_after - curr.amount
)
diff = abs(expected - curr.balance_after)
if diff > Decimal("0.10"): # Allow 10p rounding tolerance
issues.append({
"field": f"balance_row_{curr.row_index}",
"message": (
f"Balance discontinuity at row {curr.row_index}: "
f"expected ~{expected:.2f}, got {curr.balance_after:.2f}. "
f"Diff: {diff:.2f}. Possible missing rows."
),
})
return issues
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
# Main Orchestrator
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
[docs]
class CSVParser(BaseParser[ParsedBankStatement]):
"""
Orchestrates the 5-stage CSV parsing pipeline.
Entry point for all CSV bank statement parsing.
"""
def __init__(self, max_rows: int = 100_000):
self.max_rows = max_rows
self._reader = FileReader()
self._format_detector = FormatDetector()
self._post_processor = PostProcessor()
[docs]
def parse(self, content: bytes) -> ParsedBankStatement:
"""
Full pipeline: bytes โ ParsedBankStatement.
Args:
content: Raw file bytes.
Returns:
ParsedBankStatement with transactions and quality metadata.
Raises:
CSVEncodingError: Cannot decode file.
CSVMissingRequiredColumnsError: Mandatory columns absent.
CSVNoDataRowsError: No valid data rows after filtering.
CSVParseError: Unrecoverable structural error.
"""
logger.info("CSV parsing started", content_size=len(content))
# โโ Stage 1: Decode โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
text, encoding = self._reader.read(content)
# โโ Stage 2: Format Detection โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
delimiter, all_rows = self._format_detector.detect(text)
if not all_rows:
raise CSVParseError("File is empty or contains no parseable rows.")
header_row_idx = self._format_detector.find_header_row(all_rows)
header_row = all_rows[header_row_idx]
data_rows = all_rows[header_row_idx + 1 :]
column_mapping = self._format_detector.map_columns(header_row)
self._format_detector.validate_mapping(column_mapping, header_row)
raw_headers = {i: h.strip() for i, h in enumerate(header_row)}
# โโ Infer dayfirst from sample dates โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
if column_mapping.date is not None:
sample_dates = [
row[column_mapping.date]
for row in data_rows[:20]
if len(row) > column_mapping.date
]
dayfirst = infer_date_format_hint(sample_dates)
else:
dayfirst = True
# โโ Stage 3: Row Filtering โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
row_filter = RowFilter(expected_col_count=len(header_row))
row_parser = RowParser(mapping=column_mapping, dayfirst=dayfirst)
transactions: list[ParsedTransaction] = []
format_warnings: list[dict] = []
# Enforce max row limit
if len(data_rows) > self.max_rows:
format_warnings.append({
"field": "row_count",
"message": (
f"File has {len(data_rows)} rows; truncated to {self.max_rows}. "
"Consider splitting large files."
),
})
data_rows = data_rows[: self.max_rows]
# โโ Stage 4: Row Parsing โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
for csv_row_index, row in enumerate(data_rows, start=header_row_idx + 2):
if row_filter.should_skip(row, csv_row_index):
continue
try:
parsed_tx = row_parser.parse(row, csv_row_index)
except Exception as e:
logger.warning(
"Unexpected row parse error",
row_index=csv_row_index,
error=str(e),
)
row_filter.skipped.append({
"row_index": csv_row_index,
"reason": f"parse_error: {e}",
"content": row[:3],
})
continue
if parsed_tx is not None:
transactions.append(parsed_tx)
rows_skipped = len(row_filter.skipped)
logger.info(
"Row parsing complete",
parsed=len(transactions),
skipped=rows_skipped,
)
# โโ Stage 5: Post-processing โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
result = self._post_processor.process(
transactions=transactions,
raw_headers=raw_headers,
encoding=encoding,
delimiter=delimiter,
column_mapping=column_mapping,
rows_skipped=rows_skipped,
format_warnings=format_warnings,
)
logger.info(
"CSV parsing complete",
transactions=result.total_rows_parsed,
warnings=len(result.warnings),
currency=result.currency,
from_date=str(result.statement_from),
to_date=str(result.statement_to),
)
return result
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
# Helper functions
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
def _clean_description(raw: str) -> str | None:
"""Normalize description text: collapse whitespace, strip control chars."""
if not raw or not raw.strip():
return None
cleaned = re.sub(r"[\x00-\x1f\x7f]", " ", raw) # Strip control chars
cleaned = re.sub(r"\s+", " ", cleaned).strip()
return cleaned or None
def _infer_transaction_type(
explicit_raw: str,
description: str | None,
direction: str,
) -> TransactionType:
"""
Infer transaction type from explicit column value or description keywords.
Falls back to CREDIT/DEBIT based on direction if no keywords match.
"""
if explicit_raw:
normalized = explicit_raw.strip().lower()
type_map = {
"credit": TransactionType.CREDIT,
"cr": TransactionType.CREDIT,
"debit": TransactionType.DEBIT,
"dr": TransactionType.DEBIT,
"transfer": TransactionType.TRANSFER,
"fee": TransactionType.FEE,
"interest": TransactionType.INTEREST,
}
if normalized in type_map:
return type_map[normalized]
if description:
for pattern, tx_type in TYPE_KEYWORD_MAP:
if pattern.search(description):
return tx_type
# Fall back to direction-based type
return TransactionType.CREDIT if direction == "C" else TransactionType.DEBIT
def _most_common(items: list[str]) -> str:
"""Return the most frequently occurring item."""
return max(set(items), key=items.count)
def _detect_bank_format(raw_headers: dict) -> str:
"""
Attempt to identify the bank/format from header patterns.
Returns a format identifier string.
TODO: Add more bank-specific format fingerprints as needed.
"""
headers_lower = {v.lower() for v in raw_headers.values()}
if "narration" in headers_lower and "chq./ref.no." in headers_lower:
return "HDFC_STANDARD"
if "transaction remarks" in headers_lower:
return "ICICI_STANDARD"
if "particulars" in headers_lower and "cheque no" in headers_lower:
return "SBI_STANDARD"
if "memo" in headers_lower and "amount" in headers_lower:
return "CHASE_US"
if "description" in headers_lower and "debit" in headers_lower and "credit" in headers_lower:
return "GENERIC_SPLIT"
if "description" in headers_lower and "amount" in headers_lower:
return "GENERIC_SINGLE"
return "UNKNOWN"