Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 97 additions & 22 deletions __init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,22 @@
"""
return self.time_format == 'full'

@staticmethod
def _normalize_phrase(text: str) -> str:
"""Normalize phrases for case-insensitive locale resource matching."""
return text.strip(" \t,;:.!?").casefold()

def _load_locale_phrase_set(self, name: str):
"""Load a locale phrase list once and return a normalized set."""
cache_key = f"{name}.list.normalized"
if cache_key not in self.resources.static:
self.resources.static[cache_key] = {
self._normalize_phrase(phrase)
for phrase in self.resources.load_list_file(name)
if phrase.strip()
}
return self.resources.static[cache_key]

######################################################################
# parsing
def _extract_location(self, utt: str) -> str:
Expand All @@ -155,14 +171,51 @@
pat = pat.strip()
if pat and pat[0] == "#":
continue
res = re.search(pat, utt)
res = re.search(pat, utt, flags=re.IGNORECASE)
if res:
try:
return res.group("Location")
return res.group("Location").strip(" \t,;:.!?")
except IndexError:
pass
return None

def _is_ambiguous_location(self, location_string: str) -> bool:
"""Return True when a locale marks a location name as timezone-ambiguous."""
return (
self._normalize_phrase(location_string)
in self._load_locale_phrase_set("ambiguous_locations")
)

def _sanitize_location(self, location_string: Optional[str]) -> Optional[str]:
"""Discard locale-specific phrases accidentally captured as locations."""
if not location_string:
return None
cleaned = location_string.strip(" \t,;:.!?")
if (
self._normalize_phrase(cleaned)
in self._load_locale_phrase_set("non_location_phrases")
):
return None
return cleaned

def _resolve_location(self,
location_string: Optional[str] = None,
utterance: str = "") -> Optional[str]:
"""Resolve a sanitized location from an explicit slot or from the utterance."""
if location_string:
return self._sanitize_location(location_string)
if utterance:
return self._sanitize_location(self._extract_location(utterance))
return None

def _mentions_current_weekend(self, utterance: str) -> bool:
"""Check whether the active locale explicitly asked for the current weekend."""
current_weekend_phrases = self._load_locale_phrase_set("current_weekend_phrases")
if not current_weekend_phrases:
return False
normalized_utterance = utterance.casefold()
return any(phrase in normalized_utterance for phrase in current_weekend_phrases)

@staticmethod
def _get_timezone_from_builtins(location_string: str) -> Optional[datetime.tzinfo]:
"""Attempt to resolve a timezone from a location name using geocoding.
Expand Down Expand Up @@ -262,6 +315,8 @@
Returns:
datetime.tzinfo: The timezone object if resolved, else None.
"""
if self._is_ambiguous_location(location_string):
return None
timezone = self._get_timezone_from_builtins(location_string)
if not timezone:
timezone = self._get_timezone_from_table(location_string)
Expand Down Expand Up @@ -296,7 +351,7 @@
return dt

def get_spoken_time(self, location: str = None, force_ampm=False,
anchor_date: datetime.datetime = None) -> str:
anchor_date: datetime.datetime = None) -> Optional[str]:
"""Get a human-readable spoken version of the current time.

Args:
Expand All @@ -308,6 +363,8 @@
str: A spoken-friendly representation of the time.
"""
dt = self.get_datetime(location, anchor_date)
if not dt:
return None

# speak AM/PM when talking about somewhere else
say_am_pm = bool(location) or force_ampm
Expand All @@ -320,7 +377,7 @@
return s

def get_display_time(self, location: str = None, force_ampm=False,
anchor_date: datetime.datetime = None) -> str:
anchor_date: datetime.datetime = None) -> Optional[str]:
"""Get a display-friendly version of the current time.

