This commit is contained in:
Santiago Martinez-Avial
2025-12-19 20:13:00 +01:00
commit 97b7a15977
17 changed files with 1913 additions and 0 deletions

16
.env.example Normal file
View File

@@ -0,0 +1,16 @@
# LLM Configuration
# Defaults to OpenRouter if not specified
# Base URL for the LLM provider (default: https://openrouter.ai/api/v1)
HELIA_LLM_BASE_URL=https://openrouter.ai/api/v1
# API Key. Checked in order: HELIA_LLM_API_KEY, OPENROUTER_API_KEY, OPENAI_API_KEY
HELIA_LLM_API_KEY=sk-or-your-api-key-here
# Model identifier (default: google/gemini-3.0-pro-preview)
HELIA_LLM_MODEL=google/gemini-3.0-pro-preview
# Neo4j Configuration
NEO4J_URI=bolt://localhost:7687
NEO4J_USER=neo4j
NEO4J_PASSWORD=password

12
.gitignore vendored Normal file
View File

@@ -0,0 +1,12 @@
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info
# Virtual environments
.venv
daic-woz/

1
.python-version Normal file
View File

@@ -0,0 +1 @@
3.13

113
README.md Normal file
View File

@@ -0,0 +1,113 @@
# Helia
Agentic Interview Framework for ingesting, analyzing, and querying transcript data.
## Project Structure
```
src/helia/
├── agent/
│ └── workflow.py # LangGraph agent workflow
├── analysis/
│ └── extractor.py # LLM metadata extraction
├── graph/
│ ├── loader.py # Neo4j data loading
│ └── schema.py # Pydantic graph models
├── ingestion/
│ └── parser.py # Transcript parsing logic
└── main.py # CLI entry point
```
## Data Flow
```mermaid
graph TD
A[Transcript File<br/>TSV/TXT] -->|TranscriptParser| B(Utterance Objects)
B -->|MetadataExtractor<br/>+ OpenAI LLM| C(Enriched UtteranceNodes)
C -->|GraphLoader| D[(Neo4j Database)]
E[User Question] -->|LangGraph Agent| F{Router}
F -->|Graph Tool| D
F -->|Vector Tool| G[(Vector Store)]
D --> H[Context]
G --> H
H -->|Synthesizer| I[Answer]
```
1. **Ingestion**: `TranscriptParser` reads TSV/txt files into `Utterance` objects.
2. **Analysis**: `MetadataExtractor` enriches utterances with sentiment and tone using LLMs.
3. **Graph**: `GraphLoader` pushes nodes and relationships to Neo4j database.
4. **Agent**: ReAct workflow queries graph/vector data to answer user questions.
## Implemented Features
- Parse DAIC-WOZ transcripts and simple text formats.
- Extract metadata (sentiment, tone, speech acts) via OpenAI.
- Load `Utterance` and `Speaker` nodes into Neo4j.
- Run basic LangGraph agent with planner and router.
## Roadmap
- Add robust error handling for LLM API failures.
- Implement real `graph_tool` and `vector_tool` logic.
- Enhance agent planning capabilities.
- Add comprehensive test suite.
## Installation
Install the package using `uv`.
```sh
uv pip install helia
```
## Quick Start
Run the agent directly from the command line.
```sh
export OPENAI_API_KEY=sk-...
export NEO4J_URI=bolt://localhost:7687
export NEO4J_PASSWORD=password
python -m helia.main "How many interruptions occurred?"
```
## Usage
Parse a transcript file programmatically.
```python
from helia.ingestion.parser import TranscriptParser
from pathlib import Path
parser = TranscriptParser()
utterances = parser.parse(Path("transcript.tsv"))
```
Extract metadata from utterances.
```python
from helia.analysis.extractor import MetadataExtractor
extractor = MetadataExtractor()
nodes = extractor.extract(utterances)
```
Load data into Neo4j.
```python
from helia.graph.loader import GraphLoader
loader = GraphLoader()
loader.connect()
loader.load_utterances(nodes)
loader.close()
```
## Contributing
Fork the project and submit a pull request.
## License
This project is available as open source under the terms of the [MIT License](LICENSE).

27
docker-compose.yml Normal file
View File

