CI-CD

Argo Workflows - (4) Secret, 반복 및 조건부 실행

Operation CWAL 2021. 7. 10. 21:59

Secret

Template 정의시, 미리 생성해 놓은 K8s Secret 또는 ConfigMap 리소스를 사용할 수 있다. 우선 다음 명령어로 Secret 'test-secret'을 생성한다. 

kubectl -n argo create secret generic test-secret --from-literal test-password="Password123"

그리고 wf-artifact 워크플로우를 아래와 같이 수정하여, secret을 환경변수로 사용하는 'wf-secret-env'를 정의하였다.

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  name: wf-secret-env
spec:
  entrypoint: dag-template
  arguments:
    parameters:
    - name: message1
      value: Task 1 is executed
    - name: message2
      value: Task 2 is executed   
  templates:
  - name: dag-template
    inputs:
      parameters:
      - name: message1
      - name: message2
    dag:
      tasks:
      - name: Task1
        arguments:
          parameters: [{name: text, value: "{{inputs.parameters.message1}}" }]
        template: task-template
      - name: Task2
        arguments:
          parameters: [{name: text, value: "{{inputs.parameters.message2}}" }]
        template: task-template
      - name: Task3
        dependencies: [Task1]
        template: task-secret-env
        
  - name: task-template
    inputs:
      parameters:
      - name: text
    script:
      image: python:3.8-slim
      command: [python]
      source: |
        p = "{{inputs.parameters.text}}"
        print(p)
  
  - name: task-secret-env
    script:
      image: python:3.8-slim
      command: [python]
      source: |
        import os
        print(os.environ['TEST_PASSWORD'])
      env:
      - name: TEST_PASSWORD
        valueFrom:
          secretKeyRef:
            name: test-secret
            key: test-password
  • 'task-secret-env' 템플릿: secret 'test-secret'을 환경변수 'TEST_PASSWORD'로 지정하여, 메시지로 출력하는 Script Template이다.
  • K8s의 Pod 정의와 동일한 필드명(ex: valueFrom, secretKeyRef)을 사용하므로, 기존 K8s에 익숙한 사용자는 큰 어려움없이 사용 가능하다.

다음은 Workflow를 실행한 결과이다. Task3에서 Secret 내용을 정상적으로 출력하고 있다.

Volume from Secret 

기존 K8s와 동일하게 Secret을 환경변수 외에도 Volume으로 Mount하는 방식을 사용할 수 있다. 다음은 'wf-secret-env' 템플릿을 수정하여, 파일로 저장된 Secret 내용을 메시지로 출력하는 템플릿 'task-secret-vol'을 정의한 Workflow 예시이다.

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  name: wf-secret-volume
spec:
  entrypoint: dag-template
  volumes:
  - name: test-secret-vol
    secret:
      secretName: test-secret
  arguments:
    parameters:
    - name: message1
      value: Task 1 is executed
    - name: message2
      value: Task 2 is executed   
  templates:
  - name: dag-template
    inputs:
      parameters:
      - name: message1
      - name: message2
    dag:
      tasks:
      - name: Task1
        arguments:
          parameters: [{name: text, value: "{{inputs.parameters.message1}}" }]
        template: task-template
      - name: Task2
        arguments:
          parameters: [{name: text, value: "{{inputs.parameters.message2}}" }]
        template: task-template
      - name: Task3
        dependencies: [Task1]
        template: task-secret-vol
        
  - name: task-template
    inputs:
      parameters:
      - name: text
    script:
      image: python:3.8-slim
      command: [python]
      source: |
        p = "{{inputs.parameters.text}}"
        print(p)
  
  - name: task-secret-vol
    container:
      image: python:3.8-slim
      command: [python, -c]
      args: ['with open("/secrets/test-password", "r") as f: lines = f.read(); print(lines)']
      volumeMounts:
      - name: test-secret-vol
        mountPath: "/secrets"

 Container Template 역시 Secret 사용이 가능하며, script를 작성하는 대신 one-liner 코드를 통해 파일 내용을 출력하도록 명령어를 정의하였다. 다음은 Workflow 실행 결과와 실제 Pod에 Mount된 Volume 내용이다.

 

 

