[mastodon] oh haruhi what did I NOT do here

+ --force-use-mastodon option
+ logging in to mastodon/pleroma
+ fetching posts via different mastodon/pleroma instances to get follower-only/direct posts
+ fetching peertube videos via pleroma instances to circuvument censorship (?)
merge-requests/5/head
Lauren Liberda 2021-05-03 01:41:05 +02:00
parent 76d4e8de92
commit 5c054ee942
4 changed files with 201 additions and 8 deletions

View File

@ -787,6 +787,10 @@ class HaruhiDL(object):
if not ie_key and force_generic_extractor:
ie_key = 'Generic'
force_use_mastodon = self.params.get('force_use_mastodon')
if not ie_key and force_use_mastodon:
ie_key = 'MastodonSH'
if not ie_key:
ie_key = self.params.get('ie_key')
@ -796,7 +800,7 @@ class HaruhiDL(object):
ies = self._ies
for ie in ies:
if not ie.suitable(url):
if not force_use_mastodon and not ie.suitable(url):
continue
ie = self.get_info_extractor(ie.ie_key())

View File

@ -176,6 +176,8 @@ def _real_main(argv=None):
opts.max_sleep_interval = opts.sleep_interval
if opts.ap_mso and opts.ap_mso not in MSO_INFO:
parser.error('Unsupported TV Provider, use --ap-list-mso to get a list of supported TV Providers')
if opts.force_generic_extractor and opts.force_use_mastodon:
parser.error('force either generic extractor or Mastodon')
def parse_retries(retries):
if retries in ('inf', 'infinite'):
@ -348,6 +350,7 @@ def _real_main(argv=None):
'restrictfilenames': opts.restrictfilenames,
'ignoreerrors': opts.ignoreerrors,
'force_generic_extractor': opts.force_generic_extractor,
'force_use_mastodon': opts.force_use_mastodon,
'ie_key': opts.ie_key,
'ratelimit': opts.ratelimit,
'nooverwrites': opts.nooverwrites,

View File

