-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcode
More file actions
103 lines (82 loc) · 3.76 KB
/
code
File metadata and controls
103 lines (82 loc) · 3.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# To make this whole code works for specific version of dataset, we need to only change the value of selectDataset. and choose the relative clinical dataset name
def return_first_line_header(rdd):
return rdd.first()
selectDataset = 'clinicaltrial_2021'
CPath = 'dbfs:/FileStore/tables/{}.csv'.format(selectDataset)
Mpath = 'dbfs:/FileStore/tables/mesh.csv'
Ppath = 'dbfs:/FileStore/tables/pharma.csv'
Crdd = sc.textFile(CPath)
Mrdd = sc.textFile(Mpath)
Prdd = sc.textFile(Ppath)
cHeader = return_first_line_header(Crdd)
mHeader = return_first_line_header(Mrdd)
pHeader = return_first_line_header(Prdd)
# As the first line of all the RDD is a header, I am skipping the first line to only get the contents of the file.
Crdd = Crdd.filter(lambda x: x!=cHeader)
Mrdd = Mrdd.filter(lambda x: x!=mHeader)
Prdd = Prdd.filter(lambda x: x!=pHeader)
# problem statement 1
Crdd.distinct().count()
Out[2]: 387261
# problem statement 2
Crdd.map(lambda x:x.split('|')[5]).map(lambda x: (x,1)).reduceByKey(lambda a,b: a+b).sortBy(lambda x: [-x[1],x[0]]).take(5)
Out[3]: [('Interventional', 301472),
('Observational', 77540),
('Observational [Patient Registry]', 8180),
('Expanded Access', 69)]
# problem statement 3
Crdd.map(lambda x: x.split('|')[7]).flatMap(lambda x: x.split(',')).map(lambda x: (x,1)).filter(lambda x: x[0]!='').reduceByKey(lambda x,v: x+v).sortBy(lambda x: [-x[1],x[0]]).take(5)
Out[4]: [('Carcinoma', 13389),
('Diabetes Mellitus', 11080),
('Neoplasms', 9371),
('Breast Neoplasms', 8640),
('Syndrome', 8032)]
# problem statement 4
def preProcessMeshRdd(row):
newRow = row.split(',')
f = '{}{}'.format(newRow[0], newRow[1]) if len(newRow) == 3 else newRow[0]
l = newRow[2] if len(newRow) == 3 else newRow[1]
print((f,l))
return (f,l)
CData = Crdd.map(lambda x: x.split('|')[7]).flatMap(lambda x: x.split(',')).map(lambda x: (x,1)).filter(lambda x: len(x[0])>0)
CData.join(Mrdd.map(lambda x: ','.join(preProcessMeshRdd(x))).map(lambda x: (x.split(',')[0],x.split(',')[1].split('.')[0]))).map(lambda x: (x[1][1], x[1][0])).reduceByKey(lambda x,v: x+v).sortBy(lambda x: [-x[1],x[0]]).take(5)
Out[5]: [('C04', 143994),
('C23', 136079),
('C01', 106674),
('C14', 94523),
('C10', 92310)]
# problem statement 5
Pdata = Prdd.map(lambda x: x.replace('"','')).map(lambda x:x.split(',')[1])
Crdd.map(lambda x: x.split('|')[1]).subtract(Pdata).map(lambda x: (x,1)).reduceByKey(lambda x,v: x+v).sortBy(lambda x: [-x[1],x[0]]).take(10)
Out[6]: [('National Cancer Institute (NCI)', 3218),
('M.D. Anderson Cancer Center', 2414),
('Assistance Publique - Hôpitaux de Paris', 2369),
('Mayo Clinic', 2300),
('Merck Sharp & Dohme Corp.', 2243),
('Assiut University', 2154),
('Novartis Pharmaceuticals', 2088),
('Massachusetts General Hospital', 1971),
('Cairo University', 1928),
('Hoffmann-La Roche', 1828)]
# problem statement 6
study_date_to_filter = selectDataset.split('_')[-1]
res = Crdd.map(lambda x: x.split('|')).map(lambda x:(x[2], x[4])).filter(lambda x: x[0]=='Completed').map(lambda x: x[1].split(' ')).filter(lambda x: len(x)==2).filter(lambda x: x[1]==study_date_to_filter).map(lambda x:(x[0],1)).reduceByKey(lambda x,v: x+v).sortBy(lambda x: [-x[1],x[0]]).collect()
def change_months_to_correct_order(results):
end_results = []
order = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
result_dict = dict(results)
for o in order:
new_tup = (o, result_dict.get(o))
if new_tup[1]:end_results.append(new_tup)
return end_results
change_months_to_correct_order(res)
Out[7]: [('Jan', 1131),
('Feb', 934),
('Mar', 1227),
('Apr', 967),
('May', 984),
('Jun', 1094),
('Jul', 819),
('Aug', 700),
('Sep', 528),
('Oct', 187)]