Loops

Argo Workflows는 여러개의 Input Parameter를 List 형식으로 묶어, 같은 Template을 반복 호출하는 기능을 제공한다.  아래와 같은 workflow 'wf-loop'를 정의해보자.

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  name: wf-loop
spec:
  entrypoint: dag-template
  volumes:
  - name: test-secret-vol
    secret:
      secretName: test-secret
  arguments:
    parameters:
    - name: message1
      value: Task 1 is executed
    - name: message2
      value: Task 2 is executed   
  templates:
  - name: dag-template
    inputs:
      parameters:
      - name: message1
      - name: message2
    dag:
      tasks:
      - name: Task1
        arguments:
          parameters: [{name: text, value: "{{inputs.parameters.message1}}" }]
        template: task-template
      - name: Task2
        arguments:
          parameters: [{name: text, value: "{{inputs.parameters.message2}}" }]
        template: task-template
      - name: Task3
        dependencies: [Task1]
        template: task-template
        arguments:
          parameters:
          - name: text
            value: "{{ item }}"
        withItems:
        - element1
        - element2
        - element3
        
  - name: task-template
    inputs:
      parameters:
      - name: text
    script:
      image: python:3.8-slim
      command: [python]
      source: |
        p = "{{inputs.parameters.text}}"
        print(p)

withItems에 Parameter 값을 List 형식으로 나열하고, 템플릿의 parameter 전달시 "{{ item }}"의 Reserve된 이름을 사용하여 반복문 수행이 가능하다. 다만, Loop 방식으로 실행되는 Template은 서로간의 의존성이 존재하지 않기 때문에, 순차 실행이 아닌 병렬 실행이 이루어진다. 다음은 Workflow 실행 결과로 Task3로부터 생성된 Pod은 'element1', 'element2', 'element3'을 각각 출력한다.

 

Set으로 Loop 수행

위 예시에선 단순 string으로 이루어진 List를 통해 Loop를 구성하였다. 이번엔 보다 현실적으로, 여러개의 Input Parameter를 받는 Template을 정의하고 이를 어떻게 반복 실행하는지에 대해 알아보자. YAML에서 "{hello:world, foo:bark}"와 같은 형식으로 Set 데이터 타입을 지원하는 것은 다들 알고 있을 것이다. 사실 YAML 자체가 Set과 List의 조합으로 이루어진 양식이므로 어찌보면 당연한 이야기이다.

다음과 같이 Set으로 구성한 List로부터 템플릿을 반복 실행하는 Workflow 'wf-loop-sets'를 정의하였다.

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  name: wf-loop-sets
spec:
  entrypoint: dag-template
  volumes:
  - name: test-secret-vol
    secret:
      secretName: test-secret
  arguments:
    parameters:
    - name: message1
      value: Task 1 is executed
    - name: message2
      value: Task 2 is executed   
  templates:
  - name: dag-template
    inputs:
      parameters:
      - name: message1
      - name: message2
    dag:
      tasks:
      - name: Task1
        arguments:
          parameters: [{name: text, value: "{{inputs.parameters.message1}}" }]
        template: task-template
      - name: Task2
        arguments:
          parameters: [{name: text, value: "{{inputs.parameters.message2}}" }]
        template: task-template
      - name: Task3
        dependencies: [Task1]
        template: task-loop-set-template
        arguments:
          parameters:
          - name: extractor
            value: "{{ item.extractor }}"
          - name: table
            value: "{{ item.table }}"
        withItems:
        - { extractor: 'PythonExtractor', table: 'Table 1'}
        - { extractor: 'PySparkExtractor', table: 'Table 2'}
        - { extractor: 'DaskExtractor', table: 'Table 3'}
        
  - name: task-template
    inputs:
      parameters:
      - name: text
    script:
      image: python:3.8-slim
      command: [python]
      source: |
        p = "{{inputs.parameters.text}}"
        print(p)
  - name: task-loop-set-template
    inputs:
      parameters:
      - name: extractor
      - name: table
    script:
      image: python:3.8-slim
      command: [python]
      source: |
        print("Applying ", "{{inputs.parameters.extractor}}", " to the table ", "{{inputs.parameters.table}}")

