singer-pipeline-wrapper-script
Description
n/a
Variables
Name | Type | Default | Description |
---|---|---|---|
|
string | -- | The path of the wrapper script. Required |
|
string | -- | The base path of the pipeline. Required |
|
string | -- | The name of the tap. Required |
|
string | -- | The name of the target. Required |
|
dict | -- | The tap config. |
|
dict | -- | The target config. |
Code
args: path: doc: short_help: The path of the wrapper script. type: string required: true pipeline_path: doc: short_help: The base path of the pipeline. type: string required: true tap_name: doc: short_help: The name of the tap. type: string required: true tap_config: doc: short_help: The tap config. type: dict required: false empty: true target_name: doc: short_help: The name of the target. type: string required: true target_config: doc: short_help: The target config. type: dict required: false empty: true frecklets: - shell-script-exists: path: '{{:: path ::}}' script_content: | #!/usr/bin/env bash state_arg="" if [[ -f "{{:: pipeline_path ::}}/state.json" ]]; then if [[ -s "{{:: pipeline_path ::}}/state.json" ]]; then state_arg="--state {{:: pipeline_path ::}}/state.json" else # delete empty state file rm "{{:: pipeline_path ::}}/state.json" fi fi {{:: pipeline_path ::}}//tap-{{:: tap_name ::}} {%:: if tap_config is defined and tap_config ::%}--config {{:: pipeline_path ::}}/tap_config.json{%:: endif ::%} ${state_arg} | {{:: pipeline_path ::}}//target-{{:: target_name ::}} {%:: if target_config is defined and target_config ::%}--config {{:: pipeline_path ::}}/target_config.json{%:: endif ::%} >> {{:: pipeline_path ::}}/state.json if [[ -f "{{:: pipeline_path ::}}/state.json" ]]; then tail -1 {{:: pipeline_path ::}}/state.json > {{:: pipeline_path ::}}/state.json.tmp && mv {{:: pipeline_path ::}}/state.json.tmp {{:: pipeline_path ::}}/state.json fi
frecklecute --community singer-pipeline-wrapper-script --help Usage: frecklecute singer-pipeline-wrapper-script [OPTIONS] n/a Options: --path PATH The path of the wrapper script. [required] --pipeline-path PIPELINE_PATH The base path of the pipeline. [required] --tap-name TAP_NAME The name of the tap. [required] --target-name TARGET_NAME The name of the target. [required] --tap-config TAP_CONFIG The tap config. --target-config TARGET_CONFIG The target config. --help Show this message and exit.
# -*- coding: utf-8 -*- # # module path: pycklets.singer_pipeline_wrapper_script.SingerPipelineWrapperScript # from dataclasses import dataclass from pyckles import AutoPycklet from typing import * # noqa @dataclass class SingerPipelineWrapperScript(AutoPycklet): """No documentation available. Args: path: The path of the wrapper script. pipeline_path: The base path of the pipeline. tap_config: The tap config. tap_name: The name of the tap. target_config: The target config. target_name: The name of the target. """ FRECKLET_ID = "singer-pipeline-wrapper-script" path: str = None pipeline_path: str = None tap_config: Dict = None tap_name: str = None target_config: Dict = None target_name: str = None def __post_init__(self): super(SingerPipelineWrapperScript, self).__init__(var_names=["path", "pipeline_path", "tap_config", "tap_name", "target_config", "target_name"]) frecklet_class = SingerPipelineWrapperScript
# -*- coding: utf-8 -*- # # module path: pycklets.singer_pipeline_wrapper_script.SingerPipelineWrapperScript # from pyckles import AutoPycklet class SingerPipelineWrapperScript(AutoPycklet): """No documentation available. Args: path: The path of the wrapper script. pipeline_path: The base path of the pipeline. tap_config: The tap config. tap_name: The name of the tap. target_config: The target config. target_name: The name of the target. """ FRECKLET_ID = "singer-pipeline-wrapper-script" def __init__(self, path=None, pipeline_path=None, tap_config=None, tap_name=None, target_config=None, target_name=None): super(SingerPipelineWrapperScript, self).__init__(var_names=["path", "pipeline_path", "tap_config", "tap_name", "target_config", "target_name"]) self._path = path self._pipeline_path = pipeline_path self._tap_config = tap_config self._tap_name = tap_name self._target_config = target_config self._target_name = target_name @property def path(self): return self._path @path.setter def path(self, path): self._path = path @property def pipeline_path(self): return self._pipeline_path @pipeline_path.setter def pipeline_path(self, pipeline_path): self._pipeline_path = pipeline_path @property def tap_config(self): return self._tap_config @tap_config.setter def tap_config(self, tap_config): self._tap_config = tap_config @property def tap_name(self): return self._tap_name @tap_name.setter def tap_name(self, tap_name): self._tap_name = tap_name @property def target_config(self): return self._target_config @target_config.setter def target_config(self, target_config): self._target_config = target_config @property def target_name(self): return self._target_name @target_name.setter def target_name(self, target_name): self._target_name = target_name frecklet_class = SingerPipelineWrapperScript