diff --git a/music_assistant/providers/m3uradio/__init__.py b/music_assistant/providers/m3uradio/__init__.py new file mode 100644 index 0000000000..a435a13989 --- /dev/null +++ b/music_assistant/providers/m3uradio/__init__.py @@ -0,0 +1,291 @@ +"""M3U / IPTV radio music provider for Music Assistant. + +Imports radio stations from a self-hosted M3U (or IPTV) playlist and exposes +them as a native radio source: library sync, browse-by-group, and search. + +Modelled on the built-in `radiobrowser` provider +(music_assistant/providers/radiobrowser/__init__.py). +""" + +from __future__ import annotations + +import hashlib +import re +from collections.abc import AsyncGenerator, Sequence +from typing import TYPE_CHECKING + +from aiohttp import ClientError +from music_assistant_models.config_entries import ConfigEntry +from music_assistant_models.enums import ( + ConfigEntryType, + ContentType, + ImageType, + MediaType, + ProviderFeature, + StreamType, +) +from music_assistant_models.errors import MediaNotFoundError, ProviderUnavailableError +from music_assistant_models.media_items import ( + AudioFormat, + BrowseFolder, + MediaItemImage, + MediaItemType, + ProviderMapping, + Radio, + SearchResults, + UniqueList, +) +from music_assistant_models.streamdetails import StreamDetails + +from music_assistant.models.music_provider import MusicProvider + +if TYPE_CHECKING: + from music_assistant_models.config_entries import ConfigValueType, ProviderConfig + from music_assistant_models.provider import ProviderManifest + + from music_assistant.mass import MusicAssistant + from music_assistant.models import ProviderInstanceType + + +CONF_M3U_URL = "m3u_url" +CONF_GROUPS = "groups" + +SUPPORTED_FEATURES = { + ProviderFeature.LIBRARY_RADIOS, + ProviderFeature.BROWSE, + ProviderFeature.SEARCH, +} + +# the display name starts after the first comma that is outside quoted +# attribute values (attributes like group-title may contain commas) +EXTINF_NAME_RE = re.compile(r'#EXTINF:[^,"]*(?:"[^"]*"[^,"]*)*,(?P.*)$') +ATTR_RE = re.compile(r'([\w-]+)="([^"]*)"') + + +async def setup( + mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig +) -> ProviderInstanceType: + """Initialize provider(instance) with given configuration.""" + return M3URadioProvider(mass, manifest, config, SUPPORTED_FEATURES) + + +async def get_config_entries( + mass: MusicAssistant, # noqa: ARG001 + instance_id: str | None = None, # noqa: ARG001 + action: str | None = None, # noqa: ARG001 + values: dict[str, ConfigValueType] | None = None, # noqa: ARG001 +) -> tuple[ConfigEntry, ...]: + """Return config entries to set up this provider.""" + return ( + ConfigEntry( + key=CONF_M3U_URL, + type=ConfigEntryType.STRING, + label="M3U / IPTV playlist URL", + required=True, + description="Direct URL to your M3U playlist.", + ), + ConfigEntry( + key=CONF_GROUPS, + type=ConfigEntryType.STRING, + label="Group filter (optional)", + required=False, + default_value="", + description=( + "Comma-separated group-title values to include. " + "Leave blank to import every entry in the playlist." + ), + ), + ) + + +class M3URadioProvider(MusicProvider): + """Music provider serving radio stations from an M3U playlist.""" + + _stations: dict[str, dict[str, str]] + + async def handle_async_init(self) -> None: + """Handle async initialization of the provider.""" + self._stations = {} + await self._refresh_stations() + + async def sync_library(self, media_type: MediaType) -> None: + """Run library sync for this provider.""" + # re-fetch the playlist so scheduled syncs pick up playlist changes + await self._refresh_stations() + await super().sync_library(media_type) + + async def get_library_radios(self) -> AsyncGenerator[Radio]: + """Retrieve all library radio stations from the provider.""" + for st in self._stations.values(): + yield self._parse_radio(st) + + async def get_radio(self, prov_radio_id: str) -> Radio: + """Get radio station details.""" + st = self._stations.get(prov_radio_id) + if not st: + await self._refresh_stations() + st = self._stations.get(prov_radio_id) + if not st: + raise MediaNotFoundError(f"Station {prov_radio_id} not found") + return self._parse_radio(st) + + async def search( + self, search_query: str, media_types: list[MediaType], limit: int = 10 + ) -> SearchResults: + """Perform search on the provider.""" + result = SearchResults() + if MediaType.RADIO not in media_types: + return result + q = search_query.lower() + matches = [ + self._parse_radio(st) for st in self._stations.values() if q in st["name"].lower() + ] + result.radio = matches[:limit] + return result + + async def browse(self, path: str) -> Sequence[MediaItemType | BrowseFolder]: + """Browse this provider's items, grouped by group-title.""" + subpath = "" if "://" not in path else path.split("://", 1)[1] + if not subpath: + groups = sorted({st["group"] for st in self._stations.values() if st["group"]}) + return [ + BrowseFolder( + item_id=_slug(group), + provider=self.domain, + path=f"{path}{_slug(group)}", + name=group, + ) + for group in groups + ] + return [ + self._parse_radio(st) for st in self._stations.values() if _slug(st["group"]) == subpath + ] + + async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails: + """Get stream details for a radio station.""" + st = self._stations.get(item_id) + if not st: + raise MediaNotFoundError(f"Station {item_id} not found") + return StreamDetails( + provider=self.domain, + item_id=item_id, + audio_format=AudioFormat(content_type=_guess_content_type(st["url"])), + media_type=MediaType.RADIO, + stream_type=StreamType.HTTP, + path=st["url"], + can_seek=False, + allow_seek=False, + ) + + async def _refresh_stations(self) -> None: + """Fetch the playlist and rebuild the station index.""" + url = self.config.get_value(CONF_M3U_URL) + if not url: + self.logger.warning("No M3U URL configured") + self._stations = {} + return + groups_raw = self.config.get_value(CONF_GROUPS) or "" + group_filter = {g.strip().lower() for g in str(groups_raw).split(",") if g.strip()} + try: + async with self.mass.http_session.get(str(url)) as resp: + resp.raise_for_status() + content = await resp.text() + except (TimeoutError, ClientError) as err: + raise ProviderUnavailableError(f"Failed to fetch M3U playlist: {err}") from err + index: dict[str, dict[str, str]] = {} + for st in parse_m3u(content): + if group_filter and st["group"].lower() not in group_filter: + continue + index[st["id"]] = st + self._stations = index + self.logger.info("Loaded %d stations from M3U", len(index)) + + def _parse_radio(self, st: dict[str, str]) -> Radio: + """Build a Radio media item from a parsed station dict.""" + radio = Radio( + item_id=st["id"], + provider=self.domain, + name=st["name"], + provider_mappings={ + ProviderMapping( + item_id=st["id"], + provider_domain=self.domain, + provider_instance=self.instance_id, + ) + }, + ) + if st.get("logo"): + radio.metadata.images = UniqueList( + [ + MediaItemImage( + type=ImageType.THUMB, + path=st["logo"], + provider=self.instance_id, + remotely_accessible=True, + ) + ] + ) + return radio + + +def parse_m3u(content: str) -> list[dict[str, str]]: + """Parse #EXTM3U content into a list of station dicts. + + Each dict has: id, name, url, logo, group, tvg_id. + """ + stations: list[dict[str, str]] = [] + name = logo = group = tvg_id = "" + pending = False + for raw in content.splitlines(): + line = raw.strip() + if not line or line.upper().startswith("#EXTM3U"): + continue + if line.startswith("#EXTINF:"): + attrs = dict(ATTR_RE.findall(line)) + logo = attrs.get("tvg-logo", "") + group = attrs.get("group-title", "") + tvg_id = attrs.get("tvg-id", "") + match = EXTINF_NAME_RE.search(line) + name = match.group("name").strip() if match else "" + pending = True + elif line.startswith("#"): + continue + elif pending: + url = line + sid = tvg_id.strip() or _hash_id(name, url) + stations.append( + { + "id": sid, + "name": name or url, + "url": url, + "logo": logo, + "group": group, + "tvg_id": tvg_id, + } + ) + name = logo = group = tvg_id = "" + pending = False + return stations + + +def _hash_id(name: str, url: str) -> str: + return hashlib.sha1(f"{name}|{url}".encode()).hexdigest()[:16] + + +def _slug(text: str) -> str: + return re.sub(r"[^a-z0-9]+", "-", text.lower()).strip("-") or "ungrouped" + + +def _guess_content_type(url: str) -> ContentType: + lowered = url.lower() + for ext, codec in ( + (".mp3", "mp3"), + (".aac", "aac"), + (".m4a", "aac"), + (".ogg", "ogg"), + (".opus", "ogg"), + (".flac", "flac"), + ): + if ext in lowered: + return ContentType.try_parse(codec) + return ContentType.UNKNOWN diff --git a/music_assistant/providers/m3uradio/icon.svg b/music_assistant/providers/m3uradio/icon.svg new file mode 100644 index 0000000000..7bc4a27248 --- /dev/null +++ b/music_assistant/providers/m3uradio/icon.svg @@ -0,0 +1,15 @@ + + + + + + + + + + + + + + + diff --git a/music_assistant/providers/m3uradio/icon_monochrome.svg b/music_assistant/providers/m3uradio/icon_monochrome.svg new file mode 100644 index 0000000000..83460db3bc --- /dev/null +++ b/music_assistant/providers/m3uradio/icon_monochrome.svg @@ -0,0 +1,15 @@ + + + + + + + + + + + + + + + diff --git a/music_assistant/providers/m3uradio/manifest.json b/music_assistant/providers/m3uradio/manifest.json new file mode 100644 index 0000000000..ed14eead08 --- /dev/null +++ b/music_assistant/providers/m3uradio/manifest.json @@ -0,0 +1,12 @@ +{ + "type": "music", + "domain": "m3uradio", + "stage": "beta", + "name": "M3U Radio", + "description": "Radio stations from a self-hosted M3U / IPTV playlist, exposed as a native radio source.", + "codeowners": ["@cladkins"], + "requirements": [], + "documentation": "https://music-assistant.io/music-providers/m3uradio/", + "multi_instance": true, + "icon": "radio" +} diff --git a/tests/providers/m3uradio/__init__.py b/tests/providers/m3uradio/__init__.py new file mode 100644 index 0000000000..8c0ae6c0b2 --- /dev/null +++ b/tests/providers/m3uradio/__init__.py @@ -0,0 +1 @@ +"""Tests for the M3U Radio provider.""" diff --git a/tests/providers/m3uradio/test_parser.py b/tests/providers/m3uradio/test_parser.py new file mode 100644 index 0000000000..b11de2831d --- /dev/null +++ b/tests/providers/m3uradio/test_parser.py @@ -0,0 +1,91 @@ +"""Tests for the M3U Radio provider's playlist parser.""" + +from music_assistant_models.enums import ContentType + +from music_assistant.providers.m3uradio import _guess_content_type, parse_m3u + +FULL_PLAYLIST = """#EXTM3U +#EXTINF:-1 tvg-id="st1" tvg-logo="http://logo.example.com/1.png" group-title="Chill",Station One +http://stream.example.com/1.mp3 +#EXTINF:-1 group-title="News",Station Two +http://stream.example.com/2.aac + +# a stray comment line +#EXTINF:-1,Bare Station +http://stream.example.com/3 +""" + + +def test_parse_full_attributes() -> None: + """Test that all EXTINF attributes are extracted.""" + stations = parse_m3u(FULL_PLAYLIST) + assert len(stations) == 3 + first = stations[0] + assert first["id"] == "st1" + assert first["name"] == "Station One" + assert first["url"] == "http://stream.example.com/1.mp3" + assert first["logo"] == "http://logo.example.com/1.png" + assert first["group"] == "Chill" + assert first["tvg_id"] == "st1" + + +def test_hash_id_fallback_is_stable() -> None: + """Test that entries without tvg-id get a stable 16-char hash id.""" + stations = parse_m3u(FULL_PLAYLIST) + second = stations[1] + assert second["tvg_id"] == "" + assert len(second["id"]) == 16 + # same name+url must hash to the same id across parses + assert parse_m3u(FULL_PLAYLIST)[1]["id"] == second["id"] + # a different url must yield a different id + changed = FULL_PLAYLIST.replace("2.aac", "2-changed.aac") + assert parse_m3u(changed)[1]["id"] != second["id"] + + +def test_name_with_comma_inside_quoted_attribute() -> None: + """Test that commas inside quoted attribute values do not corrupt the name.""" + playlist = ( + '#EXTM3U\n#EXTINF:-1 group-title="News, Talk",Station X\nhttp://stream.example.com/x.mp3\n' + ) + stations = parse_m3u(playlist) + assert len(stations) == 1 + assert stations[0]["name"] == "Station X" + assert stations[0]["group"] == "News, Talk" + + +def test_header_comments_and_blank_lines_skipped() -> None: + """Test that the header, comments and blank lines do not produce entries.""" + stations = parse_m3u(FULL_PLAYLIST) + urls = [st["url"] for st in stations] + assert "# a stray comment line" not in urls + assert all(url.startswith("http") for url in urls) + + +def test_name_falls_back_to_url() -> None: + """Test that an entry without a display name uses the url as name.""" + playlist = "#EXTM3U\n#EXTINF:-1,\nhttp://stream.example.com/unnamed.mp3\n" + stations = parse_m3u(playlist) + assert len(stations) == 1 + assert stations[0]["name"] == "http://stream.example.com/unnamed.mp3" + + +def test_url_without_extinf_is_ignored() -> None: + """Test that bare urls without a preceding EXTINF line are ignored.""" + playlist = "#EXTM3U\nhttp://stream.example.com/orphan.mp3\n" + assert parse_m3u(playlist) == [] + + +def test_crlf_line_endings() -> None: + """Test that playlists with CRLF line endings parse correctly.""" + playlist = FULL_PLAYLIST.replace("\n", "\r\n") + stations = parse_m3u(playlist) + assert len(stations) == 3 + assert stations[0]["name"] == "Station One" + + +def test_guess_content_type() -> None: + """Test content type detection from stream urls.""" + assert _guess_content_type("http://x/stream.mp3") == ContentType.MP3 + assert _guess_content_type("http://x/stream.aac") == ContentType.AAC + assert _guess_content_type("http://x/stream.mp3?token=abc") == ContentType.MP3 + assert _guess_content_type("http://x/stream") == ContentType.UNKNOWN