final code changes

This commit is contained in:
Madhu
2024-12-21 11:54:53 +05:30
parent 42cee9621a
commit abc0c10e55

View File

@@ -5,6 +5,7 @@ import json
import requests
import PyPDF2
from datetime import datetime, timedelta
import pytz
import streamlit as st
from phi.agent import Agent
@@ -13,33 +14,14 @@ from phi.tools.email import EmailTools
from phi.tools.zoom import ZoomTool
from phi.utils.log import logger
from dotenv import load_dotenv
from streamlit_pdf_viewer import pdf_viewer
load_dotenv()
# Constants
FROM_EMAIL = "ryomensukuna64@gmail.com"
ACCOUNT_ID = os.getenv("ZOOM_ACCOUNT_ID")
CLIENT_ID = os.getenv("ZOOM_CLIENT_ID")
CLIENT_SECRET = os.getenv("ZOOM_CLIENT_SECRET")
class CustomZoomTool(ZoomTool):
def __init__(
self,
account_id: Optional[str] = None,
client_id: Optional[str] = None,
client_secret: Optional[str] = None,
name: str = "zoom_tool",
):
super().__init__(
account_id=account_id,
client_id=client_id,
client_secret=client_secret,
name=name
)
def __init__(self, *, account_id: Optional[str] = None, client_id: Optional[str] = None, client_secret: Optional[str] = None, name: str = "zoom_tool"):
super().__init__(account_id=account_id, client_id=client_id, client_secret=client_secret, name=name)
self.token_url = "https://zoom.us/oauth/token"
self.access_token = None
self.token_expires_at = 0
@@ -47,16 +29,12 @@ class CustomZoomTool(ZoomTool):
def get_access_token(self) -> str:
if self.access_token and time.time() < self.token_expires_at:
return str(self.access_token)
headers = {"Content-Type": "application/x-www-form-urlencoded"}
data = {"grant_type": "account_credentials", "account_id": self.account_id}
try:
response = requests.post(
self.token_url,
headers=headers,
data=data,
auth=(self.client_id, self.client_secret) # Use basic auth
)
response = requests.post(self.token_url, headers=headers, data=data, auth=(self.client_id, self.client_secret))
response.raise_for_status()
token_info = response.json()
@@ -64,7 +42,6 @@ class CustomZoomTool(ZoomTool):
expires_in = token_info["expires_in"]
self.token_expires_at = time.time() + expires_in - 60
# Update this line to use the helper method
self._set_parent_token(str(self.access_token))
return str(self.access_token)
@@ -114,11 +91,9 @@ ROLE_REQUIREMENTS: Dict[str, str] = {
def init_session_state() -> None:
"""Initialize only necessary session state variables."""
defaults = {
'candidate_email': "",
'openai_api_key': "",
'resume_text': "",
'analysis_complete': False,
'is_selected': False
'candidate_email': "", 'openai_api_key': "", 'resume_text': "", 'analysis_complete': False,
'is_selected': False, 'zoom_account_id': "", 'zoom_client_id': "", 'zoom_client_secret': "",
'email_sender': "", 'email_passkey': "", 'company_name': ""
}
for key, value in defaults.items():
if key not in st.session_state:
@@ -155,9 +130,9 @@ def create_email_agent() -> Agent:
),
tools=[EmailTools(
receiver_email=st.session_state.candidate_email,
sender_email=FROM_EMAIL,
sender_name="AI Recruitment Team",
sender_passkey=os.getenv("EMAIL_PASSKEY")
sender_email=st.session_state.email_sender,
sender_name=st.session_state.company_name,
sender_passkey=st.session_state.email_passkey
)],
description="You are a professional recruitment coordinator handling email communications.",
instructions=[
@@ -166,7 +141,7 @@ def create_email_agent() -> Agent:
"Maintain a friendly yet professional tone",
"Always end emails with exactly: 'best,\nthe ai recruiting team'",
"Never include the sender's or receiver's name in the signature",
"The name of the company is 'AI Recruiting Team'"
f"The name of the company is '{st.session_state.company_name}'"
],
markdown=True,
show_tool_calls=True
@@ -174,8 +149,11 @@ def create_email_agent() -> Agent:
def create_scheduler_agent() -> Agent:
zoom_tools = CustomZoomTool(account_id=ACCOUNT_ID, client_id=CLIENT_ID, client_secret=CLIENT_SECRET)
zoom_tools = CustomZoomTool(
account_id=st.session_state.zoom_account_id,
client_id=st.session_state.zoom_client_id,
client_secret=st.session_state.zoom_client_secret
)
return Agent(
name="Interview Scheduler",
@@ -216,45 +194,37 @@ def analyze_resume(
analyzer: Agent
) -> Tuple[bool, str]:
try:
response = analyzer.run(f"""
Please analyze this resume against the following requirements and provide your response in valid JSON format:
Role Requirements:
{ROLE_REQUIREMENTS[role]}
Resume Text:
{resume_text}
Your response must be a valid JSON object like this:
{{
"selected": true/false,
"feedback": "Detailed feedback explaining the decision",
"matching_skills": ["skill1", "skill2"],
"missing_skills": ["skill3", "skill4"],
"experience_level": "junior/mid/senior"
}}
Evaluation criteria:
1. Match at least 70% of required skills
2. Consider both theoretical knowledge and practical experience
3. Value project experience and real-world applications
4. Consider transferable skills from similar technologies
5. Look for evidence of continuous learning and adaptability
Important: Return ONLY the JSON object without any markdown formatting or backticks.
""")
# Extract the assistant's message content
assistant_message = None
for message in response.messages:
if message.role == 'assistant':
assistant_message = message.content
break
response = analyzer.run(
f"""Please analyze this resume against the following requirements and provide your response in valid JSON format:
Role Requirements:
{ROLE_REQUIREMENTS[role]}
Resume Text:
{resume_text}
Your response must be a valid JSON object like this:
{{
"selected": true/false,
"feedback": "Detailed feedback explaining the decision",
"matching_skills": ["skill1", "skill2"],
"missing_skills": ["skill3", "skill4"],
"experience_level": "junior/mid/senior"
}}
Evaluation criteria:
1. Match at least 70% of required skills
2. Consider both theoretical knowledge and practical experience
3. Value project experience and real-world applications
4. Consider transferable skills from similar technologies
5. Look for evidence of continuous learning and adaptability
Important: Return ONLY the JSON object without any markdown formatting or backticks.
"""
)
assistant_message = next((msg.content for msg in response.messages if msg.role == 'assistant'), None)
if not assistant_message:
raise ValueError("No assistant message found in response.")
result = json.loads(assistant_message.strip())
if not isinstance(result, dict):
raise ValueError("Response is not a JSON object")
if "selected" not in result or "feedback" not in result:
raise ValueError("Missing required fields in response")
if not isinstance(result, dict) or not all(k in result for k in ["selected", "feedback"]):
raise ValueError("Invalid response format")
return result["selected"], result["feedback"]
@@ -301,29 +271,45 @@ def send_rejection_email(email_agent: Agent, to_email: str, role: str, feedback:
def schedule_interview(scheduler: Agent, candidate_email: str, email_agent: Agent, role: str) -> None:
"""
Basic interview scheduler that creates a Zoom meeting and sends email details.
Schedules interviews during business hours (9 AM - 5 PM EST).
Schedule interviews during business hours (9 AM - 5 PM IST).
"""
try:
# Calculate tomorrow at 2 PM EST (instead of UTC)
tomorrow = datetime.now() + timedelta(days=1)
interview_time = tomorrow.replace(hour=14, minute=0, second=0, microsecond=0)
# 1. Schedule the meeting with EST timezone specification
# Get current time in IST
ist_tz = pytz.timezone('Asia/Kolkata')
current_time_ist = datetime.now(ist_tz)
tomorrow_ist = current_time_ist + timedelta(days=1)
interview_time = tomorrow_ist.replace(hour=11, minute=0, second=0, microsecond=0)
formatted_time = interview_time.strftime('%Y-%m-%dT%H:%M:%S')
meeting_response = scheduler.run(
f"Schedule a 60-minute meeting titled '{role} Technical Interview' "
f"for tomorrow at 2 PM EST (Eastern Time) 2024 with attendee {candidate_email}. "
f"Ensure the meeting is between 9 AM - 5 PM EST only."
f"""Schedule a 60-minute technical interview with these specifications:
- Title: '{role} Technical Interview'
- Date: {formatted_time}
- Timezone: IST (India Standard Time)
- Attendee: {candidate_email}
Important Notes:
- The meeting must be between 9 AM - 5 PM IST
- Use IST (UTC+5:30) timezone for all communications
- Include timezone information in the meeting details
"""
)
# 2. Send email notification
email_agent.run(
f"Send an email to {candidate_email} about their scheduled interview for "
f"the {role} position. Include the Zoom meeting link from: {meeting_response}. "
f"Make sure to specify that the time is in EST timezone."
f"""Send an interview confirmation email with these details:
- Role: {role} position
- Meeting Details: {meeting_response}
Important:
- Clearly specify that the time is in IST (India Standard Time)
- Ask the candidate to join 5 minutes early
- Include timezone conversion link if possible
- Ask him to be confident and not so nervous and prepare well for the interview
"""
)
st.success("Interview scheduled successfully!")
st.success("Interview scheduled successfully! Check your email for details.")
except Exception as e:
logger.error(f"Error scheduling interview: {str(e)}")
@@ -331,53 +317,81 @@ def schedule_interview(scheduler: Agent, candidate_email: str, email_agent: Agen
def main() -> None:
"""Main function to run the Streamlit application."""
st.title("AI Recruitment System")
# Initialize session state
init_session_state()
# Sidebar for API key
with st.sidebar:
st.header("Configuration")
api_key = st.text_input(
"Enter your OpenAI API key",
type="password",
value=st.session_state.openai_api_key,
help="Get your API key from platform.openai.com"
)
if api_key:
st.session_state.openai_api_key = api_key
# OpenAI Configuration
st.subheader("OpenAI Settings")
api_key = st.text_input("OpenAI API Key", type="password", value=st.session_state.openai_api_key, help="Get your API key from platform.openai.com")
if api_key: st.session_state.openai_api_key = api_key
st.subheader("Zoom Settings")
zoom_account_id = st.text_input("Zoom Account ID", type="password", value=st.session_state.zoom_account_id)
zoom_client_id = st.text_input("Zoom Client ID", type="password", value=st.session_state.zoom_client_id)
zoom_client_secret = st.text_input("Zoom Client Secret", type="password", value=st.session_state.zoom_client_secret)
st.subheader("Email Settings")
email_sender = st.text_input("Sender Email", value=st.session_state.email_sender, help="Email address to send from")
email_passkey = st.text_input("Email App Password", type="password", value=st.session_state.email_passkey, help="App-specific password for email")
company_name = st.text_input("Company Name", value=st.session_state.company_name, help="Name to use in email communications")
if zoom_account_id: st.session_state.zoom_account_id = zoom_account_id
if zoom_client_id: st.session_state.zoom_client_id = zoom_client_id
if zoom_client_secret: st.session_state.zoom_client_secret = zoom_client_secret
if email_sender: st.session_state.email_sender = email_sender
if email_passkey: st.session_state.email_passkey = email_passkey
if company_name: st.session_state.company_name = company_name
required_configs = {'OpenAI API Key': st.session_state.openai_api_key, 'Zoom Account ID': st.session_state.zoom_account_id,
'Zoom Client ID': st.session_state.zoom_client_id, 'Zoom Client Secret': st.session_state.zoom_client_secret,
'Email Sender': st.session_state.email_sender, 'Email Password': st.session_state.email_passkey,
'Company Name': st.session_state.company_name}
missing_configs = [k for k, v in required_configs.items() if not v]
if missing_configs:
st.warning(f"Please configure the following in the sidebar: {', '.join(missing_configs)}")
return
# Main content
if not st.session_state.openai_api_key:
st.warning("Please enter your OpenAI API key in the sidebar to continue.")
return
# Role selection with requirements display
role = st.selectbox(
"Select the role you're applying for:",
["ai_ml_engineer", "frontend_engineer", "backend_engineer"]
)
role = st.selectbox("Select the role you're applying for:", ["ai_ml_engineer", "frontend_engineer", "backend_engineer"])
with st.expander("View Required Skills", expanded=True): st.markdown(ROLE_REQUIREMENTS[role])
# Display requirements for selected role
with st.expander("View Required Skills", expanded=True):
st.markdown(ROLE_REQUIREMENTS[role])
# Resume upload and processing
resume_file = st.file_uploader("Upload your resume (PDF)", type=["pdf"])
if resume_file and not st.session_state.resume_text:
with st.spinner("Processing your resume..."):
resume_text = extract_text_from_pdf(resume_file)
if resume_text:
st.session_state.resume_text = resume_text
st.success("Resume processed successfully!")
else:
st.error("Could not process the PDF. Please try again.")
if resume_file:
st.subheader("Uploaded Resume")
col1, col2 = st.columns([4, 1])
with col1:
import tempfile, os
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file:
tmp_file.write(resume_file.read())
tmp_file_path = tmp_file.name
resume_file.seek(0)
try: pdf_viewer(tmp_file_path)
finally: os.unlink(tmp_file_path)
with col2:
st.download_button(label="📥 Download", data=resume_file, file_name=resume_file.name, mime="application/pdf")
if st.button("🗑️ Clear"): st.session_state.resume_text = ""; st.rerun()
# Process the resume text
if not st.session_state.resume_text:
with st.spinner("Processing your resume..."):
resume_text = extract_text_from_pdf(resume_file)
if resume_text:
st.session_state.resume_text = resume_text
st.success("Resume processed successfully!")
else:
st.error("Could not process the PDF. Please try again.")
# Email input with session state
email = st.text_input(
"Your email address",
"Candidate's email address",
value=st.session_state.candidate_email,
key="email_input"
)