⛓️ AI Data Pipelines
The AI Data Pipeline manages the conversion of raw relational data and unstructured business documents into clean, searchable context embeddings for retrieval-augmented generation (RAG).
1. Ingestion Pipeline
The ingestion pipeline handles two distinct streams of data:
Unstructured Data (PDFs, Contracts, Policies)
- Extract: Documents uploaded to OCI Object Storage are intercepted by a worker.
- Clean: Scrapes formatting noise, parses tables, and isolates key-value lists.
- Chunk: Segments text into overlapping chunks (510 tokens, 10% overlap) using semantic boundaries rather than character lengths.
- Embed: Computes vector representations using a local Cohere embed-multilingual-v3 model.
- Index: Saves the embeddings to OCI Search with OpenSearch.
Structured Data (Relational DB Records)
- Database Sync: When invoices, customer notes, or project tasks are saved, an database event trigger pushes the record to an event broker (Kafka).
- Document Generation: A consumer parses the transaction data, compiles it into a readable text format (e.g. "Invoice PINV-041 issued by Al-Qahtani Trading to SABIC for amount SAR 15,000 including 15% VAT"), and embeds it for vector search.
2. Metadata Labeling & Context Enrichment
To ensure the retriever returns relevant search context, chunks are labeled with metadata tags:
tenant_company_id: Restricts query lookup to the user's specific business account.document_type: Categorizes chunks (e.g.,invoice,ledger_entry,hr_policy).gregorian_year/gregorian_month: Filters queries targeting specific fiscal periods.created_by_user_id: Tracks the originating owner of the context.
3. Real-Time Vector Index Synchronization
To prevent data drift (where RAG queries return outdated account records):
- Change Data Capture (CDC): Deletes or updates in the primary MariaDB database trigger CDC actions that target and invalidate or update the corresponding vector records in OpenSearch within 10 seconds.
- Stale Index Audits: An automated weekly cron job runs a checksum audit comparing MariaDB row counts with vector index documents per tenant, flagging index discrepancies for auto-rebuilding.