-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ParallelTable Musings #185
base: master
Are you sure you want to change the base?
Conversation
…need some investigation.
table.close() | ||
|
||
|
||
class ParallelTable(Table): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
class ParallelTable(Table): | |
class ParallelTable(metaclass=...) |
As I read the ParallelTable class, it is predicated around proxying the encapsulated Table objects as opposed to overriding inherited methods of the Table class.
Therefore it's unnecessary to inherit from Table as proxied table objects are created in _get_table
.
This way, it's also possible to use a metaclass without getting tangled up with a boost python subclass.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am trying to grasp this, but I am still struggling. The problem I see is that failing to inherit from pyrap.tables.table
means that the ParallelTable object will not in fact appear to be a table i.e. self._table_future = table = ex.impl.submit(factory, *args, **kwargs)
will not be a future pointing at a table. What I could see working is defining a ParralelTableProxy
which simply inherits from TableProxy
but defines its own metaclass which modifies the behaviour of get* operations. Currently, ParallelTable
is itself a table (i.e. you can do something like pt.taql(query, ParallelTable)
) in addition to simply rerouting get* operation through a cache. In other words, there will always be one extra copy open - the "root" table which gets used for unmodified methods. I will take a swing at the ParallelTableProxy
idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you mind clarifying what the problem is with this approach? Currently, all the ParallelTable
does is override some inherited methods of pyrap.tables.table
prior (!!) to the being embedded in a TableProxy
. This just means that the proxy proxies these special methods, rather than those on the base class. This yields a really simple solution as subsequent operations proceed as normal. The ParallelTable
is a table, and supports all pyrap.tables.table
methods, and will have all the relevant multiton patterns applied inside the TableProxy
. I have tried creating a ParallelTableProxy
, but that becomes difficult as one needs to access the cache inside get* methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have implemented a ParallelTableProxy
. At present it segfaults, but I know why. The problem stems from the fact that getter_wrapper
in reads.py
calls methods directly on the underlying table object, rather than via the TableProxy
. This is problematic as TableProxy.method
may not be the same as pyrap.tables.table.method
. This is precisely where my current segfaults come from, as getter_wrapper
calls the non-threadsafe get* functions on the underlying table.
self._cached_tables = {} | ||
self._table_path = args[0] # TODO: This should be checked. | ||
|
||
super().__init__(*args, **kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super().__init__(*args, **kwargs) |
Just a note that I'm still trying to ensure minimum thread safety is
achieved on the table system itself.
After spending some time yesterday digging through the table system code I
have decided the best way forward is to serialize access to table and
column pointers by making TableProxy a threadsafe object, locking on the
internal Table::Table object.
Not only are there bucket caches, but column caches and table caches and
making the entire C++ table system threadsafe is a massive undertaking for
which I do not have
time due to much higher priorities porting Meqtrees and supporting eMeerKAT.
This will not be optimal in terms of processing when using a single Table
(inside TableProxy.h) object (it will give similar lockup issues to relying
on the GIL, but at bare minimum it will make the codebase safe to access
from multiple threads through the TableProxy.h/cc.
From discussion with @JSKenyon the correct way to use CC is then
Correct:
```python
import numpy as np
import threading
import pyrap.tables as pt
import os
import concurrent.futures as cf
import multiprocessing
import time
import sys
def threaded_getcolnp(ms_path, colname, startrow, nrow):
tid = threading.get_ident()
with pt.table(ms_path, lockoptions="user", ack=False) as ms:
ref_row = ms.getcol(colname, nrow=1) # Read an example row.
ref_dims = ref_row.shape[1:]
ref_dtype = ref_row.dtype
out = np.empty((nrow, *ref_dims), dtype=ref_dtype) # Preallocate
output.
ms.getcolnp(colname, out, startrow=startrow, nrow=nrow)
return # We don't want to profile the cost of returning the result.
if __name__ == "__main__":
ms_path = sys.argv[1]
no_times = int(sys.argv[2])
column = "DATA"
nchunk = 40 # Partition the read into this many chunks.
nworker = 4 # Number of threads/processes.
table = pt.table(ms_path, ack=False)
total_nrow = table.nrows()
table.close()
nrow = int(np.ceil(total_nrow / nchunk)) # nrow per chunk.
starts = np.repeat(np.arange(0, total_nrow, nrow), no_times)
np.random.shuffle(starts)
t0 = time.time()
with cf.ThreadPoolExecutor(max_workers=nworker) as tpe:
futures = [
tpe.submit(
threaded_getcolnp,
ms_path,
column,
row,
nrow
)
for row in starts
]
print(f"Reading MS using threads and pid multiton TableCache:
{time.time() - t0:.3f}s")
```
Wrong:
```python
import numpy as np
import threading
import pyrap.tables as pt
import os
import concurrent.futures as cf
import multiprocessing
import time
import sys
def threaded_getcolnp(ms, colname, startrow, nrow):
tid = threading.get_ident()
ref_row = ms.getcol(colname, nrow=1) # Read an example row.
ref_dims = ref_row.shape[1:]
ref_dtype = ref_row.dtype
out = np.empty((nrow, *ref_dims), dtype=ref_dtype) # Preallocate output.
ms.getcolnp(colname, out, startrow=startrow, nrow=nrow)
return # We don't want to profile the cost of returning the result.
if __name__ == "__main__":
ms_path = sys.argv[1]
no_times = int(sys.argv[2])
column = "DATA"
nchunk = 40 # Partition the read into this many chunks.
nworker = 4 # Number of threads/processes.
table = pt.table(ms_path, ack=False)
total_nrow = table.nrows()
table.close()
nrow = int(np.ceil(total_nrow / nchunk)) # nrow per chunk.
starts = np.repeat(np.arange(0, total_nrow, nrow), no_times)
np.random.shuffle(starts)
with pt.table(ms_path, lockoptions="user", ack=False) as ms
t0 = time.time()
with cf.ThreadPoolExecutor(max_workers=nworker) as tpe:
futures = [
tpe.submit(
threaded_getcolnp,
ms,
column,
row,
nrow)
for row in starts
]
print(f"Reading MS using threads and pid multiton TableCache:
{time.time() - t0:.3f}s")
```
I know this requires substantial changes to the way dask-ms works, but in
my view file type descriptor objects should never be shared between
threads and one should rely on POSIX to ensure that locking is done
correctly at file system layer.
The minimum workaround in the mean time would perhaps be to pass a proxy
object around like simon showed which contains not a file / table
descriptor but only the args and kwargs needed for a thread to open
readonly read and close as needed. This has some overheads to it but on
large enough chunk sizes those overheads should be amortized.
|
I am also wary of rushing to a solution on the casacore end @bennahugo. Ger may have a different solution in mind. I am just experimenting on the dask-ms end and trying to get something working. This is all moot if we just end up with a different scenario in which everything locks up. |
Sure I'm just noting down our discussion -- it won't help to to put the cart in front of the horses here, but it would be good to have a draft solution for this. Thanks for the work on dask-ms front |
Tests added / passed
If the pep8 tests fail, the quickest way to correct
this is to run
autopep8
and thenflake8
andpycodestyle
to fix the remaining issues.Fully documented, including
HISTORY.rst
for all changesand one of the
docs/*-api.rst
files for new APITo build the docs locally:
This PR is a WIP demonstrating a possible approach for parallel reads from threads. This approach is reliant on casacore/casacore#1167, which allows me to avoid using soft links. Instead, the changes in that PR mean that when a table is opened from multiple threads, it does not share its underlying plain table object.
The approach that I am attempting here is almost certainly imperfect but it is very simple. It defines a
ParallelTable
class which inherits frompyrap.tables.table
. This, unfortunately, introduces some limitations as the base class is defined in C++. That said, doing this allows us to create aParallelTable
object which masquerades as a normal table - the only difference is that when a read method is called, it first checks if the thread has an open instance of the table. If not, the table is opened in the thread and added to the cache. I make use ofweakref
to ensure that all tables are closed when the ParallelTable object is GCed.The changes in this PR seem to work although some tests are broken - I suspect this may have to do with subtables, but I have yet to investigate. Note that there is plenty of ugly debugging code in the PR. I will remove it if this coalesces into a stable approach.
One important thing to note is the fact that the
cf.ThreadPoolExecutor
has been dummied out with aDummyThreadPoolExecutor
andDummyFuture
. This seems to work for a simple read case, though further testing is needed. This would be a nice simplification as it suggests that we could get away without internal threadpools. That said, the changes in the PR also work with the internal threadpools with the caveat that those threadpools need more than one thread (as otherwise we serialise).Finally, one thing to note is that using the
processes
scheduler does not function optimally for both this PR andmaster
. Both will repeatedly open tables for reasons I don't fully understand. I suspect that the caching mechanism on theTableProxy
doesn't function as expected in this specific case. What is particularly confusing is that it does seem to operate correctly in the distributed case using aLocalCluster
with multiple workers.