diff --git a/apps/beeswax/src/beeswax/api.py b/apps/beeswax/src/beeswax/api.py index c77184625ef..f437600f83a 100644 --- a/apps/beeswax/src/beeswax/api.py +++ b/apps/beeswax/src/beeswax/api.py @@ -15,17 +15,36 @@ # See the License for the specific language governing permissions and # limitations under the License. -from builtins import zip -import logging -import json import re import sys +import json +import logging +from builtins import zip -from django.urls import reverse from django.http import Http404 +from django.urls import reverse +from django.utils.translation import gettext as _ from django.views.decorators.http import require_POST - from thrift.transport.TTransport import TTransportException + +import beeswax.models +from beeswax.conf import USE_GET_LOG_API +from beeswax.data_export import upload +from beeswax.design import HQLdesign +from beeswax.forms import QueryForm +from beeswax.models import QueryHistory, Session +from beeswax.server import dbms +from beeswax.server.dbms import QueryServerException, QueryServerTimeoutException, SubQueryTable, expand_exception, get_query_server_config +from beeswax.views import ( + _get_query_handle_and_state, + authorized_get_design, + authorized_get_query_history, + make_parameterization_form, + massage_columns_for_json, + parse_out_jobs, + safe_get_design, + save_design, +) from desktop.auth.backend import is_admin from desktop.context_processors import get_app_name from desktop.lib.django_util import JsonResponse @@ -34,29 +53,11 @@ from desktop.lib.i18n import force_unicode from desktop.lib.parameterization import substitute_variables from metastore import parser -from notebook.models import escape_rows, MockedDjangoRequest, make_notebook from metastore.conf import FORCE_HS2_METADATA from metastore.views import _get_db, _get_servername +from notebook.models import MockedDjangoRequest, escape_rows, make_notebook from useradmin.models import User -import beeswax.models -from beeswax.data_export import upload -from beeswax.design import HQLdesign -from beeswax.conf import USE_GET_LOG_API -from beeswax.forms import QueryForm -from beeswax.models import Session, QueryHistory -from beeswax.server import dbms -from beeswax.server.dbms import expand_exception, get_query_server_config, QueryServerException, QueryServerTimeoutException, \ - SubQueryTable -from beeswax.views import authorized_get_design, authorized_get_query_history, make_parameterization_form, \ - safe_get_design, save_design, massage_columns_for_json, _get_query_handle_and_state, parse_out_jobs - -if sys.version_info[0] > 2: - from django.utils.translation import gettext as _ -else: - from django.utils.translation import ugettext as _ - - LOG = logging.getLogger() @@ -87,7 +88,7 @@ def decorator(request, *args, **kwargs): } if re.search('database is locked|Invalid query handle|not JSON serializable', message, re.IGNORECASE): - response['status'] = 2 # Frontend will not display this type of error + response['status'] = 2 # Frontend will not display this type of error LOG.warning('error_handler silencing the exception: %s' % e) return JsonResponse(response) return decorator @@ -131,17 +132,17 @@ def _autocomplete(db, database=None, table=None, column=None, nested=None, query cols_extended = massage_columns_for_json(table.cols) - if table.is_impala_only: # Expand Kudu table information + if table.is_impala_only: # Expand Kudu table information if db.client.query_server['dialect'] != 'impala': query_server = get_query_server_config('impala', connector=cluster) db = dbms.get(db.client.user, query_server, cluster=cluster) - col_options = db.get_table_describe(database, table.name) # Expand columns information + col_options = db.get_table_describe(database, table.name) # Expand columns information extra_col_options = dict([(col[0], dict(list(zip(col_options.cols(), col)))) for col in col_options.rows()]) for col_props in cols_extended: col_props.update(extra_col_options.get(col_props['name'], {})) - primary_keys = [col['name'] for col in extra_col_options.values() if col.get('primary_key') == 'true'] # Until IMPALA-8291 + primary_keys = [col['name'] for col in extra_col_options.values() if col.get('primary_key') == 'true'] # Until IMPALA-8291 foreign_keys = [] # Not supported yet else: primary_keys = [pk.name for pk in table.primary_keys] @@ -163,7 +164,7 @@ def _autocomplete(db, database=None, table=None, column=None, nested=None, query response = parse_tree # If column or nested type is scalar/primitive, add sample of values if parser.is_scalar_type(parse_tree['type']): - sample = _get_sample_data(db, database, table, column, cluster=cluster) + sample = _get_sample_data(db, database, table, column, nested, cluster=cluster) if 'rows' in sample: response['sample'] = sample['rows'] else: @@ -279,7 +280,7 @@ def watch_query_refresh_json(request, id): query_history = authorized_get_query_history(request, id, must_exist=True) db = dbms.get(request.user, query_history.get_query_server_config()) - if not request.POST.get('next'): # We need this as multi query would fail as current query is closed + if not request.POST.get('next'): # We need this as multi query would fail as current query is closed handle, state = _get_query_handle_and_state(query_history) query_history.save_state(state) @@ -713,18 +714,18 @@ def clear_history(request): @error_handler -def get_sample_data(request, database, table, column=None): +def get_sample_data(request, database, table, column=None, nested=None): app_name = get_app_name(request) cluster = json.loads(request.POST.get('cluster', '{}')) query_server = get_query_server_config(app_name, connector=cluster) db = dbms.get(request.user, query_server) - response = _get_sample_data(db, database, table, column, cluster=cluster) + response = _get_sample_data(db, database, table, column, nested, cluster=cluster) return JsonResponse(response) -def _get_sample_data(db, database, table, column, is_async=False, cluster=None, operation=None): +def _get_sample_data(db, database, table, column, nested, is_async=False, cluster=None, operation=None): if operation == 'hello': table_obj = None else: @@ -738,7 +739,7 @@ def _get_sample_data(db, database, table, column, is_async=False, cluster=None, response['message'] = _('Not getting sample data as this is a view which can be expensive when run.') return response - sample_data = db.get_sample(database, table_obj, column, generate_sql_only=is_async, operation=operation) + sample_data = db.get_sample(database, table_obj, column, nested, generate_sql_only=is_async, operation=operation) response = {'status': -1} if sample_data: @@ -1010,7 +1011,7 @@ def get_query_form(request): query_form = QueryForm() query_form.bind(request.POST) - query_form.query.fields['database'].choices = databases # Could not do it in the form + query_form.query.fields['database'].choices = databases # Could not do it in the form return query_form @@ -1018,6 +1019,8 @@ def get_query_form(request): """ Utils """ + + def _extract_nested_type(parse_tree, nested_path): nested_tokens = nested_path.strip('/').split('/') diff --git a/apps/beeswax/src/beeswax/server/dbms.py b/apps/beeswax/src/beeswax/server/dbms.py index df279adcd96..5a19443822e 100644 --- a/apps/beeswax/src/beeswax/server/dbms.py +++ b/apps/beeswax/src/beeswax/server/dbms.py @@ -15,19 +15,58 @@ # See the License for the specific language governing permissions and # limitations under the License. -from builtins import object -import logging import re import sys -import threading -import time import json +import time +import logging +import threading +from builtins import object from django.core.cache import caches from django.urls import reverse +from django.utils.encoding import force_str +from django.utils.translation import gettext as _ from kazoo.client import KazooClient -from beeswax.models import Compute +from azure.abfs import abfspath +from beeswax.common import apply_natural_sort, is_compute +from beeswax.conf import ( + APPLY_NATURAL_SORT_MAX, + AUTH_PASSWORD, + AUTH_USERNAME, + CACHE_TIMEOUT, + CLOSE_SESSIONS, + HIVE_DISCOVERY_HIVESERVER2_ZNODE, + HIVE_DISCOVERY_HS2, + HIVE_DISCOVERY_LLAP, + HIVE_DISCOVERY_LLAP_HA, + HIVE_DISCOVERY_LLAP_ZNODE, + HIVE_HTTP_THRIFT_PORT, + HIVE_METASTORE_HOST, + HIVE_METASTORE_PORT, + HIVE_SERVER_HOST, + HIVE_SERVER_PORT, + LIST_PARTITIONS_LIMIT, + LLAP_SERVER_HOST, + LLAP_SERVER_PORT, + LLAP_SERVER_THRIFT_PORT, + MAX_NUMBER_OF_SESSIONS, + QUERY_PARTITIONS_LIMIT, + SERVER_CONN_TIMEOUT, + USE_SASL as HIVE_USE_SASL, + ZOOKEEPER_CONN_TIMEOUT, + has_session_pool, +) +from beeswax.design import hql_query +from beeswax.hive_site import ( + get_hiveserver2_kerberos_principal, + hiveserver2_impersonation_enabled, + hiveserver2_thrift_http_path, + hiveserver2_transport_mode, + hiveserver2_use_ssl, +) +from beeswax.models import QUERY_TYPES, Compute, QueryHistory from desktop.conf import CLUSTER_ID, has_connectors from desktop.lib.django_util import format_preserving_redirect from desktop.lib.exceptions_renderable import PopupException @@ -38,30 +77,6 @@ from indexer.file_format import HiveFormat from libzookeeper import conf as libzookeeper_conf -from azure.abfs import abfspath -from beeswax.conf import HIVE_SERVER_HOST, HIVE_SERVER_PORT, HIVE_SERVER_HOST, HIVE_HTTP_THRIFT_PORT, HIVE_METASTORE_HOST, \ - HIVE_METASTORE_PORT, LIST_PARTITIONS_LIMIT, SERVER_CONN_TIMEOUT, ZOOKEEPER_CONN_TIMEOUT, \ - AUTH_USERNAME, AUTH_PASSWORD, APPLY_NATURAL_SORT_MAX, QUERY_PARTITIONS_LIMIT, HIVE_DISCOVERY_HIVESERVER2_ZNODE, \ - HIVE_DISCOVERY_HS2, HIVE_DISCOVERY_LLAP, HIVE_DISCOVERY_LLAP_HA, HIVE_DISCOVERY_LLAP_ZNODE, CACHE_TIMEOUT, \ - LLAP_SERVER_HOST, LLAP_SERVER_PORT, LLAP_SERVER_THRIFT_PORT, USE_SASL as HIVE_USE_SASL, CLOSE_SESSIONS, has_session_pool, \ - MAX_NUMBER_OF_SESSIONS -from beeswax.common import apply_natural_sort, is_compute -from beeswax.design import hql_query -from beeswax.hive_site import hiveserver2_use_ssl, hiveserver2_impersonation_enabled, get_hiveserver2_kerberos_principal, \ - hiveserver2_transport_mode, hiveserver2_thrift_http_path -from beeswax.models import QueryHistory, QUERY_TYPES - - -if sys.version_info[0] > 2: - from django.utils.encoding import force_str -else: - from django.utils.encoding import force_unicode as force_str - -if sys.version_info[0] > 2: - from django.utils.translation import gettext as _ -else: - from django.utils.translation import ugettext as _ - LOG = logging.getLogger() @@ -70,6 +85,7 @@ DBMS_CACHE_LOCK = threading.Lock() cache = caches[CACHES_HIVE_DISCOVERY_KEY] + # Using file cache to make sure eventlet threads are uniform, this cache is persistent on startup # So we clear it to make sure the server resets hiveserver2 host. def reset_ha(): @@ -316,7 +332,7 @@ def get_query_server_config_via_connector(connector): connector_name = connector['type'] compute_name = compute['name'] if compute.get('id'): - compute = Compute.objects.get(id=compute['id']).to_dict() #Reload the full compute from db + compute = Compute.objects.get(id=compute['id']).to_dict() # Reload the full compute from db LOG.debug("Query cluster connector %s compute %s" % (connector_name, compute)) if compute['options'].get('has_ssh') == 'true': @@ -384,7 +400,6 @@ def __init__(self, client, server_type): self.server_name = self.client.query_server.get('dialect') if self.client.query_server['server_name'].isdigit() \ else self.client.query_server['server_name'] - @classmethod def to_matching_wildcard(cls, identifier=None): cleaned = "*" @@ -392,7 +407,6 @@ def to_matching_wildcard(cls, identifier=None): cleaned = "*%s*" % identifier.strip().strip("*") return cleaned - def get_databases(self, database_names='*'): if database_names != '*': database_names = self.to_matching_wildcard(database_names) @@ -404,11 +418,9 @@ def get_databases(self, database_names='*'): return databases - def get_database(self, database): return self.client.get_database(database) - def alter_database(self, database, properties): hql = 'ALTER database `%s` SET DBPROPERTIES (' % database hql += ', '.join(["'%s'='%s'" % (k, v) for k, v in list(properties.items())]) @@ -426,7 +438,6 @@ def alter_database(self, database, properties): return self.client.get_database(database) - def get_tables_meta(self, database='default', table_names='*', table_types=None): database = database.lower() # Impala is case sensitive @@ -444,7 +455,6 @@ def get_tables_meta(self, database='default', table_names='*', table_types=None) tables = apply_natural_sort(tables, key='name') return tables - def get_tables(self, database='default', table_names='*', table_types=None): database = database.lower() # Impala is case sensitive @@ -459,7 +469,6 @@ def get_tables(self, database='default', table_names='*', table_types=None): tables = apply_natural_sort(tables) return tables - def _get_tables_via_sparksql(self, database, table_names='*'): hql = "SHOW TABLES IN %s" % database if table_names != '*': @@ -486,7 +495,6 @@ def _get_tables_via_sparksql(self, database, table_names='*'): else: return [] - def get_table(self, database, table_name): try: return self.client.get_table(database, table_name) @@ -505,7 +513,6 @@ def get_table(self, database, table_name): else: raise e - def alter_table(self, database, table_name, new_table_name=None, comment=None, tblproperties=None): table_obj = self.get_table(database, table_name) if table_obj is None: @@ -536,7 +543,6 @@ def alter_table(self, database, table_name, new_table_name=None, comment=None, t return self.client.get_table(database, table_name) - def get_column(self, database, table_name, column_name): table = self.get_table(database, table_name) for col in table.cols: @@ -544,7 +550,6 @@ def get_column(self, database, table_name, column_name): return col return None - def alter_column(self, database, table_name, column_name, new_column_name, column_type, comment=None, partition_spec=None, cascade=False): hql = 'ALTER TABLE `%s`.`%s`' % (database, table_name) @@ -571,27 +576,23 @@ def alter_column(self, database, table_name, column_name, new_column_name, colum return self.get_column(database, table_name, new_column_name) - def execute_query(self, query, design): return self.execute_and_watch(query, design=design) - def select_star_from(self, database, table, limit=1000): if table.partition_keys: # Filter on max number of partitions for partitioned tables - hql = self._get_sample_partition_query(database, table, limit=limit) # Currently need a limit + hql = self._get_sample_partition_query(database, table, limit=limit) # Currently need a limit else: hql = "SELECT * FROM `%s`.`%s` LIMIT %d;" % (database, table.name, limit) return self.execute_statement(hql) - def get_select_star_query(self, database, table, limit=1000): if table.partition_keys: # Filter on max number of partitions for partitioned tables - hql = self._get_sample_partition_query(database, table, limit=limit) # Currently need a limit + hql = self._get_sample_partition_query(database, table, limit=limit) # Currently need a limit else: hql = "SELECT * FROM `%s`.`%s` LIMIT %d;" % (database, table.name, limit) return hql - def execute_statement(self, hql): if self.server_name.startswith('impala'): query = hql_query(hql, QUERY_TYPES[1]) @@ -599,7 +600,6 @@ def execute_statement(self, hql): query = hql_query(hql, QUERY_TYPES[0]) return self.execute_and_watch(query) - def fetch(self, query_handle, start_over=False, rows=None): no_start_over_support = [ config_variable @@ -611,15 +611,12 @@ def fetch(self, query_handle, start_over=False, rows=None): return self.client.fetch(query_handle, start_over, rows) - def close_operation(self, query_handle): return self.client.close_operation(query_handle) - def open_session(self, user): return self.client.open_session(user) - def close_session(self, session): resp = self.client.close_session(session) @@ -633,14 +630,12 @@ def close_session(self, session): return session - def cancel_operation(self, query_handle): resp = self.client.cancel_operation(query_handle) if self.client.query_server.get('dialect') == 'impala': resp = self.client.close_operation(query_handle) return resp - def get_sample(self, database, table, column=None, nested=None, limit=100, generate_sql_only=False, operation=None): result = None hql = None @@ -666,15 +661,15 @@ def get_sample(self, database, table, column=None, nested=None, limit=100, gener else: hql = "SELECT * FROM `%s`.`%s` LIMIT %s;" % (database, table.name, limit) else: + select_clause, from_clause = self.get_nested_select(database, table.name, column, nested) if operation == 'distinct': - hql = "SELECT DISTINCT %s FROM `%s`.`%s` LIMIT %s;" % (column, database, table.name, limit) + hql = 'SELECT DISTINCT %s FROM %s LIMIT %s;' % (select_clause, from_clause, limit) elif operation == 'max': - hql = "SELECT max(%s) FROM `%s`.`%s`;" % (column, database, table.name) + hql = 'SELECT max(%s) FROM %s;' % (select_clause, from_clause) elif operation == 'min': - hql = "SELECT min(%s) FROM `%s`.`%s`;" % (column, database, table.name) + hql = 'SELECT min(%s) FROM %s;' % (select_clause, from_clause) else: - hql = "SELECT %s FROM `%s`.`%s` LIMIT %s;" % (column, database, table.name, limit) - # TODO: Add nested select support for HS2 + hql = 'SELECT %s FROM %s LIMIT %s;' % (select_clause, from_clause, limit) if hql: if generate_sql_only: @@ -689,6 +684,31 @@ def get_sample(self, database, table, column=None, nested=None, limit=100, gener return result + def get_nested_select(self, database, table, column, nested): + """ + Given a column or nested type, return the corresponding SELECT and FROM clauses in Hive's nested-type syntax + """ + select_clause = column + from_clause = f"{database}.{table}" + + if nested: + nested_tokens = nested.strip('/').split('/') + for i, token in enumerate(nested_tokens): + if token == 'item': + from_clause += f" LATERAL VIEW explode({select_clause}) temp_table AS temp_item{i}" + select_clause = f"temp_item{i}" + elif token == 'key': + from_clause += f" LATERAL VIEW explode({select_clause}) temp_table AS temp_key{i}, temp_value{i}" + select_clause = f"temp_key{i}" + elif token == 'value': + from_clause += f" LATERAL VIEW explode({select_clause}) temp_table AS temp_key{i}, temp_value{i}" + select_clause = f"temp_value{i}" + else: + select_clause += f".{token}" + else: + select_clause = column + + return select_clause, from_clause def _get_sample_partition_query(self, database, table, column='*', limit=100, operation=None): max_parts = QUERY_PARTITIONS_LIMIT.get() @@ -714,7 +734,6 @@ def _get_sample_partition_query(self, database, table, column='*', limit=100, op return prefix + " FROM `%(database)s`.`%(table)s` %(partition_clause)s LIMIT %(limit)s" % \ {'database': database, 'table': table.name, 'partition_clause': partition_clause, 'limit': limit} - def analyze_table(self, database, table): if self.server_name.startswith('impala'): hql = 'COMPUTE STATS `%(database)s`.`%(table)s`' % {'database': database, 'table': table} @@ -730,7 +749,6 @@ def analyze_table(self, database, table): return self.execute_statement(hql) - def analyze_table_columns(self, database, table): if self.server_name.startswith('impala'): hql = 'COMPUTE STATS `%(database)s`.`%(table)s`' % {'database': database, 'table': table} @@ -743,7 +761,6 @@ def analyze_table_columns(self, database, table): return self.execute_statement(hql) - def get_table_stats(self, database, table): stats = [] @@ -763,7 +780,6 @@ def get_table_stats(self, database, table): return stats - def get_table_columns_stats(self, database, table, column): if self.server_name.startswith('impala'): hql = 'SHOW COLUMN STATS `%(database)s`.`%(table)s`' % {'database': database, 'table': table} @@ -779,7 +795,7 @@ def get_table_columns_stats(self, database, table, column): data = list(result.rows()) if self.server_name.startswith('impala'): - if column == -1: # All the columns + if column == -1: # All the columns return [self._extract_impala_column(col) for col in data] else: data = [col for col in data if col[0] == column][0] @@ -823,7 +839,6 @@ def get_table_properties(self, database, table, property_name=None): self.close(handle) return result - def get_table_describe(self, database, table): hql = 'DESCRIBE `%s`.`%s`' % (database, table) @@ -835,7 +850,6 @@ def get_table_describe(self, database, table): self.close(handle) return result - def get_top_terms(self, database, table, column, limit=30, prefix=None): limit = min(limit, 100) prefix_match = '' @@ -852,7 +866,7 @@ def get_top_terms(self, database, table, column, limit=30, prefix=None): } query = hql_query(hql) - handle = self.execute_and_wait(query, timeout_sec=60.0) # Hive is very slow + handle = self.execute_and_wait(query, timeout_sec=60.0) # Hive is very slow if handle: result = self.fetch(handle, rows=limit) @@ -861,7 +875,6 @@ def get_top_terms(self, database, table, column, limit=30, prefix=None): else: return [] - def drop_table(self, database, table): if table.is_view: hql = "DROP VIEW `%s`.`%s`" % (database, table.name,) @@ -870,11 +883,10 @@ def drop_table(self, database, table): return self.execute_statement(hql) - def load_data(self, database, table, form_data, design, generate_ddl_only=False): hql = "LOAD DATA INPATH" source_path = "%(path)s" % form_data - if source_path.lower().startswith("abfs"): #this is to check if its using an ABFS path + if source_path.lower().startswith("abfs"): # this is to check if its using an ABFS path source_path = abfspath(source_path) hql += " '%s'" % source_path if form_data['overwrite']: @@ -896,7 +908,6 @@ def load_data(self, database, table, form_data, design, generate_ddl_only=False) return self.execute_query(query, design) - def drop_tables(self, database, tables, design, skip_trash=False, generate_ddl_only=False): hql = [] @@ -918,11 +929,9 @@ def drop_tables(self, database, tables, design, skip_trash=False, generate_ddl_o return self.execute_query(query, design) - def drop_database(self, database): return self.execute_statement("DROP DATABASE `%s`" % database) - def drop_databases(self, databases, design, generate_ddl_only=False): hql = [] @@ -953,7 +962,6 @@ def insert_query_into_directory(self, query_history, target_dir): hql = "INSERT OVERWRITE DIRECTORY '%s' %s" % (target_dir, query) return self.execute_statement(hql) - def create_table_as_a_select(self, request, query_history, target_database, target_table, result_meta): design = query_history.design.get_design() database = design.query['database'] @@ -984,7 +992,7 @@ def create_table_as_a_select(self, request, query_history, target_database, targ if not delim.isdigit(): delim = str(ord(delim)) - hql = ''' + hql = r''' CREATE TABLE `%s` ( %s ) @@ -1016,24 +1024,19 @@ def create_table_as_a_select(self, request, query_history, target_database, targ return query_history - def use(self, database, session=None): query = hql_query('USE `%s`' % database) return self.client.use(query, session=session) - def get_log(self, query_handle, start_over=True): return self.client.get_log(query_handle, start_over) - def get_state(self, handle): return self.client.get_state(handle) - def get_operation_status(self, handle): return self.client.get_operation_status(handle) - def execute_and_wait(self, query, timeout_sec=30.0, sleep_interval=0.5): """ Run query and check status until it finishes or timeouts. @@ -1064,7 +1067,6 @@ def execute_and_wait(self, query, timeout_sec=30.0, sleep_interval=0.5): raise QueryServerTimeoutException(message=msg) - def execute_next_statement(self, query_history, hql_query): if query_history.is_success() or query_history.is_expired(): # We need to go to the next statement only if the previous one passed @@ -1084,7 +1086,6 @@ def execute_next_statement(self, query_history, hql_query): return self.execute_and_watch(query, query_history=query_history) - def execute_and_watch(self, query, design=None, query_history=None): """ Run query and return a QueryHistory object in order to see its progress on a Web page. @@ -1140,24 +1141,20 @@ def execute_and_watch(self, query, design=None, query_history=None): return query_history - def get_results_metadata(self, handle): return self.client.get_results_metadata(handle) - def close(self, handle): return self.client.close(handle) - def get_partitions(self, db_name, table, partition_spec=None, max_parts=None, reverse_sort=True): if max_parts is None or max_parts > LIST_PARTITIONS_LIMIT.get(): max_parts = LIST_PARTITIONS_LIMIT.get() return self.client.get_partitions(db_name, table.name, partition_spec, max_parts=max_parts, reverse_sort=reverse_sort) - def get_partition(self, db_name, table_name, partition_spec, generate_ddl_only=False): - if partition_spec and self.server_name.startswith('impala'): # partition_spec not supported + if partition_spec and self.server_name.startswith('impala'): # partition_spec not supported partition_query = " AND ".join(partition_spec.split(',')) else: table = self.get_table(db_name, table_name) @@ -1176,11 +1173,9 @@ def get_partition(self, db_name, table_name, partition_spec, generate_ddl_only=F else: return self.execute_statement(hql) - def describe_partition(self, db_name, table_name, partition_spec): return self.client.get_table(db_name, table_name, partition_spec=partition_spec) - def drop_partitions(self, db_name, table_name, partition_specs, design=None, generate_ddl_only=False): hql = [] @@ -1198,7 +1193,6 @@ def drop_partitions(self, db_name, table_name, partition_specs, design=None, gen return self.execute_query(query, design) - def get_indexes(self, db_name, table_name): hql = 'SHOW FORMATTED INDEXES ON `%(table)s` IN `%(database)s`' % {'table': table_name, 'database': db_name} @@ -1211,11 +1205,9 @@ def get_indexes(self, db_name, table_name): return result - def get_configuration(self): return self.client.get_configuration() - def get_functions(self, prefix=None, database=None): ''' Not using self.client.get_functions() as pretty limited. More comments there. @@ -1242,7 +1234,6 @@ def get_functions(self, prefix=None, database=None): return rows - def get_function(self, name): hql = 'DESCRIBE FUNCTION EXTENDED `%(name)s`' % { 'name': name, @@ -1257,7 +1248,6 @@ def get_function(self, name): return rows - def get_query_metadata(self, query): hql = 'SELECT * FROM ( %(query)s ) t LIMIT 0' % {'query': query.strip(';')} @@ -1270,11 +1260,9 @@ def get_query_metadata(self, query): return result - def explain(self, query): return self.client.explain(query) - def get_primary_keys(self, database_name, table_name, catalog_name=None): return self.client.get_primary_keys( @@ -1283,7 +1271,6 @@ def get_primary_keys(self, database_name, table_name, catalog_name=None): catalog_name=catalog_name ) - def get_foreign_keys(self, parent_catalog_name=None, parent_database_name=None, parent_table_name=None, foreign_catalog_name=None, foreign_database_name=None, foreign_table_name=None): @@ -1296,11 +1283,9 @@ def get_foreign_keys(self, parent_catalog_name=None, parent_database_name=None, foreign_table_name=foreign_table_name ) - def get_status(self): return self.client.getStatus() - def get_default_configuration(self, include_hadoop): return self.client.get_default_configuration(include_hadoop) @@ -1331,7 +1316,7 @@ def __init__(self, db, query): self.name = 'Test' cols = db.get_query_metadata(query).data_table.cols() for col in cols: - col.name = re.sub('^t\.', '', col.name) + col.name = re.sub(r'^t\.', '', col.name) col.type = HiveFormat.FIELD_TYPE_TRANSLATE.get(col.type, 'string') self.cols = cols self.hdfs_link = None diff --git a/apps/beeswax/src/beeswax/server/dbms_tests.py b/apps/beeswax/src/beeswax/server/dbms_tests.py index 7911e774be5..973adfd7e41 100644 --- a/apps/beeswax/src/beeswax/server/dbms_tests.py +++ b/apps/beeswax/src/beeswax/server/dbms_tests.py @@ -16,19 +16,17 @@ # See the License for the specific language governing permissions and # limitations under the License. +import sys import logging +from unittest.mock import Mock, patch + import pytest -import sys +from django.core.cache import caches from django.test import TestCase -from beeswax.server.dbms import get_query_server_config + +from beeswax.server.dbms import HiveServer2Dbms, get_query_server_config from desktop.lib.exceptions_renderable import PopupException from desktop.settings import CACHES_HIVE_DISCOVERY_KEY -from django.core.cache import caches - -if sys.version_info[0] > 2: - from unittest.mock import patch, Mock -else: - from mock import patch, Mock LOG = logging.getLogger() cache = caches[CACHES_HIVE_DISCOVERY_KEY] @@ -173,3 +171,38 @@ def test_get_hs2_discovery(self): # TODO: all the combinations in new test methods, e.g.: # HIVE_DISCOVERY_LLAP_HA.get() --> True # ... + + +class TestHiveServer2Dbms(): + + def test_get_nested_select_no_nested(self): + select_clause, from_clause = HiveServer2Dbms.get_nested_select(self, "db", "tbl", "col", None) + assert select_clause == "col" + assert from_clause == "db.tbl" + + def test_get_nested_select_with_array_item(self): + select_clause, from_clause = HiveServer2Dbms.get_nested_select(self, "db", "tbl", "col", "item") + assert select_clause == "temp_item0" + assert from_clause == "db.tbl LATERAL VIEW explode(col) temp_table AS temp_item0" + + def test_get_nested_select_with_map_key(self): + select_clause, from_clause = HiveServer2Dbms.get_nested_select(self, "db", "tbl", "col", "key") + assert select_clause == "temp_key0" + assert from_clause == "db.tbl LATERAL VIEW explode(col) temp_table AS temp_key0, temp_value0" + + def test_get_nested_select_with_map_value(self): + select_clause, from_clause = HiveServer2Dbms.get_nested_select(self, "db", "tbl", "col", "value") + assert select_clause == "temp_value0" + assert from_clause == "db.tbl LATERAL VIEW explode(col) temp_table AS temp_key0, temp_value0" + + def test_get_nested_select_with_nested_path(self): + select_clause, from_clause = HiveServer2Dbms.get_nested_select(self, "db", "tbl", "col", "item/key/value") + assert select_clause == "temp_value2" + assert from_clause == ("db.tbl LATERAL VIEW explode(col) temp_table AS temp_item0 " + "LATERAL VIEW explode(temp_item0) temp_table AS temp_key1, temp_value1 " + "LATERAL VIEW explode(temp_key1) temp_table AS temp_key2, temp_value2") + + def test_get_nested_select_with_struct_path(self): + select_clause, from_clause = HiveServer2Dbms.get_nested_select(self, "db", "tbl", "col", "item/field1") + assert select_clause == "temp_item0.field1" + assert from_clause == "db.tbl LATERAL VIEW explode(col) temp_table AS temp_item0" diff --git a/desktop/core/src/desktop/api_public.py b/desktop/core/src/desktop/api_public.py index d579e3c299d..5280fcab89b 100644 --- a/desktop/core/src/desktop/api_public.py +++ b/desktop/core/src/desktop/api_public.py @@ -167,12 +167,12 @@ def get_logs(request): @api_view(["POST"]) -def get_sample_data(request, server=None, database=None, table=None, column=None): +def get_sample_data(request, server=None, database=None, table=None, column=None, nested=None): django_request = get_django_request(request) _patch_operation_id_request(django_request) - return notebook_api.get_sample_data(django_request, server, database, table, column) + return notebook_api.get_sample_data(django_request, server, database, table, column, nested) @api_view(["POST"]) diff --git a/desktop/core/src/desktop/api_public_urls_v1.py b/desktop/core/src/desktop/api_public_urls_v1.py index d480e4c4988..71aceb7109f 100644 --- a/desktop/core/src/desktop/api_public_urls_v1.py +++ b/desktop/core/src/desktop/api_public_urls_v1.py @@ -86,6 +86,11 @@ api_public.get_sample_data, name='editor_sample_data_column', ), + re_path( + r"^editor/sample/(?P[^/?]*)/(?P[\w_\-]+)/(?P\w+)/(?P.+)/?$", + api_public.get_sample_data, + name="editor_sample_data_nested", + ), ] urlpatterns += [ diff --git a/desktop/core/src/desktop/js/ko/components/ko.catalogEntriesList.js b/desktop/core/src/desktop/js/ko/components/ko.catalogEntriesList.js index a27849ab13e..9b889f197a3 100644 --- a/desktop/core/src/desktop/js/ko/components/ko.catalogEntriesList.js +++ b/desktop/core/src/desktop/js/ko/components/ko.catalogEntriesList.js @@ -330,6 +330,43 @@ class CatalogEntriesList { } }); + /** + * Transforms the data and metadata of a complex response. + * This function processes the response based on the column type (map, array, struct) + * and appropriately formats the data and metadata for sampling. + */ + self.transformData = function (response, isComplex, dataType) { + const encodeToHtml = value => JSON.stringify(value).replace(/"/g, '"'); + const decodeFromHtml = str => str.replace(/"/g, '"'); + + let { data, meta } = response; + + if (isComplex) { + if (dataType === 'array') { + meta = [{ name: 'item', type: '' }]; + } else if (dataType === 'map') { + meta = [ + { name: 'key', type: '' }, + { name: 'value', type: '' } + ]; + data = data.map(item => { + const jsonDict = JSON.parse(decodeFromHtml(item[0])); + const keys = Object.keys(jsonDict).map(encodeToHtml); + const values = Object.values(jsonDict).map(encodeToHtml); + return [keys, values]; + }); + } else if (dataType === 'struct' && data.length > 0 && data[0].length > 0) { + const jsonDict = JSON.parse(decodeFromHtml(data[0][0])); + meta = Object.keys(jsonDict).map(key => ({ name: key, type: '' })); + data = data.map(item => { + const jsonValues = Object.values(JSON.parse(decodeFromHtml(item[0]))); + return jsonValues.map(encodeToHtml); + }); + } + } + return { data, meta }; + }; + // TODO: Can be computed based on contents (instead of always suggesting all col types etc.) self.knownFacetValues = ko.pureComputed(() => { if (self.catalogEntry().isDatabase()) { @@ -583,6 +620,11 @@ class CatalogEntriesList { }); self.lastSamplePromise .then(sample => { + const isComplex = self.catalogEntry().isComplex(); + const dataType = self.catalogEntry().getType(); + const { data, meta } = self.transformData(sample, isComplex, dataType); + sample.meta = meta; + sample.data = data; childPromise .then(() => { if (sample.meta && sample.meta.length && sample.data && sample.data.length) { diff --git a/desktop/libs/notebook/src/notebook/api.py b/desktop/libs/notebook/src/notebook/api.py index fdd88b7576a..833bb9afc39 100644 --- a/desktop/libs/notebook/src/notebook/api.py +++ b/desktop/libs/notebook/src/notebook/api.py @@ -772,7 +772,7 @@ def autocomplete(request, server=None, database=None, table=None, column=None, n @require_POST @check_document_access_permission @api_error_handler -def get_sample_data(request, server=None, database=None, table=None, column=None): +def get_sample_data(request, server=None, database=None, table=None, column=None, nested=None): response = {'status': -1} # Passed by check_document_access_permission but unused by APIs @@ -781,7 +781,7 @@ def get_sample_data(request, server=None, database=None, table=None, column=None is_async = json.loads(request.POST.get('async', 'false')) operation = json.loads(request.POST.get('operation', '"default"')) - sample_data = get_api(request, snippet).get_sample_data(snippet, database, table, column, is_async=is_async, operation=operation) + sample_data = get_api(request, snippet).get_sample_data(snippet, database, table, column, nested, is_async=is_async, operation=operation) response.update(sample_data) response['status'] = 0 diff --git a/desktop/libs/notebook/src/notebook/connectors/hiveserver2.py b/desktop/libs/notebook/src/notebook/connectors/hiveserver2.py index 879402f2564..4a8eda35834 100644 --- a/desktop/libs/notebook/src/notebook/connectors/hiveserver2.py +++ b/desktop/libs/notebook/src/notebook/connectors/hiveserver2.py @@ -15,19 +15,18 @@ # See the License for the specific language governing permissions and # limitations under the License. -from __future__ import division -from future import standard_library -standard_library.install_aliases() -from builtins import next, object -import binascii +import re +import sys import copy import json -import logging -import re import struct -import sys +import logging +import binascii +from builtins import next, object +from urllib.parse import quote as urllib_quote, unquote as urllib_unquote from django.urls import reverse +from django.utils.translation import gettext as _ from beeswax.common import is_compute from desktop.auth.backend import is_admin @@ -40,16 +39,17 @@ from desktop.lib.rest.http_client import RestException from desktop.lib.thrift_util import unpack_guid, unpack_guid_base64 from desktop.models import DefaultConfiguration, Document2 - -from notebook.connectors.base import Api, QueryError, QueryExpired, OperationTimeout, OperationNotSupported, _get_snippet_name, Notebook, \ - get_interpreter, patch_snippet_for_connector - -if sys.version_info[0] > 2: - from urllib.parse import quote as urllib_quote, unquote as urllib_unquote - from django.utils.translation import gettext as _ -else: - from django.utils.translation import ugettext as _ - from urllib import quote as urllib_quote, unquote as urllib_unquote +from notebook.connectors.base import ( + Api, + Notebook, + OperationNotSupported, + OperationTimeout, + QueryError, + QueryExpired, + _get_snippet_name, + get_interpreter, + patch_snippet_for_connector, +) LOG = logging.getLogger() @@ -57,29 +57,36 @@ try: from beeswax import conf as beeswax_conf, data_export from beeswax.api import _autocomplete, _get_sample_data - from beeswax.conf import CONFIG_WHITELIST as hive_settings, DOWNLOAD_ROW_LIMIT, DOWNLOAD_BYTES_LIMIT, MAX_NUMBER_OF_SESSIONS, \ - has_session_pool, has_multiple_sessions, CLOSE_SESSIONS + from beeswax.conf import ( + CLOSE_SESSIONS, + CONFIG_WHITELIST as hive_settings, + DOWNLOAD_BYTES_LIMIT, + DOWNLOAD_ROW_LIMIT, + MAX_NUMBER_OF_SESSIONS, + has_multiple_sessions, + has_session_pool, + ) from beeswax.data_export import upload from beeswax.design import hql_query from beeswax.models import QUERY_TYPES, HiveServerQueryHandle, HiveServerQueryHistory, QueryHistory, Session from beeswax.server import dbms - from beeswax.server.dbms import get_query_server_config, QueryServerException, reset_ha + from beeswax.server.dbms import QueryServerException, get_query_server_config, reset_ha from beeswax.views import parse_out_jobs, parse_out_queries except ImportError as e: LOG.warning('Hive and HiveServer2 interfaces are not enabled: %s' % e) hive_settings = None try: - from impala import api # Force checking if Impala is enabled + from impala import api # Force checking if Impala is enabled from impala.conf import CONFIG_WHITELIST as impala_settings - from impala.server import get_api as get_impalad_api, ImpalaDaemonApiException, _get_impala_server_url + from impala.server import ImpalaDaemonApiException, _get_impala_server_url, get_api as get_impalad_api except ImportError as e: LOG.warning("Impala app is not enabled") impala_settings = None try: from jobbrowser.apis.query_api import _get_api - from jobbrowser.conf import ENABLE_QUERY_BROWSER, ENABLE_HIVE_QUERY_BROWSER + from jobbrowser.conf import ENABLE_HIVE_QUERY_BROWSER, ENABLE_QUERY_BROWSER from jobbrowser.views import get_job has_query_browser = ENABLE_QUERY_BROWSER.get() has_hive_query_browser = ENABLE_HIVE_QUERY_BROWSER.get() @@ -179,7 +186,6 @@ class HS2Api(Api): def get_properties(lang='hive'): return ImpalaConfiguration.PROPERTIES if lang == 'impala' else HiveConfiguration.PROPERTIES - @query_error_handler def create_session(self, lang='hive', properties=None): application = 'beeswax' if lang == 'hive' or lang == 'llap' else lang @@ -246,7 +252,6 @@ def create_session(self, lang='hive', properties=None): return response - @query_error_handler def close_session(self, session): app_name = session.get('type') @@ -281,7 +286,6 @@ def close_session(self, session): return response - def close_session_idle(self, notebook, session): idle = True response = {'result': []} @@ -324,9 +328,8 @@ def execute(self, notebook, snippet): query = self._prepare_hql_query(snippet, statement['statement'], session) _session = self._get_session_by_id(notebook, session_type) - try: - if statement.get('statement_id') == 0: # TODO: move this to client + if statement.get('statement_id') == 0: # TODO: move this to client if query.database and not statement['statement'].lower().startswith('set'): result = db.use(query.database, session=_session) if result.session: @@ -356,7 +359,6 @@ def execute(self, notebook, snippet): return response - @query_error_handler def check_status(self, notebook, snippet): response = {} @@ -384,7 +386,6 @@ def check_status(self, notebook, snippet): return response - @query_error_handler def fetch_result(self, notebook, snippet, rows, start_over): db = self._get_db(snippet, interpreter=self.interpreter) @@ -411,7 +412,6 @@ def fetch_result(self, notebook, snippet, rows, start_over): 'type': 'table' } - @query_error_handler def fetch_result_size(self, notebook, snippet): resp = { @@ -440,7 +440,6 @@ def fetch_result_size(self, notebook, snippet): return resp - @query_error_handler def cancel(self, notebook, snippet): db = self._get_db(snippet, interpreter=self.interpreter) @@ -449,7 +448,6 @@ def cancel(self, notebook, snippet): db.cancel_operation(handle) return {'status': 0} - @query_error_handler def get_log(self, notebook, snippet, startFrom=None, size=None): db = self._get_db(snippet, interpreter=self.interpreter) @@ -457,7 +455,6 @@ def get_log(self, notebook, snippet, startFrom=None, size=None): handle = self._get_handle(snippet) return db.get_log(handle, start_over=startFrom == 0) - @query_error_handler def close_statement(self, notebook, snippet): db = self._get_db(snippet, interpreter=self.interpreter) @@ -472,7 +469,6 @@ def close_statement(self, notebook, snippet): raise e return {'status': 0} - def can_start_over(self, notebook, snippet): try: db = self._get_db(snippet, interpreter=self.interpreter) @@ -484,13 +480,12 @@ def can_start_over(self, notebook, snippet): raise e return can_start_over - @query_error_handler def progress(self, notebook, snippet, logs=''): patch_snippet_for_connector(snippet) if snippet['dialect'] == 'hive': - match = re.search('Total jobs = (\d+)', logs, re.MULTILINE) + match = re.search(r'Total jobs = (\d+)', logs, re.MULTILINE) total = int(match.group(1)) if match else 1 started = logs.count('Starting Job') @@ -499,13 +494,12 @@ def progress(self, notebook, snippet, logs=''): progress = int((started + ended) * 100 / (total * 2)) return max(progress, 5) # Return 5% progress as a minimum elif snippet['dialect'] == 'impala': - match = re.findall('(\d+)% Complete', logs, re.MULTILINE) + match = re.findall(r'(\d+)% Complete', logs, re.MULTILINE) # Retrieve the last reported progress percentage if it exists return int(match[-1]) if match and isinstance(match, list) else 0 else: return 50 - @query_error_handler def get_jobs(self, notebook, snippet, logs): jobs = [] @@ -552,7 +546,6 @@ def get_jobs(self, notebook, snippet, logs): return jobs - @query_error_handler def autocomplete(self, snippet, database=None, table=None, column=None, nested=None, operation=None): db = self._get_db(snippet, interpreter=self.interpreter) @@ -577,16 +570,14 @@ def autocomplete(self, snippet, database=None, table=None, column=None, nested=N return resp - @query_error_handler - def get_sample_data(self, snippet, database=None, table=None, column=None, is_async=False, operation=None): + def get_sample_data(self, snippet, database=None, table=None, column=None, nested=None, is_async=False, operation=None): try: db = self._get_db(snippet, is_async=is_async, interpreter=self.interpreter) - return _get_sample_data(db, database, table, column, is_async, operation=operation, cluster=self.interpreter) + return _get_sample_data(db, database, table, column, nested, is_async, operation=operation, cluster=self.interpreter) except QueryServerException as ex: raise QueryError(ex.message) - @query_error_handler def explain(self, notebook, snippet): db = self._get_db(snippet, interpreter=self.interpreter) @@ -613,7 +604,6 @@ def explain(self, notebook, snippet): 'statement': statement, } - @query_error_handler def export_data_as_hdfs_file(self, snippet, target_file, overwrite): db = self._get_db(snippet, interpreter=self.interpreter) @@ -626,8 +616,7 @@ def export_data_as_hdfs_file(self, snippet, target_file, overwrite): return '/filebrowser/view=%s' % urllib_quote( urllib_quote(target_file.encode('utf-8'), safe=SAFE_CHARACTERS_URI_COMPONENTS) - ) # Quote twice, because of issue in the routing on client - + ) # Quote twice, because of issue in the routing on client def export_data_as_table(self, notebook, snippet, destination, is_temporary=False, location=None): db = self._get_db(snippet, interpreter=self.interpreter) @@ -654,7 +643,6 @@ def export_data_as_table(self, notebook, snippet, destination, is_temporary=Fals return hql, success_url - def export_large_data_to_hdfs(self, notebook, snippet, destination): response = self._get_current_statement(notebook, snippet) session = self._get_session(notebook, snippet['type']) @@ -684,7 +672,6 @@ def export_large_data_to_hdfs(self, notebook, snippet, destination): return hql, success_url - def upgrade_properties(self, lang='hive', properties=None): upgraded_properties = copy.deepcopy(self.get_properties(lang)) @@ -708,7 +695,6 @@ def upgrade_properties(self, lang='hive', properties=None): return upgraded_properties - def _get_session(self, notebook, type='hive'): session = next((session for session in notebook['sessions'] if session['type'] == type), None) return session @@ -723,7 +709,6 @@ def _get_session_by_id(self, notebook, type='hive'): filters['owner'] = self.user return Session.objects.get(**filters) - def _get_hive_execution_engine(self, notebook, snippet): # Get hive.execution.engine from snippet properties, if none, then get from session properties = snippet['properties'] @@ -746,7 +731,6 @@ def _get_hive_execution_engine(self, notebook, snippet): return engine - def _prepare_hql_query(self, snippet, statement, session): settings = snippet['properties'].get('settings', None) file_resources = snippet['properties'].get('files', None) @@ -775,7 +759,6 @@ def _prepare_hql_query(self, snippet, statement, session): database=database ) - def get_browse_query(self, snippet, database, table, partition_spec=None): db = self._get_db(snippet, interpreter=self.interpreter) table = db.get_table(database, table) @@ -789,7 +772,6 @@ def get_browse_query(self, snippet, database, table, partition_spec=None): else: return db.get_select_star_query(database, table, limit=100) - def _get_handle(self, snippet): try: handle = snippet['result']['handle'].copy() @@ -805,7 +787,6 @@ def _get_handle(self, snippet): return HiveServerQueryHandle(**handle) - def _get_db(self, snippet, is_async=False, interpreter=None): if interpreter and interpreter.get('dialect'): dialect = interpreter['dialect'] @@ -828,7 +809,6 @@ def _get_db(self, snippet, is_async=False, interpreter=None): # Note: name is not used if interpreter is present return dbms.get(self.user, query_server=get_query_server_config(name=name, connector=interpreter)) - def _parse_job_counters(self, job_id): # Attempt to fetch total records from the job's Hive counter total_records, total_size = None, None @@ -864,7 +844,6 @@ def _parse_job_counters(self, job_id): return total_records, total_size - def _get_hive_result_size(self, notebook, snippet): total_records, total_size, msg = None, None, None engine = self._get_hive_execution_engine(notebook, snippet).lower() @@ -879,8 +858,8 @@ def _get_hive_result_size(self, notebook, snippet): else: msg = _('Hive query did not execute any jobs.') elif engine == 'spark': - total_records_re = "RECORDS_OUT_0: (?P\d+)" - total_size_re = "Spark Job\[[a-z0-9-]+\] Metrics[A-Za-z0-9:\s]+ResultSize: (?P\d+)" + total_records_re = r"RECORDS_OUT_0: (?P\d+)" + total_size_re = r"Spark Job\[[a-z0-9-]+\] Metrics[A-Za-z0-9:\s]+ResultSize: (?P\d+)" total_records_match = re.search(total_records_re, logs, re.MULTILINE) total_size_match = re.search(total_size_re, logs, re.MULTILINE) @@ -891,7 +870,6 @@ def _get_hive_result_size(self, notebook, snippet): return total_records, total_size, msg - def _get_impala_result_size(self, notebook, snippet): total_records_match = None total_records, total_size, msg = None, None, None @@ -904,7 +882,7 @@ def _get_impala_result_size(self, notebook, snippet): fragment = self._get_impala_query_profile(server_url, query_id=query_id) total_records_re = \ - "Coordinator Fragment F\d\d.+?RowsReturned: \d+(?:.\d+[KMB])? \((?P\d+)\).*?(Averaged Fragment F\d\d)" + r"Coordinator Fragment F\d\d.+?RowsReturned: \d+(?:.\d+[KMB])? \((?P\d+)\).*?(Averaged Fragment F\d\d)" total_records_match = re.search(total_records_re, fragment, re.MULTILINE | re.DOTALL) if total_records_match: @@ -917,7 +895,6 @@ def _get_impala_result_size(self, notebook, snippet): return total_records, total_size, msg - def _get_impala_query_id(self, snippet): guid = None if 'result' in snippet and 'handle' in snippet['result'] and 'guid' in snippet['result']['handle']: @@ -929,7 +906,6 @@ def _get_impala_query_id(self, snippet): LOG.warning('Snippet does not contain a valid result handle, cannot extract Impala query ID.') return guid - def _get_impala_query_profile(self, server_url, query_id): api = get_impalad_api(user=self.user, url=server_url) @@ -944,18 +920,15 @@ def _get_impala_query_profile(self, server_url, query_id): return profile - def _get_impala_profile_plan(self, query_id, profile): - query_plan_re = "Query \(id=%(query_id)s\):.+?Execution Profile %(query_id)s" % {'query_id': query_id} + query_plan_re = r"Query \(id=%(query_id)s\):.+?Execution Profile %(query_id)s" % {'query_id': query_id} query_plan_match = re.search(query_plan_re, profile, re.MULTILINE | re.DOTALL) return query_plan_match.group() if query_plan_match else None - def describe_column(self, notebook, snippet, database=None, table=None, column=None): db = self._get_db(snippet, interpreter=self.interpreter) return db.get_table_columns_stats(database, table, column) - def describe_table(self, notebook, snippet, database=None, table=None): db = self._get_db(snippet, interpreter=self.interpreter) tb = db.get_table(database, table) diff --git a/desktop/libs/notebook/src/notebook/urls.py b/desktop/libs/notebook/src/notebook/urls.py index c651695fbed..ae7f1da37b4 100644 --- a/desktop/libs/notebook/src/notebook/urls.py +++ b/desktop/libs/notebook/src/notebook/urls.py @@ -17,13 +17,9 @@ import sys -from notebook import views as notebook_views -from notebook import api as notebook_api +from django.urls import re_path -if sys.version_info[0] > 2: - from django.urls import re_path -else: - from django.conf.urls import url as re_path +from notebook import api as notebook_api, views as notebook_views # Views urlpatterns = [ @@ -96,6 +92,11 @@ notebook_api.get_sample_data, name='api_sample_data_column' ), + re_path( + r'^api/sample/(?P[^/?]*)/(?P
[\w_\-]+)/(?P\w+)/(?P.+)/?$', + notebook_api.get_sample_data, + name='api_sample_data_nested' + ), # SQLite re_path(r'^api/autocomplete//?(?P[\w_\-/]+)/(?P[^/?]*)/?$', notebook_api.autocomplete, name='api_autocomplete_tables'),