Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process executors #184

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions daskms/experimental/arrow/reads.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from collections import defaultdict
import json
from pathlib import Path
import re
from threading import Lock
import weakref
import warnings
Expand Down Expand Up @@ -54,6 +55,10 @@
_parquet_table_cache = weakref.WeakValueDictionary()


def natural_order(key):
return tuple(int(c) if c.isdigit() else c.lower()
for c in re.split("(\d+)", str(key)))

class ParquetFileProxy(metaclass=Multiton):
def __init__(self, store, key):
self.store = store
Expand Down
103 changes: 86 additions & 17 deletions daskms/patterns.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import inspect
from inspect import getattr_static
from threading import Lock
from types import MethodType
from warnings import warn
import weakref

Expand All @@ -28,6 +29,22 @@ def freeze(arg):
return arg


def _warn_if_positional_in_kwargs(cls, kwargs):
signature = inspect.signature(cls.__init__)
positional_in_kwargs = [p.name for p in signature.parameters.values()
if p.kind == p.POSITIONAL_OR_KEYWORD
and p.default == p.empty
and p.name in kwargs]

if positional_in_kwargs:
warn(f"Positional arguments {positional_in_kwargs} were "
f"supplied as keyword arguments to "
f"{cls.__init__}{signature}. "
f"This may create separate Multiton instances "
f"for what is intended to be a unique set of "
f"arguments.")


class Multiton(type):
"""General Multiton metaclass

Expand Down Expand Up @@ -66,21 +83,56 @@ def __init__(self, *args, **kwargs):
self.__lock = Lock()

def __call__(cls, *args, **kwargs):
signature = inspect.signature(cls.__init__)
positional_in_kwargs = [p.name for p in signature.parameters.values()
if p.kind == p.POSITIONAL_OR_KEYWORD
and p.default == p.empty
and p.name in kwargs]

if positional_in_kwargs:
warn(f"Positional arguments {positional_in_kwargs} were "
f"supplied as keyword arguments to "
f"{cls.__init__}{signature}. "
f"This may create separate Multiton instances "
f"for what is intended to be a unique set of "
f"arguments.")

key = freeze(args + (kwargs if kwargs else Multiton.MISSING,))
_warn_if_positional_in_kwargs(cls, kwargs)
key = freeze(args + (kwargs if kwargs else cls.MISSING,))

# Double-checked locking
# https://en.wikipedia.org/wiki/Double-checked_locking
try:
return cls.__cache[key]
except KeyError:
pass

with cls.__lock:
try:
return cls.__cache[key]
except KeyError:
instance = type.__call__(cls, *args, **kwargs)
cls.__cache[key] = instance
return instance


def dummy(self, *args, **kwargs):
pass

class PersistentMultiton(type):
"""Similar to a :class:`Multiton`, but """
MISSING = object()

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.__cache = {}
self.__lock = Lock()

def __new__(cls, name, bases, namespace):
return super().__new__(cls, name, bases, {
"__forget_multiton__": dummy,
**namespace,
})

@staticmethod
def forgetter(key):
def __forget_multiton__(self, *args, **kwargs):
try:
del self.__cache[key]
except KeyError:
pass

return __forget_multiton__

def __call__(cls, *args, **kwargs):
_warn_if_positional_in_kwargs(cls, kwargs)
key = freeze(args + (kwargs if kwargs else cls.MISSING,))

# Double-checked locking
# https://en.wikipedia.org/wiki/Double-checked_locking
Expand All @@ -94,6 +146,8 @@ def __call__(cls, *args, **kwargs):
return cls.__cache[key]
except KeyError:
instance = type.__call__(cls, *args, **kwargs)
forget_method = MethodType(cls.forgetter(key), instance)
instance.__forget_multiton__ = forget_method
cls.__cache[key] = instance
return instance

Expand Down Expand Up @@ -359,8 +413,20 @@ def __getattr__(self, name):
raise AttributeError(name) from e

def __setattr__(self, name, value):
obj = self if name in self.__lazy_members__ else self.__lazy_object__
return object.__setattr__(obj, name, value)
# Defer to self
if name in self.__lazy_members__:
return object.__setattr__(self, name, value)

try:
# name might exist on self, e.g. maybe a metaclass
# added a method
getattr_static(self, name)
except AttributeError:
# Nope, defer to the proxy
return object.__setattr__(self.__lazy_object__, name, value)
else:
# Defer to self
return object.__setattr__(self, name, value)

def __delattr__(self, name):
if name in self.__lazy_members__:
Expand Down Expand Up @@ -391,3 +457,6 @@ def __init__(self, value):

See :class:`LazyProxy` and :class:`Multiton` for further details
"""

