Initial commit
This commit is contained in:
56
backend/app/core/engine/ingest.py
Normal file
56
backend/app/core/engine/ingest.py
Normal file
@@ -0,0 +1,56 @@
|
||||
import pandas as pd
|
||||
import pyarrow as pa
|
||||
import io
|
||||
from typing import Tuple, Dict, Any
|
||||
|
||||
def parse_file(file_content: bytes, filename: str) -> pd.DataFrame:
|
||||
"""
|
||||
Parses the uploaded file (Excel or CSV) into a Pandas DataFrame.
|
||||
"""
|
||||
file_obj = io.BytesIO(file_content)
|
||||
|
||||
if filename.endswith(('.xlsx', '.xls')):
|
||||
df = pd.read_excel(file_obj)
|
||||
elif filename.endswith('.csv'):
|
||||
# Attempt to detect common delimiters if needed, default to comma
|
||||
df = pd.read_csv(file_obj)
|
||||
else:
|
||||
raise ValueError(f"Unsupported file format: {filename}")
|
||||
|
||||
# Basic hygiene: strip whitespace from headers
|
||||
df.columns = [str(c).strip() for c in df.columns]
|
||||
|
||||
return df
|
||||
|
||||
def get_column_metadata(df: pd.DataFrame) -> list:
|
||||
"""
|
||||
Returns a list of column metadata (name and inferred type).
|
||||
"""
|
||||
metadata = []
|
||||
for col in df.columns:
|
||||
dtype = str(df[col].dtype)
|
||||
# Simplify types for the frontend
|
||||
inferred_type = "numeric"
|
||||
if "object" in dtype or "string" in dtype:
|
||||
inferred_type = "categorical"
|
||||
elif "datetime" in dtype:
|
||||
inferred_type = "date"
|
||||
elif "bool" in dtype:
|
||||
inferred_type = "boolean"
|
||||
|
||||
metadata.append({
|
||||
"name": col,
|
||||
"type": inferred_type,
|
||||
"native_type": dtype
|
||||
})
|
||||
return metadata
|
||||
|
||||
def dataframe_to_arrow_stream(df: pd.DataFrame) -> bytes:
|
||||
"""
|
||||
Converts a Pandas DataFrame to an Apache Arrow IPC stream.
|
||||
"""
|
||||
table = pa.Table.from_pandas(df)
|
||||
sink = pa.BufferOutputStream()
|
||||
with pa.ipc.new_stream(sink, table.schema) as writer:
|
||||
writer.write_table(table)
|
||||
return sink.getvalue().to_pybytes()
|
||||
Reference in New Issue
Block a user