Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Solved outstanding issues mentioned in snscrape#413 #8

Open
wants to merge 8 commits into
base: more-tg-info
Choose a base branch
from
93 changes: 48 additions & 45 deletions snscrape/modules/telegram.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def _soup_to_items(self, soup, pageUrl, onlyUsername = False):
imageUrls = _STYLE_MEDIA_URL_PATTERN.findall(style)
if len(imageUrls) == 1:
media.append(Photo(url = imageUrls[0]))
continue

if _SINGLE_MEDIA_LINK_PATTERN.match(link['href']):
style = link.attrs.get('style', '')
imageUrls = _STYLE_MEDIA_URL_PATTERN.findall(style)
Expand All @@ -161,48 +161,23 @@ def _soup_to_items(self, soup, pageUrl, onlyUsername = False):
# resp = self._get(image[0])
# encoded_string = base64.b64encode(resp.content)
# Individual photo or video link
continue
john-osullivan marked this conversation as resolved.
Show resolved Hide resolved

if link.text.startswith('@'):
mentions.append(link.text.strip('@'))
continue

if link.text.startswith('#'):
hashtags.append(link.text.strip('#'))
continue

if 'tgme_widget_message_voice_player' in link.get('class', []):
media.append(_parse_voice_message(link))
john-osullivan marked this conversation as resolved.
Show resolved Hide resolved

if 'tgme_widget_message_video_player' in link.get('class', []):
media.append(_parse_video_message(link))

href = urllib.parse.urljoin(pageUrl, link['href'])
if (href not in outlinks) and (href != rawUrl) and (href != forwardedUrl):
outlinks.append(href)

for voicePlayer in post.find_all('a', {'class': 'tgme_widget_message_voice_player'}):
audioUrl = voicePlayer.find('audio')['src']
durationStr = voicePlayer.find('time').text
duration = _durationStrToSeconds(durationStr)
barHeights = [float(s['style'].split(':')[-1].strip(';%')) for s in voicePlayer.find('div', {'class': 'bar'}).find_all('s')]

media.append(VoiceMessage(url = audioUrl, duration = duration, bars = barHeights))

for videoPlayer in post.find_all('a', {'class': 'tgme_widget_message_video_player'}):
iTag = videoPlayer.find('i')
if iTag is None:
videoUrl = None
videoThumbnailUrl = None
else:
style = iTag['style']
videoThumbnailUrl = _STYLE_MEDIA_URL_PATTERN.findall(style)[0]
videoTag = videoPlayer.find('video')
videoUrl = None if videoTag is None else videoTag['src']
mKwargs = {
'thumbnailUrl': videoThumbnailUrl,
'url': videoUrl,
}
timeTag = videoPlayer.find('time')
if timeTag is None:
cls = Gif
else:
cls = Video
durationStr = videoPlayer.find('time').text
mKwargs['duration'] = _durationStrToSeconds(durationStr)
media.append(cls(**mKwargs))

linkPreview = None
if (linkPreviewA := post.find('a', class_ = 'tgme_widget_message_link_preview')):
kwargs = {}
Expand All @@ -219,8 +194,6 @@ def _soup_to_items(self, soup, pageUrl, onlyUsername = False):
else:
_logger.warning(f'Could not process link preview image on {url}')
linkPreview = LinkPreview(**kwargs)
if kwargs['href'] in outlinks:
outlinks.remove(kwargs['href'])
john-osullivan marked this conversation as resolved.
Show resolved Hide resolved

viewsSpan = post.find('span', class_ = 'tgme_widget_message_views')
views = None if viewsSpan is None else _parse_num(viewsSpan.text)
Expand All @@ -240,20 +213,20 @@ def get_items(self):
nextPageUrl = ''
while True:
yield from self._soup_to_items(soup, r.url)
try:
if soup.find('a', attrs = {'class': 'tgme_widget_message_date'}, href = True)['href'].split('/')[-1] == '1':
dateElt = soup.find('a', attrs = {'class': 'tgme_widget_message_date'}, href = True)
if dateElt and 'href' in dateElt.attrs:
john-osullivan marked this conversation as resolved.
Show resolved Hide resolved
urlPieces = dateElt['href'].split('/')
if urlPieces and urlPieces[-1] == '1':
# if message 1 is the first message in the page, terminate scraping
break
except:
pass
pageLink = soup.find('a', attrs = {'class': 'tme_messages_more', 'data-before': True})
if not pageLink:
# some pages are missing a "tme_messages_more" tag, causing early termination
if '=' not in nextPageUrl:
nextPageUrl = soup.find('link', attrs = {'rel': 'canonical'}, href = True)['href']
nextPostIndex = int(nextPageUrl.split('=')[-1]) - 20
nextPageUrl = soup.find('link', attrs = {'rel': 'prev'}, href = True)['href']
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shifting to using the prev tag addresses the duplicates issue caused by media getting a post ID, and also lets us remove the index math. Still calculating the next post index to determine whether it makes sense to fetch another page.

nextPostIndex = int(nextPageUrl.split('=')[-1])
if nextPostIndex > 20:
pageLink = {'href': nextPageUrl.split('=')[0] + f'={nextPostIndex}'}
pageLink = {'href': nextPageUrl}
else:
break
nextPageUrl = urllib.parse.urljoin(r.url, pageLink['href'])
Expand Down Expand Up @@ -333,4 +306,34 @@ def _telegramResponseOkCallback(r):
if r.status_code == 200:
return (True, None)
return (False, f'{r.status_code=}')


def _parse_voice_message(voicePlayer):
audioUrl = voicePlayer.find('audio')['src']
durationStr = voicePlayer.find('time').text
duration = _durationStrToSeconds(durationStr)
barHeights = [float(s['style'].split(':')[-1].strip(';%')) for s in voicePlayer.find('div', {'class': 'bar'}).find_all('s')]
return VoiceMessage(url = audioUrl, duration = duration, bars = barHeights)

def _parse_video_message(videoPlayer):
iTag = videoPlayer.find('i')
if iTag is None:
videoUrl = None
videoThumbnailUrl = None
else:
style = iTag['style']
videoThumbnailUrl = _STYLE_MEDIA_URL_PATTERN.findall(style)[0]
videoTag = videoPlayer.find('video')
videoUrl = None if videoTag is None else videoTag['src']
mKwargs = {
'thumbnailUrl': videoThumbnailUrl,
'url': videoUrl,
}
timeTag = videoPlayer.find('time')
if timeTag is None:
# Example of duration-less GIF: https://t.me/thisisatestchannel19451923/3
cls = Gif
else:
cls = Video
durationStr = videoPlayer.find('time').text
mKwargs['duration'] = _durationStrToSeconds(durationStr)
return cls(**mKwargs)