@@ -0,0 +1,27 @@
services:
neo4j:
image: neo4j:5
container_name: helia-neo4j
ports:
- "7474:7474" # Neo4j Browser / HTTP
- "7687:7687" # Bolt
environment:
# Matches defaults in `src/helia/graph/loader.py`
- NEO4J_AUTH=neo4j/password
volumes:
- neo4j_data:/data
- neo4j_logs:/logs
qdrant:
image: qdrant/qdrant:latest
container_name: helia-qdrant
ports:
- "6333:6333" # HTTP
- "6334:6334" # gRPC
volumes:
- qdrant_storage:/qdrant/storage
volumes:
neo4j_data:
neo4j_logs:
qdrant_storage:

62
pyproject.toml Normal file
View File

@@ -0,0 +1,62 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "helia"
version = "0.1.0"
description = "Agentic Interview Analysis Framework"
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
"langchain>=0.1.0",
"langchain-openai>=0.1.0",
"langgraph",
"neo4j",
"qdrant-client",
"pydantic",
"openai",
"pydantic-settings>=2.12.0",
]
[tool.hatch.build.targets.wheel]
packages = ["src/helia"]
[dependency-groups]
dev = [
"ruff>=0.14.7",
"pyrefly>=0.43.1",
]
[tool.ruff]
line-length = 100
target-version = "py314"
[tool.ruff.lint]
extend-select = [
"F", # Pyflakes rules
"W", # PyCodeStyle warnings
"E", # PyCodeStyle errors
"I", # Sort imports properly
"UP", # Warn if certain things can changed due to newer Python versions
"C4", # Catch incorrect use of comprehensions, dict, list, etc
"FA", # Enforce from __future__ import annotations
"ISC", # Good use of string concatenation
"ICN", # Use common import conventions
"RET", # Good return practices
"SIM", # Common simplification rules
"TID", # Some good import practices
"TC", # Enforce importing certain types in a TYPE_CHECKING block
"PTH", # Use pathlib instead of os.path
"TD", # Be diligent with TODO comments
"NPY", # Numpy-specific rules
"COM", # enforce trailing comma rules
"DTZ", # require strict timezone manipulation with datetime
"FBT", # detect boolean traps
"N", # enforce naming conventions, e.g. ClassName vs function_name
]
ignore = ["E501", "COM812", "TD003"]
[tool.pyrefly]
search-path = ["src"]
project-includes = ["**/*.py*", "**/*.ipynb"]

0
src/helia/__init__.py Normal file
View File

108
src/helia/agent/workflow.py Normal file
View File

@@ -0,0 +1,108 @@
from __future__ import annotations
from typing import Any
from langgraph.graph import END, StateGraph
from typing_extensions import TypedDict
from helia.llm.client import get_openai_client
from helia.llm.settings import settings
class AgentState(TypedDict):
question: str
plan: list[str]
context: list[str]
answer: str
critique: str | None
def planner_node(state: AgentState) -> dict[str, Any]:
plan: list[str] = ["Understand question", "Retrieve info", "Synthesize answer"]
return {"plan": plan}
def router_node(state: AgentState) -> str:
question = state["question"].lower()
if "how many" in question or "when" in question:
return "graph_tool"
return "vector_tool"
def graph_tool_node(state: AgentState) -> dict[str, Any]:
context = [*state["context"]]
context.append("Graph data: Interruption count = 5")
return {"context": context}
def vector_tool_node(state: AgentState) -> dict[str, Any]:
context = [*state["context"]]
context.append("Vector data: Discussed salary at 10:00")
return {"context": context}
def synthesizer_node(state: AgentState) -> dict[str, Any]:
context_text = "\n".join(state["context"])
question = state["question"]
prompt = f"""
Answer the user's question based on the provided context.
Context:
{context_text}
Question: {question}
Answer:
"""
try:
client = get_openai_client()
response = client.chat.completions.create(
model=settings.model,
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": prompt},
],
)
answer = response.choices[0].message.content or "No answer generated."
except Exception as e:
answer = f"Error generating answer: {e}. Fallback: Based on context: {context_text}, here is the answer."
return {"answer": answer}
def reflector_node(state: AgentState) -> dict[str, Any]:
return {"critique": "Answer appears sufficient."}
workflow: Any = StateGraph(AgentState)
workflow.add_node("planner", planner_node)
workflow.add_node("graph_tool", graph_tool_node)
workflow.add_node("vector_tool", vector_tool_node)
workflow.add_node("synthesizer", synthesizer_node)
workflow.add_node("reflector", reflector_node)
workflow.set_entry_point("planner")
workflow.add_conditional_edges(
"planner", router_node, {"graph_tool": "graph_tool", "vector_tool": "vector_tool"}
)
workflow.add_edge("graph_tool", "synthesizer")
workflow.add_edge("vector_tool", "synthesizer")
workflow.add_edge("synthesizer", "reflector")
workflow.add_edge("reflector", END)
def run_agent(question: str) -> dict[str, Any]:
app = workflow.compile()
inputs: AgentState = {
"question": question,
"plan": [],
"context": [],
"answer": "",
"critique": None,
}
return app.invoke(inputs)

