Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,18 @@ res = search.run(
)
```

#### Monitor User Mentions

```python
from twitter.search import Search

email, username, password = ..., ..., ...
search = Search(email, username, password, save=True, debug=1)
user_mentions = search.get_user_mentions(x_handle="MY_X_HANDLE", search_count=30)
for tweet in user_mentions:
pritn(tweet)
```

**Search Operators Reference**

https://developer.twitter.com/en/docs/twitter-api/v1/rules-and-filtering/search-operators
Expand Down
24 changes: 22 additions & 2 deletions twitter/account.py
Original file line number Diff line number Diff line change
Expand Up @@ -796,16 +796,36 @@ def clear_draft_tweets(self) -> None:
if _id != user_id:
self.gql('POST', Operation.DeleteDraftTweet, {'draft_tweet_id': _id})

def notifications(self, params: dict = None) -> dict:
def notifications(self, params: dict = None, type: str = "all") -> dict:
"""
type: all, verified, mentions
"""
r = self.session.get(
f'{self.v2_api}/notifications/all.json',
f'{self.v2_api}/notifications/{type}.json',
headers=get_headers(self.session),
params=params or live_notification_params
)
if self.debug:
log(self.logger, self.debug, r)
return r.json()

# def get_new_user_mentions_outdated(self, x_handle: str, last_mention_id: int = 0) -> dict:
# """
# Monitor user mentions via 'mentions' type notifications.
# """
# last_id = last_mention_id
# notifications = self.notifications(type='mentions')
# mentions = notifications.get('globalObjects', {}).get('tweets', {})
# new_mentions = []
# for tweet_id, tweet in mentions.items():
# if last_id and int(tweet_id) <= int(last_id):
# continue
# if f'@{x_handle.lower()}' in tweet.get('full_text', '').lower():
# new_mentions.append((tweet_id, tweet))
# # Sort by tweet_id ascending (oldest first)
# new_mentions.sort(key=lambda x: int(x[0]))
# return new_mentions

def recommendations(self, params: dict = None) -> dict:
r = self.session.get(
f'{self.v1_api}/users/recommendations.json',
Expand Down
31 changes: 31 additions & 0 deletions twitter/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,34 @@ def save_cookies(self, fname: str = None):
""" Save cookies to file """
cookies = self.session.cookies
Path(f'{fname or cookies.get("username")}.cookies').write_bytes(orjson.dumps(dict(cookies)))

def get_user_mentions(
self,
x_handle: str,
search_count: int = 20
) -> list:
"""
Fetches user mentions via search method.

Parameters
----------
x_handle : :class:`str`
The target user handle (eg: 5mknc5, elonmusk).
search_count : :class:`int`, default=20
The number of latest tweets to retrieve in each request.

Returns
-------
A list of tweets mentions `x_handle`.
"""
result = self.run(
limit=search_count,
retries=5,
queries=[
{
'category': 'Latest',
'query': f'@{x_handle}',
},
]
)
return result