Args:
Expand All @@ -332,6 +389,8 @@
str: A string representing the display time.
"""
dt = self.get_datetime(location, anchor_date)
if not dt:
return None
# speak AM/PM when talking about somewhere else
say_am_pm = bool(location) or force_ampm
return nice_time(dt, lang=self.lang,
Expand Down Expand Up @@ -363,30 +422,32 @@

######################################################################
# Time queries / display
def speak_time(self, dialog: str, location: str = None):
def speak_time(self, dialog: str, location: str = None,
anchor_date: datetime.datetime = None):
"""Speak the current time. Optionally at a location
speaks an error if timezone for requested location could not be detected"""
if location:
current_time = self.get_spoken_time(location)
current_time = self.get_spoken_time(location, anchor_date=anchor_date)
if not current_time:
self.speak_dialog("time.tz.not.found", {"location": location})
return
time_string = self.get_display_time(location)
time_string = self.get_display_time(location, anchor_date=anchor_date)
else:
current_time = self.get_spoken_time()
time_string = self.get_display_time()
current_time = self.get_spoken_time(anchor_date=anchor_date)
time_string = self.get_display_time(anchor_date=anchor_date)

# speak it
self.speak_dialog(dialog, {"time": current_time})

# and briefly show the time
self.show_time(time_string)
if time_string:
self.show_time(time_string)

@intent_handler("what.time.is.it.intent")
def handle_query_time(self, message):
"""Handle queries about the current time."""
utt = message.data.get('utterance', "")
location = message.data.get("location") or self._extract_location(utt)
location = self._resolve_location(message.data.get("location"), utt)
# speak it
self.speak_time("time.current", location=location)

Expand All @@ -400,10 +461,10 @@
self.handle_query_time(message)
return

location = message.data.get("location") or self._extract_location(utt)
location = self._resolve_location(message.data.get("location"), utt)

# speak it
self.speak_time("time.future", location=location)
self.speak_time("time.future", location=location, anchor_date=dt)

######################################################################
# Date queries
Expand All @@ -413,12 +474,12 @@
now = self.get_datetime() # session aware
try:
dt, utt = extract_datetime(utt, anchorDate=now, lang=self.lang) or (now, utt)
except Exception as e:

Check failure on line 477 in __init__.py

View workflow job for this annotation

GitHub Actions / lint / lint

ruff (F841)

__init__.py:477:29: F841 Local variable `e` is assigned to but never used help: Remove assignment to unused variable `e`
self.log.exception(f"failed to extract date from '{utt}'")
dt = now

# handle questions ~ "what is the day in sydney"
location_string = message.data.get("location") or self._extract_location(utt)
location_string = self._resolve_location(message.data.get("location"), utt)

if location_string:
dt = self.get_datetime(location_string, anchor_date=dt)
Expand Down Expand Up @@ -470,7 +531,12 @@
Args:
message: The message object triggering the intent.
"""
now = self.get_datetime() # session aware
utt = message.data.get("utterance", "")
location = self._resolve_location(message.data.get("location"), utt)
now = self.get_datetime(location)
if location and not now:
self.speak_dialog("time.tz.not.found", {"location": location})
return
self.speak_dialog("day.current",
{"day": nice_day(now, lang=self.lang)})

Expand Down Expand Up @@ -530,19 +596,28 @@

@intent_handler("date.future.weekend.intent")
def handle_date_future_weekend(self, message):
# Strip year off nice_date as request is inherently close
# Don't pass `now` to `nice_date` as a
# request on Friday will return "tomorrow"
"""
Handles queries about the upcoming weekend's dates.

Determines the dates for the next Saturday and Sunday, formats them for speech, and responds with a dialog containing both dates.
"""
now = self.get_datetime()
dt = extract_datetime('this saturday', anchorDate=now, lang='en-us')[0]
saturday_date = ', '.join(nice_date(dt, lang=self.lang).split(', ')[:2])
dt = extract_datetime('this sunday', anchorDate=now, lang='en-us')[0]
sunday_date = ', '.join(nice_date(dt, lang=self.lang).split(', ')[:2])
utt = message.data.get("utterance", "").lower()
weekday = now.weekday()

# On Saturday/Sunday, default to the upcoming weekend unless the user
# explicitly asked for the current weekend in this locale.
if weekday <= 5:
saturday_dt = now + datetime.timedelta(days=5 - weekday)
else:
saturday_dt = now - datetime.timedelta(days=1)

if weekday >= 5 and not self._mentions_current_weekend(utt):
saturday_dt += datetime.timedelta(days=7)

sunday_dt = saturday_dt + datetime.timedelta(days=1)
saturday_date = ', '.join(nice_date(saturday_dt, lang=self.lang).split(', ')[:2])
sunday_date = ', '.join(nice_date(sunday_dt, lang=self.lang).split(', ')[:2])
self.speak_dialog('date.future.weekend', {
'saturday_date': saturday_date,
'sunday_date': sunday_date
Expand Down
4 changes: 2 additions & 2 deletions test/unittests/test_skill_loading.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import unittest
from os.path import dirname

from mycroft.skills.skill_loader import PluginSkillLoader, SkillLoader
from ovos_plugin_manager.skills import find_skill_plugins
from ovos_utils.messagebus import FakeBus
from skill_ovos_date_time import TimeSkill
from ovos_workshop.skill_launcher import PluginSkillLoader, SkillLoader
from ovos_skill_date_time import TimeSkill


class TestSkillLoading(unittest.TestCase):
Expand Down
Loading