repl.it
@mat1/

twitter bot

Python

No description

fork
loading
Files
  • main.py
  • alive.py
  • setup.py
  • Packager files
  • poetry.lock
  • pyproject.toml
  • requirements.txt
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import setup
import asyncio
from peony import PeonyClient
import repltalk
import motor.motor_asyncio
import alive
import os

loop = asyncio.get_event_loop()

if not os.getenv('consumer_key'):
  print('Go to https://twitter.com/replittalk to see the bot in action :)')
  exit()

consumer_key = os.getenv('consumer_key')
consumer_secret = os.getenv('consumer_secret')
access_token = os.getenv('access_token')
access_secret = os.getenv('access_secret')

dbuser = os.getenv('dbuser')
dbpass = os.getenv('dbpass')

print('started program')


conn = motor.motor_asyncio.AsyncIOMotorClient(
	f'mongodb+srv://{dbuser}:{dbpass}@cluster0-pugod.mongodb.net/test?retryWrites=true&w=majority'
)
db = conn['repl-talk-twitter']
async def get_posts():
	posts = await db.data.find_one({
		'_id': 'posts'
	})
	return set(posts['posts'])

async def add_post(post_id):
	posts = await db.data.update_one({
		'_id': 'posts'
	}, {
		'$push': {'posts': post_id}
	})
checked_posts = loop.run_until_complete(get_posts())
print(checked_posts)
print('gotten already checked posts')

twitter = PeonyClient(
	consumer_key=consumer_key,
	consumer_secret=consumer_secret,
	access_token=access_token,
	access_token_secret=access_secret
)

print('logged in')

talk = repltalk.Client()

async def main():
	print('started checking posts')
	while True:
		try:
			async for post in talk.boards.all.get_posts(limit=10, sort='hot'):
				if post.votes >= 10:
					if post.id not in checked_posts:
						print(post.url, post.votes)
						if post.votes > 15:
							message = f'{post.title} by {post.author.name} is trending with {post.votes} upvotes! {post.url}'
						else:
							message = f'{post.title} by {post.author.name} is trending! {post.url}'
						await twitter.api.statuses.update.post(
							status=message
						)
						checked_posts.add(post.id)
						await add_post(post.id)
						await asyncio.sleep(60)
		except Exception as e:
			print('error', e, type(e))
		await asyncio.sleep(60 * 10)


alive.start_server(loop)
loop.run_until_complete(main())