API documentation

Overview

The main purpose of Girder Worker is to execute a broad range of tasks. These tasks, along with a set of input bindings and output bindings are passed to the girder_worker.tasks.run() function, which is responsible for fetching the inputs as necessary and executing the task, and finally populating any output variables and sending them to their destination.

The task, its inputs, and its outputs are each passed into the function as Python dictionaries. In this section, we describe the structure of each of those dictionaries.

The task specification

The first argument to girder_worker.tasks.run() describes the task to execute, independently of the actual data that it will be executed upon. The most important field of the task is the mode, which describes what type of task it is. The structure for the task dictionary is described below. Uppercase names within angle braces represent symbols defined in the specification. Optional parts of the specification are surrounded by parentheses to avoid ambiguity with the square braces, which represent lists in Python or Arrays in JSON. The Python task also accepts a write_script parameter that when set to 1 will write task scripts to disk before executing them. This aids in readability for interactive debuggers such as pdb.

<TASK> ::= <PYTHON_TASK> | <R_TASK> | <DOCKER_TASK> | <WORKFLOW_TASK>

<PYTHON_TASK> ::= {
    "mode": "python",
    "script": <Python code to run as a string>
    (, "inputs": [<TASK_INPUT> (, <TASK_INPUT>, ...)])
    (, "outputs": [<TASK_OUTPUT> (, <TASK_OUTPUT>, ...)])
    (, "write_script": 1)
}

<R_TASK> ::= {
    "mode": "r",
    "script": <R code to run (as a string)>
    (, "inputs": [<TASK_INPUT> (, <TASK_INPUT>, ...)])
    (, "outputs": [<TASK_OUTPUT> (, <TASK_OUTPUT>, ...)])
}

<DOCKER_TASK> ::= {
    "mode": "docker",
    "docker_image": <Docker image name to run>
    (, "container_args": [<container arguments>])
    (, "entrypoint": <custom override for container entry point>)
    (, "inputs": [<TASK_INPUT> (, <TASK_INPUT>, ...)])
    (, "outputs": [<TASK_OUTPUT> (, <TASK_OUTPUT>, ...)])
    (, "progress_pipe": <set to true to create a channel for progress notifications>)
}

<WORKFLOW_TASK> ::= {
    "mode": "workflow",
    "steps": [<WORKFLOW_STEP> (, <WORKFLOW_STEP>, ...)],
    "connections": [<WORKFLOW_CONNECTION> (, <WORKFLOW_CONNECTION>, ...)]
    (, "inputs": [<TASK_INPUT> (, <TASK_INPUT>, ...)])
    (, "outputs": [<TASK_OUTPUT> (, <TASK_OUTPUT>, ...)])
}

<WORKFLOW_STEP> ::= {
    "name": <step name>,
    "task": <TASK>
}

<WORKFLOW_CONNECTION> ::= {
    ("name": <name of top-level input to bind to>)
    (, "input": <input id to bind to for a step>)
    (, "input_step": <input step name to connect>)
    (, "output_step": <output step name to connect>)
}

The workflow mode simply allows for a directed acyclic graph of tasks to be specified to girder_worker.run().

See also

Visualize Facebook data with Girder Worker in Examples
A full example of how to create workflows in Girder Worker.
<TASK_INPUT> ::= {
    "id": <string, the variable name>
    (, "default": <default value if none is bound at runtime>)
    (, "target": <INPUT_TARGET_TYPE>)   ; default is "memory"
    (, "filename": <name of file if target="filepath">)
    (, "stream": <set to true to indicate a streaming input>)
}

<INPUT_TARGET_TYPE> ::= "memory" | "filepath"

<TASK_OUTPUT> ::= {
    "id": <string, the variable name>,
    (, "target": <INPUT_TARGET_TYPE>)   ; default is "memory"
    (, "stream": <set to true to indicate a streaming output>)
}

The input specification

The inputs argument to girder_worker.run() specifies the inputs to the task described by the task argument. Specifically, it tells what data should be placed into the task input ports.

<INPUTS> ::= {
    <id> : <INPUT_BINDING>
    (, <id> : <INPUT_BINDING>)
    (, ...)
}

The input spec is a dictionary mapping each id (corresponding to the id key of each task input) to its data binding for this execution.

<INPUT_BINDING> ::= <INPUT_BINDING_HTTP> | <INPUT_BINDING_LOCAL> |
                    <INPUT_BINDING_MONGODB> | <INPUT_BINDING_INLINE>

<INPUT_BINDING_HTTP> ::= {
    "mode": "http",
    "url": <url of data to download>
    (, "params": <dict of URL parameters to encode>)
    (, "headers": <dict of HTTP headers to send when fetching>)
    (, "method": <http method to use, default is "GET">)
    (, "maxSize": <integer, max size of download in bytes>)
}

