diff --git a/.gitignore b/.gitignore index 72364f9..77be3b9 100644 --- a/.gitignore +++ b/.gitignore @@ -15,7 +15,6 @@ dist/ downloads/ eggs/ .eggs/ -lib/ lib64/ parts/ sdist/ @@ -87,3 +86,8 @@ ENV/ # Rope project settings .ropeproject + +# lambda +.lambda +config +*.zip diff --git a/CommunityGraphTwitterCleanLinks.py b/CommunityGraphTwitterCleanLinks.py new file mode 100644 index 0000000..3d83c90 --- /dev/null +++ b/CommunityGraphTwitterCleanLinks.py @@ -0,0 +1,80 @@ +from urllib import urlencode +from urlparse import urlparse, urlunparse, parse_qs +from neo4j.v1 import GraphDatabase, basic_auth +import os + +import boto3 + +from base64 import b64decode + +def lambda_handler(event, context): + print("Event:", event) + version_updated = "Default (Updating public graph)" + NEO4J_PASSWORD = os.environ.get('NEO4J_PASSWORD', "test") + NEO4J_URL = os.environ.get('NEO4J_URL', "bolt://localhost") + + if event and event.get("resources"): + if "CommunityGraphTwitterCleanLinksPublic" in event["resources"][0]: + version_updated = "Updating public graph" + ENCRYPTED_NEO4J_PASSWORD = os.environ['NEO4J_PASSWORD'] + NEO4J_PASSWORD = boto3.client('kms').decrypt(CiphertextBlob=b64decode(ENCRYPTED_NEO4J_PASSWORD))['Plaintext'] + NEO4J_URL = os.environ.get('NEO4J_PUBLIC_URL') + elif "CommunityGraphTwitterCleanLinksPrivate" in event["resources"][0]: + version_updated = "Updating private graph" + ENCRYPTED_NEO4J_PASSWORD = os.environ['NEO4J_PRIVATE_PASSWORD'] + NEO4J_PASSWORD = boto3.client('kms').decrypt(CiphertextBlob=b64decode(ENCRYPTED_NEO4J_PASSWORD))['Plaintext'] + NEO4J_URL = os.environ.get('NEO4J_PRIVATE_URL') + + neo4jUrl = NEO4J_URL + neo4jUser = os.environ.get('NEO4J_USER', "neo4j") + neo4jPass = NEO4J_PASSWORD + + print(version_updated) + clean_links(neo4jUrl = neo4jUrl, neo4jUser = neo4jUser, neo4jPass = neo4jPass) + + +def clean_links(neo4jUrl, neo4jUser, neo4jPass): + driver = GraphDatabase.driver(neo4jUrl, auth=basic_auth(neo4jUser, neo4jPass)) + + query = "MATCH (l:Link) WHERE NOT EXISTS(l.cleanUrl) RETURN l, ID(l) AS internalId" + + session = driver.session() + result = session.run(query) + + updates = [] + for row in result: + uri = row["l"]["url"] + if uri: + uri = uri.encode('utf-8') + updates.append({"id": row["internalId"], "clean": clean_uri(uri)}) + + print("Updates to apply", updates) + + updateQuery = """\ + UNWIND {updates} AS update + MATCH (l:Link) WHERE ID(l) = update.id + SET l.cleanUrl = update.clean + """ + + update_result = session.run(updateQuery, {"updates": updates}) + + print(update_result) + + session.close() + + +def clean_uri(url): + u = urlparse(url) + query = parse_qs(u.query) + + for param in ["utm_content", "utm_source", "utm_medium", "utm_campaign", "utm_term"]: + query.pop(param, None) + + u = u._replace(query=urlencode(query, True)) + return urlunparse(u) + +if __name__ == "__main__": + neo4jPass = os.environ.get('NEO4J_PASSWORD', "test") + neo4jUrl = os.environ.get('NEO4J_URL', "bolt://localhost") + neo4jUser = os.environ.get('NEO4J_USER', "neo4j") + clean_links(neo4jUrl=neo4jUrl, neo4jUser=neo4jUser, neo4jPass=neo4jPass) \ No newline at end of file diff --git a/CommunityGraphTwitterHydrateLinks.py b/CommunityGraphTwitterHydrateLinks.py new file mode 100644 index 0000000..0f4dff2 --- /dev/null +++ b/CommunityGraphTwitterHydrateLinks.py @@ -0,0 +1,86 @@ +import socket + +import requests +import boto3 +import os + +from neo4j.v1 import GraphDatabase, basic_auth +from base64 import b64decode +from bs4 import BeautifulSoup, Tag + +def lambda_handler(event, context): + print("Event:", event) + version_updated = "Default (Updating public graph)" + NEO4J_PASSWORD = os.environ.get('NEO4J_PASSWORD', "test") + NEO4J_URL = os.environ.get('NEO4J_URL', "bolt://localhost") + + if event and event.get("resources"): + if "CommunityGraphTwitterHydrateLinksPublic" in event["resources"][0]: + version_updated = "Updating public graph" + ENCRYPTED_NEO4J_PASSWORD = os.environ['NEO4J_PASSWORD'] + NEO4J_PASSWORD = boto3.client('kms').decrypt(CiphertextBlob=b64decode(ENCRYPTED_NEO4J_PASSWORD))['Plaintext'] + NEO4J_URL = os.environ.get('NEO4J_PUBLIC_URL') + elif "CommunityGraphTwitterHydrateLinksPrivate" in event["resources"][0]: + version_updated = "Updating private graph" + ENCRYPTED_NEO4J_PASSWORD = os.environ['NEO4J_PRIVATE_PASSWORD'] + NEO4J_PASSWORD = boto3.client('kms').decrypt(CiphertextBlob=b64decode(ENCRYPTED_NEO4J_PASSWORD))['Plaintext'] + NEO4J_URL = os.environ.get('NEO4J_PRIVATE_URL') + + neo4jUrl = NEO4J_URL + neo4jUser = os.environ.get('NEO4J_USER', "neo4j") + neo4jPass = NEO4J_PASSWORD + + print(version_updated) + hydrate_links(neo4jUrl = neo4jUrl, neo4jUser = neo4jUser, neo4jPass = neo4jPass) + + +def hydrate_links(neo4jUrl, neo4jUser, neo4jPass): + driver = GraphDatabase.driver(neo4jUrl, auth=basic_auth(neo4jUser, neo4jPass)) + session = driver.session() + result = session.run( + "MATCH (link:Link) WHERE not exists(link.title) RETURN id(link) as id, link.url as url ORDER BY ID(link) DESC LIMIT {limit}", + {"limit": 100}) + update = [] + rows = 0 + for record in result: + try: + print(record["url"]) + title = hydrate_url(record["url"]) + rows += 1 + update += [{"id": record["id"], "title": title}] + except socket.gaierror: + print("Failed to resolve {0}. Ignoring for now".format(record["url"])) + except socket.error: + print("Failed to connect to {0}. Ignoring for now".format(record["url"])) + + print("urls", len(update), "records", rows) + result = session.run( + "UNWIND {data} AS row MATCH (link) WHERE id(link) = row.id SET link.title = row.title", + {"data": update}) + print(result.consume().counters) + session.close() + + +def hydrate_url(url): + user_agent = {'User-agent': 'Mozilla/5.0'} + potential_title = [] + try: + if url: + r = requests.get(url, headers=user_agent) + response = r.text + page = BeautifulSoup(response, "html.parser") + potential_title = page.find_all("title") + except requests.exceptions.ConnectionError: + print("Failed to connect: ", url) + + if len(potential_title) == 0: + print("Skipping: ", url) + return "N/A" + else: + return potential_title[0].text + +if __name__ == "__main__": + neo4jPass = os.environ.get('NEO4J_PASSWORD', "test") + neo4jUrl = os.environ.get('NEO4J_URL', "bolt://localhost") + neo4jUser = os.environ.get('NEO4J_USER', "neo4j") + hydrate_links(neo4jUrl=neo4jUrl, neo4jUser=neo4jUser, neo4jPass=neo4jPass) \ No newline at end of file diff --git a/CommunityGraphTwitterImport.py b/CommunityGraphTwitterImport.py new file mode 100644 index 0000000..30d2606 --- /dev/null +++ b/CommunityGraphTwitterImport.py @@ -0,0 +1,42 @@ +import os + +from lib.utils import import_links, decrypt_value + + +def lambda_handler(event, context): + print("Event:", event) + version_updated = "Default (Updating public graph)" + NEO4J_PASSWORD = os.environ.get('NEO4J_PASSWORD', "test") + NEO4J_URL = os.environ.get('NEO4J_URL', "bolt://localhost") + + TWITTER_BEARER = decrypt_value(os.environ['TWITTER_BEARER']) + + if event and event.get("resources"): + if "CommunityGraphTwitterImportPublic" in event["resources"][0]: + version_updated = "Updating public graph" + NEO4J_PASSWORD = decrypt_value(os.environ['NEO4J_PASSWORD']) + NEO4J_URL = os.environ.get('NEO4J_PUBLIC_URL') + elif "CommunityGraphTwitterImportPrivate" in event["resources"][0]: + version_updated = "Updating private graph" + NEO4J_PASSWORD = decrypt_value(os.environ['NEO4J_PRIVATE_PASSWORD']) + NEO4J_URL = os.environ.get('NEO4J_PRIVATE_URL') + + neo4jPass = NEO4J_PASSWORD + bearerToken = TWITTER_BEARER + neo4jUrl = NEO4J_URL + neo4jUser = os.environ.get('NEO4J_USER', "neo4j") + search = os.environ.get("TWITTER_SEARCH") + + print(version_updated) + import_links(neo4jUrl=neo4jUrl, neo4jUser=neo4jUser, neo4jPass=neo4jPass, bearerToken=bearerToken, search=search) + + +if __name__ == "__main__": + neo4jPass = os.environ.get('NEO4J_PASSWORD', "test") + bearerToken = os.environ.get('TWITTER_BEARER', "") + neo4jUrl = os.environ.get('NEO4J_URL', "bolt://localhost") + neo4jUser = os.environ.get('NEO4J_USER', "neo4j") + search = os.environ.get("TWITTER_SEARCH", + 'neo4j OR "graph database" OR "graph databases" OR graphdb OR graphconnect OR @neoquestions OR @Neo4jDE OR @Neo4jFr OR neotechnology') + + import_links(neo4jUrl=neo4jUrl, neo4jUser=neo4jUser, neo4jPass=neo4jPass, bearerToken=bearerToken, search=search) diff --git a/GenerateSummaryPage.py b/GenerateSummaryPage.py new file mode 100644 index 0000000..ce4529e --- /dev/null +++ b/GenerateSummaryPage.py @@ -0,0 +1,138 @@ +import datetime +import sys +import boto +import flask +import time + +from ago import human +from flask import render_template +from neo4j.v1 import GraphDatabase +from datetime import tzinfo, timedelta, datetime + +ZERO = timedelta(0) + +class UTC(tzinfo): + def utcoffset(self, dt): + return ZERO + def tzname(self, dt): + return "UTC" + def dst(self, dt): + return ZERO + +products = { + "neo4j": { + "url": "138.197.15.1", + "user": "all", + "password": "readonly", + "title": "Neo4j", + "summary": "twin4j" + }, + "graphql": { + "url": "107.170.69.23", + "user": "graphql", + "password": "graphql", + "title": "GraphQL", + "summary": "twigraphql" + } +} + + +def github_links(tx): + records = [] + for record in tx.run("""\ + MATCH (n:Repository) WHERE EXISTS(n.created) AND n.updated > timestamp() - 7 * 60 * 60 * 24 * 1000 + WITH n + ORDER BY n.updated desc + MATCH (n)<-[:CREATED]-(user) WHERE NOT (user.name IN ["neo4j", "neo4j-contrib"]) + RETURN n.title, n.url, n.created, n.favorites, n.updated, user.name, n.created_at, n.updated_at + ORDER BY n.updated desc + """): + records.append(record) + return records + + +def twitter_links(tx): + records = [] + for record in tx.run("""\ + WITH ((timestamp() / 1000) - (7 * 24 * 60 * 60)) AS oneWeekAgo + MATCH (l:Link)<--(t:Tweet:Content) + WHERE not(t:Retweet) + WITH oneWeekAgo, l, t + ORDER BY l.cleanUrl, toInteger(t.created) + WITH oneWeekAgo, l.cleanUrl AS url, l.title AS title, collect(t) AS tweets WHERE toInteger(tweets[0].created) is not null AND tweets[0].created > oneWeekAgo AND not url contains "neo4j.com" + RETURN url, title, REDUCE(acc = 0, tweet IN tweets | acc + tweet.favorites + size((tweet)<-[:RETWEETED]-())) AS score, tweets[0].created * 1000 AS dateCreated, [ tweet IN tweets | head([ (tweet)<-[:POSTED]-(user) | user.screen_name]) ] AS users + ORDER BY score DESC + """): + records.append(record) + return records + + +def meetup_events(tx): + records = [] + for record in tx.run("""\ + MATCH (event:Event)<-[:CONTAINED]-(group) + WHERE timestamp() + 7 * 60 * 60 * 24 * 1000 > event.time > timestamp() - 7 * 60 * 60 * 24 * 1000 + RETURN event, group + ORDER BY event.time + """): + records.append(record) + return records + +app = flask.Flask('my app') + + +@app.template_filter('humanise') +def humanise_filter(value): + return human(datetime.fromtimestamp(value / 1000), precision=1) + + +@app.template_filter("shorten") +def shorten_filter(value): + return (value[:75] + '..') if len(value) > 75 else value + + +def generate_page(product): + driver = GraphDatabase.driver("bolt://{0}:7687".format(product["url"]), auth=(product["user"], product["password"])) + with driver.session() as session: + github_records = session.read_transaction(github_links) + twitter_records = session.read_transaction(twitter_links) + meetup_records = session.read_transaction(meetup_events) + + with app.app_context(): + utc = UTC() + time_now = str(datetime.now(utc)) + + rendered = render_template('index.html', + github_records=github_records, + twitter_records=twitter_records, + meetup_records=meetup_records, + title=product["title"], + time_now=time_now) + + local_file_name = "/tmp/{0}.html".format(product["summary"]) + with open(local_file_name, "w") as file: + file.write(rendered.encode('utf-8')) + + s3_connection = boto.connect_s3() + bucket = s3_connection.get_bucket(product["summary"]) + key = boto.s3.key.Key(bucket, "{0}.html".format(product["summary"])) + key.set_contents_from_filename(local_file_name) + + +def lambda_handler(event, context): + print("Event:", event) + + product_name = "neo4j" + if event and event.get("resources"): + if "GraphQLGenerateSummaryPage" in event["resources"][0]: + product_name = "graphql" + if "Neo4jGenerateSummaryPage" in event["resources"][0]: + product_name = "neo4js" + + generate_page(products[product_name]) + + +if __name__ == "__main__": + args = sys.argv[1:] + product_name = args[0] if args[0:] else "neo4j" + generate_page(products[product_name]) diff --git a/GraphQLCommunityGraphTwitterCleanLinks.py b/GraphQLCommunityGraphTwitterCleanLinks.py new file mode 100644 index 0000000..3452288 --- /dev/null +++ b/GraphQLCommunityGraphTwitterCleanLinks.py @@ -0,0 +1,73 @@ +import os +from urllib import urlencode +from urlparse import urlparse, urlunparse, parse_qs + +from neo4j.v1 import GraphDatabase, basic_auth + +from lib.utils import decrypt_value + + +def lambda_handler(event, context): + print("Event:", event) + version_updated = "Default (Updating GraphQL graph)" + NEO4J_PASSWORD = os.environ.get('NEO4J_PASSWORD', "test") + NEO4J_URL = os.environ.get('NEO4J_URL', "bolt://localhost") + + if event and event.get("resources"): + if "GraphQLCommunityGraphTwitterCleanLinks" in event["resources"][0]: + NEO4J_PASSWORD = decrypt_value(os.environ['NEO4J_PASSWORD']) + + neo4jUrl = NEO4J_URL + neo4jUser = os.environ.get('NEO4J_USER', "neo4j") + neo4jPass = NEO4J_PASSWORD + + print(version_updated) + clean_links(neo4jUrl=neo4jUrl, neo4jUser=neo4jUser, neo4jPass=neo4jPass) + + +def clean_links(neo4jUrl, neo4jUser, neo4jPass): + driver = GraphDatabase.driver(neo4jUrl, auth=basic_auth(neo4jUser, neo4jPass)) + + query = "MATCH (l:Link) WHERE NOT EXISTS(l.cleanUrl) RETURN l, ID(l) AS internalId" + + session = driver.session() + result = session.run(query) + + updates = [] + for row in result: + uri = row["l"]["url"] + if uri: + uri = uri.encode('utf-8') + updates.append({"id": row["internalId"], "clean": clean_uri(uri)}) + + print("Updates to apply", updates) + + updateQuery = """\ + UNWIND {updates} AS update + MATCH (l:Link) WHERE ID(l) = update.id + SET l.cleanUrl = update.clean + """ + + update_result = session.run(updateQuery, {"updates": updates}) + + print(update_result) + + session.close() + + +def clean_uri(url): + u = urlparse(url) + query = parse_qs(u.query) + + for param in ["utm_content", "utm_source", "utm_medium", "utm_campaign", "utm_term"]: + query.pop(param, None) + + u = u._replace(query=urlencode(query, True)) + return urlunparse(u) + + +if __name__ == "__main__": + neo4jPass = os.environ.get('NEO4J_PASSWORD', "test") + neo4jUrl = os.environ.get('NEO4J_URL', "bolt://localhost") + neo4jUser = os.environ.get('NEO4J_USER', "neo4j") + clean_links(neo4jUrl=neo4jUrl, neo4jUser=neo4jUser, neo4jPass=neo4jPass) diff --git a/GraphQLCommunityGraphTwitterHydrateLinks.py b/GraphQLCommunityGraphTwitterHydrateLinks.py new file mode 100644 index 0000000..94b4fad --- /dev/null +++ b/GraphQLCommunityGraphTwitterHydrateLinks.py @@ -0,0 +1,90 @@ +import os +import socket + +import requests + +from bs4 import BeautifulSoup +from neo4j.v1 import GraphDatabase, basic_auth + +from lib.utils import decrypt_value + + +def lambda_handler(event, context): + print("Event:", event) + version_updated = "Default (Updating GraphQL graph)" + NEO4J_PASSWORD = os.environ.get('NEO4J_PASSWORD', "test") + NEO4J_URL = os.environ.get('NEO4J_URL', "bolt://localhost") + + if event and event.get("resources"): + if "GraphQLCommunityGraphTwitterHydrateLinks" in event["resources"][0]: + version_updated = "Updating public graph" + NEO4J_PASSWORD = decrypt_value(os.environ["NEO4J_PASSWORD"]) + + neo4jUrl = NEO4J_URL + neo4jUser = os.environ.get('NEO4J_USER', "neo4j") + neo4jPass = NEO4J_PASSWORD + + print(version_updated) + hydrate_links(neo4jUrl=neo4jUrl, neo4jUser=neo4jUser, neo4jPass=neo4jPass) + + +def hydrate_links(neo4jUrl, neo4jUser, neo4jPass): + driver = GraphDatabase.driver(neo4jUrl, auth=basic_auth(neo4jUser, neo4jPass)) + session = driver.session() + result = session.run( + "MATCH (link:Link) WHERE not exists(link.title) RETURN id(link) as id, link.url as url ORDER BY ID(link) DESC LIMIT {limit}", + {"limit": 100}) + + rows = 0 + for record in result: + + try: + print("Processing {0}".format(record["url"])) + title = hydrate_url(record["url"]) + rows += 1 + update_graph(session, {"id": record["id"], "title": title}) + except socket.gaierror: + print("Failed to resolve {0}. Ignoring for now".format(record["url"])) + update_graph(session, {"id": record["id"], "title": "N/A"}) + except socket.error: + print("Failed to connect to {0}. Ignoring for now".format(record["url"])) + update_graph(session, {"id": record["id"], "title": "N/A"}) + + print("records", rows) + + session.close() + + +def update_graph(session, update): + result = session.run( + "WITH {data} AS row MATCH (link) WHERE id(link) = row.id SET link.title = row.title", + {"data": update}) + print(result.consume().counters) + + +def hydrate_url(url): + user_agent = {'User-agent': 'Mozilla/5.0'} + potential_title = [] + try: + if url: + r = requests.get(url, headers=user_agent, timeout=5.0) + response = r.text + page = BeautifulSoup(response, "html.parser") + potential_title = page.find_all("title") + except requests.exceptions.ConnectionError: + print("Failed to connect: ", url) + except requests.exceptions.ReadTimeout: + print("Read timed out: ", url) + + if len(potential_title) == 0: + print("Skipping: ", url) + return "N/A" + else: + return potential_title[0].text + + +if __name__ == "__main__": + neo4jPass = os.environ.get('NEO4J_PASSWORD', "test") + neo4jUrl = os.environ.get('NEO4J_URL', "bolt://localhost") + neo4jUser = os.environ.get('NEO4J_USER', "neo4j") + hydrate_links(neo4jUrl=neo4jUrl, neo4jUser=neo4jUser, neo4jPass=neo4jPass) diff --git a/GraphQLCommunityGraphTwitterImport.py b/GraphQLCommunityGraphTwitterImport.py new file mode 100644 index 0000000..7251a3e --- /dev/null +++ b/GraphQLCommunityGraphTwitterImport.py @@ -0,0 +1,35 @@ +import os + +from lib.utils import import_links, decrypt_value + + +def lambda_handler(event, context): + print("Event:", event) + version_updated = "Default (Updating GraphQL graph)" + NEO4J_PASSWORD = os.environ.get('NEO4J_PASSWORD', "test") + NEO4J_URL = os.environ.get('NEO4J_URL', "bolt://localhost") + TWITTER_BEARER = os.environ['TWITTER_BEARER'] + + if event and event.get("resources"): + if "GraphQLCommunityGraphTwitterImport" in event["resources"][0]: + NEO4J_PASSWORD = decrypt_value(os.environ['NEO4J_PASSWORD']) + TWITTER_BEARER = decrypt_value(os.environ['TWITTER_BEARER']) + + neo4jPass = NEO4J_PASSWORD + bearerToken = TWITTER_BEARER + neo4jUrl = NEO4J_URL + neo4jUser = os.environ.get('NEO4J_USER', "neo4j") + search = os.environ.get("TWITTER_SEARCH") + + print(version_updated) + import_links(neo4jUrl=neo4jUrl, neo4jUser=neo4jUser, neo4jPass=neo4jPass, bearerToken=bearerToken, search=search) + +if __name__ == "__main__": + neo4jPass = os.environ.get('NEO4J_PASSWORD', "neo") + bearerToken = os.environ.get('TWITTER_BEARER', "") + neo4jUrl = os.environ.get('NEO4J_URL', "bolt://127.0.0.1") + neo4jUser = os.environ.get('NEO4J_USER', "neo4j") + search = os.environ.get("TWITTER_SEARCH", + 'graphql OR @apollo OR @graphcool OR graphql-js OR @leeb OR #graphql OR @graphql') + + import_links(neo4jUrl=neo4jUrl, neo4jUser=neo4jUser, neo4jPass=neo4jPass, bearerToken=bearerToken, search=search) diff --git a/RemoveQueryParams.py b/RemoveQueryParams.py new file mode 100644 index 0000000..989a9cd --- /dev/null +++ b/RemoveQueryParams.py @@ -0,0 +1,48 @@ +from urllib import urlencode +from urlparse import urlparse, urlunparse, parse_qs +from neo4j.v1 import GraphDatabase, basic_auth +import os + +def clean_uri(url): + u = urlparse(url) + query = parse_qs(u.query) + + for param in ["utm_content", "utm_source", "utm_medium", "utm_campaign", "utm_term"]: + query.pop(param, None) + + u = u._replace(query=urlencode(query, True)) + return urlunparse(u) + + +neo4jPass = os.environ.get('NEO4J_PASSWORD', "test") +neo4jUrl = os.environ.get('NEO4J_URL', "bolt://localhost") +neo4jUser = os.environ.get('NEO4J_USER', "neo4j") + +driver = GraphDatabase.driver(neo4jUrl, auth=basic_auth(neo4jUser, neo4jPass)) + +query = "MATCH (l:Link) WHERE NOT EXISTS(l.cleanUrl) RETURN l, ID(l) AS internalId" + +session = driver.session() +result = session.run(query) + +updates = [] +for row in result: + uri = row["l"]["url"] + if uri: + uri = uri.encode('utf-8') + # print(uri, clean_uri(uri), row["internalId"]) + updates.append({"id": row["internalId"], "clean": clean_uri(uri)}) + +print("Updates to apply", updates) + +updateQuery = """\ +UNWIND {updates} AS update +MATCH (l:Link) WHERE ID(l) = update.id +SET l.cleanUrl = update.clean +""" + +update_result = session.run(updateQuery, {"updates": updates}) + +print(update_result) + +session.close() diff --git a/Twitter Import Neo4j.ipynb b/Twitter.ipynb similarity index 98% rename from Twitter Import Neo4j.ipynb rename to Twitter.ipynb index fc1fb2a..ffef2b2 100644 --- a/Twitter Import Neo4j.ipynb +++ b/Twitter.ipynb @@ -30,7 +30,7 @@ "session.run( \"CREATE CONSTRAINT ON (t:Tweet) ASSERT t.id IS UNIQUE;\")\n", "session.run( \"CREATE CONSTRAINT ON (u:User) ASSERT u.screen_name IS UNIQUE;\")\n", "session.run( \"CREATE INDEX ON :Tag(name);\")\n", - "session.run( \"CREATE INDEX ON :Link(url);\")\n" + "session.run( \"CREATE INDEX ON :Link(url);\")" ] }, { @@ -95,7 +95,7 @@ " MERGE (tweet)-[:RETWEETED]->(retweet_tweet)\n", " SET tweet:Retweet\n", ")\n", - "\"\"\"\n" + "\"\"\"" ] }, { @@ -186,7 +186,7 @@ "language_info": { "codemirror_mode": { "name": "ipython", - "version": 2 + "version": 2.0 }, "file_extension": ".py", "mimetype": "text/x-python", @@ -197,5 +197,5 @@ } }, "nbformat": 4, - "nbformat_minor": 2 -} + "nbformat_minor": 0 +} \ No newline at end of file diff --git a/build.sh b/build.sh new file mode 100755 index 0000000..04c4dbc --- /dev/null +++ b/build.sh @@ -0,0 +1,18 @@ +#!/bin/bash + +lambda_name=${1:-CommunityGraphTwitterImport} +folder=lambda +file=${lambda_name}.zip + +rm -rf $folder $file +mkdir -p $folder +echo $'[install]\nprefix=' > $folder/setup.cfg +for lib in "requests neo4j-driver bs4 ago flask boto"; do + pip2.7 install $lib -t $folder +done +cp ${lambda_name}.py $folder +cp -r lib $folder/ +cp -r templates $folder/ +cd $folder; zip -r ../$file .; cd .. + +aws s3 cp $file s3://devrel-lambda-functions/ \ No newline at end of file diff --git a/lib/__init__.py b/lib/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/lib/utils.py b/lib/utils.py new file mode 100644 index 0000000..768d634 --- /dev/null +++ b/lib/utils.py @@ -0,0 +1,159 @@ +import time +from base64 import b64decode + +import requests +import urllib +import boto3 + +from neo4j.v1 import GraphDatabase, basic_auth + + +def import_links(neo4jUrl, neo4jUser, neo4jPass, bearerToken, search): + if len(bearerToken) == 0: + raise ("No Twitter Bearer token configured") + + driver = GraphDatabase.driver(neo4jUrl, auth=basic_auth(neo4jUser, neo4jPass)) + + session = driver.session() + + # Add uniqueness constraints. + session.run("CREATE CONSTRAINT ON (t:Tweet) ASSERT t.id IS UNIQUE;") + session.run("CREATE CONSTRAINT ON (u:User) ASSERT u.screen_name IS UNIQUE;") + session.run("CREATE INDEX ON :Tag(name);") + session.run("CREATE INDEX ON :Link(url);") + + # Build query. + importQuery = """ + UNWIND {tweets} AS t + + WITH t + ORDER BY t.id + + WITH t, + t.entities AS e, + t.user AS u, + t.retweeted_status AS retweet + + MERGE (tweet:Tweet:Twitter {id:t.id}) + SET tweet:Content, tweet.text = t.text, + tweet.created_at = t.created_at, + tweet.created = apoc.date.parse(t.created_at,'s','E MMM dd HH:mm:ss Z yyyy'), + tweet.favorites = t.favorite_count + + MERGE (user:User {screen_name:u.screen_name}) + SET user.name = u.name, user.id = u.id, + user.location = u.location, + user.followers = u.followers_count, + user.following = u.friends_count, + user.statuses = u.statuses_count, + user.profile_image_url = u.profile_image_url, + user:Twitter + + MERGE (user)-[:POSTED]->(tweet) + + FOREACH (h IN e.hashtags | + MERGE (tag:Tag {name:LOWER(h.text)}) SET tag:Twitter + MERGE (tag)<-[:TAGGED]-(tweet) + ) + + FOREACH (u IN e.urls | + MERGE (url:Link {url:u.expanded_url}) + ON CREATE SET url.short = case when length(u.expanded_url) < 25 then true else null end + SET url:Twitter + MERGE (tweet)-[:LINKED]->(url) + ) + + FOREACH (m IN e.user_mentions | + MERGE (mentioned:User {screen_name:m.screen_name}) + ON CREATE SET mentioned.name = m.name, mentioned.id = m.id + SET mentioned:Twitter + MERGE (tweet)-[:MENTIONED]->(mentioned) + ) + + FOREACH (r IN [r IN [t.in_reply_to_status_id] WHERE r IS NOT NULL] | + MERGE (reply_tweet:Tweet:Twitter {id:r}) + MERGE (tweet)-[:REPLIED_TO]->(reply_tweet) + SET tweet:Reply + ) + + FOREACH (retweet_id IN [x IN [retweet.id] WHERE x IS NOT NULL] | + MERGE (retweet_tweet:Tweet:Twitter {id:retweet_id}) + MERGE (tweet)-[:RETWEETED]->(retweet_tweet) + SET tweet:Retweet + ) + """ + + # todo as params + + # """ + # dumpdevos OR #rejectrex OR #resist OR #nodapl OR #theresistance OR #resistance OR #factsmatter OR #nobannowall OR + # presson OR #notmypresident OR #alternativefacts OR + # maga OR president OR @realdonaldtrump OR @GOP OR @POTUS OR devos OR tillerson OR #scotus""" + + q = urllib.quote_plus(search) + maxPages = 100 + # False for retrieving history, True for catchup forward + catch_up = True + count = 100 + result_type = "recent" + lang = "en" + + since_id = -1 + max_id = -1 + page = 1 + + hasMore = True + while hasMore and page <= maxPages: + if catch_up: + result = session.run("MATCH (t:Tweet:Content) RETURN max(t.id) as sinceId") + for record in result: + print(record) + if record["sinceId"] != None: + since_id = record["sinceId"] + # else: + # result = session.run("MATCH (t:Tweet:Content) RETURN min(t.id) as maxId") + # for record in result: + # if record["maxId"] != None: + # max_id = record["maxId"] + + # Build URL. + apiUrl = "https://api.twitter.com/1.1/search/tweets.json?q=%s&count=%s&result_type=%s&lang=%s" % ( + q, count, result_type, lang) + if since_id != -1: + apiUrl += "&since_id=%s" % (since_id) + if max_id != -1: + apiUrl += "&max_id=%s" % (max_id) + # print(apiUrl) + response = requests.get(apiUrl, + headers={"accept": "application/json", "Authorization": "Bearer " + bearerToken}) + if response.status_code <> 200: + raise (Exception(response.status_code, response.text)) + + json = response.json() + meta = json["search_metadata"] + # print(meta) + if not catch_up and meta.get('next_results', None) != None: + max_id = meta["next_results"].split("=")[1][0:-2] + tweets = json.get("statuses", []) + # print(len(tweets)) + if len(tweets) > 0: + result = session.run(importQuery, {"tweets": tweets}) + print(result.consume().counters) + page = page + 1 + + hasMore = len(tweets) == count + + print("catch_up", catch_up, "more", hasMore, "page", page, "max_id", max_id, "since_id", since_id, "tweets", + len(tweets)) + time.sleep(1) + # if json.get('quota_remaining',0) <= 0: + # time.sleep(10) + if json.get('backoff', None) != None: + print("backoff", json['backoff']) + time.sleep(json['backoff'] + 5) + + session.close() + + +def decrypt_value(encrypted): + return boto3.client('kms').decrypt(CiphertextBlob=b64decode(encrypted))['Plaintext'] diff --git a/twitter-import.py b/twitter-import.py deleted file mode 100644 index ad86f48..0000000 --- a/twitter-import.py +++ /dev/null @@ -1,129 +0,0 @@ -import os -import time -import requests -import urllib -from neo4j.v1 import GraphDatabase, basic_auth - -neo4jUrl = os.environ.get('NEO4J_URL',"bolt://localhost") -neo4jUser = os.environ.get('NEO4J_USER',"neo4j") -neo4jPass = os.environ.get('NEO4J_PASSWORD',"test") -bearerToken = os.environ.get('TWITTER_BEARER',"") - -if len(bearerToken) == 0 : - raise("No Twitter Bearer token configured") - -driver = GraphDatabase.driver(neo4jUrl, auth=basic_auth(neo4jUser, neo4jPass)) - -session = driver.session() - -# Add uniqueness constraints. -session.run( "CREATE CONSTRAINT ON (t:Tweet) ASSERT t.id IS UNIQUE;") -session.run( "CREATE CONSTRAINT ON (u:User) ASSERT u.screen_name IS UNIQUE;") -session.run( "CREATE CONSTRAINT ON (h:Tag) ASSERT h.name IS UNIQUE;") -session.run( "CREATE CONSTRAINT ON (l:Link) ASSERT l.url IS UNIQUE;") - -# Build query. -importQuery = """ -UNWIND {tweets} AS t - -WITH t -ORDER BY t.id - -WITH t, - t.entities AS e, - t.user AS u, - t.retweeted_status AS retweet - -MERGE (tweet:Tweet {id:t.id}) -SET tweet:Content, tweet.text = t.text, - tweet.created = t.created_at, - tweet.favorites = t.favorite_count - -MERGE (user:User {screen_name:u.screen_name}) -SET user.name = u.name, - user.location = u.location, - user.followers = u.followers_count, - user.following = u.friends_count, - user.statuses = u.statuses_count, - user.profile_image_url = u.profile_image_url - -MERGE (user)-[:POSTED]->(tweet) - -FOREACH (h IN e.hashtags | - MERGE (tag:Tag {name:LOWER(h.text)}) - MERGE (tag)<-[:TAGGED]-(tweet) -) - -FOREACH (u IN e.urls | - MERGE (url:Link {url:u.expanded_url}) - MERGE (tweet)-[:LINKED]->(url) -) - -FOREACH (m IN e.user_mentions | - MERGE (mentioned:User {screen_name:m.screen_name}) - ON CREATE SET mentioned.name = m.name - MERGE (tweet)-[:MENTIONED]->(mentioned) -) - -FOREACH (r IN [r IN [t.in_reply_to_status_id] WHERE r IS NOT NULL] | - MERGE (reply_tweet:Tweet {id:r}) - MERGE (tweet)-[:REPLIED_TO]->(reply_tweet) -) - -FOREACH (retweet_id IN [x IN [retweet.id] WHERE x IS NOT NULL] | - MERGE (retweet_tweet:Tweet {id:retweet_id}) - MERGE (tweet)-[:RETWEETED]->(retweet_tweet) -) -""" - -# todo as params -q = urllib.quote_plus(os.environ.get("TWITTER_SEARCH",'neo4j OR "graph database" OR "graph databases" OR graphdb OR graphconnect OR @neoquestions OR @Neo4jDE OR @Neo4jFr OR neotechnology')) -maxPages = 20 -catch_up = False -count = 100 -result_type = "recent" -lang = "en" - -since_id = -1 -max_id = -1 -page = 1 - -hasMore = True -while hasMore and page <= maxPages: - if catch_up: - result = session.run("MATCH (t:Tweet) RETURN max(t.id) as sinceId") - for record in result: - since_id = record["sinceId"] - else: - result = session.run("MATCH (t:Tweet) RETURN min(t.id) as maxId") - for record in result: - max_id = record["maxId"] - - # Build URL. - apiUrl = "https://api.twitter.com/1.1/search/tweets.json?q=%s&count=%s&result_type=%s&lang=%s" % (q, count, result_type, lang) - if since_id != -1 : - apiUrl += "&since_id=%s" % (since_id) - if max_id != -1 : - apiUrl += "&max_id=%s" % (max_id) - response = requests.get(apiUrl, headers = {"accept":"application/json","Authorization":"Bearer " + bearerToken}) - if response.status_code != 200: - raise("%s : %s" % (response.status_code, response.text)) - - json = response.json() - meta = json["search_metadata"] - # print(meta) - - tweets = json.get("statuses",[]) - print(len(tweets)) - if len(tweets) > 0: - result = session.run(importQuery,{"tweets":tweets}) - print(result.consume().counters) - page = page + 1 - else: - hasMore = False - - print("page {page} max_id {max_id}".format(page=page,max_id=max_id)) - time.sleep(1) - if json.get('backoff',None) != None: - print("backoff",json['backoff']) - time.sleep(json['backoff']+5) \ No newline at end of file