إنتقل إلى المحتوى الرئيسي

⛓️ AI Data Pipelines

The AI Data Pipeline manages the conversion of raw relational data and unstructured business documents into clean, searchable context embeddings for retrieval-augmented generation (RAG).


1. Ingestion Pipeline

The ingestion pipeline handles two distinct streams of data:

Unstructured Data (PDFs, Contracts, Policies)

  1. Extract: Documents uploaded to OCI Object Storage are intercepted by a worker.
  2. Clean: Scrapes formatting noise, parses tables, and isolates key-value lists.
  3. Chunk: Segments text into overlapping chunks (510 tokens, 10% overlap) using semantic boundaries rather than character lengths.
  4. Embed: Computes vector representations using a local Cohere embed-multilingual-v3 model.
  5. Index: Saves the embeddings to OCI Search with OpenSearch.

Structured Data (Relational DB Records)

  • Database Sync: When invoices, customer notes, or project tasks are saved, an database event trigger pushes the record to an event broker (Kafka).
  • Document Generation: A consumer parses the transaction data, compiles it into a readable text format (e.g. "Invoice PINV-041 issued by Al-Qahtani Trading to SABIC for amount SAR 15,000 including 15% VAT"), and embeds it for vector search.

2. Metadata Labeling & Context Enrichment

To ensure the retriever returns relevant search context, chunks are labeled with metadata tags:

  • tenant_company_id: Restricts query lookup to the user's specific business account.
  • document_type: Categorizes chunks (e.g., invoice, ledger_entry, hr_policy).
  • gregorian_year / gregorian_month: Filters queries targeting specific fiscal periods.
  • created_by_user_id: Tracks the originating owner of the context.

3. Real-Time Vector Index Synchronization

To prevent data drift (where RAG queries return outdated account records):

  • Change Data Capture (CDC): Deletes or updates in the primary MariaDB database trigger CDC actions that target and invalidate or update the corresponding vector records in OpenSearch within 10 seconds.
  • Stale Index Audits: An automated weekly cron job runs a checksum audit comparing MariaDB row counts with vector index documents per tenant, flagging index discrepancies for auto-rebuilding.