Files
open-webui/backend/open_webui/utils/calendar.py
Timothy Jaeryang Baek 8d739e2aba feat: calendar
2026-04-19 19:15:05 +09:00

84 lines
2.6 KiB
Python

"""
Calendar utilities.
RRULE expansion reusing the automation infra.
"""
import logging
from datetime import datetime, timedelta
from typing import Optional
from zoneinfo import ZoneInfo
from open_webui.utils.automations import _parse_rule
log = logging.getLogger(__name__)
def expand_recurring_event(
event_dict: dict,
range_start_ns: int,
range_end_ns: int,
tz: Optional[str] = None,
max_instances: int = 5000,
) -> list[dict]:
"""Expand a recurring event into individual instances within a date range.
Takes an event dict (from CalendarEventModel.model_dump()) and produces
one dict per occurrence, with adjusted start_at / end_at.
"""
from dateutil.rrule import rrulestr
rrule_str = event_dict.get('rrule')
if not rrule_str:
return [event_dict]
range_start_dt = datetime.fromtimestamp(range_start_ns / 1_000_000_000)
range_end_dt = datetime.fromtimestamp(range_end_ns / 1_000_000_000)
scan_start = range_start_dt - timedelta(days=1)
try:
# Parse with dtstart near the range so we never iterate from epoch
rule = rrulestr(rrule_str, dtstart=scan_start, ignoretz=True)
except Exception:
log.warning(f'Failed to parse RRULE for event {event_dict.get("id")}: {rrule_str}')
return [event_dict]
original_start_ns = event_dict['start_at']
original_end_ns = event_dict.get('end_at')
duration_ns = (original_end_ns - original_start_ns) if original_end_ns else None
instances = []
dt = rule.after(scan_start, inc=True)
while dt and dt < range_end_dt and len(instances) < max_instances:
if tz:
try:
dt_tz = dt.replace(tzinfo=ZoneInfo(tz))
instance_start_ns = int(dt_tz.timestamp() * 1_000_000_000)
except Exception:
instance_start_ns = int(dt.timestamp() * 1_000_000_000)
else:
instance_start_ns = int(dt.timestamp() * 1_000_000_000)
if instance_start_ns >= range_start_ns:
instance = {
**event_dict,
'start_at': instance_start_ns,
'end_at': (instance_start_ns + duration_ns) if duration_ns else None,
'instance_id': f'{event_dict["id"]}_{instance_start_ns}',
}
instances.append(instance)
dt = rule.after(dt)
return instances
def ns_from_date(year: int, month: int, day: int, tz: Optional[str] = None) -> int:
"""Create epoch nanoseconds from a date."""
if tz:
dt = datetime(year, month, day, tzinfo=ZoneInfo(tz))
else:
dt = datetime(year, month, day)
return int(dt.timestamp() * 1_000_000_000)