from fastapi import APIRouter, UploadFile, File, HTTPException from fastapi.responses import StreamingResponse import io import json from app.core.engine.ingest import parse_file, get_column_metadata, dataframe_to_arrow_stream router = APIRouter(prefix="/upload", tags=["ingestion"]) @router.post("") async def upload_file(file: UploadFile = File(...)): """ Endpoint to upload Excel/CSV files and receive an Apache Arrow stream. Metadata about columns is sent in the X-Column-Metadata header. """ # 1. Validation if not file.filename.endswith(('.xlsx', '.xls', '.csv')): raise HTTPException(status_code=400, detail="Only .xlsx, .xls and .csv files are supported.") try: content = await file.read() # 2. Parsing df = parse_file(content, file.filename) # 3. Metadata Extraction metadata = get_column_metadata(df) # 4. Conversion to Arrow arrow_bytes = dataframe_to_arrow_stream(df) # We use a StreamingResponse to send the binary Arrow data. # Metadata is sent as a custom header (JSON stringified). return StreamingResponse( io.BytesIO(arrow_bytes), media_type="application/vnd.apache.arrow.stream", headers={ "X-Column-Metadata": json.dumps(metadata), "Access-Control-Expose-Headers": "X-Column-Metadata" } ) except Exception as e: # In a real app, we'd log this properly raise HTTPException(status_code=400, detail=f"Error processing file: {str(e)}")