excel_translator/translator_app.py

417 lines
16 KiB
Python

import streamlit as st
import requests
import pandas as pd
import time
import os
from datetime import datetime
import json
import base64
# Configuration
API_URL = "http://localhost:8000" # Change if your API is hosted elsewhere
# Page configuration
st.set_page_config(
page_title="Excel Translator",
page_icon="📊",
layout="wide",
initial_sidebar_state="expanded"
)
# Custom CSS for better styling
st.markdown("""
<style>
.main-header {
font-size: 2.5rem !important;
font-weight: 700 !important;
color: #1E88E5 !important;
}
.sub-header {
font-size: 1.5rem !important;
font-weight: 600 !important;
color: #0277BD !important;
margin-bottom: 1rem !important;
}
.status-completed {
color: #4CAF50 !important;
font-weight: bold !important;
}
.status-processing {
color: #FF9800 !important;
font-weight: bold !important;
}
.status-failed {
color: #F44336 !important;
font-weight: bold !important;
}
.status-pending {
color: #9E9E9E !important;
font-weight: bold !important;
}
.divider {
margin-top: 1.5rem !important;
margin-bottom: 1.5rem !important;
border-bottom: 1px solid #E0E0E0;
}
.card {
background-color: #F5F5F5;
border-radius: 10px;
padding: 20px;
box-shadow: 0 4px 6px rgba(0,0,0,0.1);
margin-bottom: 20px;
}
.download-btn {
background-color: #4CAF50;
color: white;
padding: 8px 16px;
text-align: center;
text-decoration: none;
display: inline-block;
font-size: 16px;
border-radius: 4px;
cursor: pointer;
margin: 10px 0;
}
.stProgress > div > div > div > div {
background-color: #1E88E5;
}
</style>
""", unsafe_allow_html=True)
# Helper Functions for API
def upload_file(file):
"""Upload file to the API"""
files = {"file": file}
response = requests.post(f"{API_URL}/upload/", files=files)
if response.status_code == 200:
return response.json()
else:
st.error(f"Error uploading file: {response.text}")
return None
def start_translation(file_id, target_language, translation_method, llm_model):
"""Start translation job via API"""
payload = {
"file_id": file_id,
"target_language": target_language,
"translation_method": translation_method,
"llm_model": llm_model
}
response = requests.post(f"{API_URL}/translate/", json=payload)
if response.status_code == 200:
return response.json()
else:
st.error(f"Error starting translation: {response.text}")
return None
def get_job_status(job_id):
"""Get status of a translation job"""
response = requests.get(f"{API_URL}/jobs/{job_id}")
if response.status_code == 200:
return response.json()
else:
st.error(f"Error getting job status: {response.text}")
return None
def get_all_jobs():
"""Get all translation jobs"""
response = requests.get(f"{API_URL}/jobs/")
if response.status_code == 200:
return response.json()
else:
st.error(f"Error getting jobs: {response.text}")
return []
def download_file(job_id):
"""Download file from the API and return as bytes"""
response = requests.get(f"{API_URL}/download/{job_id}", stream=True)
if response.status_code == 200:
return response.content
else:
st.error(f"Error downloading file: {response.text}")
return None
def get_download_button(job_id, filename):
"""Create a download button for the translated file"""
file_content = download_file(job_id)
if file_content:
b64 = base64.b64encode(file_content).decode()
dl_link = f"""
<a href="data:application/vnd.openxmlformats-officedocument.spreadsheetml.sheet;base64,{b64}"
download="{filename}" class="download-btn">
📥 Download Translated File
</a>
"""
return dl_link
return None
# Initialize session state for storing jobs
if 'jobs' not in st.session_state:
st.session_state.jobs = []
if 'job_details' not in st.session_state:
st.session_state.job_details = {}
if 'refresh_counter' not in st.session_state:
st.session_state.refresh_counter = 0
if 'current_job_id' not in st.session_state:
st.session_state.current_job_id = None
def refresh_jobs():
"""Force refresh job list"""
st.session_state.refresh_counter += 1
st.session_state.jobs = get_all_jobs()
# Application Header
st.markdown('<p class="main-header">Excel Translator</p>', unsafe_allow_html=True)
st.markdown("Easily translate your Excel files while preserving all formatting")
# Create tabs for different sections
tab1, tab2 = st.tabs(["Translate New File", "Translation History"])
with tab1:
col1, col2 = st.columns([3, 2])
with col1:
st.markdown('<div class="card">', unsafe_allow_html=True)
st.markdown('<p class="sub-header">Upload File</p>', unsafe_allow_html=True)
uploaded_file = st.file_uploader("Choose an Excel file (.xlsx, .xls)", type=["xlsx", "xls"])
if uploaded_file is not None:
# Show file info
file_details = {
"Filename": uploaded_file.name,
"File size": f"{round(uploaded_file.size / 1024, 2)} KB"
}
st.write("File Information:")
for key, value in file_details.items():
st.write(f"- {key}: {value}")
st.markdown('</div>', unsafe_allow_html=True)
with col2:
st.markdown('<div class="card">', unsafe_allow_html=True)
st.markdown('<p class="sub-header">Translation Options</p>', unsafe_allow_html=True)
language_options = {
"English": "en",
"French": "fr",
"German": "de",
"Spanish": "es",
"Italian": "it",
"Portuguese": "pt",
"Dutch": "nl",
"Persian" : "fa",
"Russian": "ru",
"Chinese (Simplified)": "zh-CN",
"Japanese": "ja",
"Korean": "ko",
"Arabic": "ar"
}
language_name = st.selectbox("Target Language", list(language_options.keys()))
target_language = language_options[language_name]
translation_method = st.radio(
"Translation Method",
["google", "llm"],
format_func=lambda x: "Google Translate" if x == "google" else "LLM (AI Model)"
)
llm_model = "llama3.1:8b"
if translation_method == "llm":
llm_model = st.selectbox(
"Select LLM Model",
["llama3.1:8b", "llama3.1:70b", "mistral:7b"]
)
st.markdown('</div>', unsafe_allow_html=True)
# Submit button
st.markdown('<div class="divider"></div>', unsafe_allow_html=True)
col1, col2, col3 = st.columns([2, 3, 2])
with col2:
submit_button = st.button("Start Translation", type="primary", use_container_width=True)
# Handle file submission
if submit_button and uploaded_file is not None:
with st.spinner("Uploading file..."):
upload_result = upload_file(uploaded_file)
if upload_result and 'file_id' in upload_result:
st.success(f"File uploaded successfully: {upload_result['file_name']}")
with st.spinner("Starting translation job..."):
job_result = start_translation(
upload_result['file_id'],
target_language,
translation_method,
llm_model
)
if job_result and 'job_id' in job_result:
st.success(f"Translation job started! Job ID: {job_result['job_id']}")
st.session_state.job_details[job_result['job_id']] = job_result
st.session_state.current_job_id = job_result['job_id']
st.session_state.jobs = get_all_jobs()
# Create a progress indicator that will auto-refresh
st.info("Your translation is processing. You can track progress below or in the Translation History tab.")
# Add progress tracker for this job
progress_placeholder = st.empty()
status_placeholder = st.empty()
download_placeholder = st.empty()
job_id = job_result['job_id']
# Initial progress at 0
progress_bar = progress_placeholder.progress(0)
status_placeholder.text("Starting translation...")
# Auto-refresh for 10 minutes max (600 seconds)
max_wait = 600
start_time = time.time()
completed = False
while time.time() - start_time < max_wait and not completed:
# Get current job status
current_status = get_job_status(job_id)
if current_status:
status = current_status.get('status')
# Update status message
if status == "pending":
status_placeholder.text("Waiting in queue...")
progress_value = 0.1
elif status == "processing":
status_placeholder.text("Processing translation...")
progress_value = 0.5
elif status == "completed":
status_placeholder.text("Translation completed!")
progress_value = 1.0
completed = True
elif status == "failed":
status_placeholder.text(f"Translation failed: {current_status.get('error', 'Unknown error')}")
progress_value = 1.0
completed = True
else:
status_placeholder.text(f"Status: {status}")
progress_value = 0.3
# Update progress bar
progress_bar.progress(progress_value)
# Show download button when completed
if status == "completed":
file_name = current_status.get('file_name', 'translated_file.xlsx')
download_button = get_download_button(job_id, file_name)
if download_button:
download_placeholder.markdown(download_button, unsafe_allow_html=True)
# Wait before next check
if not completed:
time.sleep(3)
if not completed:
status_placeholder.text("Still processing. Please check the Translation History tab for updates.")
elif submit_button:
st.error("Please upload an Excel file first.")
with tab2:
# Refresh button for jobs
col1, col2 = st.columns([6, 1])
with col2:
st.button("Refresh", on_click=refresh_jobs)
# Get the latest jobs data when refresh counter changes
if st.session_state.refresh_counter or not st.session_state.jobs:
st.session_state.jobs = get_all_jobs()
if st.session_state.jobs and len(st.session_state.jobs) > 0:
st.markdown('<p class="sub-header">Translation Jobs</p>', unsafe_allow_html=True)
# Sort jobs by created_at, latest first
jobs = sorted(st.session_state.jobs, key=lambda x: x.get('created_at', ''), reverse=True)
for job in jobs:
job_id = job.get('job_id')
status = job.get('status', 'unknown')
# Create a card for each job
st.markdown('<div class="card">', unsafe_allow_html=True)
col1, col2, col3 = st.columns([2, 2, 1])
with col1:
st.markdown(f"**File:** {job.get('file_name')}")
st.markdown(f"**Target Language:** {job.get('target_language')}")
st.markdown(f"**Method:** {job.get('translation_method')}")
with col2:
status_class = f"status-{status}"
st.markdown(f"**Status:** <span class='{status_class}'>{status.upper()}</span>", unsafe_allow_html=True)
created_time = job.get('created_at', '')
if created_time:
try:
dt = datetime.fromisoformat(created_time)
st.markdown(f"**Created:** {dt.strftime('%Y-%m-%d %H:%M:%S')}")
except:
st.markdown(f"**Created:** {created_time}")
if status == "completed" and job.get('completed_at'):
try:
completed_dt = datetime.fromisoformat(job.get('completed_at'))
st.markdown(f"**Completed:** {completed_dt.strftime('%Y-%m-%d %H:%M:%S')}")
except:
st.markdown(f"**Completed:** {job.get('completed_at')}")
with col3:
# Show progress bar based on status
if status == "pending":
st.progress(0.1)
elif status == "processing":
st.progress(0.5)
st.text("Processing...")
elif status == "completed":
st.progress(1.0)
# Create download button
file_name = job.get('file_name', 'translated_file.xlsx')
download_button = get_download_button(job_id, file_name)
if download_button:
st.markdown(download_button, unsafe_allow_html=True)
elif status == "failed":
st.progress(1.0)
st.error(f"Error: {job.get('error', 'Unknown error')}")
st.markdown('</div>', unsafe_allow_html=True)
else:
st.info("No translation jobs found. Start a new translation to see it here.")
# Auto-refresh functionality
if st.session_state.jobs:
st.markdown("""
<script>
var timeout = setTimeout(function() {
window.location.reload();
}, 30000); // Refresh every 30 seconds
</script>
""", unsafe_allow_html=True)
# Footer
st.markdown('<div class="divider"></div>', unsafe_allow_html=True)
st.markdown(
"""
<div style="text-align: center; color: gray; font-size: 0.8rem;">
Excel Translator App | Powered by FastAPI & Streamlit
</div>
""",
unsafe_allow_html=True
)