'task-loop-set-template'은 두 개의 Input Parameter 'extractor'와 'table'을 사용하는 Script template이다. 그리고 이를 호출하는 Task3은 Parameter 값을 set으로 전달하는데, 이때 "{{ item.extractor }}"와 같이 Set의 각 필드명을 통해 값을 가져오는 것을 알 수 있다. 다음은 위 Workflow를 실행한 결과이다.

Input Parameter Set List는 Template 내부가 아닌 외부에서도 정의할 수 있으며, 다음은 ".spec.arguments.parameters" 필드에  Set List를 정의한 Workflow 예시이다. 이 경우, Task는 List 이름만 가져와야 하므로 'withItems'이 아닌 'withParam' 필드를 사용한다. 물론 실행 결과는 동일하다.

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  name: wf-loop-sets-inputparam
spec:
  entrypoint: dag-template
  volumes:
  - name: test-secret-vol
    secret:
      secretName: test-secret
  arguments:
    parameters:
    - name: message1
      value: Task 1 is executed
    - name: message2
      value: Task 2 is executed   
    - name: ingest-list
      value: |
        [
          { "extractor": "PythonExtractor", "table": "Table 1"},
          { "extractor": "PySparkExtractor", "table": "Table 2"},
          { "extractor": "DaskExtractor", "table": "Table 3"}
        ]
  templates:
  - name: dag-template
    inputs:
      parameters:
      - name: message1
      - name: message2
      - name: ingest-list
    dag:
      tasks:
      - name: Task1
        arguments:
          parameters: [{name: text, value: "{{inputs.parameters.message1}}" }]
        template: task-template
      - name: Task2
        arguments:
          parameters: [{name: text, value: "{{inputs.parameters.message2}}" }]
        template: task-template
      - name: Task3
        dependencies: [Task1]
        template: task-loop-set-template
        arguments:
          parameters:
          - name: extractor
            value: "{{ item.extractor }}"
          - name: table
            value: "{{ item.table }}"
        withParam: "{{ inputs.parameters.ingest-list }}"
        
  - name: task-template
    inputs:
      parameters:
      - name: text
    script:
      image: python:3.8-slim
      command: [python]
      source: |
        p = "{{inputs.parameters.text}}"
        print(p)
  - name: task-loop-set-template
    inputs:
      parameters:
      - name: extractor
      - name: table
    script:
      image: python:3.8-slim
      command: [python]
      source: |
        print("Applying ", "{{inputs.parameters.extractor}}", " to the table ", "{{inputs.parameters.table}}")

 

이전 시간, Pararmeter에 대해서 배웠을 때 다른 Template의 출력 메시지를 Input Parameter로 사용가능하다고 했었다.

 

Argo Workflows - (3) Parameter, Artifact

Parameters Workflow를 정의할 때, template에서 사용할 변수를 정의하고 해당 template 호출시 값을 전달해 줄 수 있는 이를 Parameter라고 한다. Argo Workflows에는 Input과 Output 두가지 타입의 Parameter가..

cwal.tistory.com

