-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexamples_api_request.py
More file actions
301 lines (234 loc) · 11.4 KB
/
examples_api_request.py
File metadata and controls
301 lines (234 loc) · 11.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
### SEMANTIC ENRICHMENT PROCESSOR API USE EXAMPLE ###
#
# Ensure that this script has access to the files of a dataset that can work with the processor.
# This example uses the Rome dataset, which is also present in the official GitHub repository (although it must be decompressed).
#
# Ensure also that this script is accessing the semantic enrichment processor's correct IP -- you can set it up
# in the "url_service" variable below.
#
# The script makes use of the 3 endpoints the semantic enrichment processor API exposes: preprocessing, segmentation, and enrichment.
# To this end, we refer to the "main" function at the end of the script, and to the various functions used by the main, each of which
# shows how to make POST and GET requests (i.e., which parameters and files must be passed) with the endpoints.
import time
import requests
import pandas as pd
import re
from typing import Optional, Tuple
### Semantic enrichment processor root address ###
# url_service = "http://127.0.0.1:8000/semantic/"
# url_service = "http://azureuser@semantic.westeurope.cloudapp.azure.com:8000/semantic/"
url_service = 'https://services.mobidatalab.eu:8443/semantic/'
def test_preprocessing_post(pathfile : str) -> Tuple[int, Optional[str]]:
'''
This function issues a POST request to the preprocessing endpoint of the semantic enrichment processor web API.
'''
print("*** Initiating Preprocessing task! ***")
# Here we set up the endpoint, parameters, and binary file to pass in the POST request.
url = url_service + "Preprocessing/"
parameters = {'max_speed' : 300, 'min_num_samples' : 1500, 'compress_trajectories' : True}
files = {'file_trajectories': ('trajectories.parquet', open(pathfile, 'rb'))}
print(f"Sending POST request with the following input parameters:\n{parameters}\n{files}")
res = requests.post(url, params=parameters, files=files)
# print(res)
# print(res.json()['message'])
# If the request was successful, return the task ID returned by the processor.
# If an error occurred, then return None.
if res.status_code == 200:
return res.status_code, res.json()['message'].split(' ')[1]
else:
return res.status_code, None
def test_preprocessing_get(task_id : str) -> Tuple[int, Optional[str]]:
'''
This function issues a GET request to the preprocessing endpoint of the semantic enrichment processor web API.
'''
# Set up the API endpoint, parameters, and path where to save the file received from the server.
url = url_service + "Preprocessing/"
parameters = {'task_id' : task_id}
res = requests.get(url, params=parameters)
#print(res)
filename = None
if res.status_code == 200 :
filename = "preprocessed_trajectories.parquet"
if "content-disposition" in res.headers.keys():
filename = re.findall("filename=\"(.+)\"", res.headers['content-disposition'])[0]
print(f"Writing received file to: {filename}")
with open(filename, 'wb') as f:
f.write(res.content)
return res.status_code, filename
#test_file = pd.read_parquet(filename)
#print(test_file.info())
def test_segmentation_post(pathfile : str) -> Tuple[int, Optional[str]]:
'''
This function issues a POST request to the segmentation endpoint of the semantic enrichment processor web API.
'''
print("*** Initiating Segmentation task! ***")
url = url_service + "Segmentation/"
params = {'min_duration_stop': 10, 'max_stop_radius': 0.2}
files = {'file_trajectories': ('trajectories.parquet', open(pathfile, 'rb'))}
print(f"Sending POST request with the following input parameters:\n{params}\n{files}")
res = requests.post(url, params=params, files=files)
# print(res)
# print(res.json()['message'])
if res.status_code == 200:
return res.status_code, res.json()['message'].split(' ')[1]
else:
return res.status_code, None
def test_segmentation_get(task_id : str) -> Tuple[int, Optional[str], Optional[str]]:
'''
This function issues a GET request to the segmentation endpoint of the semantic enrichment processor web API.
'''
url = url_service + "Segmentation/"
parameters = {'task_id' : task_id}
res = requests.get(url, params = parameters)
# print(res)
# print(res.status_code)
stops_path = None
moves_path = None
if res.status_code == 200 :
# Translate the received stops and moves from json to dataframes.
stops = pd.DataFrame.from_dict(res.json()['stops'])
moves = pd.DataFrame.from_dict(res.json()['moves'])
#print(stops.info())
#print(moves.info())
# Store the dataframes into parquet files.
stops_path = './stops.parquet'
moves_path = './moves.parquet'
stops.to_parquet(stops_path)
moves.to_parquet(moves_path)
return res.status_code, stops_path, moves_path
def test_enrichment_post(path_trajs : str,
path_moves : str,
path_stops : str,
path_pois : str,
path_social : str,
path_weather : str) -> Tuple[int, Optional[str]]:
'''
This function issues a POST request to the enrichment endpoint of the semantic enrichment processor web API.
'''
print("*** Initiating Enrichment task! ***")
url = url_service + "Enrichment/"
params = \
{
'move_enrichment': True,
'max_dist': 50,
'dbscan_epsilon': 50,
'systematic_threshold': 5
}
files = \
{
'file_trajectories': ('trajectories.parquet', open(path_trajs, 'rb')),
'file_moves': ('moves.parquet', open(path_moves, 'rb')),
'file_stops': ('stops.parquet', open(path_stops, 'rb')),
'file_pois': ('pois.parquet', open(path_pois, 'rb')),
'file_social': ('social.parquet', open(path_social, 'rb')),
'file_weather': ('weather.parquet', open(path_weather, 'rb'))
}
print(f"Sending POST request with the following input parameters:\n{params}\n{files}")
res = requests.post(url, params=params, files=files)
# print(res)
# print(res.json()['message'])
if res.status_code == 200:
return res.status_code, res.json()['message'].split(' ')[1]
else:
return res.status_code, None
def test_enrichment_get(task_id: str) -> Tuple[int, Optional[str]]:
'''
This function issues a GET request to the enrichment endpoint of the semantic enrichment processor web API.
'''
url = url_service + "Enrichment/"
parameters = {'task_id': task_id}
res = requests.get(url, params=parameters)
# print(res)
filename = None
if res.status_code == 200:
filename = "results.ttl"
if "content-disposition" in res.headers.keys():
filename = re.findall("filename=\"(.+)\"", res.headers['content-disposition'])[0]
print(f"Writing received file to: {filename}")
with open(filename, 'wb') as f:
f.write(res.content)
return res.status_code, filename
def main() :
'''
In this main we simulate the chains of requests done to the semantic enrichment processor in order to,
starting from a dataset of trajectories, get an RDF knowledge graph containing a dataset of semantically enriched
trajectories.
'''
# Paths to the files given as input to the various endpoints of the semantic enrichemnt processor.
# NOTE: change the paths if you want to try out different datasets.
traj_path = './datasets/rome/rome.parquet'
pois_path = './datasets/rome/poi/pois.parquet'
social_path = './datasets/rome/tweets/tweets_rome.parquet'
weather_path = './datasets/rome/weather/weather_conditions.parquet'
### Step 1 - preprocessing the trajectory dataset. ###
req_code, task_id = test_preprocessing_post(traj_path)
if req_code == 200:
print(f"Preprocessing POST request task initiation successful! Task ID: {task_id}")
else:
print(f"Preprocessing POST request task initiation not successful (POST return code {req_code}), aborting...")
return
waiting = True
path_preprocessed = None
while waiting :
req_code, path_preprocessed = test_preprocessing_get(task_id)
if req_code == 200:
print(f"Preprocessing task execution successful (task ID {task_id}, file received {path_preprocessed})!")
waiting = False
input("Press Enter to continue...")
print("")
elif req_code == 404 :
print(f"Server is still processing the preprocessing task ID {task_id} (GET return code {req_code})")
time.sleep(5)
else:
print(f"Server failed at processing the preprocessing task ID {task_id} (GET return code {req_code}), aborting...")
return
### Step 2 - segmenting the trajectories in the trajectory dataset. ###
req_code, task_id = test_segmentation_post(path_preprocessed)
if req_code == 200:
print(f"Segmentation POST request task initiation successful! Task ID: {task_id}")
else:
print(f"Segmentation POST request task initiation not successful (POST return code {req_code}), aborting...")
return
waiting = True
stops_path = None
moves_path = None
while waiting :
req_code, stops_path, moves_path = test_segmentation_get(task_id)
if req_code == 200:
print(f"Segmentation task successfully executed (task ID {task_id}, files received: {stops_path}, {moves_path})!")
waiting = False
input("Press Enter to continue...")
print("")
elif req_code == 404:
print(f"Server is still processing the segmentation task ID {task_id} (GET return code {req_code})")
time.sleep(5)
else:
print(f"Server failed at processing the segmentation task ID {task_id} (GET return code {req_code}), aborting...")
return
### Step 3 - Trajectory enrichment. ###
req_code, task_id = test_enrichment_post(path_preprocessed,
moves_path,
stops_path,
pois_path,
social_path,
weather_path)
if req_code == 200:
print(f"Enrichment POST request task initiation successful! Task ID: {task_id}")
else:
print(f"Enrichment POST request task initiation not successful (POST return code {req_code}), aborting...")
return
waiting = True
kg_path = None
while waiting:
req_code, kg_path = test_enrichment_get(task_id)
if req_code == 200:
print(f"Enrichment task successfully executed (task ID {task_id}, KG file received: {kg_path})!")
waiting = False
elif req_code == 404:
print(f"Server is still processing the enrichment task {task_id} (GET return code {req_code})")
time.sleep(5)
else:
print(f"Server failed at processing the enrichment task {task_id} (GET return code {req_code}), aborting...")
return
if __name__ == '__main__':
main()