45 lines
1.6 KiB
Python
45 lines
1.6 KiB
Python
from fastapi import APIRouter, UploadFile, File, HTTPException
|
|
from fastapi.responses import StreamingResponse
|
|
import io
|
|
import json
|
|
from app.core.engine.ingest import parse_file, get_column_metadata, dataframe_to_arrow_stream
|
|
|
|
router = APIRouter(prefix="/upload", tags=["ingestion"])
|
|
|
|
@router.post("")
|
|
async def upload_file(file: UploadFile = File(...)):
|
|
"""
|
|
Endpoint to upload Excel/CSV files and receive an Apache Arrow stream.
|
|
Metadata about columns is sent in the X-Column-Metadata header.
|
|
"""
|
|
# 1. Validation
|
|
if not file.filename.endswith(('.xlsx', '.xls', '.csv')):
|
|
raise HTTPException(status_code=400, detail="Only .xlsx, .xls and .csv files are supported.")
|
|
|
|
try:
|
|
content = await file.read()
|
|
|
|
# 2. Parsing
|
|
df = parse_file(content, file.filename)
|
|
|
|
# 3. Metadata Extraction
|
|
metadata = get_column_metadata(df)
|
|
|
|
# 4. Conversion to Arrow
|
|
arrow_bytes = dataframe_to_arrow_stream(df)
|
|
|
|
# We use a StreamingResponse to send the binary Arrow data.
|
|
# Metadata is sent as a custom header (JSON stringified).
|
|
return StreamingResponse(
|
|
io.BytesIO(arrow_bytes),
|
|
media_type="application/vnd.apache.arrow.stream",
|
|
headers={
|
|
"X-Column-Metadata": json.dumps(metadata),
|
|
"Access-Control-Expose-Headers": "X-Column-Metadata"
|
|
}
|
|
)
|
|
|
|
except Exception as e:
|
|
# In a real app, we'd log this properly
|
|
raise HTTPException(status_code=400, detail=f"Error processing file: {str(e)}")
|