The http input mode specifies that the data should be fetched over HTTP. Depending on the target field of the corresponding task input specifier, the data will either be passed in memory, or streamed to a file on the local filesystem, and the variable will be set to the path of that file.

<INPUT_BINDING_LOCAL> ::= {
    "mode": "local",
    "path": <path on local filesystem to the file>
}

The local input mode denotes that the data exists on the local filesystem. Its contents will be read into memory and the variable will point to those contents.

<INPUT_BINDING_MONGODB> ::= {
    "mode": "mongodb",
    "db": <the database to use>,
    "collection": <the collection to fetch from>
    (, "host": <mongodb host, default is "localhost">)
}

The mongodb input mode specifies that the data should be fetched from a mongo collection. This simply binds the entire BSON-encoded collection to the input variable.

<INPUT_BINDING_INLINE> ::= {
    "mode": "inline",
    "data": <data to bind to the variable>
}

The inline input mode simply passes the data directly in the input binding dictionary as the value of the “data” key. Do not use this for any data that could be large.

Note: The mode field is inferred in a few special cases. If there is a url field, the mode is assumed to be "http", and if there is a data field, the mode is assumed to be "inline". For example, the following input specifications are equivalent:

{
    'url': 'https://upload.wikimedia.org/wikipedia/en/2/24/Lenna.png'
}
{
    'mode': 'http',
    'url': 'https://upload.wikimedia.org/wikipedia/en/2/24/Lenna.png'
}

The following two specifications are also equivalent:

{
    'data': 'hello'
}
{
    'mode': 'inline',
    'data': 'hello'
}

The output specification

The optional outputs argument to girder_worker.run() specifies output variables of the task that should be handled in some way.

<OUTPUTS> ::= {
    <id> : <OUTPUT_BINDING>
    (, <id> : <OUTPUT_BINDING>)
    (, ...)
}

The output spec is a dictionary mapping each id (corresponding to the id key of each task output) to some behavior that should be performed with it. Task outputs that do not have bindings in the ouput spec simply get their results set in the return value of girder_worker.run().

<OUTPUT_BINDING> ::= <OUTPUT_BINDING_HTTP> | <OUTPUT_BINDING_LOCAL> |
                     <OUTPUT_BINDING_MONGODB>

<OUTPUT_BINDING_HTTP> ::= {
    "mode": "http",
    "url": <url to upload data to>,
    (, "headers": <dict of HTTP headers to send with the request>)
    (, "method": <http method to use, default is "POST">)
    (, "params": <dict of HTTP query parameters to send with the request>)
}

<OUTPUT_BINDING_LOCAL> ::= {
    "mode": "local",
    "path": <path to write data on the local filesystem>
}

The local output mode writes the data to the specified path on the local filesystem.

<OUTPUT_BINDING_MONGODB> ::= {
    "mode": "mongodb",
    "db": <mongo database to write to>,
    "collection": <mongo collection to write to>
    (, "host": <mongo host to connect to>)
}

The mongodb output mode attempts to BSON-decode the bound data, and then overwrites any data in the specified collection with the output data.

Script execution

class girder_worker.GirderWorkerPluginABC(app, *args, **kwargs)[source]
task_imports()[source]

Formats

class girder_worker.plugins.types.format.Validator[source]
type

The validator type, like string.

format

The validator format, like text.

is_valid()[source]

Return whether the type/format combination is valid.

If format is None, checks for the presence of any valid type/format with the specified type.

Returns:True if type and format are a valid, loaded type/format pair.
girder_worker.plugins.types.format.converter_path(source, target)[source]

Gives the shortest path that should be taken to go from a source type/format to a target type/format.

Throws a NetworkXNoPath exception if it can not find a path.

Parameters:
  • source – Validator tuple indicating the type/format being converted from.
  • targetValidator tuple indicating the type/format being converted to.
Returns:

An ordered list of the analyses that need to be run to convert from source to target.

girder_worker.plugins.types.format.get_validator_analysis(validator)[source]

Gets a validator’s analysis from the conversion graph.

>>> analysis = get_validator_analysis(Validator('string', 'text'))

Returns an analysis dictionary

>>> type(analysis) == dict
True

Which contains an inputs key

>>> 'inputs' in analysis
True

If the validator doesn’t exist, an exception will be raised

>>> get_validator_analysis(Validator('foo', 'bar'))
Traceback (most recent call last):
...
Exception: No such validator foo/bar
Parameters:validator – A Validator namedtuple
Returns:A dictionary containing the runnable analysis.
girder_worker.plugins.types.format.has_converter(source, target=Validator(type=None, format=None))[source]

