-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathpipeline.py
More file actions
69 lines (49 loc) · 1.88 KB
/
pipeline.py
File metadata and controls
69 lines (49 loc) · 1.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import argparse
import requests
import logging
from datetime import datetime
import apache_beam as beam
def parse_lines(element):
return element.split(",")
class CalcVisitDuration(beam.DoFn):
def process(self, element):
dt_format = "%Y-%m-%dT%H:%M:%S"
start_dt = datetime.strptime(element[1], dt_format)
end_dt = datetime.strptime(element[2], dt_format)
diff = end_dt - start_dt
yield [element[0], diff.total_seconds()]
class GetIpCountryOrigin(beam.DoFn):
def process(self, element):
ip = element[0]
response = requests.get(f"http://ip-api.com/json/{ip}?fields=country")
coutry = response.json()["country"]
yield [ip, coutry]
def map_country_to_ip(element, ip_map):
ip = element[0]
return [ip_map[ip], element[1]]
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input")
parser.add_argument("--output")
args, beam_args = parser.parse_known_args(argv)
with beam.Pipeline(argv=beam_args) as p:
lines = (
p
| "ReadFile" >> beam.io.ReadFromText(args.input, skip_header_lines=1)
| "ParseLines" >> beam.Map(parse_lines)
)
duration = lines | "CalcVisitDuration" >> beam.ParDo(CalcVisitDuration())
ip_map = lines | "GetIpCountryOrigin" >> beam.ParDo(GetIpCountryOrigin())
result = (
duration
| "MapIpToCountry"
>> beam.Map(map_country_to_ip, ip_map=beam.pvalue.AsDict(ip_map))
| "AverageByCountry" >> beam.CombinePerKey(beam.combiners.MeanCombineFn())
| "FormatOutput" >> beam.Map(lambda element: ",".join(map(str, element)))
)
result | "WriteOutput" >> beam.io.WriteToText(
args.output, file_name_suffix=".csv"
)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.WARNING)
run()