마찬가지로 List 형식으로 메시지를 출력하여, 이를 Input Parameter로 사용하여 반복 실행하는 Task를 정의할 수 있으며, 미리 정의된 input parameter가 아닌, Template 실행 결과에 따라 Dynamic하게 실행되는 Loop가 가능하다.

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  name: wf-loop-dynamic
spec:
  entrypoint: dag-template
  arguments:
    parameters:
    - name: message1
      value: Task 1 is executed
  templates:
  - name: dag-template
    inputs:
      parameters:
      - name: message1

    dag:
      tasks:
      - name: Task1
        arguments:
          parameters: [{name: text, value: "{{inputs.parameters.message1}}" }]
        template: task-template
      - name: Task2
        template: task-generate-list
      - name: Task3
        dependencies: [Task2]
        template: task-loop-set
        arguments:
          parameters: 
          - name: extractor
            value: "{{item.extractor}}"
          - name: table
            value: "{{item.table}}"
        withParam: "{{tasks.Task2.outputs.result}}"

        
  - name: task-template
    inputs:
      parameters:
      - name: text
    script:
      image: python:3.8-slim
      command: [python]
      source: |
        p = "{{inputs.parameters.text}}"
        print(p)

  - name: task-generate-list
    script:
      image: python:3.8-slim
      command: [python]
      source: |
        import json
        import sys
        list = [("PythonExtractor", "Table 1"), ("PySparkExtractor", "Table 2"), ("DaskExtractor", "Table 3")]
        json.dump([{"extractor": i[0], "table": i[1]} for i in list], sys.stdout)

  - name: task-loop-set
    inputs:
      parameters:
      - name: extractor
      - name: table
    script:
      image: python:3.8-slim
      command: [python]
      source: |
        print("Applying ", "{{inputs.parameters.extractor}}", "to the table ", "{{inputs.parameters.table}}")
  • task-generate-list: stdout에 JSON 양식의 Set List를 출력하는 템플릿으로, 필드명과 값은 이전 예시와 동일하다.
  • Task3: Task2의 stdout 메시지를 가져오기 위해 "{{tasks.Task2.outputs.result}}"을 사용한다. 나머지는 이전 예시와 동일하다.

 

위 Workflow 생성시, Task2는 미리 정의한 Set List를 출력하며, Task3은 이를 Input Paramter로 사용하여 task-loop-set을 실행하는 것을 볼 수 있다.

 

조건부(Conditional) 실행

Workflow 정의시, 다음과 같이 다른 Task의 Output 내용에 따라 특정 Task의 실행 여부를 결정할 수 있다. "when" 필드는 조건문이 참일 경우에만 해당 Task를 실행하도록 동작한다.

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  name: wf-conditional
spec:
  entrypoint: dag-template
  arguments:
    parameters:
    - name: messageA
      value: A
    - name: messageB
      value: B

  templates:
  - name: dag-template
    inputs:
      parameters:
      - name: messageA
      - name: messageB

    dag:
      tasks:
      - name: Task1
        arguments:
          parameters: [{name: text, value: "{{inputs.parameters.messageA}}" }]
        template: task-decision
      - name: TaskA
        template: task-A
        dependencies: [Task1]
        when: "{{tasks.Task1.outputs.result}} == A"
      - name: TaskB
        template: task-B
        dependencies: [Task1]
        when: "{{tasks.Task1.outputs.result}} == B"
      - name: Task2
        arguments:
          parameters: [{name: text, value: "{{inputs.parameters.messageB}}" }]
        template: task-decision
      - name: TaskA2
        template: task-A
        dependencies: [Task2]
        when: "{{tasks.Task2.outputs.result}} == A"
      - name: TaskB2
        template: task-B
        dependencies: [Task2]
        when: "{{tasks.Task2.outputs.result}} == B"
  - name: task-decision
    inputs:
      parameters:
      - name: text
    script:
      image: python:3.8-slim
      command: [python]
      source: |
        p = "{{inputs.parameters.text}}"
        print(p)

  - name: task-A
    script:
      image: python:3.8-slim
      command: [python]
      source: |
        print("Task A was executed.")

  - name: task-B
    script:
      image: python:3.8-slim
      command: [python]
      source: |
        print("Task B was executed.")
  • Task1: 'messageA'를 task-decision의 Input Parameter로 전달하여, "A"를 출력
  • TaskA: Task1의 출력이 "A"인 경우 실행되며, Task1은 "A"를 출력하므로 실제 실행됨
  • TaskB: Task1의 출력이 "B"인 경우 실행되며, Task1은 "A"를 출력하므로 실제 실행되지 않음 
  • Task2: 'messageB'를 task-decision의 Input Parameter로 전달하여, "B"를 출력
  • TaskA2: Task2의 출력이 "A"인 경우 실행, Task2는 "B"를 출력하므로 실제 실행되지 않음
  • TaskB2: Task2의 출력이 "B"인 경우 실행, Task2는 "B"를 출력하므로 실제 실행됨

 