Determines if any converters exist from a given type, and optionally format.

Underneath, this just traverses the edges until it finds one which matches the arguments.

Parameters:
  • sourceValidator tuple indicating the type/format being converted from.
  • targetValidator tuple indicating the type/format being converted to.
Returns:

True if it can converter from source to target, False otherwise.

girder_worker.plugins.types.format.import_converters(search_paths)[source]

Import converters and validators from the specified search paths. These functions are loaded into girder_worker.format.conv_graph with nodes representing validators, and directed edges representing converters.

Any files in a search path matching validate_*.json are loaded as validators. Validators should be fast (ideally O(1)) algorithms for determining if data is of the specified format. These are algorithms that have a single input named "input" and a single output named "output". The input has the type and format to be checked. The output must have type and format "boolean". The script performs the validation and sets the output variable to either true or false.

Any *_to_*.json files are imported as converters. A converter is simply an analysis with one input named "input" and one output named "output". The input and output should have matching type but should be of different formats.

Parameters:search_paths (str or list of str) – A list of search paths relative to the current working directory. Passing a single path as a string also works.
girder_worker.plugins.types.format.import_default_converters()[source]

Import converters from the default search paths. This is called when the girder_worker.format module is first loaded.

girder_worker.plugins.types.format.print_conversion_graph()[source]

Print a graph of supported conversion paths in DOT format to standard output.

girder_worker.plugins.types.format.print_conversion_table()[source]

Print a table of supported conversion paths in CSV format with 'from' and 'to' columns to standard output.

Pythonic task API

class girder_worker.core.specs.Spec(*args, **kw)[source]

Defines core utility methods that all spec objects have in common.

Supports dict-like initialization.

>>> a = Spec({'a': 1, 'b': {'c': [1, 2, None]}})
>>> b = Spec(a=1, b={'c': [1, 2, None]})
>>> a == b
True

Also supports initialization from json.

>>> c = Spec('{"a": 1, "b": {"c": [1, 2, null]}}')
>>> a == c
True

Multiple initialization method can be used together, which will be inserted in order.

>>> Spec('{"a": 0}', {'a': 1}, a=2)
{"a": 2}

Updating merging specs is always done recursively.

>>> Spec('{"a": {"b": 0}}', a={'c': 1})
{"a": {"b": 0, "c": 1}}

Conflicts are resolved by taking the value with highest priority (i.e. the once provided next in the constructor.)

>>> Spec('{"a": {"b": 0}}', {"a": []})
{"a": []}
>>> Spec('{"a": {"b": 0}}', {"a": []}, a={"c": 1})
{"a": {"c": 1}}
>>> Spec('{"a": []}', {"a": {"b": 0}}, a={"c": 1})
{"a": {"b": 0, "c": 1}}

Serialization is performed as json

>>> str(a)
'{"a": 1, "b": {"c": [1, 2, null]}}'

Strings are assumed to be utf-8 encoded.

>>> str(Spec({u"for\u00eat": u"\ud83c\udf33 \ud83c\udf32 \ud83c\udf34"}))
'{"for\\u00eat": "\\ud83c\\udf33 \\ud83c\\udf32 \\ud83c\\udf34"}'

Methods that mutate the state of the Spec will test if the new state is valid, restoring the original state before raising an exception.

>>> s = Spec({'a': 0})
>>> try:
...     s['a'] = object
... except Exception:
...     pass
... else:
...     assert False
>>> s
{"a": 0}

Spec constructors are idempotent

>>> Spec(a='a') == Spec(Spec(a='a'))
True
class girder_worker.core.specs.Port(*arg, **kw)[source]

A port defines a communication channel between tasks.

Ports enable bidirectional communication between tasks and are responsible for ensuring that the connections are compatible. The primary purpose of ports is to specify what types of data tasks can read and write. This information is used by tasks to determine if they can be connected. Ports also provide documentation for the task by describing its inputs and outputs. Ports also handle fetching data from and pushing data to remote data stores.

>>> spec = {'name': 'a', 'type': 'number', 'format': 'number'}
>>> port = Port(spec)

The port object is serialized as a json object

>>> import json
>>> json.loads(str(port)) == spec
True

It has several properties derived from the spec

>>> port.name == spec['name']
True
>>> port.type == spec['type']
True
>>> port.format == spec['format']
True

It also supports auto converting formats and validation by default

>>> port.auto_convert
True
>>> port.auto_validate
True

Spec properties are automatically validated when setting them

>>> port = Port()
Traceback (most recent call last):
    ...
ValueError: Port specs require a valid name.
>>> port = Port(name="my port", type="python", format="object")
>>> port.format = 'invalid'
Traceback (most recent call last):
    ...