class PersistentLazyProxyMultiton(LazyProxy, metaclass=PersistentMultiton):
pass
24 changes: 17 additions & 7 deletions daskms/table_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,35 @@

class ExecutorMetaClass(type):
""" https://en.wikipedia.org/wiki/Multiton_pattern """
def __call__(cls, key=STANDARD_EXECUTOR):
def __call__(cls, key, typ):
cache_key = (key, typ)

try:
return _executor_cache[key]
return _executor_cache[cache_key]
except KeyError:
pass

with _executor_lock:
try:
return _executor_cache[key]
return _executor_cache[cache_key]
except KeyError:
instance = type.__call__(cls, key)
_executor_cache[key] = instance
instance = type.__call__(cls, key, typ)
_executor_cache[cache_key] = instance
return instance


class Executor(object, metaclass=ExecutorMetaClass):
def __init__(self, key=STANDARD_EXECUTOR):
def __init__(self, key=STANDARD_EXECUTOR, typ="thread"):
# Initialise a single thread
self.impl = impl = cf.ThreadPoolExecutor(1)

if typ == "thread":
impl = cf.ThreadPoolExecutor(1)
elif typ == "process":
impl = cf.ProcessPoolExecutor(1)
else:
raise ValueError(f"Invalid Executor type {typ}")

self.impl = impl
self.key = key

# Register a finaliser shutting down the
Expand Down
42 changes: 13 additions & 29 deletions daskms/table_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,10 @@
import pyrap.tables as pt
from daskms.table_executor import Executor, STANDARD_EXECUTOR
from daskms.utils import arg_hasher
from daskms.patterns import LazyProxyMultiton, Multiton

log = logging.getLogger(__name__)

_table_cache = weakref.WeakValueDictionary()
_table_lock = Lock()

# CASA Table Locking Modes
NOLOCK = 0
READLOCK = 1
Expand Down Expand Up @@ -139,33 +137,13 @@ def public_method(self, *args, **kwargs):
return public_method


class TableProxyMetaClass(type):
"""
https://en.wikipedia.org/wiki/Multiton_pattern

"""
class TableProxyMetaClass(Multiton):
def __new__(cls, name, bases, dct):
for method, locktype in _proxied_methods:
proxy_method = proxied_method_factory(method, locktype)
dct[method] = proxy_method

return type.__new__(cls, name, bases, dct)

def __call__(cls, *args, **kwargs):
key = arg_hasher((cls,) + args + (kwargs,))

with _table_lock:
try:
return _table_cache[key]
except KeyError:
instance = type.__call__(cls, *args, **kwargs)
_table_cache[key] = instance
return instance


def _map_create_proxy(cls, factory, args, kwargs):
""" Support pickling of kwargs in TableProxy.__reduce__ """
return cls(factory, *args, **kwargs)
return super().__new__(cls, name, bases, dct)


class MismatchedLocks(Exception):
Expand Down Expand Up @@ -304,8 +282,9 @@ def __init__(self, factory, *args, **kwargs):
**kwargs : dict
Keyword arguments passed to factory.
__executor_key__ : str, optional
Executor key. Identifies a unique threadpool
Executor key. Identifies a unique pool
in which table operations will be performed.
__executor_type__: {"thread", "process"}
"""

# Save the arguments as keys for pickling and tokenising
Expand All @@ -325,11 +304,13 @@ def __init__(self, factory, *args, **kwargs):
# TableProxy(*args, ex_key=..., **kwargs)
kwargs = kwargs.copy()
self._ex_key = kwargs.pop("__executor_key__", STANDARD_EXECUTOR)
ex_type = kwargs.pop("__executor_type__", "thread")

# Store a reference to the Executor wrapper class
# so that the Executor is retained while this TableProxy
# still lives
self._ex_wrapper = ex = Executor(key=self._ex_key)
self._ex_wrapper = ex = Executor(key=self._ex_key, typ=ex_type)

self._table_future = table = ex.impl.submit(factory, *args, **kwargs)

weakref.finalize(self, _table_future_finaliser, ex, table,
Expand Down Expand Up @@ -358,10 +339,13 @@ def __init__(self, factory, *args, **kwargs):
def executor_key(self):
return self._ex_key

@classmethod
def from_args(cls, factory, args, kwargs):
return cls(factory, *args, **kwargs)

def __reduce__(self):
""" Defer to _map_create_proxy to support kwarg pickling """
return (_map_create_proxy, (TableProxy, self._factory,
self._args, self._kwargs))
return (self.from_args, (self._factory, self._args, self._kwargs))

def __enter__(self):
return self
Expand Down
Loading