327 lines
7.9 KiB
Python
327 lines
7.9 KiB
Python
"""
|
|
Sentiment Analysis Service
|
|
|
|
This module provides services for batch processing of tweets and posts,
|
|
storing sentiment scores in the database, and calculating aggregated metrics.
|
|
"""
|
|
|
|
from typing import List, Dict, Optional
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.ml.sentiment_analyzer import (
|
|
analyze_sentiment,
|
|
analyze_sentiment_batch,
|
|
calculate_aggregated_metrics
|
|
)
|
|
from app.models.sentiment_score import SentimentScore
|
|
from app.models.tweet import Tweet
|
|
from app.models.reddit_post import RedditPost
|
|
|
|
|
|
def process_tweet_sentiment(
|
|
db: Session,
|
|
tweet_id: str,
|
|
text: str
|
|
) -> SentimentScore:
|
|
"""
|
|
Analyze sentiment for a single tweet and store in database.
|
|
|
|
Args:
|
|
db: Database session
|
|
tweet_id: Tweet identifier
|
|
text: Tweet text to analyze
|
|
|
|
Returns:
|
|
Created SentimentScore record
|
|
"""
|
|
# Analyze sentiment
|
|
sentiment_result = analyze_sentiment(text)
|
|
|
|
# Create database record
|
|
sentiment_score = SentimentScore(
|
|
entity_id=tweet_id,
|
|
entity_type='tweet',
|
|
score=sentiment_result['compound'],
|
|
sentiment_type=sentiment_result['sentiment'],
|
|
positive=sentiment_result['positive'],
|
|
negative=sentiment_result['negative'],
|
|
neutral=sentiment_result['neutral']
|
|
)
|
|
|
|
db.add(sentiment_score)
|
|
db.commit()
|
|
db.refresh(sentiment_score)
|
|
|
|
return sentiment_score
|
|
|
|
|
|
def process_tweet_batch(
|
|
db: Session,
|
|
tweets: List[Tweet]
|
|
) -> List[SentimentScore]:
|
|
"""
|
|
Analyze sentiment for a batch of tweets and store in database.
|
|
|
|
Args:
|
|
db: Database session
|
|
tweets: List of Tweet models to analyze
|
|
|
|
Returns:
|
|
List of created SentimentScore records
|
|
"""
|
|
if not tweets:
|
|
return []
|
|
|
|
# Extract texts
|
|
texts = [tweet.text for tweet in tweets]
|
|
tweet_ids = [tweet.tweet_id for tweet in tweets]
|
|
|
|
# Analyze in batch
|
|
sentiment_results = analyze_sentiment_batch(texts)
|
|
|
|
# Create database records
|
|
sentiment_scores = []
|
|
for tweet_id, result in zip(tweet_ids, sentiment_results):
|
|
sentiment_score = SentimentScore(
|
|
entity_id=tweet_id,
|
|
entity_type='tweet',
|
|
score=result['compound'],
|
|
sentiment_type=result['sentiment'],
|
|
positive=result['positive'],
|
|
negative=result['negative'],
|
|
neutral=result['neutral']
|
|
)
|
|
sentiment_scores.append(sentiment_score)
|
|
|
|
# Batch insert
|
|
db.add_all(sentiment_scores)
|
|
db.commit()
|
|
|
|
# Refresh to get IDs
|
|
for score in sentiment_scores:
|
|
db.refresh(score)
|
|
|
|
return sentiment_scores
|
|
|
|
|
|
def process_reddit_post_sentiment(
|
|
db: Session,
|
|
post_id: str,
|
|
text: str
|
|
) -> SentimentScore:
|
|
"""
|
|
Analyze sentiment for a single Reddit post and store in database.
|
|
|
|
Args:
|
|
db: Database session
|
|
post_id: Reddit post identifier
|
|
text: Post text to analyze
|
|
|
|
Returns:
|
|
Created SentimentScore record
|
|
"""
|
|
# Analyze sentiment
|
|
sentiment_result = analyze_sentiment(text)
|
|
|
|
# Create database record
|
|
sentiment_score = SentimentScore(
|
|
entity_id=post_id,
|
|
entity_type='reddit_post',
|
|
score=sentiment_result['compound'],
|
|
sentiment_type=sentiment_result['sentiment'],
|
|
positive=sentiment_result['positive'],
|
|
negative=sentiment_result['negative'],
|
|
neutral=sentiment_result['neutral']
|
|
)
|
|
|
|
db.add(sentiment_score)
|
|
db.commit()
|
|
db.refresh(sentiment_score)
|
|
|
|
return sentiment_score
|
|
|
|
|
|
def process_reddit_post_batch(
|
|
db: Session,
|
|
posts: List[RedditPost]
|
|
) -> List[SentimentScore]:
|
|
"""
|
|
Analyze sentiment for a batch of Reddit posts and store in database.
|
|
|
|
Args:
|
|
db: Database session
|
|
posts: List of RedditPost models to analyze
|
|
|
|
Returns:
|
|
List of created SentimentScore records
|
|
"""
|
|
if not posts:
|
|
return []
|
|
|
|
# Extract texts (combine title and text if available)
|
|
texts = []
|
|
post_ids = []
|
|
for post in posts:
|
|
text = post.text if post.text else ""
|
|
full_text = f"{post.title} {text}"
|
|
texts.append(full_text)
|
|
post_ids.append(post.post_id)
|
|
|
|
# Analyze in batch
|
|
sentiment_results = analyze_sentiment_batch(texts)
|
|
|
|
# Create database records
|
|
sentiment_scores = []
|
|
for post_id, result in zip(post_ids, sentiment_results):
|
|
sentiment_score = SentimentScore(
|
|
entity_id=post_id,
|
|
entity_type='reddit_post',
|
|
score=result['compound'],
|
|
sentiment_type=result['sentiment'],
|
|
positive=result['positive'],
|
|
negative=result['negative'],
|
|
neutral=result['neutral']
|
|
)
|
|
sentiment_scores.append(sentiment_score)
|
|
|
|
# Batch insert
|
|
db.add_all(sentiment_scores)
|
|
db.commit()
|
|
|
|
# Refresh to get IDs
|
|
for score in sentiment_scores:
|
|
db.refresh(score)
|
|
|
|
return sentiment_scores
|
|
|
|
|
|
def get_sentiment_by_entity(
|
|
db: Session,
|
|
entity_id: str,
|
|
entity_type: str
|
|
) -> Optional[SentimentScore]:
|
|
"""
|
|
Retrieve sentiment score for a specific entity.
|
|
|
|
Args:
|
|
db: Database session
|
|
entity_id: Entity identifier
|
|
entity_type: Entity type ('tweet' or 'reddit_post')
|
|
|
|
Returns:
|
|
SentimentScore if found, None otherwise
|
|
"""
|
|
return db.query(SentimentScore).filter(
|
|
SentimentScore.entity_id == entity_id,
|
|
SentimentScore.entity_type == entity_type
|
|
).first()
|
|
|
|
|
|
def get_sentiments_by_match(
|
|
db: Session,
|
|
match_id: int
|
|
) -> List[SentimentScore]:
|
|
"""
|
|
Retrieve all sentiment scores for a specific match.
|
|
|
|
Args:
|
|
db: Database session
|
|
match_id: Match identifier
|
|
|
|
Returns:
|
|
List of SentimentScore records for the match
|
|
"""
|
|
# Join with tweets table to filter by match_id
|
|
return db.query(SentimentScore).join(
|
|
Tweet, Tweet.tweet_id == SentimentScore.entity_id
|
|
).filter(
|
|
Tweet.match_id == match_id,
|
|
SentimentScore.entity_type == 'tweet'
|
|
).all()
|
|
|
|
|
|
def calculate_match_sentiment_metrics(
|
|
db: Session,
|
|
match_id: int
|
|
) -> Dict:
|
|
"""
|
|
Calculate aggregated sentiment metrics for a match.
|
|
|
|
Args:
|
|
db: Database session
|
|
match_id: Match identifier
|
|
|
|
Returns:
|
|
Dictionary with aggregated metrics
|
|
"""
|
|
# Get all sentiments for the match
|
|
sentiments = get_sentiments_by_match(db, match_id)
|
|
|
|
if not sentiments:
|
|
return {
|
|
'match_id': match_id,
|
|
'total_count': 0,
|
|
'positive_count': 0,
|
|
'negative_count': 0,
|
|
'neutral_count': 0,
|
|
'positive_ratio': 0.0,
|
|
'negative_ratio': 0.0,
|
|
'neutral_ratio': 0.0,
|
|
'average_compound': 0.0
|
|
}
|
|
|
|
# Convert to list of dicts for calculate_aggregated_metrics
|
|
sentiment_dicts = [
|
|
{
|
|
'compound': s.score,
|
|
'sentiment': s.sentiment_type
|
|
}
|
|
for s in sentiments
|
|
]
|
|
|
|
# Calculate metrics
|
|
metrics = calculate_aggregated_metrics(sentiment_dicts)
|
|
metrics['match_id'] = match_id
|
|
|
|
return metrics
|
|
|
|
|
|
def get_global_sentiment_metrics(
|
|
db: Session
|
|
) -> Dict:
|
|
"""
|
|
Calculate global sentiment metrics across all entities.
|
|
|
|
Args:
|
|
db: Database session
|
|
|
|
Returns:
|
|
Dictionary with global aggregated metrics
|
|
"""
|
|
# Get all sentiment scores
|
|
all_sentiments = db.query(SentimentScore).all()
|
|
|
|
if not all_sentiments:
|
|
return {
|
|
'total_count': 0,
|
|
'positive_count': 0,
|
|
'negative_count': 0,
|
|
'neutral_count': 0,
|
|
'positive_ratio': 0.0,
|
|
'negative_ratio': 0.0,
|
|
'neutral_ratio': 0.0,
|
|
'average_compound': 0.0
|
|
}
|
|
|
|
# Convert to list of dicts
|
|
sentiment_dicts = [
|
|
{
|
|
'compound': s.score,
|
|
'sentiment': s.sentiment_type
|
|
}
|
|
for s in all_sentiments
|
|
]
|
|
|
|
# Calculate metrics
|
|
return calculate_aggregated_metrics(sentiment_dicts)
|