forked from bensonk/pine-siskin
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstreaming_twitter.py
99 lines (90 loc) · 3.38 KB
/
streaming_twitter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
#!/usr/bin/python
import time, socket, json
import oauth2 as oauth
import urllib2 as urllib
from urllib import urlencode
from config import config
from models import *
socket._fileobject.default_bufsize = 0
update_endpoint = 'https://twitter.com/statuses/update.json'
class TwitterClient(object):
def __init__(self):
self.consumer = oauth.Consumer(key=config['consumer']['key'], secret=config['consumer']['secret'])
self.token = oauth.Token(config['auth']['oauth_token'], config['auth']['oauth_token_secret'])
self.tweets = []
self.friends = []
self.client = oauth.Client(self.consumer, self.token)
def handle_init(self, friend_json):
try:
self.friends.extend(json.loads(friend_json)['friends'])
except:
print "Error fetching initial friends list"
def tweet(self, text, msg_dict={}):
if len(text) > 140:
raise ValueError('Tweet was too long')
msg_dict['status'] = text
message_body = urlencode(msg_dict)
resp, content = self.client.request(update_endpoint, 'POST', message_body, include_body_hash=False)
print 'Response: {}'.format(resp)
print 'Response content: {}'.format(content)
def handle_message(self, tweet_json, callback):
try:
message = json.loads(tweet_json)
if 'event' in message:
event = Event(message)
callback(event)
elif 'delete' in message:
deletion = Deletion(message)
callback(deletion)
elif 'retweeted_status' in message:
tweet = Tweet(message['retweeted_status'])
self.tweets.append(tweet)
callback(tweet)
else:
tweet = Tweet(message)
self.tweets.append(tweet)
callback(tweet)
except ValueError as e:
# This is to handle empty messages and unexpected characters more
# gracefully. We might want to have some sort of logging facility here.
pass
def watch(self, url, callback):
error_count = 0
while True:
try:
last_connection = time.time()
tweet_stream = self.get(url)
self.handle_init(tweet_stream.readline())
for line in tweet_stream:
self.handle_message(line, callback)
except IOError as error:
error_count += 1
print "Connection failed with error '{}'. Error count is {}".format(error, error_count)
delay = time.time() - last_connection
if delay < 120:
# Mandated exponential backoff
time.sleep(2 ** error_count)
else:
# If we've successfully kept a connection open for at least 2
# minutes, we can reset our error count.
error_count = 0
def get(self, url):
# Set parameters
params = {
'oauth_version': "1.0",
'oauth_nonce': oauth.generate_nonce(),
'oauth_timestamp': int(time.time()),
'oauth_token': self.token.key,
'oauth_consumer_key': self.consumer.key
}
# Build and sign request
oauth_req = oauth.Request(method="GET", url=url, parameters=params)
signature_method = oauth.SignatureMethod_HMAC_SHA1()
oauth_req.sign_request(signature_method, self.consumer, self.token, include_body_hash=False)
# Make a urllib request object and open the connection
headers = oauth_req.to_header()
headers['User-agent'] = "Pine Siskin/0.1"
headers['Accept-Encoding'] = "*/*"
req = urllib.Request(url=url, headers=headers)
connection = urllib.urlopen(req)
return connection