This commit is contained in:
2025-10-25 13:21:06 +02:00
parent eb57506d39
commit 033ffb21f5
8388 changed files with 484789 additions and 16 deletions

View File

@@ -0,0 +1,261 @@
# -*- coding: UTF-8 -*-
#
# Copyright (C) 2020, Team Kodi
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# pylint: disable=missing-docstring
#
# This is based on the metadata.tvmaze scrapper by Roman Miroshnychenko aka Roman V.M.
"""Plugin route actions"""
from __future__ import absolute_import, unicode_literals
import sys
import json
import urllib.parse
import xbmcgui
import xbmcplugin
from . import tmdb, data_utils
from .utils import logger, safe_get
try:
from typing import Optional, Text, Union, ByteString # pylint: disable=unused-import
except ImportError:
pass
HANDLE = int(sys.argv[1]) # type: int
def find_show(title, year=None):
# type: (Union[Text, bytes], Optional[Text]) -> None
"""Find a show by title"""
if not isinstance(title, str):
title = title.decode('utf-8')
logger.debug('Searching for TV show {} ({})'.format(title, year))
search_results = tmdb.search_show(title, year)
for search_result in search_results:
show_name = search_result['name']
if safe_get(search_result, 'first_air_date') is not None:
show_name += ' ({})'.format(search_result['first_air_date'][:4])
list_item = xbmcgui.ListItem(show_name, offscreen=True)
show_info = search_result
list_item = data_utils.add_main_show_info(
list_item, show_info, full_info=False)
# Below "url" is some unique ID string (may be an actual URL to a show page)
# that is used to get information about a specific TV show.
xbmcplugin.addDirectoryItem(
HANDLE,
url=str(search_result['id']),
listitem=list_item,
isFolder=True
)
def get_show_id_from_nfo(nfo):
# type: (Text) -> None
"""
Get show ID by NFO file contents
This function is called first instead of find_show
if a NFO file is found in a TV show folder.
:param nfo: the contents of a NFO file
"""
if isinstance(nfo, bytes):
nfo = nfo.decode('utf-8', 'replace')
logger.debug('Parsing NFO file:\n{}'.format(nfo))
parse_result, named_seasons = data_utils.parse_nfo_url(nfo)
if parse_result:
if parse_result.provider == 'tmdb':
show_info = tmdb.load_show_info(
parse_result.show_id, ep_grouping=parse_result.ep_grouping, named_seasons=named_seasons)
else:
show_info = None
if show_info is not None:
list_item = xbmcgui.ListItem(show_info['name'], offscreen=True)
# "url" is some string that unique identifies a show.
# It may be an actual URL of a TV show page.
xbmcplugin.addDirectoryItem(
HANDLE,
url=str(show_info['id']),
listitem=list_item,
isFolder=True
)
def get_show_id(unique_ids):
"""
Get show ID by unique IDs
In case there is a tmdb identifier in the unique IDs, use that.
Else use the find_by_id method to get the show ID by an external ID.
:param unique_ids: dictionary of unique IDs
"""
if unique_ids.get('tmdb'):
return unique_ids['tmdb']
else:
res = tmdb.find_by_id(unique_ids)
if len(res) > 0:
return res[0].get('id')
return None
def get_details(show_id):
# type: (Text) -> None
"""Get details about a specific show"""
logger.debug('Getting details for show id {}'.format(show_id))
show_info = tmdb.load_show_info(show_id)
if show_info is not None:
list_item = xbmcgui.ListItem(show_info['name'], offscreen=True)
list_item = data_utils.add_main_show_info(
list_item, show_info, full_info=True)
xbmcplugin.setResolvedUrl(HANDLE, True, list_item)
else:
xbmcplugin.setResolvedUrl(
HANDLE, False, xbmcgui.ListItem(offscreen=True))
def get_episode_list(show_ids): # pylint: disable=missing-docstring
# type: (Text) -> None
# Kodi has a bug: when a show directory contains an XML NFO file with
# episodeguide URL, that URL is always passed here regardless of
# the actual parsing result in get_show_from_nfo()
# so much of this weird logic is to deal with that
try:
all_ids = json.loads(show_ids)
show_id = all_ids.get('tmdb')
if not show_id:
for key, value in all_ids.items():
show_id = data_utils._convert_ext_id(key, value)
if show_id:
show_id = str(show_id)
break
if not show_id:
show_id = str(show_ids)
except (ValueError, AttributeError):
show_id = str(show_ids)
if show_id.isdigit():
logger.error(
'using deprecated episodeguide format, this show should be refreshed or rescraped')
if not show_id:
raise RuntimeError(
'No TMDb TV show id found in episode guide, this show should be refreshed or rescraped')
elif not show_id.isdigit():
parse_result, named_seasons = data_utils.parse_nfo_url(show_id)
if parse_result:
show_id = parse_result.show_id
else:
raise RuntimeError(
'No TMDb TV show id found in episode guide, this show should be refreshed or rescraped')
logger.debug('Getting episode list for show id {}'.format(show_id))
show_info = tmdb.load_show_info(show_id)
if show_info is not None:
theindex = 0
for episode in show_info['episodes']:
epname = episode.get('name', 'Episode ' +
str(episode['episode_number']))
list_item = xbmcgui.ListItem(epname, offscreen=True)
list_item = data_utils.add_episode_info(
list_item, episode, full_info=False)
encoded_ids = urllib.parse.urlencode(
{'show_id': str(show_info['id']), 'episode_id': str(theindex)}
)
theindex = theindex + 1
# Below "url" is some unique ID string (may be an actual URL to an episode page)
# that allows to retrieve information about a specific episode.
url = urllib.parse.quote(encoded_ids)
xbmcplugin.addDirectoryItem(
HANDLE,
url=url,
listitem=list_item,
isFolder=True
)
else:
logger.error(
'unable to get show information using show id {}'.format(show_id))
logger.error('you may need to refresh the show to get a valid show id')
def get_episode_details(encoded_ids): # pylint: disable=missing-docstring
# type: (Text) -> None
encoded_ids = urllib.parse.unquote(encoded_ids)
decoded_ids = dict(urllib.parse.parse_qsl(encoded_ids))
logger.debug('Getting episode details for {}'.format(decoded_ids))
episode_info = tmdb.load_episode_info(
decoded_ids['show_id'], decoded_ids['episode_id']
)
if episode_info:
list_item = xbmcgui.ListItem(episode_info['name'], offscreen=True)
list_item = data_utils.add_episode_info(
list_item, episode_info, full_info=True)
xbmcplugin.setResolvedUrl(HANDLE, True, list_item)
else:
xbmcplugin.setResolvedUrl(
HANDLE, False, xbmcgui.ListItem(offscreen=True))
def get_artwork(show_id):
# type: (Text) -> None
"""
Get available artwork for a show
:param show_id: default unique ID set by setUniqueIDs() method
"""
if not show_id:
return
logger.debug('Getting artwork for show ID {}'.format(show_id))
show_info = tmdb.load_show_info(show_id)
if show_info is not None:
list_item = xbmcgui.ListItem(show_info['name'], offscreen=True)
list_item = data_utils.set_show_artwork(show_info, list_item)
xbmcplugin.setResolvedUrl(HANDLE, True, list_item)
else:
xbmcplugin.setResolvedUrl(
HANDLE, False, xbmcgui.ListItem(offscreen=True))
def router(paramstring):
# type: (Text) -> None
"""
Route addon calls
:param paramstring: url-encoded query string
:raises RuntimeError: on unknown call action
"""
params = dict(urllib.parse.parse_qsl(paramstring))
logger.debug('Called addon with params: {}'.format(sys.argv))
if params['action'] == 'find':
logger.debug('performing find action')
find_show(params['title'], params.get('year'))
elif params['action'].lower() == 'nfourl':
logger.debug('performing nfourl action')
get_show_id_from_nfo(params['nfo'])
elif params['action'] == 'getdetails':
logger.debug('performing getdetails action')
show_id = params.get('url') or get_show_id(json.loads(params.get('uniqueIDs')))
if show_id:
get_details(show_id)
elif params['action'] == 'getepisodelist':
logger.debug('performing getepisodelist action')
get_episode_list(params['url'])
elif params['action'] == 'getepisodedetails':
logger.debug('performing getepisodedetails action')
get_episode_details(params['url'])
elif params['action'] == 'getartwork':
logger.debug('performing getartwork action')
get_artwork(params.get('id'))
else:
raise RuntimeError('Invalid addon call: {}'.format(sys.argv))
xbmcplugin.endOfDirectory(HANDLE)