View File

@@ -0,0 +1,92 @@
from __future__ import annotations
import json
from typing import TYPE_CHECKING, Any
from helia.graph.schema import UtteranceNode
from helia.llm.client import get_openai_client
from helia.llm.settings import settings
if TYPE_CHECKING:
from helia.ingestion.parser import Utterance
class MetadataExtractor:
def __init__(self):
self.llm = get_openai_client()
def extract(self, utterances: list[Utterance]) -> list[UtteranceNode]:
nodes: list[UtteranceNode] = []
window_size = 3
for i, utt in enumerate(utterances):
if i > 0:
prev_utt = utterances[i - 1]
if (
utt.start_time is not None
and prev_utt.end_time is not None
and utt.start_time < prev_utt.end_time
):
utt.metadata["is_interrupted"] = True
prev_utt.metadata["was_interrupted_by"] = utt.id
start_idx = max(0, i - window_size + 1)
context_window = utterances[start_idx : i + 1]
metadata = self._analyze_with_llm(utt, context_window)
utt.metadata.update(metadata)
node = UtteranceNode(
id=utt.id,
speaker_id=utt.speaker,
text=utt.text,
start_time=utt.start_time if utt.start_time is not None else 0.0,
end_time=utt.end_time if utt.end_time is not None else 0.0,
sentiment=metadata.get("sentiment"),
tone=metadata.get("tone"),
speech_act=metadata.get("speech_act"),
)
nodes.append(node)
return nodes
def _analyze_with_llm(self, target_utt: Utterance, context: list[Utterance]) -> dict[str, Any]:
"""
Constructs the prompt and calls the LLM.
"""
context_text = "\n".join([f"{u.speaker}: {u.text}" for u in context])
prompt = f"""
Analyze the last utterance in this conversation context:
CONTEXT:
{context_text}
Analyze the LAST utterance (by {target_utt.speaker}) for:
1. Sentiment (Positive, Negative, Neutral)
2. Tone (e.g., Confident, Hesitant, Aggressive, Polite, etc.)
3. Speech Act (e.g., Question, Statement, Agreement, Disagreement, etc.)
Return ONLY valid JSON with keys: "sentiment", "tone", "speech_act".
"""
try:
response = self.llm.chat.completions.create(
model=settings.model,
messages=[
{
"role": "system",
"content": "You are an expert linguistic analyst. Output JSON only.",
},
{"role": "user", "content": prompt},
],
response_format={"type": "json_object"},
)
content = response.choices[0].message.content
if content:
return json.loads(content)
except Exception as e:
print(f"LLM analysis failed: {e}")
# Fallback if LLM fails
return {"sentiment": "Neutral", "tone": "Confident", "speech_act": "Statement"}

95
src/helia/graph/loader.py Normal file
View File

