import sys
import uuid
import os
import tempfile
import six
import abc
from girder_worker_utils.transform import Transform
TEMP_VOLUME_MOUNT_PREFIX = '/mnt/girder_worker'
def _maybe_transform(obj, *args, **kwargs):
if hasattr(obj, 'transform') and six.callable(obj.transform):
return obj.transform(*args, **kwargs)
return obj
[docs]class HostStdOut(Transform):
"""
Represents the standard output stream on the host machine. Can be used with
:py:class:`girder_worker.docker.transforms.Connect` to write text to stdout.
"""
def transform(self, **kwargs):
from girder_worker.docker.io import (
StdStreamWriter
)
return StdStreamWriter(sys.stdout)
[docs]class HostStdErr(Transform):
"""
Represents the standard error stream on the host machine. Can be used with
:py:class:`girder_worker.docker.transforms.Connect` to write text to stderr.
"""
def transform(self, **kwargs):
from girder_worker.docker.io import (
StdStreamWriter
)
return StdStreamWriter(sys.stderr)
[docs]class ContainerStdOut(Transform):
"""
Represents the standard output stream of the container. Can be used with
:py:class:`girder_worker.docker.transforms.Connect` to redirect the containers
standard output to another stream.
"""
def transform(self, **kwargs):
return self
def open(self):
# noop
pass
[docs]class ContainerStdErr(Transform):
"""
Represents the standard error stream of the container. Can be used with
:py:class:`girder_worker.docker.transforms.Connect` to redirect the containers
standard error to another stream.
"""
def transform(self, **kwargs):
return self
def open(self):
# noop
pass
[docs]class BindMountVolume(Transform):
"""
A volume that will be bind mounted into a docker container.
:param host_path: The path on the host machine.
:type host_path: str
:param container_path: The path in the container this volume will be mounted
at.
:type container_path: str
:param mode: The mounting mode
:type mode: str
"""
def __init__(self, host_path, container_path, mode='rw'):
self._host_path = host_path
self._container_path = container_path
self.mode = mode
def _repr_json_(self):
return {
self.host_path: {
'bind': self.container_path,
'mode': self.mode
}
}
def transform(self, **kwargs):
return self.container_path
@property
def container_path(self):
return self._container_path
@property
def host_path(self):
return self._host_path
class _TemporaryVolumeMetaClass(abc.ABCMeta):
@property
def default(cls):
"""
This returns the default temporary volume that is always mounted into the container.
"""
return _DefaultTemporaryVolume()
class _TemporaryVolumeBase(BindMountVolume):
def __init__(self, *arg, **kwargs):
super(_TemporaryVolumeBase, self).__init__(*arg, **kwargs)
self._transformed = False
def _make_paths(self, host_dir=None, mode=0o755):
if host_dir is not None and not os.path.exists(host_dir):
os.makedirs(host_dir)
# Sometimes we need to explicitly set the mode of the
# directory to 0o777 (e.g. when running the integration
# tests). To do this explicitly (without the user's umask
# getting in the way) we must make a separate call to
# os.chmod.
os.chmod(host_dir, mode)
self._host_path = tempfile.mkdtemp(dir=host_dir)
self._container_path = os.path.join(TEMP_VOLUME_MOUNT_PREFIX, uuid.uuid4().hex)
[docs]@six.add_metaclass(_TemporaryVolumeMetaClass)
class TemporaryVolume(_TemporaryVolumeBase):
"""
This is a class used to represent a temporary directory on the host that will
be mounted into a docker container. girder_worker will automatically attach a default
temporary volume. This can be reference using `TemporaryVolume.default` class attribute.
A temporary volume can also be create in a particular host directory by providing the
`host_dir` param.
:param host_dir: The root directory on the host to use when creating the
the temporary host path.
:type host_dir: str
:param mode: The default mode applied to the temporary volume if it does
not already exist.
:type mode: int
"""
# Note that this mode is explicitly set with os.chmod. What you
# set, is what you get - no os.makedirs umask shenanigans.
def __init__(self, host_dir=None, mode=0o755):
super(TemporaryVolume, self).__init__(None, None)
self.host_dir = host_dir
self._mode = mode
self._instance = None
self._transformed = False
def transform(self, **kwargs):
if not self._transformed:
self._transformed = True
self._make_paths(self.host_dir, mode=self._mode)
return super(TemporaryVolume, self).transform(**kwargs)
class _DefaultTemporaryVolume(TemporaryVolume):
"""
Place holder who delegates implementation to instance provide by transform(...) method
An instance of the class is returned each time `TemporaryVolume.default` is accessed.
When the docker_run task is executed the transform(...) method is call with an instance
containing information about the actual default temporary volume associated with the
task. The place holder then delegates all functionality to this instance.
"""
def transform(self, _default_temp_volume=None, **kwargs):
self._instance = _default_temp_volume
self._transformed = True
return self._instance.transform(**kwargs)
@property
def container_path(self):
return self._instance.container_path
@property
def host_path(self):
return self._instance.host_path
class NamedPipeBase(Transform):
def __init__(self, name, container_path=None, host_path=None, volume=TemporaryVolume.default):
super(NamedPipeBase, self).__init__()
self._container_path = None
self._host_path = None
self._volume = None
if container_path is not None and host_path is not None:
self._container_path = container_path
self._host_path = host_path
else:
self._volume = volume
self.name = name
def transform(self, **kwargs):
if self._volume is not None:
self._volume.transform(**kwargs)
@property
def container_path(self):
"""
The path within the docker container.
"""
if self._container_path is not None:
return os.path.join(self._container_path, self.name)
else:
return os.path.join(self._volume.container_path, self.name)
@property
def host_path(self):
"""
The path on the host machine
"""
if self._host_path is not None:
return os.path.join(self._host_path, self.name)
else:
return os.path.join(self._volume.host_path, self.name)
def cleanup(self, **kwargs):
os.remove(self.host_path)
[docs]class NamedOutputPipe(NamedPipeBase):
"""
A named pipe that can be opened for write within a docker container.
i.e. To stream data out of a container.
:param name: The name of the pipe.
:type name: str
:param container_path: The path in the container.
:type container_path: str
:param host_path: The path on the host machine.
:type host_path: str
:param volume: Alternatively a :py:class:`girder_worker.docker.transforms.BindMountVolume`
instance can be provided. In which can the container_path and host_paths from
the volume will be use when creating the pipe. The default location is
:py:attr:`girder_worker.docker.transforms.TemporaryVolume.default`
"""
def __init__(self, name, container_path=None, host_path=None, volume=TemporaryVolume.default):
super(NamedOutputPipe, self).__init__(name, container_path, host_path, volume)
def transform(self, **kwargs):
from girder_worker.docker.io import (
NamedPipe,
NamedPipeReader
)
super(NamedOutputPipe, self).transform(**kwargs)
pipe = NamedPipe(self.host_path)
return NamedPipeReader(pipe, self.container_path)
[docs]class VolumePath(Transform):
"""
A path on a docker volume. Must be a path relative to the root of the volume.
:param filename: The file name.
:type name: str
:param volume: The volume this file lived on. If no volume is provided then
the file will be on
:py:attr:`girder_worker.docker.transforms.TemporaryVolume.default`
:type volume: :py:class:`girder_worker.docker.transforms.BindMountVolume`
"""
def __init__(self, filename, volume=TemporaryVolume.default):
if os.path.isabs(filename):
raise Exception('VolumePath paths must be relative to a volume (%s).' % filename)
self.filename = filename
self._volume = volume
def transform(self, *pargs, **kwargs):
self._volume.transform(**kwargs)
# If we are being called with arguments, then this is the execution of
# girder_result_hooks, so return the host_path
if len(pargs) > 0:
return os.path.join(self._volume.host_path, self.filename)
else:
return os.path.join(self._volume.container_path, self.filename)
def _repr_model_(self):
return '<%s.%s: "%s">' % (self.__module__, self.__class__.__name__, self.filename)
[docs]class Connect(Transform):
"""
This utility class represents the connection between a
:py:class:`girder_worker.docker.transforms.NamedOutputPipe` or
:py:class:`girder_worker.docker.transforms.NamedInputPipe` and one of the other streaming
transforms. Girder Worker will stream the data to or from the named pipe.
:param input: The input side of the connection
:type input: :py:class:`girder_worker.docker.transforms.NamedOutputPipe` or
:py:class:`girder_worker.docker.transforms.girder.GirderFileIdToStream`
:param output: The output side of the connection
:type output: :py:class:`girder_worker.docker.transforms.NamedInputPipe` or
:py:class:`girder_worker.docker.transforms.ChunkedTransferEncodingStream` or
:py:class:`girder_worker.docker.transforms.HostStdOut` or
:py:class:`girder_worker.docker.transforms.HostStdErr`
"""
def __init__(self, input, output):
super(Connect, self).__init__()
self._input = input
self._output = output
def transform(self, **kwargs):
from girder_worker.docker.io import (
FDWriteStreamConnector,
FDReadStreamConnector,
)
input = _maybe_transform(self._input, **kwargs)
output = _maybe_transform(self._output, **kwargs)
if isinstance(self._output, NamedInputPipe):
return FDWriteStreamConnector(input, output)
elif isinstance(self._input, NamedOutputPipe):
return FDReadStreamConnector(input, output)
else:
raise TypeError('A NamedInputPipe or NamedOutputPipe must be provided.')
def _repr_model_(self):
"""
The method is called before save the argument in the job model.
"""
return str(self)
[docs]class ChunkedTransferEncodingStream(Transform):
"""
A stream transform that allows data to be streamed using HTTP Chunked Transfer Encoding
to a server.
:param url: Destination URL for the stream.
:type url: str
:param headers: HTTP headers to send.
:type header: dict
"""
def __init__(self, url, headers={}, **kwargs):
self.url = url
self.headers = headers
def transform(self, **kwargs):
from girder_worker.docker.io import (
ChunkedTransferEncodingStreamWriter
)
return ChunkedTransferEncodingStreamWriter(self.url, self.headers)