View File

@@ -0,0 +1,81 @@
# coding: utf-8
#
# Copyright (C) 2020, Team Kodi
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Functions to interact with various web site APIs"""
from __future__ import absolute_import, unicode_literals
import json
from urllib.request import Request, urlopen
from urllib.error import URLError
from urllib.parse import urlencode
from pprint import pformat
from .utils import logger
try:
from typing import Text, Optional, Union, List, Dict, Any # pylint: disable=unused-import
InfoType = Dict[Text, Any] # pylint: disable=invalid-name
except ImportError:
pass
HEADERS = {}
def set_headers(headers):
# type: (Dict) -> None
HEADERS.update(headers)
def load_info(url, params=None, default=None, resp_type='json', verboselog=False):
# type: (Text, Dict, Text, Text, bool) -> Optional[Text]
"""
Load info from external api
:param url: API endpoint URL
:param params: URL query params
:default: object to return if there is an error
:resp_type: what to return to the calling function
:return: API response or default on error
"""
if params:
url = url + '?' + urlencode(params)
logger.debug('Calling URL "{}"'.format(url))
if HEADERS:
logger.debug(str(HEADERS))
req = Request(url, headers=HEADERS)
try:
response = urlopen(req)
except URLError as e:
if hasattr(e, 'reason'):
logger.debug(
'failed to reach the remote site\nReason: {}'.format(e.reason))
elif hasattr(e, 'code'):
logger.debug(
'remote site unable to fulfill the request\nError code: {}'.format(e.code))
response = None
if response is None:
resp = default
elif resp_type.lower() == 'json':
try:
resp = json.loads(response.read().decode('utf-8'))
except json.decoder.JSONDecodeError:
logger.debug('remote site sent back bad JSON')
resp = default
else:
resp = response.read().decode('utf-8')
if verboselog:
logger.debug('the api response:\n{}'.format(pformat(resp)))
return resp

View File

@@ -0,0 +1,81 @@
# -*- coding: UTF-8 -*-
#
# Copyright (C) 2020, Team Kodi
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# pylint: disable=missing-docstring
#
# This is based on the metadata.tvmaze scrapper by Roman Miroshnychenko aka Roman V.M.
"""Cache-related functionality"""
from __future__ import absolute_import, unicode_literals
import os
import pickle
import xbmc
import xbmcvfs
from .utils import ADDON, logger
try:
from typing import Optional, Text, Dict, Any # pylint: disable=unused-import
except ImportError:
pass
def _get_cache_directory(): # pylint: disable=missing-docstring
# type: () -> Text
temp_dir = xbmcvfs.translatePath('special://temp')
cache_dir = os.path.join(temp_dir, 'scrapers', ADDON.getAddonInfo('id'))
if not xbmcvfs.exists(cache_dir):
xbmcvfs.mkdir(cache_dir)
logger.debug('the cache dir is ' + cache_dir)
return cache_dir
CACHE_DIR = _get_cache_directory() # type: Text
def cache_show_info(show_info):
# type: (Dict[Text, Any]) -> None
"""
Save show_info dict to cache
"""
file_name = str(show_info['id']) + '.pickle'
cache = {
'show_info': show_info
}
with open(os.path.join(CACHE_DIR, file_name), 'wb') as fo:
pickle.dump(cache, fo, protocol=2)
def load_show_info_from_cache(show_id):
# type: (Text) -> Optional[Dict[Text, Any]]
"""
Load show info from a local cache
:param show_id: show ID on TVmaze
:return: show_info dict or None
"""
file_name = str(show_id) + '.pickle'
try:
with open(os.path.join(CACHE_DIR, file_name), 'rb') as fo:
load_kwargs = {}
load_kwargs['encoding'] = 'bytes'
cache = pickle.load(fo, **load_kwargs)
return cache['show_info']
except (IOError, pickle.PickleError) as exc:
logger.debug('Cache message: {} {}'.format(type(exc), exc))
return None

View File

@@ -0,0 +1,463 @@
# -*- coding: UTF-8 -*-
#
# Copyright (C) 2020, Team Kodi
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# pylint: disable=missing-docstring
#
# This is based on the metadata.tvmaze scrapper by Roman Miroshnychenko aka Roman V.M.
"""Functions to process data"""
from __future__ import absolute_import, unicode_literals
import re
import json
from xbmc import Actor, VideoStreamDetail
from collections import namedtuple
from .utils import safe_get, logger
from . import settings, api_utils
try:
from typing import Optional, Tuple, Text, Dict, List, Any # pylint: disable=unused-import
from xbmcgui import ListItem # pylint: disable=unused-import
InfoType = Dict[Text, Any] # pylint: disable=invalid-name
except ImportError:
pass
SOURCE_SETTINGS = settings.getSourceSettings()
BASE_URL = 'https://api.themoviedb.org/3/{}'
FIND_URL = BASE_URL.format('find/{}')
TAG_RE = re.compile(r'<[^>]+>')
# Regular expressions are listed in order of priority.
# "TMDB" provider is preferred than other providers (IMDB and TheTVDB),
# because external providers IDs need to be converted to TMDB_ID.
SHOW_ID_REGEXPS = (
r'(themoviedb)\.org/tv/(\d+).*/episode_group/(.*)', # TMDB_http_link
r'(themoviedb)\.org/tv/(\d+)', # TMDB_http_link
r'(themoviedb)\.org/./tv/(\d+)', # TMDB_http_link
r'(tmdb)\.org/./tv/(\d+)', # TMDB_http_link
r'(imdb)\.com/.+/(tt\d+)', # IMDB_http_link
r'(thetvdb)\.com.+&id=(\d+)', # TheTVDB_http_link
r'(thetvdb)\.com/series/(\d+)', # TheTVDB_http_link
r'(thetvdb)\.com/api/.*series/(\d+)', # TheTVDB_http_link
r'(thetvdb)\.com/.*?"id":(\d+)', # TheTVDB_http_link
)
SUPPORTED_ARTWORK_TYPES = {'poster', 'banner'}
IMAGE_SIZES = ('large', 'original', 'medium')
CLEAN_PLOT_REPLACEMENTS = (
('<b>', '[B]'),
('</b>', '[/B]'),
('<i>', '[I]'),
('</i>', '[/I]'),
('</p><p>', '[CR]'),
)
VALIDEXTIDS = ['tmdb_id', 'imdb_id', 'tvdb_id']
UrlParseResult = namedtuple(
'UrlParseResult', ['provider', 'show_id', 'ep_grouping'])
def _clean_plot(plot):
# type: (Text) -> Text
"""Replace HTML tags with Kodi skin tags"""
for repl in CLEAN_PLOT_REPLACEMENTS:
plot = plot.replace(repl[0], repl[1])
plot = TAG_RE.sub('', plot)
return plot
def _set_cast(cast_info, vtag):
# type: (InfoType, ListItem) -> ListItem
"""Save cast info to list item"""
imagerooturl, previewrooturl = settings.loadBaseUrls()
cast = []
for item in cast_info:
actor = {
'name': item['name'],
'role': item.get('character', item.get('character_name', '')),
'order': item['order'],
}
thumb = None
if safe_get(item, 'profile_path') is not None:
thumb = imagerooturl + item['profile_path']
cast.append(Actor(actor['name'], actor['role'], actor['order'], thumb))
vtag.setCast(cast)
def _get_credits(show_info):
# type: (InfoType) -> List[Text]
"""Extract show creator(s) and writer(s) from show info"""
credits = []
for item in show_info.get('created_by', []):
credits.append(item['name'])
for item in show_info.get('credits', {}).get('crew', []):
isWriter = item.get('job', '').lower() == 'writer' or item.get(
'department', '').lower() == 'writing'
if isWriter and item.get('name') not in credits:
credits.append(item['name'])
return credits
def _get_directors(episode_info):
# type: (InfoType) -> List[Text]
"""Extract episode writer(s) from episode info"""
directors_ = []
for item in episode_info.get('credits', {}).get('crew', []):
if item.get('job') == 'Director':
directors_.append(item['name'])
return directors_
def _set_unique_ids(ext_ids, vtag):
# type: (Dict, ListItem) -> ListItem
"""Extract unique ID in various online databases"""
return_ids = {}
for key, value in ext_ids.items():
if key in VALIDEXTIDS and value:
if key == 'tmdb_id':
isTMDB = True
else:
isTMDB = False
shortkey = key[:-3]
str_value = str(value)
vtag.setUniqueID(str_value, type=shortkey, isdefault=isTMDB)
return_ids[shortkey] = str_value
return return_ids
def _set_rating(the_info, vtag):
# type: (InfoType, ListItem) -> None
"""Set show/episode rating"""
first = True
for rating_type in SOURCE_SETTINGS["RATING_TYPES"]:
logger.debug('adding rating type of %s' % rating_type)
rating = float(the_info.get('ratings', {}).get(
rating_type, {}).get('rating', '0'))
votes = int(the_info.get('ratings', {}).get(
rating_type, {}).get('votes', '0'))
logger.debug("adding rating of %s and votes of %s" %
(str(rating), str(votes)))
if rating > 0:
vtag.setRating(rating, votes=votes,
type=rating_type, isdefault=first)
first = False
def _add_season_info(show_info, vtag):
# type: (InfoType, ListItem) -> None
"""Add info for show seasons"""
for season in show_info['seasons']:
logger.debug('adding information for season %s to list item' %
season['season_number'])
vtag.addSeason(season['season_number'],
safe_get(season, 'name', ''))
for image_type, image_list in season.get('images', {}).items():
if image_type == 'posters':
destination = 'poster'
else:
destination = image_type
for image in image_list:
theurl, previewurl = get_image_urls(image)
if theurl:
vtag.addAvailableArtwork(
theurl, arttype=destination, preview=previewurl, season=season['season_number'])
def _get_names(item_list):
# type: (List) -> None
"""Get names from a list of dicts"""
items = []
for item in item_list:
items.append(item['name'])
return items
def get_image_urls(image):
# type: (Dict) -> Tuple[Text, Text]
"""Get image URLs from image information"""
imagerooturl, previewrooturl = settings.loadBaseUrls()
if image.get('file_path', '').endswith('.svg'):
return None, None
if image.get('type') == 'fanarttv':
theurl = image['file_path']
previewurl = theurl.replace(
'.fanart.tv/fanart/', '.fanart.tv/preview/')
else:
theurl = imagerooturl + image['file_path']
previewurl = previewrooturl + image['file_path']
return theurl, previewurl
def set_show_artwork(show_info, list_item):
# type: (InfoType, ListItem) -> ListItem
"""Set available images for a show"""
vtag = list_item.getVideoInfoTag()
for image_type, image_list in show_info.get('images', {}).items():
if image_type == 'backdrops':
fanart_list = []
for image in image_list:
theurl, previewurl = get_image_urls(image)
if (image.get('iso_639_1') != None and image.get('iso_639_1').lower() != 'xx') and SOURCE_SETTINGS["CATLANDSCAPE"] and theurl:
vtag.addAvailableArtwork(
theurl, arttype="landscape", preview=previewurl)
elif theurl:
fanart_list.append({'image': theurl})
if fanart_list:
list_item.setAvailableFanart(fanart_list)
else:
if image_type == 'posters':
destination = 'poster'
elif image_type == 'logos':
destination = 'clearlogo'
else:
destination = image_type
for image in image_list:
theurl, previewurl = get_image_urls(image)
if theurl:
vtag.addAvailableArtwork(
theurl, arttype=destination, preview=previewurl)
return list_item
def add_main_show_info(list_item, show_info, full_info=True):
# type: (ListItem, InfoType, bool) -> ListItem
"""Add main show info to a list item"""
imagerooturl, previewrooturl = settings.loadBaseUrls()
vtag = list_item.getVideoInfoTag()
original_name = show_info.get('original_name')
if SOURCE_SETTINGS["KEEPTITLE"] and original_name:
showname = original_name
else:
showname = show_info['name']
plot = _clean_plot(safe_get(show_info, 'overview', ''))
vtag.setTitle(showname)
vtag.setOriginalTitle(original_name)
vtag.setTvShowTitle(showname)
vtag.setPlot(plot)
vtag.setPlotOutline(plot)
vtag.setMediaType('tvshow')
ext_ids = {'tmdb_id': show_info['id']}
ext_ids.update(show_info.get('external_ids', {}))
epguide_ids = _set_unique_ids(ext_ids, vtag)
vtag.setEpisodeGuide(json.dumps(epguide_ids))
if show_info.get('first_air_date'):
vtag.setYear(int(show_info['first_air_date'][:4]))
vtag.setPremiered(show_info['first_air_date'])
if full_info:
vtag.setTvShowStatus(safe_get(show_info, 'status', ''))
vtag.setGenres(_get_names(show_info.get('genres', [])))
if SOURCE_SETTINGS["SAVETAGS"]:
vtag.setTags(_get_names(show_info.get(
'keywords', {}).get('results', [])))
networks = show_info.get('networks', [])
if networks:
network = networks[0]
country = network.get('origin_country', '')
else:
network = None
country = None
if network and country and SOURCE_SETTINGS["STUDIOCOUNTRY"]:
vtag.setStudios(['{0} ({1})'.format(network['name'], country)])
elif network:
vtag.setStudios([network['name']])
if country:
vtag.setCountries([country])
content_ratings = show_info.get(
'content_ratings', {}).get('results', {})
if content_ratings:
mpaa = ''
mpaa_backup = ''
for content_rating in content_ratings:
iso = content_rating.get('iso_3166_1', '').lower()
if iso == 'us':
mpaa_backup = content_rating.get('rating')
if iso == SOURCE_SETTINGS["CERT_COUNTRY"].lower():
mpaa = content_rating.get('rating', '')
if not mpaa:
mpaa = mpaa_backup
if mpaa:
vtag.setMpaa(SOURCE_SETTINGS["CERT_PREFIX"] + mpaa)
vtag.setWriters(_get_credits(show_info))
if SOURCE_SETTINGS["ENABTRAILER"]:
trailer = _parse_trailer(show_info.get(
'videos', {}).get('results', {}))
if trailer:
vtag.setTrailer(trailer)
list_item = set_show_artwork(show_info, list_item)
_add_season_info(show_info, vtag)
_set_cast(show_info['credits']['cast'], vtag)
_set_rating(show_info, vtag)
else:
image = show_info.get('poster_path', '')
if image and not image.endswith('.svg'):
theurl = imagerooturl + image
previewurl = previewrooturl + image
vtag.addAvailableArtwork(
theurl, arttype='poster', preview=previewurl)
logger.debug('adding tv show information for %s to list item' % showname)
return list_item
def add_episode_info(list_item, episode_info, full_info=True):
# type: (ListItem, InfoType, bool) -> ListItem
"""Add episode info to a list item"""
title = episode_info.get('name', 'Episode ' +
str(episode_info['episode_number']))
vtag = list_item.getVideoInfoTag()
vtag.setTitle(title)
vtag.setSeason(episode_info['season_number'])
vtag.setEpisode(episode_info['episode_number'])
vtag.setMediaType('episode')
if safe_get(episode_info, 'air_date') is not None:
vtag.setFirstAired(episode_info['air_date'])
if full_info:
summary = safe_get(episode_info, 'overview')
if summary is not None:
plot = _clean_plot(summary)
vtag.setPlot(plot)
vtag.setPlotOutline(plot)
if safe_get(episode_info, 'air_date') is not None:
vtag.setPremiered(episode_info['air_date'])
duration = episode_info.get('runtime')
if duration:
videostream = VideoStreamDetail(duration=int(duration)*60)
vtag.addVideoStream(videostream)
_set_cast(
episode_info['season_cast'] + episode_info['credits']['guest_stars'], vtag)
ext_ids = {'tmdb_id': episode_info['id']}
ext_ids.update(episode_info.get('external_ids', {}))
_set_unique_ids(ext_ids, vtag)
_set_rating(episode_info, vtag)
for image in episode_info.get('images', {}).get('stills', []):
theurl, previewurl = get_image_urls(image)
if theurl:
vtag.addAvailableArtwork(
theurl, arttype='thumb', preview=previewurl)
vtag.setWriters(_get_credits(episode_info))
vtag.setDirectors(_get_directors(episode_info))
logger.debug('adding episode information for S%sE%s - %s to list item' %
(episode_info['season_number'], episode_info['episode_number'], title))
return list_item
def parse_nfo_url(nfo):
# type: (Text) -> Optional[UrlParseResult]
"""Extract show ID and named seasons from NFO file contents"""
# work around for xbmcgui.ListItem.addSeason overwriting named seasons from NFO files
ns_regex = r'<namedseason number="(.*)">(.*)</namedseason>'
ns_match = re.findall(ns_regex, nfo, re.I)
sid_match = None
ep_grouping = None
for regexp in SHOW_ID_REGEXPS:
logger.debug('trying regex to match service from parsing nfo:')
logger.debug(regexp)
show_id_match = re.search(regexp, nfo, re.I)
if show_id_match:
logger.debug('match group 1: ' + show_id_match.group(1))
logger.debug('match group 2: ' + show_id_match.group(2))
if show_id_match.group(1) == "themoviedb" or show_id_match.group(1) == "tmdb":
try:
ep_grouping = show_id_match.group(3)
except IndexError:
pass
tmdb_id = show_id_match.group(2)
else:
tmdb_id = _convert_ext_id(
show_id_match.group(1), show_id_match.group(2))
if tmdb_id:
logger.debug('match group 3: ' + str(ep_grouping))
sid_match = UrlParseResult('tmdb', tmdb_id, ep_grouping)
break
return sid_match, ns_match
def _convert_ext_id(ext_provider, ext_id):
# type: (Text, Text) -> Text
"""get a TMDb ID from an external ID"""
providers_dict = {'imdb': 'imdb_id',
'thetvdb': 'tvdb_id',
'tvdb': 'tvdb_id'}
show_url = FIND_URL.format(ext_id)
params = {'api_key': settings.TMDB_CLOWNCAR,
'language': SOURCE_SETTINGS["LANG_DETAILS"]}
provider = providers_dict.get(ext_provider)
if provider:
params['external_source'] = provider
show_info = api_utils.load_info(show_url, params=params)
else:
show_info = None
if show_info:
tv_results = show_info.get('tv_results')
if tv_results:
return tv_results[0].get('id')
return None
def parse_media_id(title):
# type: (Text) -> Dict
"""get the ID from a title and return with the type"""
title = title.lower()
if title.startswith('tt') and title[2:].isdigit():
# IMDB ID works alone because it is clear
return {'type': 'imdb_id', 'title': title}
# IMDB ID with prefix to match
elif title.startswith('imdb/tt') and title[7:].isdigit():
# IMDB ID works alone because it is clear
return {'type': 'imdb_id', 'title': title[5:]}
elif title.startswith('tmdb/') and title[5:].isdigit(): # TMDB ID
return {'type': 'tmdb_id', 'title': title[5:]}
elif title.startswith('tvdb/') and title[5:].isdigit(): # TVDB ID
return {'type': 'tvdb_id', 'title': title[5:]}
return None
def _parse_trailer(results):
# type: (Text) -> Text
"""create a valid Tubed or YouTube plugin trailer URL"""
if results:
if SOURCE_SETTINGS["PLAYERSOPT"] == 'tubed':
addon_player = 'plugin://plugin.video.tubed/?mode=play&video_id='
elif SOURCE_SETTINGS["PLAYERSOPT"] == 'youtube':
addon_player = 'plugin://plugin.video.youtube/play/?video_id='
backup_keys = []
for video_lang in [SOURCE_SETTINGS["LANG_DETAILS"][0:2], 'en']:
for result in results:
if result.get('site') == 'YouTube' and result.get('iso_639_1') == video_lang:
key = result.get('key')
if result.get('type') == 'Trailer':
if _check_youtube(key):
# video is available and is defined as "Trailer" by TMDB. Perfect link!
return addon_player+key
else:
# video is available, but NOT defined as "Trailer" by TMDB. Saving it as backup in case it doesn't find any perfect link.
backup_keys.append(key)
for keybackup in backup_keys:
if _check_youtube(keybackup):
return addon_player+keybackup
return None
def _check_youtube(key):
# type: (Text) -> bool
"""check to see if the YouTube key returns a valid link"""
chk_link = "https://www.youtube.com/watch?v="+key
check = api_utils.load_info(chk_link, resp_type='not_json')
if not check or "Video unavailable" in check: # video not available
return False
return True

View File

@@ -0,0 +1,115 @@
# -*- coding: UTF-8 -*-
#
# Copyright (C) 2020, Team Kodi
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# pylint: disable=missing-docstring
#
# This is based on the metadata.tvmaze scrapper by Roman Miroshnychenko aka Roman V.M.
"""
Provides a context manager that writes extended debugging info
in the Kodi log on unhandled exceptions
"""
from __future__ import absolute_import, unicode_literals
import inspect
from contextlib import contextmanager
from platform import uname
from pprint import pformat
import xbmc
from .utils import logger
try:
from typing import Text, Generator, Callable, Dict, Any # pylint: disable=unused-import
except ImportError:
pass
def _format_vars(variables):
# type: (Dict[Text, Any]) -> Text
"""
Format variables dictionary
:param variables: variables dict
:type variables: dict
:return: formatted string with sorted ``var = val`` pairs
:rtype: str
"""
var_list = [(var, val) for var, val in variables.items()
if not (var.startswith('__') or var.endswith('__'))]
var_list.sort(key=lambda i: i[0])
lines = []
for var, val in var_list:
lines.append('{0} = {1}'.format(var, pformat(val)))
return '\n'.join(lines)
@contextmanager
def debug_exception(logger_func=logger.error):
# type: (Callable[[Text], None]) -> Generator[None]
"""
Diagnostic helper context manager
It controls execution within its context and writes extended
diagnostic info to the Kodi log if an unhandled exception
happens within the context. The info includes the following items:
- System info
- Kodi version
- Module path.
- Code fragment where the exception has happened.
- Global variables.
- Local variables.
After logging the diagnostic info the exception is re-raised.
Example::
with debug_exception():
# Some risky code
raise RuntimeError('Fatal error!')
:param logger_func: logger function which must accept a single argument
which is a log message.
"""
try:
yield
except Exception as exc:
frame_info = inspect.trace(5)[-1]
logger_func(
'*** Unhandled exception detected: {} {} ***'.format(type(exc), exc))
logger_func('*** Start diagnostic info ***')
logger_func('System info: {0}'.format(uname()))
logger_func('OS info: {0}'.format(
xbmc.getInfoLabel('System.OSVersionInfo')))
logger_func('Kodi version: {0}'.format(
xbmc.getInfoLabel('System.BuildVersion')))
logger_func('File: {0}'.format(frame_info[1]))
context = ''
if frame_info[4] is not None:
for i, line in enumerate(frame_info[4], frame_info[2] - frame_info[5]):
if i == frame_info[2]:
context += '{0}:>{1}'.format(str(i).rjust(5), line)
else:
context += '{0}: {1}'.format(str(i).rjust(5), line)
logger_func('Code context:\n' + context)
logger_func('Global variables:\n' +
_format_vars(frame_info[0].f_globals))
logger_func('Local variables:\n' +
_format_vars(frame_info[0].f_locals))
logger_func('**** End diagnostic info ****')
raise exc

View File

@@ -0,0 +1,80 @@
# -*- coding: UTF-8 -*-
#
# Copyright (C) 2020, Team Kodi
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# IMDb ratings based on code in metadata.themoviedb.org.python by Team Kodi
# pylint: disable=missing-docstring
import re
import json
from . import api_utils
from . import settings
try:
from typing import Optional, Tuple, Text, Dict, List, Any # pylint: disable=unused-import
except ImportError:
pass
IMDB_RATINGS_URL = 'https://www.imdb.com/title/{}/'
IMDB_JSON_REGEX = re.compile(
r'<script type="application\/ld\+json">(.*?)<\/script>')
HEADERS = (
('User-Agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36'),
('Accept', 'application/json'),
)
def get_details(imdb_id):
# type: (Text) -> Dict
"""get the IMDB ratings details"""
if not imdb_id:
return {}
votes, rating = _get_ratinginfo(imdb_id)
return _assemble_imdb_result(votes, rating)
def _get_ratinginfo(imdb_id):
# type: (Text) -> Tuple[Text, Text]
"""get the IMDB ratings details"""
source_settings = settings.getSourceSettings()
api_utils.set_headers(dict(HEADERS))
response = api_utils.load_info(IMDB_RATINGS_URL.format(
imdb_id), default='', resp_type='text', verboselog=source_settings["VERBOSELOG"])
api_utils.set_headers({})
return _parse_imdb_result(response)
def _assemble_imdb_result(votes, rating):
# type: (Text, Text) -> Dict
"""assemble to IMDB ratings into a Dict"""
result = {}
if votes and rating:
result['ratings'] = {'imdb': {'votes': votes, 'rating': rating}}
return result
def _parse_imdb_result(input_html):
# type: (Text) -> Tuple[Text, Text]
"""parse the IMDB ratings from the JSON in the raw HTML"""
match = re.search(IMDB_JSON_REGEX, input_html)
if not match:
return None, None
imdb_json = json.loads(match.group(1))
imdb_ratings = imdb_json.get("aggregateRating", {})
rating = imdb_ratings.get("ratingValue", None)
votes = imdb_ratings.get("ratingCount", None)
return votes, rating

View File

@@ -0,0 +1,125 @@
# -*- coding: UTF-8 -*-
#
# Copyright (C) 2020, Team Kodi
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# pylint: disable=missing-docstring
import json
import sys
import urllib.parse
from .utils import logger
from . import api_utils
from xbmcaddon import Addon
from datetime import datetime, timedelta
TMDB_CLOWNCAR = 'af3a53eb387d57fc935e9128468b1899'
FANARTTV_CLOWNCAR = 'b018086af0e1478479adfc55634db97d'
TRAKT_CLOWNCAR = '90901c6be3b2de5a4fa0edf9ab5c75e9a5a0fef2b4ee7373d8b63dcf61f95697'
MAXIMAGES = 200
FANARTTV_MAPPING = {'showbackground': 'backdrops',
'tvposter': 'posters',
'tvbanner': 'banner',
'hdtvlogo': 'clearlogo',
'clearlogo': 'clearlogo',
'hdclearart': 'clearart',
'clearart': 'clearart',
'tvthumb': 'landscape',
'characterart': 'characterart',
'seasonposter': 'seasonposters',
'seasonbanner': 'seasonbanner',
'seasonthumb': 'seasonlandscape'
}
def _get_date_numeric(datetime_):
return (datetime_ - datetime(1970, 1, 1)).total_seconds()
def _get_configuration():
addon = Addon()
logger.debug('getting configuration details')
return api_utils.load_info('https://api.themoviedb.org/3/configuration', params={'api_key': TMDB_CLOWNCAR}, verboselog=addon.getSettingBool('verboselog'))
def loadBaseUrls():
addon = Addon()
image_root_url = addon.getSettingString('originalUrl')
preview_root_url = addon.getSettingString('previewUrl')
last_updated = addon.getSettingString('lastUpdated')
if not image_root_url or not preview_root_url or not last_updated or \
float(last_updated) < _get_date_numeric(datetime.now() - timedelta(days=30)):
conf = _get_configuration()
if conf:
image_root_url = conf['images']['secure_base_url'] + 'original'
preview_root_url = conf['images']['secure_base_url'] + 'w780'
addon.setSetting('originalUrl', image_root_url)
addon.setSetting('previewUrl', preview_root_url)
addon.setSetting('lastUpdated', str(
_get_date_numeric(datetime.now())))
return image_root_url, preview_root_url
def getSourceSettings():
addon = Addon()
settings = {}
logger.debug('Got settings of: {}'.format(sys.argv[2]))
try:
source_params = dict(urllib.parse.parse_qsl(sys.argv[2]))
except IndexError:
source_params = {}
source_settings = json.loads(source_params.get('pathSettings', '{}'))
settings["KEEPTITLE"] = source_settings.get(
'keeporiginaltitle', addon.getSettingBool('keeporiginaltitle'))
settings["CATLANDSCAPE"] = source_settings.get('cat_landscape', True)
settings["STUDIOCOUNTRY"] = source_settings.get('studio_country', False)
settings["ENABTRAILER"] = source_settings.get(
'enab_trailer', addon.getSettingBool('enab_trailer'))
settings["PLAYERSOPT"] = source_settings.get(
'players_opt', addon.getSettingString('players_opt')).lower()
settings["VERBOSELOG"] = source_settings.get(
'verboselog', addon.getSettingBool('verboselog'))
settings["CERT_COUNTRY"] = source_settings.get(
'tmdbcertcountry', addon.getSettingString('tmdbcertcountry')).lower()
settings["SAVETAGS"] = source_settings.get(
'keywordsastags', addon.getSettingBool('keywordsastags'))
settings["LANG_DETAILS"] = source_settings.get(
'languageDetails', addon.getSettingString('languageDetails'))
if source_settings.get('usedifferentlangforimages', addon.getSettingBool('usedifferentlangforimages')):
settings["LANG_IMAGES"] = source_settings.get(
'languageImages', addon.getSettingString('languageImages'))
else:
settings["LANG_IMAGES"] = settings["LANG_DETAILS"]
if source_settings.get('usecertprefix', addon.getSettingBool('usecertprefix')):
settings["CERT_PREFIX"] = source_settings.get(
'certprefix', addon.getSettingString('certprefix'))
else:
settings["CERT_PREFIX"] = ''
primary_rating = source_settings.get(
'ratings', addon.getSettingString('ratings')).lower()
RATING_TYPES = [primary_rating]
if source_settings.get('imdbanyway', addon.getSettingBool('imdbanyway')) and primary_rating != 'imdb':
RATING_TYPES.append('imdb')
if source_settings.get('traktanyway', addon.getSettingBool('traktanyway')) and primary_rating != 'trakt':
RATING_TYPES.append('trakt')
if source_settings.get('tmdbanyway', addon.getSettingBool('tmdbanyway')) and primary_rating != 'tmdb':
RATING_TYPES.append('tmdb')
settings["RATING_TYPES"] = RATING_TYPES
settings["FANARTTV_ENABLE"] = source_settings.get(
'enable_fanarttv', addon.getSettingBool('enable_fanarttv'))
settings["FANARTTV_CLIENTKEY"] = source_settings.get(
'fanarttv_clientkey', addon.getSettingString('fanarttv_clientkey'))
logger.debug('Sending back settings of: {}'.format(settings))
return settings

View File

@@ -0,0 +1,515 @@
# -*- coding: UTF-8 -*-
#
# Copyright (C) 2020, Team Kodi
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# pylint: disable=missing-docstring
"""Functions to interact with TMDb API"""
from __future__ import absolute_import, unicode_literals
import unicodedata
from math import floor
from pprint import pformat
from . import cache, data_utils, api_utils, settings, imdbratings, traktratings
from .utils import logger
try:
from typing import Text, Optional, Union, List, Dict, Any # pylint: disable=unused-import
InfoType = Dict[Text, Any] # pylint: disable=invalid-name
except ImportError:
pass
HEADERS = (
('User-Agent', 'Kodi TV Show scraper by Team Kodi; contact pkscout@kodi.tv'),
('Accept', 'application/json'),
)
TMDB_PARAMS = {'api_key': settings.TMDB_CLOWNCAR}
BASE_URL = 'https://api.themoviedb.org/3/{}'
EPISODE_GROUP_URL = BASE_URL.format('tv/episode_group/{}')
SEARCH_URL = BASE_URL.format('search/tv')
FIND_URL = BASE_URL.format('find/{}')
SHOW_URL = BASE_URL.format('tv/{}')
SEASON_URL = BASE_URL.format('tv/{}/season/{}')
EPISODE_URL = BASE_URL.format('tv/{}/season/{}/episode/{}')
FANARTTV_URL = 'https://webservice.fanart.tv/v3/tv/{}'
FANARTTV_PARAMS = {'api_key': settings.FANARTTV_CLOWNCAR}
def _get_params():
source_settings = settings.getSourceSettings()
params = TMDB_PARAMS.copy()
params['language'] = source_settings["LANG_DETAILS"]
return params
def search_show(title, year=None):
# type: (Text, Text) -> List
"""
Search for a single TV show
:param title: TV show title to search
: param year: the year to search (optional)
:return: a list with found TV shows
"""
source_settings = settings.getSourceSettings()
api_utils.set_headers(dict(HEADERS))
params = _get_params()
results = []
ext_media_id = data_utils.parse_media_id(title)
if ext_media_id:
logger.debug('using %s of %s to find show' %
(ext_media_id['type'], ext_media_id['title']))
if ext_media_id['type'] == 'tmdb_id':
search_url = SHOW_URL.format(ext_media_id['title'])
else:
search_url = FIND_URL.format(ext_media_id['title'])
params['external_source'] = ext_media_id['type']
else:
logger.debug('using title of %s to find show' % title)
search_url = SEARCH_URL
params['query'] = unicodedata.normalize('NFKC', title)
if year:
params['first_air_date_year'] = str(year)
resp = api_utils.load_info(
search_url, params=params, verboselog=source_settings["VERBOSELOG"])
if resp is not None:
if ext_media_id:
if ext_media_id['type'] == 'tmdb_id':
if resp.get('success') == 'false':
results = []
else:
results = [resp]
else:
results = resp.get('tv_results', [])
else:
results = resp.get('results', [])
api_utils.set_headers({})
return results
def find_by_id(unique_ids):
"""
Find a show by external IDs
:param unique_ids: dict of external IDs
:return: a list with found TV shows
"""
supported_ids = ['imdb', 'facebook', 'instagram',
'tvdb', 'tiktok', 'twitter', 'wikidata', 'youtube']
source_settings = settings.getSourceSettings()
api_utils.set_headers(dict(HEADERS))
params = _get_params()
for key, value in unique_ids.items():
if key in supported_ids:
params['external_source'] = key + '_id'
search_url = FIND_URL.format(value)
resp = api_utils.load_info(
search_url, params=params, verboselog=source_settings["VERBOSELOG"])
if resp is not None:
return resp.get('tv_results', [])
api_utils.set_headers()
return []
def load_episode_list(show_info, season_map, ep_grouping):
# type: (InfoType, Dict, Text) -> Optional[InfoType]
"""get the IMDB ratings details"""
"""Load episode list from themoviedb.org API"""
source_settings = settings.getSourceSettings()
api_utils.set_headers(dict(HEADERS))
episode_list = []
if ep_grouping is not None:
logger.debug(
'Getting episodes with episode grouping of ' + ep_grouping)
episode_group_url = EPISODE_GROUP_URL.format(ep_grouping)
custom_order = api_utils.load_info(
episode_group_url, params=TMDB_PARAMS, verboselog=source_settings["VERBOSELOG"])
if custom_order is not None:
show_info['seasons'] = []
for custom_season in custom_order.get('groups', []):
season_episodes = []
try:
current_season = season_map.get(
str(custom_season['episodes'][0]['season_number']), {}).copy()
except IndexError:
continue
current_season['name'] = custom_season['name']
current_season['season_number'] = custom_season['order']
for episode in custom_season['episodes']:
episode['org_seasonnum'] = episode['season_number']
episode['org_epnum'] = episode['episode_number']
episode['season_number'] = custom_season['order']
episode['episode_number'] = episode['order'] + 1
season_episodes.append(episode)
episode_list.append(episode)
current_season['episodes'] = season_episodes
show_info['seasons'].append(current_season)
else:
logger.debug('Getting episodes from standard season list')
show_info['seasons'] = []
for key, value in season_map.items():
show_info['seasons'].append(value)
for season in show_info.get('seasons', []):
for episode in season.get('episodes', []):
episode['org_seasonnum'] = episode['season_number']
episode['org_epnum'] = episode['episode_number']
episode_list.append(episode)
show_info['episodes'] = episode_list
api_utils.set_headers({})
return show_info
def load_show_info(show_id, ep_grouping=None, named_seasons=None):
# type: (Text, Text, Dict) -> Optional[InfoType]
"""
Get full info for a single show
:param show_id: themoviedb.org show ID
:param ep_grouping: the episode group from TMDb
:param named_seasons: the named seasons from the NFO file
:return: show info or None
"""
api_utils.set_headers(dict(HEADERS))
source_settings = settings.getSourceSettings()
if named_seasons == None:
named_seasons = []
show_info = cache.load_show_info_from_cache(show_id)
if show_info is None:
logger.debug('no cache file found, loading from scratch')
show_url = SHOW_URL.format(show_id)
params = _get_params()
params['append_to_response'] = 'credits,content_ratings,external_ids,images,videos,keywords'
params['include_image_language'] = '%s,en,null' % source_settings["LANG_IMAGES"][0:2]
params['include_video_language'] = '%s,en,null' % source_settings["LANG_IMAGES"][0:2]
show_info = api_utils.load_info(
show_url, params=params, verboselog=source_settings["VERBOSELOG"])
if show_info is None:
return None
if show_info['overview'] == '' and source_settings["LANG_DETAILS"] != 'en-US':
params['language'] = 'en-US'
del params['append_to_response']
show_info_backup = api_utils.load_info(
show_url, params=params, verboselog=source_settings["VERBOSELOG"])
if show_info_backup is not None:
show_info['overview'] = show_info_backup.get('overview', '')
params['language'] = source_settings["LANG_DETAILS"]
season_map = {}
params['append_to_response'] = 'credits,images'
for season in show_info.get('seasons', []):
season_url = SEASON_URL.format(
show_id, season.get('season_number', 0))
season_info = api_utils.load_info(
season_url, params=params, default={}, verboselog=source_settings["VERBOSELOG"])
if (season_info.get('overview', '') == '' or season_info.get('name', '').lower().startswith('season')) and source_settings["LANG_DETAILS"] != 'en-US':
params['language'] = 'en-US'
season_info_backup = api_utils.load_info(
season_url, params=params, default={}, verboselog=source_settings["VERBOSELOG"])
params['language'] = source_settings["LANG_DETAILS"]
if season_info.get('overview', '') == '':
season_info['overview'] = season_info_backup.get(
'overview', '')
if season_info.get('name', '').lower().startswith('season'):
season_info['name'] = season_info_backup.get('name', '')
# this is part of a work around for xbmcgui.ListItem.addSeasons() not respecting NFO file information
for named_season in named_seasons:
if str(named_season[0]) == str(season.get('season_number')):
logger.debug('adding season name of %s from named seasons in NFO for season %s' % (
named_season[1], season['season_number']))
season_info['name'] = named_season[1]
break
# end work around
season_info['images'] = _sort_image_types(
season_info.get('images', {}))
season_map[str(season.get('season_number', 0))] = season_info
show_info = load_episode_list(show_info, season_map, ep_grouping)
show_info['ratings'] = load_ratings(show_info)
show_info = load_fanarttv_art(show_info)
show_info['images'] = _sort_image_types(show_info.get('images', {}))
show_info = trim_artwork(show_info)
cast_check = []
cast = []
for season in reversed(show_info.get('seasons', [])):
for cast_member in season.get('credits', {}).get('cast', []):
if cast_member.get('name', '') not in cast_check:
cast.append(cast_member)
cast_check.append(cast_member.get('name', ''))
show_info['credits']['cast'] = cast
logger.debug('saving show info to the cache')
if source_settings["VERBOSELOG"]:
logger.debug(format(pformat(show_info)))
cache.cache_show_info(show_info)
else:
logger.debug('using cached show info')
api_utils.set_headers({})
return show_info
def load_episode_info(show_id, episode_id):
# type: (Text, Text) -> Optional[InfoType]
"""
Load episode info
:param show_id:
:param episode_id:
:return: episode info or None
"""
source_settings = settings.getSourceSettings()
api_utils.set_headers(dict(HEADERS))
show_info = load_show_info(show_id)
if show_info is not None:
try:
episode_info = show_info['episodes'][int(episode_id)]
except KeyError:
return None
# this ensures we are using the season/ep from the episode grouping if provided
ep_url = EPISODE_URL.format(
show_info['id'], episode_info['org_seasonnum'], episode_info['org_epnum'])
params = _get_params()
params['append_to_response'] = 'credits,external_ids,images'
params['include_image_language'] = '%s,en,null' % source_settings["LANG_IMAGES"][0:2]
ep_return = api_utils.load_info(
ep_url, params=params, verboselog=source_settings["VERBOSELOG"])
if ep_return is None:
return None
bad_return_name = False
bad_return_overview = False
check_name = ep_return.get('name')
if check_name == None:
bad_return_name = True
ep_return['name'] = 'Episode ' + \
str(episode_info['episode_number'])
elif check_name.lower().startswith('episode') or check_name == '':
bad_return_name = True
if ep_return.get('overview', '') == '':
bad_return_overview = True
if (bad_return_overview or bad_return_name) and source_settings["LANG_DETAILS"] != 'en-US':
params['language'] = 'en-US'
del params['append_to_response']
ep_return_backup = api_utils.load_info(
ep_url, params=params, verboselog=source_settings["VERBOSELOG"])
if ep_return_backup is not None:
if bad_return_overview:
ep_return['overview'] = ep_return_backup.get(
'overview', '')
if bad_return_name:
ep_return['name'] = ep_return_backup.get(
'name', 'Episode ' + str(episode_info['episode_number']))
ep_return['images'] = _sort_image_types(ep_return.get('images', {}))
ep_return['season_number'] = episode_info['season_number']
ep_return['episode_number'] = episode_info['episode_number']
ep_return['org_seasonnum'] = episode_info['org_seasonnum']
ep_return['org_epnum'] = episode_info['org_epnum']
ep_return['ratings'] = load_ratings(
ep_return, show_imdb_id=show_info.get('external_ids', {}).get('imdb_id'))
for season in show_info.get('seasons', []):
if season.get('season_number') == episode_info['season_number']:
ep_return['season_cast'] = season.get(
'credits', {}).get('cast', [])
break
show_info['episodes'][int(episode_id)] = ep_return
cache.cache_show_info(show_info)
api_utils.set_headers({})
return ep_return
api_utils.set_headers({})
return None
def load_ratings(the_info, show_imdb_id=''):
# type: (InfoType, Text) -> Dict
"""
Load the ratings for the show/episode
:param the_info: show or episode info
:param show_imdb_id: show IMDB
:return: ratings or empty dict
"""
source_settings = settings.getSourceSettings()
ratings = {}
imdb_id = the_info.get('external_ids', {}).get('imdb_id')
for rating_type in source_settings["RATING_TYPES"]:
logger.debug('setting rating using %s' % rating_type)
if rating_type == 'tmdb':
ratings['tmdb'] = {'votes': the_info['vote_count'],
'rating': the_info['vote_average']}
elif rating_type == 'imdb' and imdb_id:
imdb_rating = imdbratings.get_details(imdb_id).get('ratings')
if imdb_rating:
ratings.update(imdb_rating)
elif rating_type == 'trakt':
if show_imdb_id:
season = the_info['org_seasonnum']
episode = the_info['org_epnum']
resp = traktratings.get_details(
show_imdb_id, season=season, episode=episode)
else:
resp = traktratings.get_details(imdb_id)
trakt_rating = resp.get('ratings')
if trakt_rating:
ratings.update(trakt_rating)
logger.debug('returning ratings of\n{}'.format(pformat(ratings)))
return ratings
def load_fanarttv_art(show_info):
# type: (InfoType) -> Optional[InfoType]
"""
Add fanart.tv images for a show
:param show_info: the current show info
:return: show info
"""
source_settings = settings.getSourceSettings()
api_utils.set_headers(dict(HEADERS))
if source_settings["FANARTTV_CLIENTKEY"]:
FANARTTV_PARAMS['client_key'] = source_settings["FANARTTV_CLIENTKEY"]
tvdb_id = show_info.get('external_ids', {}).get('tvdb_id')
if tvdb_id and source_settings["FANARTTV_ENABLE"]:
fanarttv_url = FANARTTV_URL.format(tvdb_id)
artwork = api_utils.load_info(
fanarttv_url, params=FANARTTV_PARAMS, verboselog=source_settings["VERBOSELOG"])
if artwork is None:
return show_info
for fanarttv_type, tmdb_type in settings.FANARTTV_MAPPING.items():
if not show_info['images'].get(tmdb_type) and not tmdb_type.startswith('season'):
show_info['images'][tmdb_type] = []
for item in artwork.get(fanarttv_type, []):
lang = item.get('lang')
if lang == '' or lang == '00':
lang = None
filepath = ''
if lang is None or lang == source_settings["LANG_DETAILS"][0:2] or lang == 'en':
filepath = item.get('url')
if filepath:
if tmdb_type.startswith('season'):
image_type = tmdb_type[6:]
for s in range(len(show_info.get('seasons', []))):
season_num = show_info['seasons'][s]['season_number']
artseason = item.get('season', '')
if not show_info['seasons'][s].get('images'):
show_info['seasons'][s]['images'] = {}
if not show_info['seasons'][s]['images'].get(image_type):
show_info['seasons'][s]['images'][image_type] = []
if artseason == '' or artseason == str(season_num):
show_info['seasons'][s]['images'][image_type].append(
{'file_path': filepath, 'type': 'fanarttv', 'iso_639_1': lang})
else:
show_info['images'][tmdb_type].append(
{'file_path': filepath, 'type': 'fanarttv', 'iso_639_1': lang})
api_utils.set_headers({})
return show_info
def trim_artwork(show_info):
# type: (InfoType) -> Optional[InfoType]
"""
Trim artwork to keep the text blob below 65K characters
:param show_info: the current show info
:return: show info
"""
image_counts = {}
image_total = 0
backdrops_total = 0
for image_type, image_list in show_info.get('images', {}).items():
total = len(image_list)
if image_type == 'backdrops':
backdrops_total = backdrops_total + total
else:
image_counts[image_type] = {'total': total}
image_total = image_total + total
for season in show_info.get('seasons', []):
for image_type, image_list in season.get('images', {}).items():
total = len(image_list)
thetype = '%s_%s' % (str(season['season_number']), image_type)
image_counts[thetype] = {'total': total}
image_total = image_total + total
if image_total <= settings.MAXIMAGES and backdrops_total <= settings.MAXIMAGES:
return show_info
if backdrops_total > settings.MAXIMAGES:
logger.error('there are %s fanart images' % str(backdrops_total))
logger.error('that is more than the max of %s, image results will be trimmed to the max' % str(
settings.MAXIMAGES))
reduce = -1 * (backdrops_total - settings.MAXIMAGES)
del show_info['images']['backdrops'][reduce:]
if image_total > settings.MAXIMAGES:
reduction = (image_total - settings.MAXIMAGES)/image_total
logger.error('there are %s non-fanart images' % str(image_total))
logger.error('that is more than the max of %s, image results will be trimmed by %s' % (
str(settings.MAXIMAGES), str(reduction)))
for key, value in image_counts.items():
image_counts[key]['reduce'] = -1 * \
int(floor(value['total'] * reduction))
logger.debug('%s: %s' % (key, pformat(image_counts[key])))
for image_type, image_list in show_info.get('images', {}).items():
if image_type == 'backdrops':
continue # already handled backdrops above
reduce = image_counts[image_type]['reduce']
if reduce != 0:
del show_info['images'][image_type][reduce:]
for s in range(len(show_info.get('seasons', []))):
for image_type, image_list in show_info['seasons'][s].get('images', {}).items():
thetype = '%s_%s' % (
str(show_info['seasons'][s]['season_number']), image_type)
reduce = image_counts[thetype]['reduce']
if reduce != 0:
del show_info['seasons'][s]['images'][image_type][reduce:]
return show_info
def _sort_image_types(imagelist):
# type: (Dict) -> Dict
"""
sort the images by language
:param imagelist:
:return: imagelist
"""
for image_type, images in imagelist.items():
imagelist[image_type] = _image_sort(images, image_type)
return imagelist
def _image_sort(images, image_type):
# type: (List, Text) -> List
"""
sort the images by language
:param images:
:param image_type:
:return: list of images
"""
source_settings = settings.getSourceSettings()
lang_pref = []
lang_null = []
lang_en = []
firstimage = True
for image in images:
image_lang = image.get('iso_639_1')
if image_lang == source_settings["LANG_DETAILS"][0:2]:
lang_pref.append(image)
elif image_lang == 'en':
lang_en.append(image)
else:
if firstimage:
lang_pref.append(image)
else:
lang_null.append(image)
firstimage = False
if image_type == 'posters':
return lang_pref + lang_en + lang_null
else:
return lang_pref + lang_null + lang_en

View File

@@ -0,0 +1,67 @@
# -*- coding: UTF-8 -*-
#
# Copyright (C) 2020, Team Kodi
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# pylint: disable=missing-docstring
"""Functions to interact with Trakt API"""
from __future__ import absolute_import, unicode_literals
from . import api_utils, settings
try:
from typing import Text, Optional, Union, List, Dict, Any # pylint: disable=unused-import
except ImportError:
pass
HEADERS = (
('User-Agent', 'Kodi TV Show scraper by Team Kodi; contact pkscout@kodi.tv'),
('Accept', 'application/json'),
('trakt-api-key', settings.TRAKT_CLOWNCAR),
('trakt-api-version', '2'),
('Content-Type', 'application/json'),
)
SHOW_URL = 'https://api.trakt.tv/shows/{}'
EP_URL = SHOW_URL + '/seasons/{}/episodes/{}/ratings'
def get_details(imdb_id, season=None, episode=None):
# type: (Text, Text, Text) -> Dict
"""
get the Trakt ratings
:param imdb_id:
:param season:
:param episode:
:return: trackt ratings
"""
source_settings = settings.getSourceSettings()
api_utils.set_headers(dict(HEADERS))
result = {}
if season and episode:
url = EP_URL.format(imdb_id, season, episode)
params = None
else:
url = SHOW_URL.format(imdb_id)
params = {'extended': 'full'}
resp = api_utils.load_info(
url, params=params, default={}, verboselog=source_settings["VERBOSELOG"])
rating = resp.get('rating')
votes = resp.get('votes')
if votes and rating:
result['ratings'] = {'trakt': {'votes': votes, 'rating': rating}}
api_utils.set_headers({})
return result

View File

@@ -0,0 +1,72 @@
# -*- coding: UTF-8 -*-
#
# Copyright (C) 2020, Team Kodi
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# pylint: disable=missing-docstring
"""Misc utils"""
from __future__ import absolute_import, unicode_literals
import xbmc
from xbmcaddon import Addon
try:
from typing import Text, Optional, Any, Dict # pylint: disable=unused-import
except ImportError:
pass
ADDON_ID = 'metadata.tvshows.themoviedb.org.python'
ADDON = Addon()
class logger:
log_message_prefix = '[{} ({})]: '.format(
ADDON_ID, ADDON.getAddonInfo('version'))
@staticmethod
def log(message, level=xbmc.LOGDEBUG):
# type: (Text, int) -> None
if isinstance(message, bytes):
message = message.decode('utf-8')
message = logger.log_message_prefix + message
xbmc.log(message, level)
@staticmethod
def info(message):
# type: (Text) -> None
logger.log(message, xbmc.LOGINFO)
@staticmethod
def error(message):
# type: (Text) -> None
logger.log(message, xbmc.LOGERROR)
@staticmethod
def debug(message):
# type: (Text) -> None
logger.log(message, xbmc.LOGDEBUG)
def safe_get(dct, key, default=None):
# type: (Dict[Text, Any], Text, Any) -> Any
"""
Get a key from dict
Returns the respective value or default if key is missing or the value is None.
"""
if key in dct and dct[key] is not None:
return dct[key]
return default