@@ -0,0 +1,95 @@
import os
from typing import TYPE_CHECKING
from neo4j import Driver, GraphDatabase
if TYPE_CHECKING:
from helia.graph.schema import UtteranceNode
class GraphLoader:
def __init__(
self, uri: str | None = None, user: str | None = None, password: str | None = None
):
self.uri = uri or os.environ.get("NEO4J_URI", "bolt://localhost:7687")
self.user = user or os.environ.get("NEO4J_USER", "neo4j")
self.password = password or os.environ.get("NEO4J_PASSWORD", "password")
self.driver: Driver | None = None
def connect(self):
driver = GraphDatabase.driver(self.uri, auth=(self.user, self.password))
driver.verify_connectivity()
self.driver = driver
print(f"Connected to Neo4j at {self.uri}")
def close(self):
if self.driver:
self.driver.close()
def clear_database(self):
"""Clears all nodes and relationships. Use with caution!"""
if not self.driver:
return
with self.driver.session() as session:
session.run("MATCH (n) DETACH DELETE n")
def load_utterances(self, nodes: list[UtteranceNode]):
"""
Loads a list of enriched UtteranceNodes into Neo4j.
Creates Speaker nodes, Utterance nodes, and the NEXT chain.
"""
if not self.driver:
raise RuntimeError("Driver not connected.")
with self.driver.session() as session:
for i, node in enumerate(nodes):
session.run(
"""
MERGE (u:Utterance {id: $id})
SET u.text = $text,
u.start_time = $start_time,
u.end_time = $end_time,
u.sentiment = $sentiment,
u.tone = $tone,
u.speech_act = $speech_act
""",
node.model_dump(),
)
if i > 0:
prev_node = nodes[i - 1]
session.run(
"""
MATCH (prev:Utterance {id: $prev_id})
MATCH (curr:Utterance {id: $curr_id})
MERGE (prev)-[:NEXT]->(curr)
""",
prev_id=prev_node.id,
curr_id=node.id,
)
session.run(
"""
MERGE (s:Speaker {id: $speaker_id})
WITH s
MATCH (u:Utterance {id: $utterance_id})
MERGE (s)-[:SPOKE]->(u)
""",
speaker_id=node.speaker_id,
utterance_id=node.id,
)
def create_interruption(self, interrupter_id: str, interrupted_id: str):
if not self.driver:
return
with self.driver.session() as session:
session.run(
"""
MATCH (a:Utterance {id: $interrupter_id})
MATCH (b:Utterance {id: $interrupted_id})
MERGE (a)-[:INTERRUPTED]->(b)
""",
interrupter_id=interrupter_id,
interrupted_id=interrupted_id,
)

55
src/helia/graph/schema.py Normal file
View File

@@ -0,0 +1,55 @@
from pydantic import BaseModel, Field
class SpeakerNode(BaseModel):
id: str = Field(..., description="Unique identifier for the speaker (e.g., 'speaker_01')")
name: str | None = Field(None, description="Real name if known")
role: str | None = Field(
None, description="Role in the conversation (e.g., 'Interviewer', 'Candidate')"
)
class UtteranceNode(BaseModel):
id: str = Field(..., description="Unique ID for the utterance")
speaker_id: str = Field(..., description="ID of the speaker who said this")
text: str = Field(..., description="The content of the speech")
start_time: float
end_time: float
# Metadata extracted by the agent
sentiment: str | None = Field(None, description="Sentiment: Positive, Negative, Neutral")
tone: str | None = Field(None, description="Tone: Aggressive, Hesitant, Confident")
speech_act: str | None = Field(None, description="Type: Question, Statement, Agreement")
class TopicNode(BaseModel):
name: str = Field(..., description="Topic name (e.g., 'Salary', 'Project X')")
description: str | None = None
class SpokeRel(BaseModel):
"""(Speaker)-[:SPOKE]->(Utterance)"""
speaker_id: str
utterance_id: str
class NextRel(BaseModel):
"""(Utterance A)-[:NEXT]->(Utterance B)"""
from_id: str
to_id: str
time_gap: float = 0.0
class InterruptedRel(BaseModel):
"""(Utterance A)-[:INTERRUPTED]->(Utterance B)"""
interrupter_utterance_id: str
interrupted_utterance_id: str
class MentionsRel(BaseModel):
"""(Utterance)-[:MENTIONS]->(Topic)"""
utterance_id: str
topic_name: str

View File

