Hera is a Python framework for constructing and submitting Argo Workflows.

Hera (hera-workflows)

The Argo was constructed by the shipwright Argus, and its crew were specially protected by the goddess Hera.

(https://en.wikipedia.org/wiki/Argo)

License: MIT

Hera is a Python framework for constructing and submitting Argo Workflows. The main goal of Hera is to make Argo Workflows more accessible by abstracting away some setup that is typically necessary for constructing Argo workflows.

Python functions are first class citizens in Hera - they are the atomic units (execution payload) that are submitted for remote execution. The framework makes it easy to wrap execution payloads into Argo Workflow tasks, set dependencies, resources, etc.

You can watch the introductory Hera presentation at the "Argo Workflows and Events Community Meeting 20 Oct 2021" here!

Table of content

Assumptions

Hera is exclusively dedicated to remote workflow submission and execution. Therefore, it requires an Argo server to be deployed to a Kubernetes cluster. Currently, Hera assumes that the Argo server sits behind an authentication layer that can authenticate workflow submission requests by using the Bearer token on the request. To learn how to deploy Argo to your own Kubernetes cluster you can follow the Argo Workflows guide!

Another option for workflow submission without the authentication layer is using port forwarding to your Argo server deployment and submitting workflows to localhost:2746 (2746 is the default, but you are free to use yours). Please refer to the documentation of Argo Workflows to see the command for port forward!

In the future some of these assumptions may either increase or decrease depending on the direction of the project. Hera is mostly designed for practical data science purposes, which assumes the presence of a DevOps team to set up an Argo server for workflow submission.

Installation

Hera can currently be installed directly from this repository using:

python3 -m pip install git+https://github.com/argoproj-labs/hera-workflows --ignore-installed

Alternatively, you can clone this repository and then run the following to install:

python setup.py install

Contributing

If you plan to submit contributions to Hera you can install Hera in a virtual environment managed by pipenv:

pipenv shell
pipenv sync --dev --pre

Also, see the contributing guide!

Concepts

Currently, Hera is centered around two core concepts. These concepts are also used by Argo, which Hera aims to stay consistent with:

  • Task - the object that holds the Python function for remote execution/the atomic unit of execution;
  • Workflow - the higher level representation of a collection of tasks.

Examples

A very primitive example of submitting a task within a workflow through Hera is:

from hera.v1.task import Task
from hera.v1.workflow import Workflow
from hera.v1.workflow_service import WorkflowService


def say(message: str):
    """
    This can be anything as long as the Docker image satisfies the dependencies. You can import anything Python 
    that is in your container e.g torch, tensorflow, scipy, biopython, etc - just provide an image to the task!
    """
    print(message)


ws = WorkflowService('my-argo-domain.com', 'my-argo-server-token')
w = Workflow('my-workflow', ws)
t = Task('say', say, [{'message': 'Hello, world!'}])
w.add_task(t)
w.submit()

Examples

See the examples directory for a collection of Argo workflow construction and submission via Hera!

Comparison

There are other libraries currently available for structuring and submitting Argo Workflows:

  • Couler, which aims to provide a unified interface for constructing and managing workflows on different workflow engines;
  • Argo Python DSL, which allows you to programmaticaly define Argo worfklows using Python.

While the aforementioned libraries provide amazing functionality for Argo workflow construction and submission, they require an advanced understanding of Argo concepts. When Dyno Therapeutics started using Argo Workflows, it was challenging to construct and submit experimental machine learning workflows. Scientists and engineers at Dyno Therapeutics used a lot of time for workflow definition rather than the implementation of the atomic unit of execution - the Python function - that performed, for instance, model training.

Hera presents a much simpler interface for task and workflow construction, empowering users to focus on their own executable payloads rather than workflow setup. Here's a side by side comparison of Hera, Argo Python DSL, and Couler:

Hera Couler Argo Python DSL

from hera.v1.task import Task
from hera.v1.workflow import Workflow
from hera.v1.workflow_service import WorkflowService


def say(message: str):
    print(message)


ws = WorkflowService('my-argo-server.com', 'my-auth-token')
w = Workflow('diamond', ws)
a = Task('A', say, [{'message': 'This is task A!'}])
b = Task('B', say, [{'message': 'This is task B!'}])
c = Task('C', say, [{'message': 'This is task C!'}])
d = Task('D', say, [{'message': 'This is task D!'}])

a.next(b).next(d)  # a >> b >> d
a.next(c).next(d)  # a >> c >> d

w.add_tasks(a, b, c, d)
w.submit()

B [lambda: job(name="A"), lambda: job(name="C")], # A -> C [lambda: job(name="B"), lambda: job(name="D")], # B -> D [lambda: job(name="C"), lambda: job(name="D")], # C -> D ] ) diamond() submitter = ArgoSubmitter() couler.run(submitter=submitter) ">
import couler.argo as couler
from couler.argo_submitter import ArgoSubmitter


def job(name):
    couler.run_container(
        image="docker/whalesay:latest",
        command=["cowsay"],
        args=[name],
        step_name=name,
    )


def diamond():
    couler.dag(
        [
            [lambda: job(name="A")],
            [lambda: job(name="A"), lambda: job(name="B")],  # A -> B
            [lambda: job(name="A"), lambda: job(name="C")],  # A -> C
            [lambda: job(name="B"), lambda: job(name="D")],  # B -> D
            [lambda: job(name="C"), lambda: job(name="D")],  # C -> D
        ]
    )


diamond()
submitter = ArgoSubmitter()
couler.run(submitter=submitter)

V1alpha1Template: return self.echo(message=message) @task @parameter(name="message", value="B") @dependencies(["A"]) def B(self, message: V1alpha1Parameter) -> V1alpha1Template: return self.echo(message=message) @task @parameter(name="message", value="C") @dependencies(["A"]) def C(self, message: V1alpha1Parameter) -> V1alpha1Template: return self.echo(message=message) @task @parameter(name="message", value="D") @dependencies(["B", "C"]) def D(self, message: V1alpha1Parameter) -> V1alpha1Template: return self.echo(message=message) @template @inputs.parameter(name="message") def echo(self, message: V1alpha1Parameter) -> V1Container: container = V1Container( image="alpine:3.7", name="echo", command=["echo", "{{inputs.parameters.message}}"], ) return container ">
from argo.workflows.dsl import Workflow

from argo.workflows.dsl.tasks import *
from argo.workflows.dsl.templates import *


class DagDiamond(Workflow):

    @task
    @parameter(name="message", value="A")
    def A(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @task
    @parameter(name="message", value="B")
    @dependencies(["A"])
    def B(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @task
    @parameter(name="message", value="C")
    @dependencies(["A"])
    def C(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @task
    @parameter(name="message", value="D")
    @dependencies(["B", "C"])
    def D(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @template
    @inputs.parameter(name="message")
    def echo(self, message: V1alpha1Parameter) -> V1Container:
        container = V1Container(
            image="alpine:3.7",
            name="echo",
            command=["echo", "{{inputs.parameters.message}}"],
        )

        return container

Comments
  • CICD Improvements

    CICD Improvements

    About

    this is a work in progress; it aims to check feasibility for further discussion

    Following #93, further improve CI + CD

    Tasks

    • [x] add coverage reports tasks and ci
      • [x] store all py vers X OS coverage files
      • [x] combine all py vers X OS coverage files
      • [x] create html cov
      • [x] store coverage report artifact
    • [x] add optional binary distribution to wheels via setup.py
      • [x] add a none-any build just in case and publish it
      • [x] add sdist build ( .tar.gz ) build for publishing
    • [x] store built wheels by tox
      • reason for publishing to tested wheels
    • [x] build wheels for all distributions ( matrix = py ver + os )
    • [x] build py3-none-any wheel + sdist ( .tar.gz )
    • [x] consolidation hera CICD into a single gh workflow
      • motivation re-use test / build job as a pre-publishing job
      • motivation to leverage artifacts passing between jobs rather than between different workflows
      • [x] add publish job ( similar to exiting one )
      • [x] change event triggers
      • [x] add if: to publish job
      • [x] delete file .github/workflows/hera_build_and_publish.yaml
      • [x] rename file .github/workflows/hera_pr.yaml -> .github/workflows/cicd.yaml ( or .. )
      • [x] fix jobs badge workflow name hera_build_and_publish.yaml -> cicd.yaml ( or .. ) in README.md
    • [x] enable upload to pypi-test for the purpose of validating entire publish job ( see publish job failure )
    • [x] replace upload to pypi-test with upload to pypi in publish job
    • [ ] setup codecov if we want a coverage badge to be introduced in this PR ( or propose a different solution )

    inspiration credit https://github.com/samuelcolvin/pydantic/blob/master/.github/workflows/ci.yml

  • Add context management for workflow submission

    Add context management for workflow submission

    Currently, users have to add tasks to a workflow independently. After a task is defined, it is necessary to call workflow.add_tasks(t1, t2, ...) for otherwise the tasks are not added to the template of the DAG in the main workflow. To save some effort in this arena, a context manager can be introduced to auto-insert tasks into a workflow, and submit a workflow during the exit phase of the context. This will provide two ways of adding tasks to a workflow.

    Current, and supported way of adding tasks and submitting a workflow:

    wf = Workflow('test')
    t1 = Task('t1', image='docker/whalesay', command=['cowsay', 'foo'])
    t2 = Task('t2', image='docker/whalesay', command=['cowsay', 'foo'])
    t1 >> t2
    wf.add(t1, t2)
    wf.submit()
    

    Potential future (second) way of adding tasks and submitting a workflow:

    with Workflow('test') as wf:
        Task('t1', image='docker/whalesay', command=['cowsay', 'foo']) 
        >> Task('t2', image='docker/whalesay', command=['cowsay', 'foo'])
    

    Alternatively:

    with Workflow('test') as wf:
        t1 = Task('t1', image='docker/whalesay', command=['cowsay', 'foo']) 
        t2 = Task('t2', image='docker/whalesay', command=['cowsay', 'foo'])
        t1 >> t2
    

    This will require the implementation of a context manager on Workflow and a container for Task, which tasks insert themselves into during __init__:

    class Workflow:
    
        def __enter__(self, ...):
            ...
            return self
    
        def __exit__(self, ...):
            ...
            return self.submit()
    
    class _Container(list):
        
        @classmethod
        def add(cls, t: Task):
            cls.append(t)
    
    class Task:
    
        def __init__(...):
            ...
            _Container.add(self)
    

    Questions to consider:

    • Does this solve a real problem or would it cause confusion because of of the implicit behavior of task additions?
    • Is this an interface that abides by Hera's principle of simplicity? Is the _Container usage a bit too abstract?
    • Is it safe to assume that exiting a context should indeed submit a workflow? If not but the design is something desirable, should users be provided with the feature, along with a parameter on Workflow that says "auto_submit? Does that pollute the interface ofWorkflow` unnecessarily with QOL features rather than focusing on exposing Argo specific parameters?

    Would love to discuss this @bchalk101 and @bobh66! Any thoughts? Would this be useful to your use case?

  • Multiple Python Versions CI

    Multiple Python Versions CI

    Purpose of PR

    related issues and PRs #73 #92 #94

    Provide multiple python versions CI ( CD will be handled in a different PR )

    To achieve this, introduce usage of tox for testing built wheels in "isolated envs" of multiple python versions.

    Notes regarding similar open PRs

    1. The difference between this PR and #92 is that it does not run pipenv in tox, but rather lets tox handle dependencies and venvs
    2. The difference between this PR and #94 that it keep pipenv as the project's dev env manager

    Tasks

    • [x] add tox.ini with python{3.7,3.8,3.9,3.10} test envs
    • [x] add build matrix of python versions to ci
      • ref docs: https://docs.github.com/en/actions/using-workflows/advanced-workflow-features#using-a-build-matrix
      • ref proj: https://github.com/spotify/luigi/
      • ref proj: https://github.com/samuelcolvin/pydantic/
    • [x] remove caching of Pipfile.lock
      • Pipfile.lock is a py37 lock file
    • [x] for dev purposes, update Pipfile and Pipfile.lock
      • for ci, tox is used directly as Pipfile.lock is a py37 lock file

    Notes for future changes

    1. Evaluate usage of pipenv in tox 1.1 See https://pipenv-fork.readthedocs.io/en/latest/advanced.html#tox-automation-project 1.2 Evaluate tox-pipenv
    2. Evaluate poetry to replace pipenv 2.1 See #94 2.2 See related discussion of "who does what and why" regarding tox and poetry: https://github.com/python-poetry/poetry/discussions/4307

    Changes to Pipfile

    To use tox, it was added to pipfile as a dev dependency, with the following command:

    pipenv install pytest-cov "tox<4.0" tox-wheel --dev
    

    tox-wheel was added as well to make tox build wheels rather than zip, so these can be used for publishing [ more below ].

    After running tox, you'll find the wheels under .tox/dist:

    ❯ ls .tox/dist
    hera_workflows-1.8.0rc7-py3-none-any.whl
    

    pytest-cov was added as well so we can add a coverage reports and tests, and perhaps add a coverage test.

    Even though this might be related to a different PR, I think it's also related to this PR as well, as it concerns major changes to how hera is tested. In this context, I've added the relevant pytest opt in setup.cfg.

    More on added tox.ini

    To use tox update / recreate your pipenv virtual env according to the updated Pipfile

    An initial tox.ini was created with a few tox envs.

    The "main" env - testenv -- provides the first desired functionality discussed in #73 and tests 4 python versions, including 3.10 ( latest ).

    To list all tox envs you can run:

    ❯ tox -a
    python3.7
    python3.8
    python3.9
    python3.10
    lint
    typecheck
    

    And to run a specific one, e.g. lint, run -

    ❯ tox -e lint
    
  • Unable to resolve methods outside script

    Unable to resolve methods outside script

    Hi there, I'm new to Hera and Argo so bear with me if the question is rookie:

    Here is my code snippet:

    from app.common.image.image_utils import read_image
    from app.common.models.thumbnail_metadata import ThumbnailMetadata
    from app.common.s3_utils import resolve_s3_url, upload
    from app.common.tasks.argo_task import ArgoTask
    from app.services.argo_client import ArgoWorkflowService
    
    class ThumbnailTask(ArgoTask):
        def __init__(self, metadata: ThumbnailMetadata):
            # .... some more code
    
        def create_thumbnails(self, image_url: str, thumbnail_sizes: list[dict]):
            blob = read_image(self.image_url)
    
            # .... some more code
    
        def create(self):
            ArgoWorkflowService.create_task('create_thumbnails', ThumbnailTask.create_thumbnails, [{
                'image_url': self.image_url,
                'thumbnail_sizes': self.thumbnail_sizes
            }])
    
    from typing import Callable
    from hera.retry import Retry
    from hera.task import Task
    from hera.workflow import Workflow
    from hera.workflow_service import WorkflowService
    
    from app.core.config import get_app_settings
    
    class ArgoWorkflowService(object):
        workflow_service = None
        workflow = None
    
        @staticmethod
        def create_task(name: str, func: Callable, func_params: list[dict]):
            # .... some more code
    
            task = Task(name, func, func_params, image='my-repo.dkr.ecr.us-east-2.amazonaws.com/my-app:latest', retry=Retry(duration=3, max_duration=60))
            ArgoWorkflowService.workflow.add_task(task)
            ArgoWorkflowService.workflow.create()
    

    The error I received in Argo is:

    │ main Traceback (most recent call last):                                                                                                                                                                                         │
    │ main   File "/argo/staging/script", line 5, in <module>                                                                                                                                                                         │
    │ main     blob = read_image(self.image_url)                                                                                                                                                                                      │
    │ main NameError: name 'read_image' is not defined                                                                                                                                                                                │
    │
    

    The method read_image is from other packages.

    In addition, Argo was unable to import 3rd party python libraries. I used the same application image but still not working.

    Any help would be appreciated!

  • How to create a workflow with 10k tasks

    How to create a workflow with 10k tasks

    I'd like to have a workflow with many tasks, but I'm running into the 1.5MB k8s/etcd file limit. The workflow isn't complicated, it's basically 10k very-short bash commands that all run on the same image with the same resources etc.

    I think the solution here is to use a WorkflowTemplate, but I haven't figured out how to use input parameters with hera.

    I have something like this:

    # set up the WorkflowTemplate
    wt = WorkflowTemplate(...)
    
    # is this the right way to pass input in?
    t = Task("cmd", lambda: _, command=["bash", "-c", "{{inputs.parameters.cmd}}"], ...)
    wt.add_task(t)
    wt.create()
    
    # how do I get these commands into the workflow as parameters?
    commands = ["echo foo", "echo bar"]
    
    # create the Workflow
    workflow = Workflow(..., workflow_template_ref=wt.name)
    workflow.create()
    
  • How to ignore certificate errors?

    How to ignore certificate errors?

    When I attempt to port-forward Argo running in GKE, I get a certificate error. I tried the suggestion from the first comment here, but I still get the certificate error.

    C:\Users\ryanr\Code\hera-workflows\examples [main ≡ +0 ~1 -0 !]> python .\diamond.py
    Traceback (most recent call last):
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\connectionpool.py", line 706, in urlopen      
        chunked=chunked,
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\connectionpool.py", line 382, in _make_request
        self._validate_conn(conn)
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\connectionpool.py", line 1010, in _validate_conn
        conn.connect()
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\connection.py", line 426, in connect
        tls_in_tls=tls_in_tls,
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\util\ssl_.py", line 450, in ssl_wrap_socket
        sock, context, tls_in_tls, server_hostname=server_hostname
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\util\ssl_.py", line 493, in _ssl_wrap_socket_impl
        return ssl_context.wrap_socket(sock, server_hostname=server_hostname)
      File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.7_3.7.2544.0_x64__qbz5n2kfra8p0\lib\ssl.py", line 423, in wrap_socket
        session=session
      File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.7_3.7.2544.0_x64__qbz5n2kfra8p0\lib\ssl.py", line 870, in _create
        self.do_handshake()
      File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.7_3.7.2544.0_x64__qbz5n2kfra8p0\lib\ssl.py", line 1139, in do_handshake
        self._sslobj.do_handshake()
    ssl.SSLCertVerificationError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1091)
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File ".\diamond.py", line 32, in <module>
        w.submit()
      File "c:\users\ryanr\code\hera-workflows\src\hera\v1\workflow.py", line 129, in submit
        self.service.submit(self.workflow, namespace)
      File "c:\users\ryanr\code\hera-workflows\src\hera\v1\workflow_service.py", line 48, in submit
        return self.service.create_workflow(namespace, V1alpha1WorkflowCreateRequest(workflow=workflow))
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\argo\workflows\client\api\workflow_service_api.py", line 62, in create_workflow
        return self.create_workflow_with_http_info(namespace, body, **kwargs)  # noqa: E501
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\argo\workflows\client\api\workflow_service_api.py", line 162, in create_workflow_with_http_info
        collection_formats=collection_formats)
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\argo\workflows\client\api_client.py", line 369, in call_api
        _preload_content, _request_timeout, _host)
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\argo\workflows\client\api_client.py", line 185, in __call_api
        _request_timeout=_request_timeout)
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\argo\workflows\client\api_client.py", line 413, in request
        body=body)
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\argo\workflows\client\rest.py", line 271, in POST
        body=body)
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\argo\workflows\client\rest.py", line 168, in request
        headers=headers)
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\request.py", line 79, in request
        method, url, fields=fields, headers=headers, **urlopen_kw
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\request.py", line 170, in request_encode_body
        return self.urlopen(method, url, **extra_kw)
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\poolmanager.py", line 375, in urlopen
        response = conn.urlopen(method, u.request_uri, **kw)
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\connectionpool.py", line 796, in urlopen
        **response_kw
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\connectionpool.py", line 796, in urlopen
        **response_kw
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\connectionpool.py", line 796, in urlopen
        **response_kw
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\connectionpool.py", line 756, in urlopen
        method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\util\retry.py", line 574, in increment
        raise MaxRetryError(_pool, url, error or ResponseError(cause))
    urllib3.exceptions.MaxRetryError: HTTPSConnectionPool(host='localhost', port=2746): Max retries exceeded with url: /api/v1/workflows/default (Caused by SSLError(SSLCertVerificationError(1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1091)')))
    
  • Support latest Python version

    Support latest Python version

    Hi! Thank you for developing such a great tool! Currently, Hera supports only Python 3.7. The version is now security status, will become end of support soon.

    reference: https://www.python.org/downloads/

    So it is better that supporting Python 3.8+, IMO. I'm happy to create PR for supporting newer version of Python, if it is OK.

    Thank you.

  • Bugfix: env value_from_input was ignored

    Bugfix: env value_from_input was ignored

    In case of Env being specified via value_from_input, the actual value of the value_from_input is ignored and just its not-Noneness is checked. We fix it here to use it. Current buggy state has the unfortunate consequence of having to name the env var the same as the parameter

  • Add task template mechanism

    Add task template mechanism

    Description

    This MR provides additional funcitonality for templating tasks with a class TaskTemplate.

    Example

    task_template = TaskTemplate('myTemplate', say,
        [{
            'message': "default value for param 'message'",
            "other_message": {"default": {"value": {"for": "other_message"}}},
        }],
    )
    a = task_template.task("taskWithDefaultValues")
    b = task_template.task("taskWithNonDefaultValues", [{"other_message": "hello again"}])
    
    
  • How about supporting onExit?

    How about supporting onExit?

    Argo Workflows provides a function, onExit, to execute a template after the workflow has finished. https://github.com/argoproj/argo-workflows/blob/master/examples/exit-handlers.yaml

    How about supporting onExit for hera-workflows as well? I have tested the following implementation.

    class WorkflowWithOnExit(Workflow):
        def __init__(self, name: str, service: WorkflowService, **kw):
            super().__init__(name, service, **kw)
    
        def add_tasks_on_exit(self, on_exit_task: "OnExitTask", *ts: Task) -> None:
            template_on_exit_name = self.name + "-onExit"
            setattr(self.spec, "on_exit", template_on_exit_name)
    
            # prepare template for onExit
            dag_template_on_exit = IoArgoprojWorkflowV1alpha1DAGTemplate(tasks=[])
            template_on_exit = IoArgoprojWorkflowV1alpha1Template(
                name=template_on_exit_name,
                steps=[],
                dag=dag_template_on_exit,
                parallelism=self.parallelism,
            )
            self.spec.templates.append(template_on_exit)
    
            # HACK: Not using workflow_editors.add_tasks to add to dag_template_on_exit.tasks
            for t in (on_exit_task,) + ts:
                self.spec.templates.append(t.argo_template)
                if t.resources.volumes:
                    for vol in t.resources.volumes:
                        if isinstance(vol, Volume):
                            self.spec.volume_claim_templates.append(vol.get_claim_spec())
                        else:
                            self.spec.volumes.append(vol.get_volume())
                dag_template_on_exit.tasks.append(t.argo_task)
    
    
    class OnExitTask(Task):
        def when_by_workflow_status(
            self, operator: Operator, workflow_status: WorkflowStatus
        ):
            self.argo_task.when = (
                f"{{{{workflow.status}}}} {operator.value} {workflow_status.value}"
            )
    
    def f_with_error():
        raise RuntimeError("Err!")
    
    
    def exit_handler():
        print("exit_handler")
    
    
    def exit_handler_next():
        print("exit_handler_next")
    
    ws = WorkflowService(
        host=host,
        token=token,
        namespace=namespace,
    )
    
    w = WorkflowWithOnExit(
        "test_exit_handler",
        ws,
        namespace=namespace,
        service_account_name=service_account,
    )
    
    task_exit_handler = OnExitTask("exit_handler", exit_handler, image=image)
    task_exit_handler.when_by_workflow_status(Operator.not_equal, WorkflowStatus.Succeeded)
    task_exit_handler_next = Task("exit_handler_next", exit_handler_next, image=image)
    task_exit_handler >> task_exit_handler_next
    
    task_with_error = Task("task_with_error", f_with_error, image=image)
    
    w.add_tasks(task_with_error)
    w.add_tasks_on_exit(task_exit_handler, task_exit_handler_next)
    
    w.create()
    
    STEP                         TEMPLATE                  PODNAME                       DURATION  MESSAGE
     ✖ test-exit-handler         test-exit-handler
     └─✖ task-with-error         task-with-error           test-exit-handler-153195892   12s       Error (exit code 1)
    
     ✔ test-exit-handler.onExit  test-exit-handler-onExit
     ├─✔ exit-handler            exit-handler              test-exit-handler-2402822171  10s
     └─✔ exit-handler-next       exit-handler-next         test-exit-handler-2592397439  10s
    
  • Support multiple python using tox with Poetry

    Support multiple python using tox with Poetry

    fix #73

    What is this PR?

    Related to #93, I implemented support for multiple Python versions using tox. Different from #93, this PR uses Poetry as a package management to simplify configurations of linters, tests and builds.

  • Custom image and inter task communication

    Custom image and inter task communication

    I am facing the issue of not being able to load pickle files from the task that previously saved them. My assumption is it has to do something with the custom image because the example by hera runs without any problems. Any ideas? Here is my code:

    from hera import Artifact, ImagePullPolicy
    from hera.task import Task
    from hera.workflow import Workflow
    from hera.workflow_service import WorkflowService
    
    
    def task_cleaned_data_housing(data_url):
        import pathlib
        import pickle
    
        import data_housing_pipeline_module
    
        data_url = str(data_url)
    
        training_data = data_housing_pipeline_module.get_cleaned_data_housing(
            data_url
        )
    
        if not pathlib.Path("/tmp").joinpath("data_housing_pipeline").exists():
            pathlib.Path("/tmp").joinpath("data_housing_pipeline").mkdir()
        pickle.dump(
            training_data,
            open("/tmp/data_housing_pipeline/variable_training_data.pickle", "wb"),
        )
    
    
    def task_linear_model_housing():
        import pathlib
        import pickle
    
        import data_housing_pipeline_module
    
        training_data = pickle.load(
            open("/tmp/data_housing_pipeline/variable_training_data.pickle", "rb")
        )
    
        linear_model = data_housing_pipeline_module.get_linear_model_housing(
            training_data
        )
    
        if not pathlib.Path("/tmp").joinpath("data_housing_pipeline").exists():
            pathlib.Path("/tmp").joinpath("data_housing_pipeline").mkdir()
        pickle.dump(
            linear_model,
            open("/tmp/data_housing_pipeline/variable_linear_model.pickle", "wb"),
        )
    
    
    ws = WorkflowService(
        host="https://localhost:2746",
        verify_ssl=False,
        token="None",
        namespace="argo",
    )
    
    with Workflow("data-housing-pipeline", service=ws) as w:
    
        cleaned_data_housing = Task(
            "cleaned-data-housing",
            task_cleaned_data_housing,
            [
                {
                    "data_url": "https://raw.githubusercontent.com/LineaLabs/lineapy/main/examples/use_cases/predict_house_price/data/ames_train_cleaned.csv",
                }
            ],
            image="argo_pipeline:latest",
            image_pull_policy=ImagePullPolicy.Never,
            outputs=[
                Artifact(
                    "training_data",
                    "/tmp/data_housing_pipeline/variable_training_data.pickle",
                ),
            ],
        )
    
        linear_model_housing = Task(
            "linear-model-housing",
            task_linear_model_housing,
            image="argo_pipeline:latest",
            image_pull_policy=ImagePullPolicy.Never,
            inputs=[cleaned_data_housing.get_artifact("training_data")],
            outputs=[
                Artifact(
                    "linear_model",
                    "/tmp/data_housing_pipeline/variable_linear_model.pickle",
                ),
            ],
        )
    
        cleaned_data_housing >> linear_model_housing
    
    w.create()
    

    The message I get is: Unable to resolve: "tasks.cleaned-data-housing.outputs.artifacts.training_data" image

  • Clearer documentation about using `--auth-mode=server` or `--auth-mode=client`

    Clearer documentation about using `--auth-mode=server` or `--auth-mode=client`

    The README line pointing at Argo Workflows helps with configuring Hera-Workflows with Argo Workflows, however, it could be more clear that it is necessary for --auth-mode=server or --auth-mode=client to be set in order for Bearer tokens to be used.

    In my particular use-case, we started off setting only --auth-mode=sso since my team is/was using Couler and all Workflow submissions were happening inside the platform that Argo Workflows is deployed in (recent adopter of Argo - recently ditching KubeFlow and it's dsl framework as it was more than we needed in our platform).

  • Fix name validation, add support for generate_name

    Fix name validation, add support for generate_name

    While browsing through the code, I spotted an issue in validate_name in that it uses re.match (match the beginning of the string) rather than re.fullmatch (match the whole string). One thing led to another, and I found and fixed a couple more bugs, and added a new parameter for proper validation of metadata.generateName.

    Names in k8s cannot end in hyphen "-" or period ".", however the same is a valid choice for metadata.generateName. I have extended validate_name with an optional generate_name parameter to support this, synchronized with the identical parameter of workflow.__init__.

    Changes

    • Validate entire name in validate_name, not only the beginning of the string
      • Without this, the invalid name "test#" would unexpectedly pass validation.
    • Fix quoting of "." in pattern used in validate_name
      • Without this, the invalid name r"test\@" would unexpectedly pass validation
    • Add a new generate_name boolean parameter, default to False; if True, a name ending in a single period "." or any number of hyphens "-" will pass validation, as alphanumeric characters are expected to be appended in the final name
      • Without this, the valid generate name "test-" would unexpectedly fail validation
    • Strip trailing "-" and "." characters when workflow name is reused for an unnamed dag
      • Without this, the valid generate name "test-" would result in a validation error if the dag is not named explicitly
  • Multitask Dag

    Multitask Dag

    Hi,

    I was wondering if there is an example of how to create complex workflows.

    I would like to create a workflow similar to this one, but I haven\t figured out how to specify the steps when I'm creating the workflow.

    Thanks for the help,

    image

  • SSL Certifciate

    SSL Certifciate

    Hello,

    I'm trying to submit this example workflow. I've set hera.set_global_token and hera.set_global_host as specified on the comments. However I'm getting an error related to SSL certificates. Is there a way I can disable the verification from the certificate?

    Thanks for the help,

    Traceback (most recent call last):
      File "/home/alejo/Documents/Nodes/Nodes/lib/python3.8/site-packages/urllib3/connectionpool.py", line 703, in urlopen
        httplib_response = self._make_request(
      File "/home/alejo/Documents/Nodes/Nodes/lib/python3.8/site-packages/urllib3/connectionpool.py", line 386, in _make_request
        self._validate_conn(conn)
      File "/home/alejo/Documents/Nodes/Nodes/lib/python3.8/site-packages/urllib3/connectionpool.py", line 1042, in _validate_conn
        conn.connect()
      File "/home/alejo/Documents/Nodes/Nodes/lib/python3.8/site-packages/urllib3/connection.py", line 414, in connect
        self.sock = ssl_wrap_socket(
      File "/home/alejo/Documents/Nodes/Nodes/lib/python3.8/site-packages/urllib3/util/ssl_.py", line 449, in ssl_wrap_socket
        ssl_sock = _ssl_wrap_socket_impl(
      File "/home/alejo/Documents/Nodes/Nodes/lib/python3.8/site-packages/urllib3/util/ssl_.py", line 493, in _ssl_wrap_socket_impl
        return ssl_context.wrap_socket(sock, server_hostname=server_hostname)
      File "/usr/lib/python3.8/ssl.py", line 500, in wrap_socket
        return self.sslsocket_class._create(
      File "/usr/lib/python3.8/ssl.py", line 1040, in _create
        self.do_handshake()
      File "/usr/lib/python3.8/ssl.py", line 1309, in do_handshake
        self._sslobj.do_handshake()
    ssl.SSLCertVerificationError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1131)
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "main.py", line 23, in <module>
        w.create()
      File "/home/alejo/Documents/Nodes/Nodes/lib/python3.8/site-packages/hera/workflow.py", line 312, in create
        resulting_argo_wf = self.service.create_workflow(self.build())
      File "/home/alejo/Documents/Nodes/Nodes/lib/python3.8/site-packages/hera/workflow_service.py", line 81, in create_workflow
        return WorkflowServiceApi(api_client=self._api_client).create_workflow(
      File "/home/alejo/Documents/Nodes/Nodes/lib/python3.8/site-packages/argo_workflows/api_client.py", line 771, in __call__
        return self.callable(self, *args, **kwargs)
      File "/home/alejo/Documents/Nodes/Nodes/lib/python3.8/site-packages/argo_workflows/api/workflow_service_api.py", line 121, in __create_workflow
        return self.call_with_http_info(**kwargs)
      File "/home/alejo/Documents/Nodes/Nodes/lib/python3.8/site-packages/argo_workflows/api_client.py", line 833, in call_with_http_info
        return self.api_client.call_api(
      File "/home/alejo/Documents/Nodes/Nodes/lib/python3.8/site-packages/argo_workflows/api_client.py", line 408, in call_api
        return self.__call_api(resource_path, method,
      File "/home/alejo/Documents/Nodes/Nodes/lib/python3.8/site-packages/argo_workflows/api_client.py", line 195, in __call_api
        response_data = self.request(
      File "/home/alejo/Documents/Nodes/Nodes/lib/python3.8/site-packages/argo_workflows/api_client.py", line 454, in request
        return self.rest_client.POST(url,
      File "/home/alejo/Documents/Nodes/Nodes/lib/python3.8/site-packages/argo_workflows/rest.py", line 266, in POST
        return self.request("POST", url,
      File "/home/alejo/Documents/Nodes/Nodes/lib/python3.8/site-packages/argo_workflows/rest.py", line 152, in request
        r = self.pool_manager.request(
      File "/home/alejo/Documents/Nodes/Nodes/lib/python3.8/site-packages/urllib3/request.py", line 78, in request
        return self.request_encode_body(
      File "/home/alejo/Documents/Nodes/Nodes/lib/python3.8/site-packages/urllib3/request.py", line 170, in request_encode_body
        return self.urlopen(method, url, **extra_kw)
      File "/home/alejo/Documents/Nodes/Nodes/lib/python3.8/site-packages/urllib3/poolmanager.py", line 376, in urlopen
        response = conn.urlopen(method, u.request_uri, **kw)
      File "/home/alejo/Documents/Nodes/Nodes/lib/python3.8/site-packages/urllib3/connectionpool.py", line 815, in urlopen
        return self.urlopen(
      File "/home/alejo/Documents/Nodes/Nodes/lib/python3.8/site-packages/urllib3/connectionpool.py", line 815, in urlopen
        return self.urlopen(
      File "/home/alejo/Documents/Nodes/Nodes/lib/python3.8/site-packages/urllib3/connectionpool.py", line 815, in urlopen
        return self.urlopen(
      File "/home/alejo/Documents/Nodes/Nodes/lib/python3.8/site-packages/urllib3/connectionpool.py", line 787, in urlopen
        retries = retries.increment(
      File "/home/alejo/Documents/Nodes/Nodes/lib/python3.8/site-packages/urllib3/util/retry.py", line 592, in increment
        raise MaxRetryError(_pool, url, error or ResponseError(cause))
    urllib3.exceptions.MaxRetryError: HTTPSConnectionPool(host='localhost', port=2746): Max retries exceeded with url: //api/v1/workflows/default (Caused by SSLError(SSLCertVerificationError(1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1131)')))
    
  • create workflow failed

    create workflow failed

    Submit workflow in batches. Except for different workflow names, other settings are consistent, and unknown errors occur

    _Reason: Internal Server Error HTTP response headers: HTTPHeaderDict({'Date': 'Thu, 08 Dec 2022 12:01:56 GMT', 'Content-Type': 'application/json', 'Transfer-Encoding': 'chunked', 'Connection': 'keep-alive', 'Trailer': 'Grpc-Trailer-Content-Type', 'Strict-Transport-Security': 'max-age=15724800; includeSubDomains'}) HTTP response body: {"code":2,"message":"spec.templates[1].name 'search' is not unique"}_

Argo Workflows is an open source container-native workflow engine for orchestrating parallel jobs on Kubernetes.
Argo Workflows is an open source container-native workflow engine for orchestrating parallel jobs on Kubernetes.

What is Argo Workflows? Argo Workflows is an open source container-native workflow engine for orchestrating parallel jobs on Kubernetes. Argo Workflow

Dec 10, 2021
Notifications for Argo CD
Notifications for Argo CD

Argo CD Notifications Argo CD Notifications continuously monitors Argo CD applications and provides a flexible way to notify users about important cha

Nov 27, 2022
Argo CD ApplicationSet Controller

The ApplicationSet controller manages multiple Argo CD Applications as a single ApplicationSet unit, supporting deployments to large numbers of clusters, deployments of large monorepos, and enabling secure Application self-service.

Dec 14, 2022
Automatic container image update for Argo CD

Argo CD Image Updater Introduction Argo CD Image Updater is a tool to automatically update the container images of Kubernetes workloads which are mana

Dec 25, 2022
A Kubernetes operator for managing Argo CD clusters.

Argo CD Operator A Kubernetes operator for managing Argo CD clusters. Documentation See the documentation for installation and usage of the operator.

Dec 14, 2022
Support for extending Argo CD

Argo CD Extensions To enable Extensions for your Argo CD cluster will require just a single kubectl apply. Here we provide a way to extend Argo CD suc

Dec 20, 2022
Argo-CD Autopilot
Argo-CD Autopilot

Introduction New users to GitOps and Argo CD are not often sure how they should structure their repos, add applications, promote apps across environme

Jan 6, 2023
A series of controllers for configuring namespaces to accomodate Argo

argo-controller A series of controllers for configuring namespaces to accomodate Argo. ArgoCD TBD Argo Workflows Make a service account in every names

Jan 4, 2022
Run infrastructure as code (IaC) software tools including CDK, Terraform and Cloud Formation via GitOps.

Argo CloudOps is Alpha on a good day, please only use as appropriate!!! What Is Argo CloudOps? Argo CloudOps is a service for running infrastructure a

Dec 27, 2022
Argus is a lightweight monitor to notify of new software releases via Gotify/Slack messages and/or WebHooks.

Argus Argus will query websites at a user defined interval for new software releases and then trigger Gotify/Slack notification(s) and/or WebHook(s) w

Dec 23, 2022
Provider-generic-workflows - A generic provider which uses argo workflows to define the backend actions.

provider-generic-workflows provider-generic-workflows is a generic provider which uses argo workflows for managing the external resource. This will re

Jan 1, 2022
Argo Rollout visualization in Argo CD Web UI
Argo Rollout visualization in Argo CD Web UI

Rollout Extension The project introduces the Argo Rollout dashboard into the Argo CD Web UI. Quick Start Install Argo CD and Argo CD Extensions Contro

Dec 29, 2022
Demo for my talk at ArgoCon '21 showing how to use Go to create and submit dynamic Argo Workflows.
Demo for my talk at ArgoCon '21 showing how to use Go to create and submit dynamic Argo Workflows.

argocon-21-demo Demo for my talk at ArgoCon '21 showing how to use Go to create and submit dynamic Argo Workflows. This repo implements a Go-based CLI

Oct 12, 2022
Argo Workflows is an open source container-native workflow engine for orchestrating parallel jobs on Kubernetes.
Argo Workflows is an open source container-native workflow engine for orchestrating parallel jobs on Kubernetes.

What is Argo Workflows? Argo Workflows is an open source container-native workflow engine for orchestrating parallel jobs on Kubernetes. Argo Workflow

Dec 10, 2021
Argo Workflows URL Finder

Argo Workflows URL Finder When Argo Workflows archives a workflow, the URL changes, and the URL cannot be pre-determined. If you're using tools such a

Aug 25, 2022
A framework for constructing self-spreading binaries
A framework for constructing self-spreading binaries

A framework that aids in creation of self-spreading software Requirements go get -u github.com/redcode-labs/Coldfire go get -u github.com/yelinaung/go

Jan 2, 2023
Go library for parsing and submitting HTML forms

gosubmit Description Docs are available here: https://godoc.org/github.com/jeremija/gosubmit Helps filling out plain html forms during testing. Will a

Nov 14, 2022
Terraform utility provider for constructing bash scripts that use data from a Terraform module

Terraform Bash Provider This is a Terraform utility provider which aims to robustly generate Bash scripts which refer to data that originated in Terra

Sep 6, 2022
Automating Kubernetes Rollouts with Argo and Prometheus. Checkout the demo URL below
Automating Kubernetes Rollouts with Argo and Prometheus. Checkout the demo URL below

observe-argo-rollout Demo for Automating and Monitoring Kubernetes Rollouts with Argo and Prometheus Performing Demo The demo can be found on Katacoda

Nov 16, 2022
Simple example using Git actions + Argo CD + K8S + Docker and GO lang

CICD-simple_example Simple example using Git actions + Argo CD + K8S + Docker and GO lang Intro Pre reqs Have an ArgoCD account and Installed. Docker

Oct 28, 2021