ValueError: Unknown format "python.invalid"

Checking the type is deferred to allow incremental updating

>>> port['type'] = 'image'
>>> port.json()
Traceback (most recent call last):
    ...
ValueError: Unknown format "image.object"
>>> port.format = 'png'
>>> port.json()
'{"type": "image", "name": "my port", "format": "png"}'
>>> port == Port(port)
True
auto_convert

If the data format is automatically

auto_validate

If the data is validated by default

convert(data_spec, format)[source]

Convert to a compatible data format.

Parameters:
  • data_spec (dict) – Data specification
  • format (str) – The target data format
Returns:

dict

>>> spec = {'name': 'a', 'type': 'number', 'format': 'number'}
>>> port = Port(spec)
>>> new_spec = port.convert({'format': 'number', 'data': 1}, 'json')
>>> new_spec['format']
'json'
>>> port.fetch(new_spec)
1
fetch(data_spec)[source]

Return the data described by the given specification.

Parameters:data_spec (dict) – A data specification object
Returns:data
Raises:ValidationError – when the validation check fails
>>> port = Port({'name': 'a', 'type': 'number', 'format': 'number'})
>>> port.fetch({'format': 'number', 'data': -1})
-1
format

The data format of the port

name

The name of the port

push(data_spec)[source]

Write data a to remote destination according the to specification.

Parameters:data_spec (dict) – A data specification object
Returns:dict
>>> port = Port({'name': 'a', 'type': 'number', 'format': 'number'})
>>> port.push({'format': 'json', 'mode': 'inline', 'data': '2'})['data']
2
>>> port.push({'format': 'number', 'mode': 'inline', 'data': 3})['data']
3
type

The data type of the port

validate(data_spec)[source]

Ensure the given data spec is compatible with this port.

Parameters:data_spec (dict) – Data specification
Returns:bool
>>> spec = {'name': 'a', 'type': 'number', 'format': 'number'}
>>> port = Port(spec)
>>> port.validate({'format': 'number', 'data': 1.5})
True
>>> port.validate({'format': 'json', 'data': '1.5'})
True
>>> port.validate({'format': 'number', 'data': '1.5'})
False
>>> port.validate({'format': 'unknown format', 'data': '...'})
False
class girder_worker.core.specs.PortList(value=None)[source]

A list that only accepts port specs.

This class is extended to behave like a read-only dictionary, where the keys are the port names.

>>> l = PortList()
>>> l.json()
'[]'

Ports can be added as instances or dictionaries

>>> l.append(Port(name='z'))
>>> l.append({'name': 'a', 'type': 'image', 'format': 'png'})

Normal list methods are supported

>>> l[1] = '{"name": "b"}'
>>> l.insert(1, {"name": "c"})
>>> del l[1]
>>> str(l)
'[{"name": "z"}, {"name": "b"}]'

Port lists have keys and values methods like dicts

>>> l.keys()[0]
'z'

Ports can be referenced by either their index in the list or by name

>>> l[0] is l['z']
True
>>> 'z' in l
True

Ports can be modified after they are added

>>> l[0].name = 'y'
>>> l.json()
'[{"name": "y"}, {"name": "b"}]'

Several validation checks are performed

>>> l[0].name = 'b'
Traceback (most recent call last):
    ...
ValueError: Duplicate keys detected
>>> l[0].name = 'a'
>>> l[0].format = 'png'
Traceback (most recent call last):
    ...
ValueError: Unknown format "python.png"
>>> str(l)
'[{"name": "a"}, {"name": "b"}]'
append(value)[source]

Append an item if possible.

check()[source]

Check that the port list is valid.

insert(index, value)[source]

Add an item before the given index if possible.

keys()[source]

Return a list of port names.

values()[source]

Return a list of ports.

exception girder_worker.core.specs.ValidationError(port, data_spec)[source]

An exception type raised when encountering invalid data types.

class girder_worker.core.specs.TaskSpec(*args, **kw)[source]

Defines a pipeline element.

A task is an element of the pipeline responsible for completing an atomic task given one or more inputs and pushing the result to the next task or tasks.

inputs

A list of inputs accepted by the task

mode

The execution mode of the task

outputs

A list of outputs returned by the task

script

A script or function to execute

update(other, **kw)[source]

Extend update to call PortList for input/output properties.

exception girder_worker.core.specs.ReadOnlyAttributeException[source]

Exception thrown when attempting to set a read only attribute

exception girder_worker.core.specs.WorkflowException[source]

Exception thrown for issues with Workflows

exception girder_worker.core.specs.DuplicateTaskException[source]

Exception thrown when adding a duplicate task