다음은 Workflow 실행 결과이다. 위 설명과 동일하게 Task가 실행된 것을 확인할 수 있다.

 

Depends

'Depends' 필드를 사용하여 Task의 출력 메시지가 아닌, 성공/실패 등의 Status에 따라 특정 Task의 실행 여부를 결정할 수도 있다. 이때 다른 Task에 의존성이 있지 않은 경우, 'dependencies' 필드는 생략 가능하다. 아래 예시와 같이 'when' 필드와 조합하여, 더욱 정교한 Workflow를 정의할 수 있다.

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  name: wf-depends
spec:
  entrypoint: dag-template
  arguments:
    parameters:
    - name: messageA
      value: A
    - name: messageB
      value: B

  templates:
  - name: dag-template
    inputs:
      parameters:
      - name: messageA
      - name: messageB

    dag:
      tasks:
      - name: Task1
        arguments:
          parameters: [{name: text, value: "{{inputs.parameters.messageA}}" }]
        template: task-decision
      - name: TaskA
        template: task-A
        depends: Task1.Succeeded
        when: "{{tasks.Task1.outputs.result}} == A"
      - name: TaskB
        template: task-B
        depends: Task1.Succeeded
        when: "{{tasks.Task1.outputs.result}} == B"
      - name: TaskC
        template: task-C
        depends: TaskA.Succeeded
      - name: TaskD
        template: task-D
        depends: TaskA.Skipped
      - name: TaskD2
        template: task-D
        depends: TaskB.Succeeded
      - name: TaskC2
        template: task-C
        depends: TaskB.Skipped
      
  - name: task-decision
    inputs:
      parameters:
      - name: text
    script:
      image: python:3.8-slim
      command: [python]
      source: |
        p = "{{inputs.parameters.text}}"
        print(p)

  - name: task-A
    script:
      image: python:3.8-slim
      command: [python]
      source: |
        print("Task A was executed.")

  - name: task-B
    script:
      image: python:3.8-slim
      command: [python]
      source: |
        print("Task B was executed.")

  - name: task-C
    script:
      image: python:3.8-slim
      command: [python]
      source: |
        print("Task B was executed.")
  
  - name: task-D
    script:
      image: python:3.8-slim
      command: [python]
      source: |
        print("Task B was executed.")
  • TaskA: Task1의 상태가 'Succeeded'(성공)이고, 메시지가 'A'인 경우 실행
  • TaskB: Task1의 상태가 'Succeeded'(성공)이고, 메시지가 'B'인 경우 실행
  • TaskC: TaskA가 성공하면 실행
  • TaskC2: TaskB를 생략한 경우 실행
  • TaskD: TaskA를 생략한 경우 실행
  • TaskD2: TaskB가 성공한 경우 실행

실제로 위 Workflow를 실행하면 다음과 같은 결과를 얻을 수 있다.

'depends' 필드에서 사용할 수 있는 Task의 Status는 아래와 같다.

Task 결과 설명
.Succeeded Task 성공
.Failed Task 실패
.Errored Task 에러 발생
.Skipped Task 생략
.Daemoned Daemon 방식 태스크(Pending X)

 

"task-1.Succeeded && taks-2.Failed"와 같이 '&&'(AND)와 '||'(OR), '!'(NOT) 연산자 사용이 가능하며, "task-1"처럼 Task 결과를 생략하고 이름만 존재하는 경우엔 "(task-1.Succeeded || task-1.Skipped || task-1.Daemoned)"와 동일하게 취급된다.

기존 'dependencies' 필드와 완벽하게 호환되며, 예를 들어 아래와 같은 예시는

dependencies: ["task-1", "task-2"]

다음과 같이 'depends' 필드로 변환 가능하다.

depends: "task-1 && task-2"

'CI-CD' 카테고리의 다른 글

Argo CD - ApplicationSet  (0) 2022.07.02
Argo Workflows - (5) Retry, 재귀 호출  (0) 2021.07.12
Argo Workflows - (3) Parameter, Artifact  (0) 2021.07.07
Argo Workflows - (2) Core Concepts  (0) 2021.07.04
Argo Workflows - (1) Introduction  (0) 2021.07.01