diff --git a/docs/advanced.md b/docs/advanced.md index e73c77e2..99d43028 100644 --- a/docs/advanced.md +++ b/docs/advanced.md @@ -4,6 +4,7 @@ Advanced topics _Tangos_ is a highly flexible, customisable system. Tutorials are available covering the following topics: + - Working with [different database systems](dbms.md) (e.g. MySQL and PostgreSQL) - Writing code to [calculate your own properties](custom_properties.md) - [Tracking](tracking.md) groups of particles across timesteps - [Parallelisation strategies](mpi.md) diff --git a/docs/index.md b/docs/index.md index 227a2995..376e661a 100644 --- a/docs/index.md +++ b/docs/index.md @@ -77,45 +77,6 @@ MySQL / MariaDB below. Remember, you will need to set these environment variables *every* time you start a new session on your computer prior to booting up the database, either with the webserver or the python interface (see below). -Using PostgreSQL, MySQL or MariaDB ----------------------------------- - -As stated above, tangos is agnostic to the underlying SQL flavour. It is easiest to get start with -SQLite which doesn't need any special server. But version 1.5+ should also work well with [MySQL](https://www.mysql.com), -[MariaDB](https://mariadb.org) and version 1.7+ also with [PostgreSQL](https://www.postgresql.org). - -To try this out, if you have [docker](https://docker.com), you can run a test -MySQL server very easily: - -```bash -docker pull mysql -docker run -d --name=mysql-server -p3306:3306 -e MYSQL_ROOT_PASSWORD=my_secret_password mysql -echo "create database database_name;" | docker exec -i mysql-server mysql -pmy_secret_password -``` - -Or, just as easily, you can get going with PostgreSQL: -```bash -docker pull postgres -docker run --name tangos-postgres -e POSTGRES_USER=tangos -e POSTGRES_PASSWORD=my_secret_password -e POSTGRES_DB=database_name -p 5432:5432 -d postgres -``` - -To be sure that python can connect to MySQL or PostgreSQL, install the appropriate modules: -```bash -pip install PyMySQL # for MySQL -pip install psycopg2-binary # for PostgreSQL -``` - -Tangos can now connect to your test MySQL server using the connection: -```bash -export TANGOS_DB_CONNECTION=mysql+pymysql://root:my_secret_password@localhost:3306/database_name -``` -or for PostgreSQL: -```bash -export TANGOS_DB_CONNECTION=postgresql+psycopg2://tangos:my_secret_password@localhost/database_name -``` - -You can now use all the tangos tools as normal, and they will populate the MySQL/PostgreSQL database -instead of a SQLite file. Where next? diff --git a/docs/rdbms.md b/docs/rdbms.md new file mode 100644 index 00000000..eb5b9555 --- /dev/null +++ b/docs/rdbms.md @@ -0,0 +1,62 @@ +Working with different database systems +======================================= + +Tangos is built on sqlalchemy, which means that it is in principle possible to use any database system supported by sqlalchemy. However, different database systems have different features and limitations of which it is worth being aware. + +The tangos tests are run with SQLite, mySQL and postgresql. Other databases, while supported by sqlalchemy, have not been directly tested. The following contain some notes on using these different systems. + +SQLite +------ + +SQLite is the default database. It is simple in the sense that it keeps your entire database within a single file which can easily be transferred to different systems. Additionally, the SQLite driver is included with Python and so it's quick to get started. + +There are two major, related drawbacks to SQLite. The first is that the + +PostgreSQL and MySQL +-------------------- + +PostgreSQL and MySQL are both server-based systems, and as such take a little more effort to set up and maintain. If one exposes PostgreSQL to the outside world, there are potential security implications. One can of course run it on a firewalled computer and manage access appropriately, but this takes some expertise of its own (that will not be covered here). The major advantage is that you can host your data in a single location and allow multiple users to connect. + + + +MySQL +----- + +MySQL is a server-based system, and as such takes a little more effort to set up. The advantage is that you can host your data in a single location and allow multiple users to connect. Additionally, it is able to cope much better with complex parallel writes than SQLite. + +For most users, MySQL and PostgreSQL are + +To try this out, if you have [docker](https://docker.com), you can run a test +MySQL server very easily: + +```bash +docker pull mysql +docker run -d --name=mysql-server -p3306:3306 -e MYSQL_ROOT_PASSWORD=my_secret_password mysql +echo "create database database_name;" | docker exec -i mysql-server mysql -pmy_secret_password +``` + +Or, just as easily, you can get going with PostgreSQL: +```bash +docker pull postgres +docker run --name tangos-postgres -e POSTGRES_USER=tangos -e POSTGRES_PASSWORD=my_secret_password -e POSTGRES_DB=database_name -p 5432:5432 -d postgres +``` + +To be sure that python can connect to MySQL or PostgreSQL, install the appropriate modules: +```bash +pip install PyMySQL # for MySQL +pip install psycopg2-binary # for PostgreSQL +``` + +Tangos can now connect to your test MySQL server using the connection: +```bash +export TANGOS_DB_CONNECTION=mysql+pymysql://root:my_secret_password@localhost:3306/database_name +``` +or for PostgreSQL: +```bash +export TANGOS_DB_CONNECTION=postgresql+psycopg2://tangos:my_secret_password@localhost/database_name +``` + +You can now use all the tangos tools as normal, and they will populate the MySQL/PostgreSQL database +instead of a SQLite file. + + diff --git a/setup.py b/setup.py index 168a546c..68c9fb86 100644 --- a/setup.py +++ b/setup.py @@ -21,7 +21,8 @@ 'hupper', 'scipy >= 0.14.0', 'more_itertools >= 8.0.0', - 'matplotlib >= 3.0.0' # for web interface + 'matplotlib >= 3.0.0', # for web interface + 'tqdm >= 4.59.0' ] tests_require = [ diff --git a/tangos/core/halo.py b/tangos/core/halo.py index 80e1aa75..eddcd73d 100644 --- a/tangos/core/halo.py +++ b/tangos/core/halo.py @@ -1,5 +1,5 @@ import numpy as np -from sqlalchemy import Column, ForeignKey, Integer, orm, types +from sqlalchemy import Column, ForeignKey, Integer, orm, types, BigInteger from sqlalchemy.orm import Session, backref, relationship from . import Base, creator, extraction_patterns @@ -30,9 +30,9 @@ class SimulationObjectBase(Base): __tablename__= "halos" id = Column(Integer, primary_key=True) #the unique ID value of the database object created for this halo - halo_number = Column(Integer) #by default this will be the halo's rank in terms of particle count - finder_id = Column(UnsignedInteger) #raw halo ID from the halo catalog - finder_offset = Column(Integer) #index of halo within halo catalog, primary identifier used when reading catalog/simulation data + halo_number = Column(BigInteger) #by default this will be the halo's rank in terms of particle count + finder_id = Column(BigInteger) #raw halo ID from the halo catalog + finder_offset = Column(BigInteger) #index of halo within halo catalog, primary identifier used when reading catalog/simulation data timestep_id = Column(Integer, ForeignKey('timesteps.id')) timestep = relationship(TimeStep, backref=backref( 'objects', order_by=halo_number, cascade_backrefs=False, lazy='dynamic'), cascade='') diff --git a/tangos/scripts/manager.py b/tangos/scripts/manager.py index 8fe9db81..bf5e0b90 100755 --- a/tangos/scripts/manager.py +++ b/tangos/scripts/manager.py @@ -5,13 +5,15 @@ from textwrap import dedent import numpy as np +import sqlalchemy, sqlalchemy.exc + from sqlalchemy import create_engine, text from sqlalchemy.orm import sessionmaker import tangos as db from tangos import all_simulations, config, core, parallel_tasks from tangos.core import (Base, Creator, HaloLink, HaloProperty, Simulation, - SimulationObjectBase, TimeStep, + SimulationObjectBase, TimeStep, DictionaryItem, get_or_create_dictionary_item) from tangos.core.simulation import SimulationProperty from tangos.core.tracking import TrackData @@ -36,116 +38,211 @@ def add_simulation_timesteps(options): def db_import(options): + if not options.force: + s = core.get_default_session() + + num_sims = s.query(core.Simulation).count() + + if num_sims>0: + print("**** WARNING ****") + print("The behaviour of importing has changed. Now when you import into a database, any existing") + print("simulations in that database will be deleted. To avoid seeing this message, you can use -f or --force.") + + num_timesteps = s.query(core.TimeStep).count() + num_objs = s.query(core.SimulationObjectBase).count() + + print(f""" +Currently there are + {num_sims} simulation(s) + {num_timesteps} timestep(s), and + {num_objs} object(s) +that will be deleted if you continue. +""") + if not _get_user_confirmation(): + print("Aborted") + exit(0) - sims = options.sims remote_db = options.file - global internal_session - engine2 = create_engine('sqlite:///' + remote_db, echo=False) + if "://" not in remote_db: + remote_db = "sqlite:///"+remote_db + + engine2 = create_engine(remote_db, echo=False) ext_session = sessionmaker(bind=engine2)() - _db_import_export(core.get_default_session(), ext_session, *sims) + _db_import_export(core.get_default_session(), ext_session) -def db_export(remote_db, *sims): - global internal_session - engine2 = create_engine('sqlite:///' + remote_db, echo=False) +def _update_foreign_id(list_of_rows, foreign_id_column: sqlalchemy.Column, mapping=None): + foreign_id_column_number = foreign_id_column.table.c.keys().index(foreign_id_column.name) + new_list_of_rows = [] + mapped_id = None + for row in list_of_rows: + if mapping is not None: + mapped_id = mapping[row[foreign_id_column_number]] + new_row = row[:foreign_id_column_number]+(mapped_id,)+row[foreign_id_column_number+1:] + new_list_of_rows.append(new_row) + return new_list_of_rows - int_session = core.get_default_session() - ext_session = sessionmaker(bind=engine2)() +def _copy_table(from_connection, target_connection, orm_class): + from sqlalchemy import select, insert, func + table = orm_class.__table__ + + num_rows = from_connection.execute(select(func.count(table.c.id))).scalar() + import tqdm + + CHUNK_SIZE = 10 + COMMIT_AFTER_CHUNKS = 500 + num_done = 0 + + source_result = from_connection.execute(select(table)) + + retries = 0 + + with tqdm.tqdm(total=num_rows, desc = f"Copying {orm_class.__name__}", unit="row", smoothing=0.1) as pbar: + while num_done < num_rows: + all_rows = source_result.fetchmany(CHUNK_SIZE) + all_rows = [tuple(r) for r in all_rows] + + try: + target_connection.execute(insert(table).values(all_rows)) + + except sqlalchemy.exc.OperationalError as e: + if retries>=1: + raise # if this line is hit, it may reflect a data limit in the server, e.g. max_allowed_packet in MySQL + # Such limits result in the connection being dropped. In PostgreSQL an error is written in the + # server log, but in MySQL it does not seem to be. Reducing CHUNK_SIZE may help, or increasing + # the limit on the server. + + num_committed = num_done - (num_done % (CHUNK_SIZE * COMMIT_AFTER_CHUNKS)) + pbar.update(num_committed-num_done) # negative correction + print(f"Note: lost connection to database after {num_done} rows. Resetting to {num_committed}.") + # reset to point of last commit + num_done = num_committed + target_connection.rollback() + # create a new connection from the target connection's engine + target_connection = target_connection.engine.connect() + source_result = from_connection.execute(select(table).offset(num_committed)) + retries+=1 + continue + + + num_done += len(all_rows) + pbar.update(len(all_rows)) + + if num_done % (CHUNK_SIZE * COMMIT_AFTER_CHUNKS) == 0: + target_connection.commit() + retries = 0 + + + + target_connection.commit() + +def _drop_foreign_keys(session): + from sqlalchemy.engine import reflection + from sqlalchemy import MetaData, Table, ForeignKeyConstraint + from sqlalchemy.schema import DropConstraint, AddConstraint + + + engine = session.get_bind() - Base.metadata.create_all(engine2) + inspector = reflection.Inspector.from_engine(engine) + fake_metadata = MetaData() - _xcurrent_creator = core.creator.get_creator() + fake_tables = [] + all_fks = [] - core.set_default_session(ext_session) - creator = Creator() - ext_session.add(creator) - core.set_creator(creator) + for table_name in Base.metadata.tables: + fks = [] + for fk in inspector.get_foreign_keys(table_name): + if fk['name']: + fks.append(ForeignKeyConstraint((), (), name=fk['name'])) + t = Table(table_name, fake_metadata, *fks) + fake_tables.append(t) + all_fks.extend(fks) - _db_import_export(ext_session, int_session, *sims) + with engine.begin() as conn: + for fkc in all_fks: + print(str(DropConstraint(fkc))) + conn.execute(DropConstraint(fkc)) - core.set_creator(_xcurrent_creator) - core.set_default_session(int_session) +def _create_foreign_keys(session): + from sqlalchemy.engine import reflection + from sqlalchemy import MetaData, Table, ForeignKeyConstraint + from sqlalchemy.schema import DropConstraint, AddConstraint + + + engine = session.get_bind() + + with engine.begin() as conn: + for table in Base.metadata.tables.values(): + for fk in table.foreign_keys: + print(str(AddConstraint(fk.constraint)), end="") + try: + conn.execute(AddConstraint(fk.constraint)) + except sqlalchemy.exc.OperationalError: + print("... FAILED") + else: + print("... OK") + + +def _drop_or_create_indexes(connection, mode='drop'): + from sqlalchemy.schema import DropIndex, CreateIndex + for table in Base.metadata.tables.values(): + for index in table.indexes: + + try: + if mode=='drop': + print(str(DropIndex(index)), end="") + sys.stdout.flush() + index.drop(connection) + elif mode=='create': + print(str(CreateIndex(index)), end="") + sys.stdout.flush() + index.create(connection) + else: + raise ValueError("mode must be 'drop' or 'create'") + except (sqlalchemy.exc.OperationalError, sqlalchemy.exc.ProgrammingError): + print("... FAILED") + connection.rollback() + else: + print("... OK") + connection.commit() def _db_import_export(target_session, from_session, *sims): - external_id_to_internal_halo = {} - translated_halolink_ids = [] - - if len(sims)==0: - sims = [x.id for x in all_simulations(from_session)] - - for sim in sims: - ext_sim = get_simulation(sim, from_session) - sim = Simulation(ext_sim.basename) - target_session.add(sim) - logger.info("Transferring simulation %s", ext_sim) - - halos_this_ts = [] - for p_ext in ext_sim.properties: - dic = get_or_create_dictionary_item( - target_session, p_ext.name.text) - p = SimulationProperty(sim, dic, p_ext.data) - halos_this_ts.append(p) - - for tk_ext in ext_sim.trackers: - tk = TrackData(sim, tk_ext.halo_number) - tk.particles = tk_ext.particles - tk.use_iord = tk_ext.use_iord - halos_this_ts.append(tk) - - target_session.add_all(halos_this_ts) - - for ts_ext in ext_sim.timesteps: - logger.info("Transferring timestep %s",ts_ext) - ts = TimeStep(sim, ts_ext.extension) - ts.redshift = ts_ext.redshift - ts.time_gyr = ts_ext.time_gyr - ts.available = True - target_session.add(ts) - - halos_this_ts = [] - - logger.info("Transferring objects for %s", ts_ext) - for h_ext in ts_ext.objects: - h = SimulationObjectBase(ts, h_ext.halo_number, h_ext.finder_id, h_ext.finder_offset, h_ext.NDM, - h_ext.NStar, h_ext.NGas, h_ext.object_typecode) - h.external_id = h_ext.id - halos_this_ts.append(h) - - target_session.add_all(halos_this_ts) - target_session.commit() - - for h in halos_this_ts: - assert h.id is not None and h.id > 0 - external_id_to_internal_halo[h.external_id] = h - - properties_this_ts = [] - logger.info("Transferring object properties for %s", ts_ext) - for h_ext in ts_ext.objects: - h_new = external_id_to_internal_halo[h_ext.id] - for p_ext in h_ext.properties: - dic = get_or_create_dictionary_item( - target_session, p_ext.name.text) - dat = p_ext.data_raw - if dat is not None: - p = HaloProperty(h_new, dic, dat) - target_session.add(p) - - target_session.commit() - - for ts_ext in ext_sim.timesteps: - logger.info("Transferring halolinks for timestep %s", ts_ext) - sys.stdout.flush() - _translate_halolinks( - target_session, ts_ext.links_from, external_id_to_internal_halo, translated_halolink_ids) - _translate_halolinks( - target_session, ts_ext.links_to, external_id_to_internal_halo, translated_halolink_ids) - target_session.commit() - - logger.info("Done") + from sqlalchemy import delete + + target_connection = target_session.connection() + from_connection = from_session.connection() + + copy_classes = [Creator, Simulation, TimeStep, SimulationObjectBase, DictionaryItem, SimulationProperty, + HaloLink, HaloProperty] + + print("Dropping foreign key constraints...") + _drop_foreign_keys(target_session) + + print("Dropping indexes...") + _drop_or_create_indexes(target_connection, mode='drop') + + + print("Copying tables...") + try: + for target in copy_classes[::-1]: + target_connection.execute(delete(target)) + + for target in copy_classes: + _copy_table(from_connection, target_connection, target) + finally: + target_connection.rollback() + + print("Recreating indexes...") + _drop_or_create_indexes(target_connection, mode='create') + + print("Recreating foreign keys...") + _create_foreign_keys(target_session) + @@ -358,6 +455,12 @@ def _erase_run_content(run): core.get_default_session().delete(run) core.get_default_session().commit() +def _get_user_confirmation(): + try: + return input("Enter 'yes' to continue, or anything else to abort >").lower() == "yes" + except EOFError: + return False + def rem_run(id, confirm=True): run = core.get_default_session().query(Creator).filter_by(id=id).first() @@ -367,10 +470,7 @@ def rem_run(id, confirm=True): print("You want to delete everything created by the following run:") run.print_info() - if confirm: - print(""">>> type "yes" to continue""") - - if (not confirm) or input(":").lower() == "yes": + if (not confirm) or _get_user_confirmation(): _erase_run_content(run) print("OK") else: @@ -533,10 +633,9 @@ def get_argument_parser_and_subparsers(): """ subparse_import = subparse.add_parser("import", - help="Import one or more simulations from another sqlite file") - subparse_import.add_argument("file", type=str, help="The filename of the sqlite file from which to import") - subparse_import.add_argument("sims", nargs="*", type=str, - help="The name of the simulations to import (or import everything if none specified)") + help="Import from a different database (e.g. useful to load sqlite data onto a server).") + subparse_import.add_argument("file", type=str, help="The filename of the sqlite file, or a sqlalchemy URI, from which to import") + subparse_import.add_argument("--force", "-f", action="store_true", help="If this flag is present, no confirmation prompts will be issued") subparse_import.set_defaults(func=db_import) diff --git a/tests/test_argmax.py b/tests/test_argmax.py new file mode 100644 index 00000000..f0f80fdf --- /dev/null +++ b/tests/test_argmax.py @@ -0,0 +1,32 @@ +import tangos +from tangos import testing +from tangos.util import sql_argmax +def test_argmax(): + testing.init_blank_db_for_testing() + + engine = tangos.core.get_default_engine() + + # define a sqlalchemy table (non-ORM), with columns "id", "value" and "category" + from sqlalchemy import Table, Column, Integer, MetaData, insert + metadata = MetaData() + table = Table('test_table', metadata, + Column('id', Integer, primary_key=True), + Column('value', Integer), + Column('category', Integer)) + metadata.create_all(engine) + + # populate the table with some data + with engine.connect() as c: + c.execute(insert(table), + [ + {'value': 1, 'category': 1}, + {'value': 2, 'category': 1}, + {'value': 3, 'category': 1}, + {'value': 4, 'category': 2}, + {'value': 5, 'category': 2} + ]) + + sql_argmax.delete_non_maximal_rows(c, table, table.c.value, [table.c.category]) + query = table.select() + + assert c.execute(query).all() == [(3,3,1),(5,5,2)] \ No newline at end of file