-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_consumer.py
More file actions
60 lines (47 loc) · 1.59 KB
/
Copy pathtest_consumer.py
File metadata and controls
60 lines (47 loc) · 1.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# GNU GENERAL PUBLIC LICENSE
# Version 2, June 1991
# Copyright (C) 1989, 1991 Free Software Foundation, Inc., <http://fsf.org/>
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
# Everyone is permitted to copy and distribute verbatim copies
# of this license document, but changing it is not allowed.
import sys
from kafka.client import KafkaClient
from kafka.consumer import SimpleConsumer
from query_privateanswer import Query
from query_privateanswer.Query import ttypes
from query_privateanswer.Query.ttypes import QueryVector, Query, SensorType, MatchingType
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
import base64
topic=""
"""
kafka = KafkaClient("localhost:9092")
consumer = SimpleConsumer(kafka, "my-group", topic)
message = consumer.get_message()
print(message)
print(message.message.value)
bytes=base64.b64decode(message.message.value)
transportIn = TTransport.TMemoryBuffer(bytes)
protocolIn = TBinaryProtocol.TBinaryProtocol(transportIn)
queries = Query()
queries.read(protocolIn)
print(queries)
import json
s=json.dumps(queries, default=lambda o: o.__dict__)
print s
"""
import psycopg2
import psycopg2.extras
SERVER_HOST=sys.argv[1]
DBNAME=sys.argv[2]
DBUSER=sys.argv[3]
TABLE=sys.argv[4]
conn = psycopg2.connect("dbname='%s' user='%s' host='%s' " % (DBNAME,DBUSER,SERVER_HOST))
query="select * from %s order by version_number limit 1" % (TABLE)
cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
cur.execute(query)
rows = cur.fetchall()
for row in rows:
print(row)