@ -5,12 +5,24 @@ from .common import SelfhostedInfoExtractor
from ..utils import (
clean_html,
float_or_none,
int_or_none,
str_or_none,
try_get,
unescapeHTML,
ExtractorError,
)
from urllib.parse import (
parse_qs,
urlencode,
urlparse,
)
import json
import re
from .peertube import PeerTubeSHIE
class MastodonSHIE(SelfhostedInfoExtractor):
"""
@ -23,6 +35,7 @@ class MastodonSHIE(SelfhostedInfoExtractor):
"""
IE_NAME = 'mastodon'
_VALID_URL = r'mastodon:(?P<host>[^:]+):(?P<id>.+)'
_NETRC_MACHINE = 'mastodon'
_SH_VALID_URL = r'''(?x)
https?://
(?P<host>[^/\s]+)/
@ -107,20 +120,176 @@ class MastodonSHIE(SelfhostedInfoExtractor):
},
}]
def _determine_instance_software(self, host, webpage=None):
if webpage:
for i, string in enumerate(self._SH_VALID_CONTENT_STRINGS):
if string in webpage:
return ['mastodon', 'mastodon', 'pleroma', 'pleroma', 'pleroma', 'gab'][i]
if any(s in webpage for s in PeerTubeSHIE._SH_VALID_CONTENT_STRINGS):
return 'peertube'
nodeinfo_href = self._download_json(
f'https://{host}/.well-known/nodeinfo', host, 'Downloading instance nodeinfo link')
nodeinfo = self._download_json(
nodeinfo_href['links'][-1]['href'], host, 'Downloading instance nodeinfo')
return nodeinfo['software']['name']
def _login(self):
username, password = self._get_login_info()
if not username:
return False
# very basic regex, but the instance domain (the one where user has an account)
# must be separated from the user login
mobj = re.match(r'^(?P<username>[^@]+(?:@[^@]+)?)@(?P<instance>.+)$', username)
if not mobj:
self.report_warning(
'Invalid login format - must be in format [username or email]@[instance]')
username, instance = mobj.group('username', 'instance')
app_info = self._download_json(
f'https://{instance}/api/v1/apps', None, 'Creating an app', headers={
'Content-Type': 'application/json',
}, data=bytes(json.dumps({
'client_name': 'haruhi-dl',
'redirect_uris': 'urn:ietf:wg:oauth:2.0:oob',
'scopes': 'read',
'website': 'https://haruhi.download',
}).encode('utf-8')))
login_webpage = self._download_webpage(
f'https://{instance}/oauth/authorize', None, 'Downloading login page', query={
'client_id': app_info['client_id'],
'scope': 'read',
'redirect_uri': 'urn:ietf:wg:oauth:2.0:oob',
'response_type': 'code',
})
oauth_token = None
# this needs to be codebase-specific, as the HTML page differs between codebases
if 'xlink:href="#mastodon-svg-logo-full"' in login_webpage:
# mastodon
if '@' not in username:
self.report_warning(
'Invalid login format - for Mastodon instances e-mail address is required')
login_form = self._hidden_inputs(login_webpage)
login_form['user[email]'] = username
login_form['user[password]'] = password
login_req = self._download_webpage(
f'https://{instance}/auth/sign_in', None, 'Sending login details',
headers={
'Content-Type': 'application/x-www-form-urlencoded',
}, data=bytes(urlencode(login_form).encode('utf-8')))
auth_form = self._hidden_inputs(
self._search_regex(
r'(?s)(<form\b[^>]+>.+?>Authorize</.+?</form>)',
login_req, 'authorization form'))
_, urlh = self._download_webpage_handle(
f'https://{instance}/oauth/authorize', None, 'Confirming authorization',
headers={
'Content-Type': 'application/x-www-form-urlencoded',
}, data=bytes(urlencode(auth_form).encode('utf-8')))
oauth_token = parse_qs(urlparse(urlh.url).query)['code'][0]
elif 'content: "\\fe0e";' in login_webpage:
# pleroma
login_form = self._hidden_inputs(login_webpage)
login_form['authorization[scope][]'] = 'read'
login_form['authorization[name]'] = username
login_form['authorization[password]'] = password
login_req = self._download_webpage(
f'https://{instance}/oauth/authorize', None, 'Sending login details',
headers={
'Content-Type': 'application/x-www-form-urlencoded',
}, data=bytes(urlencode(login_form).encode('utf-8')))
# TODO: 2FA, error handling
oauth_token = self._search_regex(
r'<h2>\s*Token code is\s*<br>\s*([a-zA-Z\d_-]+)\s*</h2>',
login_req, 'oauth token')
else:
raise ExtractorError('Unknown instance type')
actual_token = self._download_json(
f'https://{instance}/oauth/token', None, 'Downloading the actual token',
headers={
'Content-Type': 'application/x-www-form-urlencoded',
}, data=bytes(urlencode({
'client_id': app_info['client_id'],
'client_secret': app_info['client_secret'],
'redirect_uri': 'urn:ietf:wg:oauth:2.0:oob',
'scope': 'read',
'code': oauth_token,
'grant_type': 'authorization_code',
}).encode('utf-8')))
return {
'instance': instance,
'authorization': f"{actual_token['token_type']} {actual_token['access_token']}",
}
def _selfhosted_extract(self, url, webpage=None):
mobj = re.match(self._VALID_URL, url)
ap_censorship_circuvement = False
if not mobj:
mobj = re.match(self._SH_VALID_URL, url)
if not mobj and self._downloader.params.get('force_use_mastodon'):
mobj = re.match(PeerTubeSHIE._VALID_URL, url)
if mobj:
ap_censorship_circuvement = 'peertube'
if not mobj and self._downloader.params.get('force_use_mastodon'):
mobj = re.match(PeerTubeSHIE._SH_VALID_URL, url)
if mobj:
ap_censorship_circuvement = 'peertube'
if not mobj:
raise ExtractorError('Unrecognized url type')
host, id = mobj.group('host', 'id')
if any(frag in url for frag in ('/objects/', '/activities/')):
if not webpage:
webpage = self._download_webpage(url, '%s:%s' % (host, id), expected_status=302)
real_url = self._og_search_property('url', webpage, default=None)
if real_url:
return self.url_result(real_url, ie='MastodonSH')
login_info = self._login()
metadata = self._download_json('https://%s/api/v1/statuses/%s' % (host, id), '%s:%s' % (host, id))
if login_info and host != login_info['instance']:
wf_url = url
if not url.startswith('http'):
software = ap_censorship_circuvement
if not software:
software = self._determine_instance_software(host, webpage)
url_part = None
if software == 'pleroma':
if '-' in id: # UUID
url_part = 'objects'
else:
url_part = 'notice'
elif software == 'peertube':
url_part = 'videos/watch'
elif software in ('mastodon', 'gab'):
# mastodon and gab social require usernames in the url,
# but we can't determine the username without fetching the post,
# but we can't fetch the post without determining the username...
raise ExtractorError(f'Use the full url with --force-use-mastodon to download from {software}', expected=True)
else:
raise ExtractorError(f'Unknown software: {software}')
wf_url = f'https://{host}/{url_part}/{id}'
search = self._download_json(
f"https://{login_info['instance']}/api/v2/search", '%s:%s' % (host, id),
query={
'q': wf_url,
'type': 'statuses',
'resolve': True,
}, headers={
'Authorization': login_info['authorization'],
})
assert len(search['statuses']) == 1
metadata = search['statuses'][0]
else:
if not login_info and any(frag in url for frag in ('/objects/', '/activities/')):
if not webpage:
webpage = self._download_webpage(url, '%s:%s' % (host, id), expected_status=302)
real_url = self._og_search_property('url', webpage, default=None)
if real_url:
return self.url_result(real_url, ie='MastodonSH')
metadata = self._download_json(
'https://%s/api/v1/statuses/%s' % (host, id), '%s:%s' % (host, id),
headers={
'Authorization': login_info['authorization'],
} if login_info else {})
if not metadata['media_attachments']:
raise ExtractorError('No attached medias')
@ -134,11 +303,20 @@ class MastodonSHIE(SelfhostedInfoExtractor):
'url': str_or_none(media['url']),
'thumbnail': str_or_none(media['preview_url']) if media['type'] == 'video' else None,
'vcodec': 'none' if media['type'] == 'audio' else None,
'duration': float_or_none(try_get(media, lambda x: x['meta']['original']['duration'])),
'width': int_or_none(try_get(media, lambda x: x['meta']['original']['width'])),
'height': int_or_none(try_get(media, lambda x: x['meta']['original']['height'])),
'tbr': int_or_none(try_get(media, lambda x: x['meta']['original']['bitrate'])),
})
if len(entries) == 0:
raise ExtractorError('No audio/video attachments')
title = '%s - %s' % (str_or_none(metadata['account'].get('display_name') or metadata['account']['acct']), clean_html(str_or_none(metadata['content'])))
if ap_censorship_circuvement == 'peertube':
title = unescapeHTML(
self._search_regex(
r'^<p><a href="[^"]+">(.+?)</a></p>',
metadata['content'], 'video title'))
info_dict = {
"id": id,

View File

@ -401,6 +401,13 @@ def parseOpts(overrideArguments=None):
action='store_true', dest='ap_list_mso', default=False,
help='List all supported multiple-system operators')
selfhosted_ie = optparse.OptionGroup(parser, 'Selfhosted IE Options')
selfhosted_ie.add_option(
'--force-use-mastodon',
action='store_true', dest='force_use_mastodon', default=False,
help='Force use the Mastodon extractor (to get follower-only/direct posts, '
'or circuvement PeerTube censorship via Pleroma; both require logging in)')
video_format = optparse.OptionGroup(parser, 'Video Format Options')
video_format.add_option(
'-f', '--format',
@ -904,6 +911,7 @@ def parseOpts(overrideArguments=None):
parser.add_option_group(video_format)
parser.add_option_group(subtitles)
parser.add_option_group(authentication)
parser.add_option_group(selfhosted_ie)
parser.add_option_group(adobe_pass)
parser.add_option_group(postproc)