Skip to content

Commit

Permalink
chore(deps): update minor updates (#20)
Browse files Browse the repository at this point in the history
* chore(deps): update minor updates

* ruff: format

* ruff: cleanup

---------

Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
Co-authored-by: Jakob van Santen <[email protected]>
  • Loading branch information
renovate[bot] and jvansanten authored Nov 19, 2024
1 parent 0fe6b9e commit 1b2cc3e
Show file tree
Hide file tree
Showing 7 changed files with 496 additions and 386 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ jobs:
# renovate: datasource=conda depName=conda-forge/python
python-version: "3.12.4"
# renovate: datasource=pypi depName=ruff
ruff-version: "0.4.10"
ruff-version: "0.7.4"
2 changes: 1 addition & 1 deletion extcats/CatalogQuery.py
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,7 @@ def test_queries(
for pp in tqdm.tqdm(points):
buff = qfunc(pp[0], pp[1], rs_arcsec, method, **qfunc_args)
if (
(type(buff) == tuple and buff == (None, None))
(isinstance(buff, tuple) and buff == (None, None))
or (buff is None)
or (not buff)
):
Expand Down
80 changes: 45 additions & 35 deletions notebooks/example_ingest_multiproc.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -30,77 +30,87 @@
"from healpy import ang2pix\n",
"\n",
"import importlib\n",
"\n",
"importlib.reload(CatalogPusher)\n",
"\n",
"# build the pusher object and point it to the raw files.\n",
"ps1p = CatalogPusher.CatalogPusher(\n",
" catalog_name = 'ps1_test', # short name of the catalog\n",
" data_source = '../testdata/PS1DR1_test/', # where to find the data (other options are possible)\n",
" file_type = '*.csv.gz' # filter files (there is col definition file in data_source)\n",
" )\n",
" catalog_name=\"ps1_test\", # short name of the catalog\n",
" data_source=\"../testdata/PS1DR1_test/\", # where to find the data (other options are possible)\n",
" file_type=\"*.csv.gz\", # filter files (there is col definition file in data_source)\n",
")\n",
"\n",
"# define the reader for the raw files (import column names from file.)\n",
"headfile = '../testdata/PS1DR1_test/column_headings.csv'\n",
"with open(headfile, 'r') as header:\n",
" catcols=[c.strip() for c in header.readline().split(',')]\n",
"headfile = \"../testdata/PS1DR1_test/column_headings.csv\"\n",
"with open(headfile, \"r\") as header:\n",
" catcols = [c.strip() for c in header.readline().split(\",\")]\n",
"\n",
"# skimm out some columns\n",
"bad = ['projectionID', 'skyCellID']\n",
"usecols = [c for c in catcols if (not c in bad) or ('gNpt' in c)]\n",
"bad = [\"projectionID\", \"skyCellID\"]\n",
"usecols = [c for c in catcols if (c not in bad) or (\"gNpt\" in c)]\n",
"\n",
"# specify some data types to save up on the storage\n",
"# See https://outerspace.stsci.edu/display/PANSTARRS/PS1+MeanObject+table+fields\n",
"types = {}\n",
"for c in usecols:\n",
" types[c] = np.float16\n",
" if c == 'objID':\n",
" if c == \"objID\":\n",
" types[c] = np.int32\n",
" if 'Flags' in c:\n",
" if \"Flags\" in c:\n",
" types[c] = np.int16\n",
" if ('ra' in c) or ('dec' in c):\n",
" if (\"ra\" in c) or (\"dec\" in c):\n",
" types[c] = np.float32\n",
"\n",
"ps1p.assign_file_reader(\n",
" reader_func = pd.read_csv, # callable to use to read the raw_files. \n",
" read_chunks = True, # weather or not the reader process each file into smaller chunks.\n",
" names=catcols, # All other arguments are passed directly to this function.\n",
" reader_func=pd.read_csv, # callable to use to read the raw_files.\n",
" read_chunks=True, # weather or not the reader process each file into smaller chunks.\n",
" names=catcols, # All other arguments are passed directly to this function.\n",
" usecols=usecols,\n",
" dtype = types,\n",
" na_values = -999,\n",
" dtype=types,\n",
" na_values=-999,\n",
" chunksize=50000,\n",
" engine='c')\n",
" engine=\"c\",\n",
")\n",
"\n",
"# define modifier. This time the healpix grid is finer (an orer 16 corresponds to 3\")\n",
"hp_nside16=2**16\n",
"hp_nside16 = 2**16\n",
"\n",
"\n",
"def ps1_modifier(srcdict):\n",
" srcdict['_id'] = srcdict.pop('objID')\n",
" srcdict['hpxid_16']=int(\n",
" ang2pix(hp_nside16, srcdict['raMean'], srcdict['decMean'], lonlat = True, nest = True))\n",
" srcdict[\"_id\"] = srcdict.pop(\"objID\")\n",
" srcdict[\"hpxid_16\"] = int(\n",
" ang2pix(\n",
" hp_nside16, srcdict[\"raMean\"], srcdict[\"decMean\"], lonlat=True, nest=True\n",
" )\n",
" )\n",
" return srcdict\n",
"\n",
"\n",
"ps1p.assign_dict_modifier(ps1_modifier)\n",
"\n",
"# wrap up the file pushing function so that we can \n",
"# wrap up the file pushing function so that we can\n",
"# use multiprocessing to speed up the catalog ingestion\n",
"\n",
"\n",
"def pushfiles(filerange):\n",
" # push stuff\n",
" ps1p.push_to_db(\n",
" coll_name = 'srcs',\n",
" index_on = ['hpxid_16'],\n",
" filerange = filerange,\n",
" overwrite_coll = False,\n",
" dry = False, \n",
" fillna_val = None)\n",
" coll_name=\"srcs\",\n",
" index_on=[\"hpxid_16\"],\n",
" filerange=filerange,\n",
" overwrite_coll=False,\n",
" dry=False,\n",
" fillna_val=None,\n",
" )\n",
" # add metadata to direct queries\n",
" ps1p.healpix_meta(\n",
" healpix_id_key = 'hpxid_16', \n",
" order = 16, is_indexed = True, nest = True)\n",
" ps1p.healpix_meta(healpix_id_key=\"hpxid_16\", order=16, is_indexed=True, nest=True)\n",
"\n",
"\n",
"# each job will run on a subgroup of all the files\n",
"file_groups = ps1p.file_groups(group_size=1)\n",
"with concurrent.futures.ProcessPoolExecutor(max_workers = 2) as executor:\n",
" executor.map(pushfiles, file_groups) \n",
"print (\"done! Enjoy your PS1_test database.\")"
"with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:\n",
" executor.map(pushfiles, file_groups)\n",
"print(\"done! Enjoy your PS1_test database.\")"
]
},
{
Expand Down
137 changes: 73 additions & 64 deletions notebooks/insert_allWISE.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
},
{
"cell_type": "code",
"execution_count": 1,
"execution_count": null,
"metadata": {},
"outputs": [
{
Expand Down Expand Up @@ -78,27 +78,26 @@
],
"source": [
"import numpy as np\n",
"import pandas as pd\n",
"from healpy import ang2pix\n",
"from extcats import CatalogPusher\n",
"\n",
"# build the pusher object and point it to the raw files.\n",
"wisep = CatalogPusher.CatalogPusher(\n",
" catalog_name = 'wise',\n",
" data_source = '../testdata/AllWISE/',\n",
" file_type = \".bz2\")\n",
" catalog_name=\"wise\", data_source=\"../testdata/AllWISE/\", file_type=\".bz2\"\n",
")\n",
"\n",
"\n",
"# read column names and types from schema file\n",
"schema_file = \"../testdata/AllWISE/wise-allwise-cat-schema.txt\"\n",
"names, types = [], {}\n",
"with open(schema_file) as schema:\n",
" for l in schema:\n",
" if \"#\" in l or (not l.strip()):\n",
" for line in schema:\n",
" if \"#\" in line or (not line.strip()):\n",
" continue\n",
" name, dtype = zip(\n",
" [p.strip() for p in l.strip().split(\" \") if not p in [\"\"]])\n",
" name, dtype = zip([p.strip() for p in line.strip().split(\" \") if p not in [\"\"]])\n",
" name, dtype = name[0], dtype[0]\n",
" #print (name, dtype)\n",
" # print (name, dtype)\n",
" names.append(name)\n",
" # convert the data type\n",
" if \"char\" in dtype:\n",
Expand All @@ -114,60 +113,65 @@
" elif dtype == \"int8\":\n",
" types[name] = np.int8\n",
" else:\n",
" print(\"unknown data type: %s\"%dtype)\n",
" print(\"unknown data type: %s\" % dtype)\n",
"\n",
"# select the columns you want to use.\n",
"use_cols = []\n",
"select = [\"Basic Position and Identification Information\", \n",
" \"Primary Photometric Information\", \n",
" \"Measurement Quality and Source Reliability Information\",\n",
" \"2MASS PSC Association Information\"]\n",
"select = [\n",
" \"Basic Position and Identification Information\",\n",
" \"Primary Photometric Information\",\n",
" \"Measurement Quality and Source Reliability Information\",\n",
" \"2MASS PSC Association Information\",\n",
"]\n",
"with open(schema_file) as schema:\n",
" blocks = schema.read().split(\"#\")\n",
" for block in blocks:\n",
" if any([k in block for k in select]):\n",
" for l in block.split(\"\\n\")[1:]:\n",
" if \"#\" in l or (not l.strip()):\n",
" for line in block.split(\"\\n\")[1:]:\n",
" if \"#\" in line or (not line.strip()):\n",
" continue\n",
" name, dtype = zip(\n",
" [p.strip() for p in l.strip().split(\" \") if not p in [\"\"]])\n",
" [p.strip() for p in line.strip().split(\" \") if p not in [\"\"]]\n",
" )\n",
" use_cols.append(name[0])\n",
"print(\"we will be using %d columns out of %d\"%(len(use_cols), len(names)))\n",
"print(\"we will be using %d columns out of %d\" % (len(use_cols), len(names)))\n",
"\n",
"# now assign the reader to the catalog pusher object\n",
"import pandas as pd\n",
"\n",
"wisep.assign_file_reader(\n",
" reader_func = pd.read_csv, \n",
" read_chunks = True,\n",
" names = names,\n",
" usecols = lambda x : x in use_cols,\n",
" #dtype = types, #this mess up with NaN values\n",
" chunksize=5000,\n",
" header=None,\n",
" engine='c',\n",
" sep='|',\n",
" na_values = 'nnnn')\n",
" reader_func=pd.read_csv,\n",
" read_chunks=True,\n",
" names=names,\n",
" usecols=lambda x: x in use_cols,\n",
" # dtype = types, #this mess up with NaN values\n",
" chunksize=5000,\n",
" header=None,\n",
" engine=\"c\",\n",
" sep=\"|\",\n",
" na_values=\"nnnn\",\n",
")\n",
"\n",
"\n",
"# define the dictionary modifier that will act on the single entries\n",
"def modifier(srcdict):\n",
" srcdict['hpxid_16'] = int(\n",
" ang2pix(2**16, srcdict['ra'], srcdict['dec'], lonlat = True, nest = True))\n",
" #srcdict['_id'] = srcdict.pop('source_id') doesn't work, seems it is not unique\n",
" srcdict[\"hpxid_16\"] = int(\n",
" ang2pix(2**16, srcdict[\"ra\"], srcdict[\"dec\"], lonlat=True, nest=True)\n",
" )\n",
" # srcdict['_id'] = srcdict.pop('source_id') doesn't work, seems it is not unique\n",
" return srcdict\n",
"\n",
"\n",
"wisep.assign_dict_modifier(modifier)\n",
"\n",
"\n",
"# finally push it in the databse\n",
"wisep.push_to_db(\n",
" coll_name = 'srcs', \n",
" index_on = \"hpxid_16\",\n",
" overwrite_coll = True, \n",
" append_to_coll = False)\n",
" coll_name=\"srcs\", index_on=\"hpxid_16\", overwrite_coll=True, append_to_coll=False\n",
")\n",
"\n",
"\n",
"# if needed print extensive info on database\n",
"#wisep.info()"
"# wisep.info()"
]
},
{
Expand Down Expand Up @@ -210,53 +214,57 @@
}
],
"source": [
"# now test the database for query performances. We use \n",
"# now test the database for query performances. We use\n",
"# a sample of randomly distributed points on a sphere\n",
"# as targets. \n",
"# as targets.\n",
"\n",
"# define the funtion to test coordinate based queries:\n",
"from healpy import ang2pix, get_all_neighbours\n",
"from healpy import get_all_neighbours\n",
"from astropy.table import Table\n",
"from astropy.coordinates import SkyCoord\n",
"\n",
"return_fields = ['designation', 'ra', 'dec']\n",
"return_fields = [\"designation\", \"ra\", \"dec\"]\n",
"project = {}\n",
"for field in return_fields: project[field] = 1\n",
"print (project)\n",
"for field in return_fields:\n",
" project[field] = 1\n",
"print(project)\n",
"\n",
"\n",
"hp_order, rs_arcsec = 16, 30.0\n",
"\n",
"\n",
"hp_order, rs_arcsec = 16, 30.\n",
"def test_query(ra, dec, coll):\n",
" \"\"\"query collection for points within rs of target ra, dec.\n",
" The results as returned as an astropy Table.\"\"\"\n",
" \n",
" # find the index of the target pixel and its neighbours \n",
" target_pix = int( ang2pix(2**hp_order, ra, dec, nest = True, lonlat = True) )\n",
" neighbs = get_all_neighbours(2**hp_order, ra, dec, nest = True, lonlat = True)\n",
"\n",
" # find the index of the target pixel and its neighbours\n",
" target_pix = int(ang2pix(2**hp_order, ra, dec, nest=True, lonlat=True))\n",
" neighbs = get_all_neighbours(2**hp_order, ra, dec, nest=True, lonlat=True)\n",
"\n",
" # remove non-existing neigbours (in case of E/W/N/S) and add center pixel\n",
" pix_group = [int(pix_id) for pix_id in neighbs if pix_id != -1] + [target_pix]\n",
" \n",
"\n",
" # query the database for sources in these pixels\n",
" qfilter = { 'hpxid_%d'%hp_order: { '$in': pix_group } }\n",
" qfilter = {\"hpxid_%d\" % hp_order: {\"$in\": pix_group}}\n",
" qresults = [o for o in coll.find(qfilter)]\n",
" if len(qresults)==0:\n",
" if len(qresults) == 0:\n",
" return None\n",
" \n",
"\n",
" # then use astropy to find the closest match\n",
" tab = Table(qresults)\n",
" target = SkyCoord(ra, dec, unit = 'deg')\n",
" matches_pos = SkyCoord(tab['ra'], tab['dec'], unit = 'deg')\n",
" target = SkyCoord(ra, dec, unit=\"deg\")\n",
" matches_pos = SkyCoord(tab[\"ra\"], tab[\"dec\"], unit=\"deg\")\n",
" d2t = target.separation(matches_pos).arcsecond\n",
" match_id = np.argmin(d2t)\n",
"\n",
" # if it's too far away don't use it\n",
" if d2t[match_id]>rs_arcsec:\n",
" if d2t[match_id] > rs_arcsec:\n",
" return None\n",
" return tab[match_id]\n",
"\n",
"\n",
"# run the test\n",
"wisep.run_test(test_query, npoints = 10000)\n"
"wisep.run_test(test_query, npoints=10000)"
]
},
{
Expand All @@ -274,7 +282,7 @@
},
{
"cell_type": "code",
"execution_count": 3,
"execution_count": null,
"metadata": {},
"outputs": [
{
Expand All @@ -290,13 +298,14 @@
}
],
"source": [
"mqp.healpix_meta(healpix_id_key = 'hpxid_16', order = 16, is_indexed = True, nest = True)\n",
"mqp.coord_meta(ra = 'ra', dec = 'dec')\n",
"mqp.science_meta(\n",
" contact = 'C. Norris', \n",
" email = '[email protected]', \n",
" description = 'allWISE infrared catalog',\n",
" reference = 'http://wise2.ipac.caltech.edu/docs/release/allwise/')"
"wisep.healpix_meta(healpix_id_key=\"hpxid_16\", order=16, is_indexed=True, nest=True)\n",
"wisep.coord_meta(ra=\"ra\", dec=\"dec\")\n",
"wisep.science_meta(\n",
" contact=\"C. Norris\",\n",
" email=\"[email protected]\",\n",
" description=\"allWISE infrared catalog\",\n",
" reference=\"http://wise2.ipac.caltech.edu/docs/release/allwise/\",\n",
")"
]
},
{
Expand Down
Loading

0 comments on commit 1b2cc3e

Please sign in to comment.