singer-pipeline-wrapper-script

Description

n/a

Variables

Name Type Default Description

path

string --

The path of the wrapper script. Required

pipeline_path

string --

The base path of the pipeline. Required

tap_name

string --

The name of the tap. Required

target_name

string --

The name of the target. Required

tap_config

dict --

The tap config.

target_config

dict --

The target config.

Code

args:
  path:
    doc:
      short_help: The path of the wrapper script.
    type: string
    required: true
  pipeline_path:
    doc:
      short_help: The base path of the pipeline.
    type: string
    required: true
  tap_name:
    doc:
      short_help: The name of the tap.
    type: string
    required: true
  tap_config:
    doc:
      short_help: The tap config.
    type: dict
    required: false
    empty: true
  target_name:
    doc:
      short_help: The name of the target.
    type: string
    required: true
  target_config:
    doc:
      short_help: The target config.
    type: dict
    required: false
    empty: true
frecklets:
- shell-script-exists:
    path: '{{:: path ::}}'
    script_content: |
      #!/usr/bin/env bash

      state_arg=""
      if [[ -f "{{:: pipeline_path ::}}/state.json" ]]; then
          if [[ -s "{{:: pipeline_path ::}}/state.json" ]]; then
              state_arg="--state {{:: pipeline_path ::}}/state.json"
          else
              # delete empty state file
              rm "{{:: pipeline_path ::}}/state.json"
          fi
      fi
      {{:: pipeline_path ::}}//tap-{{:: tap_name ::}} {%:: if tap_config is defined and tap_config ::%}--config {{:: pipeline_path ::}}/tap_config.json{%:: endif ::%} ${state_arg} | {{:: pipeline_path ::}}//target-{{:: target_name ::}} {%:: if target_config is defined and target_config ::%}--config {{:: pipeline_path ::}}/target_config.json{%:: endif ::%} >> {{:: pipeline_path ::}}/state.json
      if [[ -f "{{:: pipeline_path ::}}/state.json" ]]; then
          tail -1 {{:: pipeline_path ::}}/state.json > {{:: pipeline_path ::}}/state.json.tmp && mv {{:: pipeline_path ::}}/state.json.tmp {{:: pipeline_path ::}}/state.json
      fi
frecklecute --community singer-pipeline-wrapper-script --help

Usage: frecklecute singer-pipeline-wrapper-script [OPTIONS]

  n/a

Options:
  --path PATH                    The path of the wrapper script.  [required]
  --pipeline-path PIPELINE_PATH  The base path of the pipeline.  [required]
  --tap-name TAP_NAME            The name of the tap.  [required]
  --target-name TARGET_NAME      The name of the target.  [required]
  --tap-config TAP_CONFIG        The tap config.
  --target-config TARGET_CONFIG  The target config.
  --help                         Show this message and exit.
# -*- coding: utf-8 -*-


#
# module path: pycklets.singer_pipeline_wrapper_script.SingerPipelineWrapperScript
#


from dataclasses import dataclass
from pyckles import AutoPycklet
from typing import *    # noqa

@dataclass
class SingerPipelineWrapperScript(AutoPycklet):
    """No documentation available.

       Args:
         path: The path of the wrapper script.
         pipeline_path: The base path of the pipeline.
         tap_config: The tap config.
         tap_name: The name of the tap.
         target_config: The target config.
         target_name: The name of the target.

    """

    FRECKLET_ID = "singer-pipeline-wrapper-script"

    path: str = None
    pipeline_path: str = None
    tap_config: Dict = None
    tap_name: str = None
    target_config: Dict = None
    target_name: str = None


    def __post_init__(self):
        super(SingerPipelineWrapperScript, self).__init__(var_names=["path", "pipeline_path", "tap_config", "tap_name", "target_config", "target_name"])


frecklet_class = SingerPipelineWrapperScript
# -*- coding: utf-8 -*-


#
# module path: pycklets.singer_pipeline_wrapper_script.SingerPipelineWrapperScript
#


from pyckles import AutoPycklet

class SingerPipelineWrapperScript(AutoPycklet):
    """No documentation available.

       Args:
         path: The path of the wrapper script.
         pipeline_path: The base path of the pipeline.
         tap_config: The tap config.
         tap_name: The name of the tap.
         target_config: The target config.
         target_name: The name of the target.

    """

    FRECKLET_ID = "singer-pipeline-wrapper-script"

    def __init__(self, path=None, pipeline_path=None, tap_config=None, tap_name=None, target_config=None, target_name=None):

        super(SingerPipelineWrapperScript, self).__init__(var_names=["path", "pipeline_path", "tap_config", "tap_name", "target_config", "target_name"])
        self._path = path
        self._pipeline_path = pipeline_path
        self._tap_config = tap_config
        self._tap_name = tap_name
        self._target_config = target_config
        self._target_name = target_name

    @property
    def path(self):
        return self._path

    @path.setter
    def path(self, path):
        self._path = path

    @property
    def pipeline_path(self):
        return self._pipeline_path

    @pipeline_path.setter
    def pipeline_path(self, pipeline_path):
        self._pipeline_path = pipeline_path

    @property
    def tap_config(self):
        return self._tap_config

    @tap_config.setter
    def tap_config(self, tap_config):
        self._tap_config = tap_config

    @property
    def tap_name(self):
        return self._tap_name

    @tap_name.setter
    def tap_name(self, tap_name):
        self._tap_name = tap_name

    @property
    def target_config(self):
        return self._target_config

    @target_config.setter
    def target_config(self, target_config):
        self._target_config = target_config

    @property
    def target_name(self):
        return self._target_name

    @target_name.setter
    def target_name(self, target_name):
        self._target_name = target_name



frecklet_class = SingerPipelineWrapperScript