Skip to content

Commit f9167c8

Browse files
authored
[fix] Resource upload and add document (#36)
* Fix resource upload function * And some documentation about how to use resource uploading
1 parent 533a06f commit f9167c8

9 files changed

Lines changed: 320 additions & 9 deletions

File tree

docs/source/concept.rst

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,84 @@ decide workflow of task. You could set `workflow` in both normal assign or in co
182182
183183
With both `Workflow`_, `Tasks`_ and `Tasks Dependence`_, we could build a workflow with multiple tasks.
184184
185+
Resource Files
186+
--------------
187+
188+
During workflow running, we may need some resource files to help us run task usually. One of a common situation
189+
is that we already have some executable files locally, and we need to schedule in specific time, or add them
190+
to existing workflow by adding the new tasks. Of cause, we can upload those files to target machine and run them
191+
in :doc:`shell task <tasks/shell>` by reference the absolute path of file. But if we have more than one machine
192+
to run task, we have to upload those files to each of them. And it is not convenient and not flexible, because
193+
we may need to change our resource files sometimes.
194+
195+
The more pydolphinscheduler way is to upload those files together with `workflow`_, and use them in task to run.
196+
For example, you have a bash script named ``echo-ten.sh`` locally, and it contains some code like this:
197+
198+
.. code-block:: bash
199+
200+
#!/bin/env bash
201+
max=10
202+
for ((i=1; i <= $max; ++i)); do
203+
echo "$i"
204+
done
205+
206+
and you want to use it in workflow but do not want to copy-paste it to shell task parameter ``command``. You could
207+
use this mechanism to upload it to resource center when you create workflow
208+
209+
.. code-block:: python
210+
211+
# Read file content
212+
file_name = "echo-ten.sh"
213+
214+
with open(file_name, "r") as f:
215+
content = f.read()
216+
217+
with Workflow(
218+
name="upload_and_run",
219+
resource_list=[
220+
Resource(name=file_name, content=content),
221+
],
222+
) as workflow:
223+
224+
And when we call :code:`workflow.run()` the new file named ``echo-ten.sh`` would be uploaded to dolphinscheduler
225+
resource center.
226+
227+
After that we can use this file in our task by reference it by name, in this case we could use :doc:`shell task <tasks/shell>`
228+
to run it.
229+
230+
.. code-block:: python
231+
232+
# We use `shell` task to run `echo-ten.sh` file
233+
shell_task = Shell(
234+
name="run",
235+
command=f"bash {file_name}",
236+
resource_list=[
237+
file_name
238+
],
239+
)
240+
241+
During workflow running, the file would be downloaded to the task runtime working directory which mean you could
242+
execute them. We execute file by ``bash`` but reference it by name directly.
243+
244+
Please notice that we could also reference the resource file already in dolphinscheduler resource center, which
245+
mean we could use resource center files in task by name without upload it again. And we can upload files to
246+
resource center bare without workflow.
247+
248+
.. code-block:: python
249+
250+
# Upload file to resource center
251+
from pydolphinscheduler.core.resource import Resource
252+
253+
resource = Resource(name="bare-create.py", user_name="<USER-MUST-EXISTS-WITH-TENANT>", content="print('Bareh create resource')")
254+
resource.create_or_update_resource()
255+
256+
After that, we could see new file named ``bare-create.py`` is be created in resource center.
257+
258+
.. note::
259+
260+
Both parameter ``resource_list`` in workflow and task is list of string which mean you could upload and reference
261+
multiple files. For more complex usage, you could read :doc:`howto/multi-resources`.
262+
185263
Authentication Token
186264
--------------------
187265

docs/source/howto/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,4 @@ Currently, the HOWTOs are:
2828
:maxdepth: 2
2929

3030
remote-submit
31+
multi-resources
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
.. Licensed to the Apache Software Foundation (ASF) under one
2+
or more contributor license agreements. See the NOTICE file
3+
distributed with this work for additional information
4+
regarding copyright ownership. The ASF licenses this file
5+
to you under the Apache License, Version 2.0 (the
6+
"License"); you may not use this file except in compliance
7+
with the License. You may obtain a copy of the License at
8+
9+
.. http://www.apache.org/licenses/LICENSE-2.0
10+
11+
.. Unless required by applicable law or agreed to in writing,
12+
software distributed under the License is distributed on an
13+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
KIND, either express or implied. See the License for the
15+
specific language governing permissions and limitations
16+
under the License.
17+
18+
Upload and Use Multiple Resources
19+
=================================
20+
21+
Resource center help us manager resources in a centralized way, easy to change and distribute them to all the workers.
22+
for more detail you can see :ref:`resources files <concept:resource files>`.
23+
24+
In this section we will show you how to upload and use multiple resources which is more common in production environment
25+
and in the real word.
26+
27+
Overview
28+
--------
29+
30+
.. literalinclude:: ../../../src/pydolphinscheduler/examples/multi_resources_example.py
31+
:dedent: 0
32+
:start-after: [start workflow]
33+
:end-before: [end workflow]
34+
35+
In this example, we will upload two python files to resource center and use them in one single task, the python
36+
files are named ``dependence.py`` and ``main.py``. And ``main.py`` use ``dependence.py`` as a dependency which
37+
will use a variable ``now`` declared in ``dependence.py``. So in task shell could call :code:`python main.py`
38+
to get all things done.
39+
40+
Upload Resources
41+
----------------
42+
43+
The module ``Resource`` need to be import firstly.
44+
45+
.. code-block:: python
46+
47+
from pydolphinscheduler.core.resource import Resource
48+
49+
Then we need to create two resources object and assign them to ``resource_list`` of the workflow. All content of
50+
resources should assign to ``content`` attribute of the resource object. Please know that we import variable
51+
:code:`now` from ``dependence.py`` in ``main.py``.
52+
53+
.. literalinclude:: ../../../src/pydolphinscheduler/examples/multi_resources_example.py
54+
:dedent: 0
55+
:start-after: [start create_new_resources]
56+
:end-before: [end create_new_resources]
57+
58+
Use Resources
59+
-------------
60+
61+
Same as :ref:`using single resource <concept:resource files>`, all we need is to assign them to ``resource_list``
62+
attribute of the task and then call the main file to run our task. In this example, we call :code:`python main.py`
63+
which will use ``dependence.py`` as a dependency.
64+
65+
.. literalinclude:: ../../../src/pydolphinscheduler/examples/multi_resources_example.py
66+
:dedent: 0
67+
:start-after: [start use_exists_resources]
68+
:end-before: [end use_exists_resources]
69+
70+
After run the workflow, will execute main.py and print the current datetime. You can see the result like this:
71+
72+
.. code-block:: text
73+
74+
2022-11-29 16:16:51.952742
75+

src/pydolphinscheduler/core/task.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
"""DolphinScheduler Task and TaskRelation object."""
1919
import copy
2020
import types
21+
import warnings
2122
from logging import getLogger
2223
from typing import Dict, List, Optional, Sequence, Set, Tuple, Union
2324

@@ -190,7 +191,7 @@ def workflow(self, workflow: Optional[Workflow]):
190191
self._workflow = workflow
191192

192193
@property
193-
def resource_list(self) -> List:
194+
def resource_list(self) -> List[Dict[str, Resource]]:
194195
"""Get task define attribute `resource_list`."""
195196
resources = set()
196197
for res in self._resource_list:
@@ -199,17 +200,19 @@ def resource_list(self) -> List:
199200
Resource(name=res, user_name=self.user_name).get_id_from_database()
200201
)
201202
elif type(res) == dict and res.get(ResourceKey.ID) is not None:
202-
logger.warning(
203+
warnings.warn(
203204
"""`resource_list` should be defined using List[str] with resource paths,
204205
the use of ids to define resources will be remove in version 3.2.0.
205-
"""
206+
""",
207+
DeprecationWarning,
208+
stacklevel=2,
206209
)
207210
resources.add(res.get(ResourceKey.ID))
208211
return [{ResourceKey.ID: r} for r in resources]
209212

210213
@property
211214
def user_name(self) -> Optional[str]:
212-
"""Return user name of workflow."""
215+
"""Return username of workflow."""
213216
if self.workflow:
214217
return self.workflow.user.name
215218
else:

src/pydolphinscheduler/core/workflow.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,12 @@ def submit(self) -> int:
419419
self._ensure_side_model_exists()
420420
self._pre_submit_check()
421421

422+
# resource should be created before workflow
423+
if len(self.resource_list) > 0:
424+
for res in self.resource_list:
425+
res.user_name = self._user
426+
res.create_or_update_resource()
427+
422428
self._workflow_code = gateway.create_or_update_workflow(
423429
self._user,
424430
self._project,
@@ -438,10 +444,6 @@ def submit(self) -> int:
438444
json.dumps(self.schedule_json) if self.schedule_json else None,
439445
None,
440446
)
441-
if len(self.resource_list) > 0:
442-
for res in self.resource_list:
443-
res.user_name = self._user
444-
res.create_or_update_resource()
445447
return self._workflow_code
446448

447449
def start(self) -> None:
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
"""
19+
This example show how to upload file to dolphinscheduler resource center and use them in tasks.
20+
21+
When you want to create a new resource file in resource center, you can add them to ``workflow.resource_list``
22+
use the following code:
23+
24+
.. code-block:: python
25+
26+
with Workflow(
27+
name="use_resource_center",
28+
resource_list=[
29+
Resource(name="new-name.py", content="print('hello world from resource center')"),
30+
],
31+
) as workflow:
32+
33+
during the workflow running, the resource file will be created and uploaded to dolphinscheduler resource
34+
center automatically.
35+
36+
If you want to use the resource file in tasks, you can also use ``resource_list`` parameter in task
37+
constructor, just like the following code:
38+
39+
.. code-block:: python
40+
41+
task_use_resource = Shell(
42+
name="run_resource",
43+
command="python new-name.py",
44+
resource_list=[
45+
"new-name.py",
46+
],
47+
)
48+
49+
and the resource file will be downloaded to the task runtime working directory which mean you cna execute
50+
them. In this example we run the file ``new-name.py`` like we execute python script in terminal. And we can
51+
also use the resource file already in dolphinscheduler resource center, not only the new we created in
52+
current workflow.
53+
"""
54+
55+
# [start workflow]
56+
from pydolphinscheduler.core import Workflow
57+
from pydolphinscheduler.core.resource import Resource
58+
from pydolphinscheduler.tasks import Shell
59+
60+
dependence = "dependence.py"
61+
main = "main.py"
62+
63+
with Workflow(
64+
name="multi_resources_example",
65+
tenant="tenant_exists",
66+
# [start create_new_resources]
67+
resource_list=[
68+
Resource(
69+
name=dependence,
70+
content="from datetime import datetime\nnow = datetime.now()",
71+
),
72+
Resource(name=main, content="from dependence import now\nprint(now)"),
73+
],
74+
# [end create_new_resources]
75+
) as pd:
76+
# [start use_exists_resources]
77+
task_use_resource = Shell(
78+
name="use-resource",
79+
command=f"python {main}",
80+
resource_list=[
81+
dependence,
82+
main,
83+
],
84+
)
85+
# [end use_exists_resources]
86+
87+
pd.run()
88+
# [end workflow]

src/pydolphinscheduler/models/tenant.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ def create_if_not_exists(
4646
self, queue_name: str, user=configuration.USER_NAME
4747
) -> None:
4848
"""Create Tenant if not exists."""
49-
tenant = gateway.create_tenant(self.name, self.description, queue_name)
49+
tenant = gateway.create_tenant(self.name, queue_name, self.description)
5050
self.tenant_id = tenant.getId()
5151
self.code = tenant.getTenantCode()
5252
# gateway_result_checker(result, None)
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
"""Test pydolphinscheduler resources."""
19+
20+
import pytest
21+
from py4j.java_gateway import JavaObject
22+
23+
from pydolphinscheduler.core.resource import Resource
24+
from pydolphinscheduler.models.user import User
25+
from tests.testing.constants import UNIT_TEST_TENANT, UNIT_TEST_USER_NAME
26+
27+
name = "unittest_resource.txt"
28+
content = "unittest_resource_content"
29+
30+
31+
@pytest.fixture(scope="module")
32+
def tmp_user():
33+
"""Get a temporary user."""
34+
user = User(
35+
name=UNIT_TEST_USER_NAME,
36+
password="unittest-password",
37+
email="test-email@abc.com",
38+
phone="17366637777",
39+
tenant=UNIT_TEST_TENANT,
40+
queue="test-queue",
41+
status=1,
42+
)
43+
user.create_if_not_exists()
44+
yield
45+
user.delete()
46+
47+
48+
def test_create_or_update(tmp_user):
49+
"""Test create or update resource to java gateway."""
50+
resource = Resource(name=name, content=content, user_name=UNIT_TEST_USER_NAME)
51+
result = resource.create_or_update_resource()
52+
assert result is not None and isinstance(result, JavaObject)
53+
assert result.getAlias() == name
54+
55+
56+
def test_get_resource_info(tmp_user):
57+
"""Test get resource info from java gateway."""
58+
resource = Resource(name=name, user_name=UNIT_TEST_USER_NAME)
59+
result = resource.get_info_from_database()
60+
assert result is not None and isinstance(result, JavaObject)
61+
assert result.getAlias() == name

tests/testing/constants.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,3 +49,6 @@
4949

5050
# default token
5151
TOKEN = "jwUDzpLsNKEFER4*a8gruBH_GsAurNxU7A@Xc"
52+
53+
UNIT_TEST_USER_NAME = "unittest_user"
54+
UNIT_TEST_TENANT = "unittest_tenant"

0 commit comments

Comments
 (0)