@@ -0,0 +1,66 @@
import csv
from typing import TYPE_CHECKING
from pydantic import BaseModel
if TYPE_CHECKING:
from pathlib import Path
class Utterance(BaseModel):
"""
Represents a single turn in a conversation.
"""
id: str
speaker: str
text: str
start_time: float | None = None
end_time: float | None = None
metadata: dict = {}
@property
def duration(self) -> float:
if self.start_time is not None and self.end_time is not None:
return self.end_time - self.start_time
return 0.0
class TranscriptParser:
def parse(self, file_path: Path) -> list[Utterance]:
with file_path.open(encoding="utf-8") as f:
lines = f.readlines()
if not lines:
return []
header = lines[0].strip()
if header == "start_time\tstop_time\tspeaker\tvalue":
return self._parse_tsv(lines[1:])
return self._parse_simple(lines)
def _parse_tsv(self, lines: list[str]) -> list[Utterance]:
reader = csv.DictReader(
lines, fieldnames=["start_time", "stop_time", "speaker", "value"], delimiter="\t"
)
return [
Utterance(
id=f"u_{i}",
speaker=row["speaker"],
text=row["value"].strip(),
start_time=float(row["start_time"]),
end_time=float(row["stop_time"]),
)
for i, row in enumerate(reader)
]
def _parse_simple(self, lines: list[str]) -> list[Utterance]:
utterances = []
for i, line in enumerate(lines):
if ":" in line:
speaker, text = line.split(":", 1)
utterances.append(
Utterance(id=f"u_{i}", speaker=speaker.strip(), text=text.strip())
)
return utterances

View File

@@ -0,0 +1,4 @@
from helia.llm.client import get_openai_client
from helia.llm.settings import settings
__all__ = ["get_openai_client", "settings"]

18
src/helia/llm/client.py Normal file
View File

@@ -0,0 +1,18 @@
from openai import OpenAI
from helia.llm.settings import settings
def get_openai_client() -> OpenAI:
"""
Returns an configured OpenAI client based on global settings.
Defaults to OpenRouter base_url if not specified otherwise.
"""
api_key = settings.resolve_api_key()
return OpenAI(
base_url=settings.base_url,
api_key=api_key,
timeout=settings.timeout,
max_retries=settings.max_retries,
)

65
src/helia/llm/settings.py Normal file
View File

@@ -0,0 +1,65 @@
import os
from typing import Final
from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict
class LLMSettings(BaseSettings):
"""
Configuration for LLM clients, defaulting to OpenRouter.
"""
api_key: str | None = Field(
default=None,
description="API key for the LLM provider. Checks HELIA_LLM_API_KEY, OPENROUTER_API_KEY, then OPENAI_API_KEY.",
)
base_url: str = Field(
default="https://openrouter.ai/api/v1",
description="Base URL for the LLM provider. Defaults to OpenRouter.",
)
model: str = Field(
default="google/gemini-3.0-pro-preview",
description="Model identifier to use.",
)
timeout: float = Field(
default=30.0,
description="Request timeout in seconds.",
)
max_retries: int = Field(
default=2,
description="Maximum number of retries for failed requests.",
)
model_config = SettingsConfigDict(
env_prefix="HELIA_LLM_",
case_sensitive=False,
extra="ignore",
)
def resolve_api_key(self) -> str:
"""
Resolves the API key with a fallback strategy:
1. configured api_key (from HELIA_LLM_API_KEY)
2. OPENROUTER_API_KEY env var
3. OPENAI_API_KEY env var
4. Raise ValueError if none found
"""
if self.api_key:
return self.api_key
# Fallback 1: OpenRouter
if key := os.environ.get("OPENROUTER_API_KEY"):
return key
# Fallback 2: OpenAI
if key := os.environ.get("OPENAI_API_KEY"):
return key
raise ValueError(
"No API key found. Please set HELIA_LLM_API_KEY, OPENROUTER_API_KEY, or OPENAI_API_KEY."
)
# Singleton instance for easy import
settings: Final[LLMSettings] = LLMSettings()

21
src/helia/main.py Normal file
View File

@@ -0,0 +1,21 @@
import sys
def main():
from helia.agent.workflow import run_agent
print("Initializing Agentic Interview Framework...")
if len(sys.argv) > 1:
question = " ".join(sys.argv[1:])
else:
question = "How many times did the interviewer interrupt?"
print(f"\nRunning Re-Agent with question: '{question}'\n")
result = run_agent(question)
print(result["answer"])
if __name__ == "__main__":
main()

1158
uv.lock generated Normal file

File diff suppressed because it is too large Load Diff