Skip to content

Commit 1a7b810

Browse files
authored
Add support for JSON_INPUT in gzipped configMap (#41)
1 parent a581a0a commit 1a7b810

3 files changed

Lines changed: 44 additions & 11 deletions

File tree

src/tesk_core/filer.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import logging
1212
import netrc
1313
import requests
14+
import gzip
1415
from tesk_core.exception import UnknownProtocol, FileProtocolDisabled
1516
import shutil
1617
from glob import glob
@@ -481,7 +482,11 @@ def main():
481482

482483
logging.info('Starting %s filer...', args.transputtype)
483484

484-
data = json.loads(args.data)
485+
if args.data.endswith('.gz'):
486+
with gzip.open(args.data, 'rb') as fh:
487+
data = json.loads(fh.read())
488+
else:
489+
data = json.loads(args.data)
485490

486491
for afile in data[args.transputtype]:
487492
logging.debug('Processing file: %s', afile['path'])
@@ -494,4 +499,4 @@ def main():
494499

495500

496501
if __name__ == "__main__":
497-
sys.exit(main())
502+
sys.exit(main())

src/tesk_core/filer_class.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,9 @@ def getEnv(self): return self.getContainer(0)['env']
1414
def getImagePullPolicy(self): return self.getContainer(0)['imagePullPolicy']
1515

1616

17-
def __init__(self, name, data, filer_name='eu.gcr.io/tes-wes/filer', filer_version='v0.5', pullPolicyAlways = False):
17+
def __init__(self, name, data, filer_name='eu.gcr.io/tes-wes/filer', filer_version='v0.5', pullPolicyAlways = False, json_pvc=None):
1818
self.name = name
19+
self.json_pvc = json_pvc
1920
self.spec = {
2021
"kind": "Job",
2122
"apiVersion": "batch/v1",
@@ -41,11 +42,22 @@ def __init__(self, name, data, filer_name='eu.gcr.io/tes-wes/filer', filer_versi
4142
}
4243

4344
env = self.getEnv()
44-
env.append({"name": "JSON_INPUT", "value": json.dumps(data)})
45+
if json_pvc is None:
46+
env.append({"name": "JSON_INPUT", "value": json.dumps(data)})
4547
env.append({"name": "HOST_BASE_PATH", "value": path.HOST_BASE_PATH})
4648
env.append(
4749
{"name": "CONTAINER_BASE_PATH", "value": path.CONTAINER_BASE_PATH})
4850

51+
if json_pvc:
52+
self.getVolumeMounts().append({
53+
"name" : 'jsoninput'
54+
, 'mountPath' : '/jsoninput'
55+
})
56+
self.getVolumes().append({
57+
"name" : 'jsoninput'
58+
, "configMap" : { 'name' : json_pvc }
59+
})
60+
4961
if fileEnabled():
5062
self.getVolumeMounts().append({
5163

@@ -152,8 +164,12 @@ def add_netrc_mount(self, netrc_name='netrc'):
152164

153165

154166
def get_spec(self, mode, debug=False):
155-
self.spec['spec']['template']['spec']['containers'][0]['args'] = [
156-
mode, "$(JSON_INPUT)"]
167+
if self.json_pvc is None:
168+
self.spec['spec']['template']['spec']['containers'][0]['args'] = [
169+
mode, "$(JSON_INPUT)"]
170+
else:
171+
self.spec['spec']['template']['spec']['containers'][0]['args'] = [
172+
mode, "/jsoninput/JSON_INPUT.gz"]
157173

158174
if debug:
159175
self.spec['spec']['template']['spec']['containers'][0][

src/tesk_core/taskmaster.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import re
77
import sys
88
import logging
9+
import gzip
910
from kubernetes import client, config
1011
from tesk_core.job import Job
1112
from tesk_core.pvc import PVC
@@ -132,13 +133,18 @@ def init_pvc(data, filer):
132133
return pvc
133134

134135

135-
def run_task(data, filer_name, filer_version):
136+
def run_task(data, filer_name, filer_version, have_json_pvc):
136137
task_name = data['executors'][0]['metadata']['labels']['taskmaster-name']
137138
pvc = None
138139

140+
if have_json_pvc:
141+
json_pvc = task_name
142+
else:
143+
json_pvc = None
144+
139145
if data['volumes'] or data['inputs'] or data['outputs']:
140146

141-
filer = Filer(task_name + '-filer', data, filer_name, filer_version, args.pull_policy_always)
147+
filer = Filer(task_name + '-filer', data, filer_name, filer_version, args.pull_policy_always, json_pvc)
142148

143149
if os.environ.get('TESK_FTP_USERNAME') is not None:
144150
filer.set_ftp(
@@ -249,6 +255,7 @@ def newLogger(loglevel):
249255

250256

251257
def main():
258+
have_json_pvc = False
252259

253260
parser = newParser()
254261
global args
@@ -271,8 +278,13 @@ def main():
271278
elif args.file == '-':
272279
data = json.load(sys.stdin)
273280
else:
274-
with open(args.file) as fh:
275-
data = json.load(fh)
281+
if args.file.endswith('.gz'):
282+
with gzip.open(args.file, 'rb') as fh:
283+
data = json.loads(fh.read())
284+
have_json_pvc = True
285+
else:
286+
with open(args.file) as fh:
287+
data = json.load(fh)
276288

277289
# Load kubernetes config file
278290
if args.localKubeConfig:
@@ -287,7 +299,7 @@ def main():
287299
if check_cancelled():
288300
exit_cancelled('Cancelled during init')
289301

290-
run_task(data, args.filer_name, args.filer_version)
302+
run_task(data, args.filer_name, args.filer_version, have_json_pvc)
291303

292304

293305
def clean_on_interrupt():

0 commit comments

Comments
 (0)