Skip to content

Commit

Permalink
[Hive] Support complex data type sampling
Browse files Browse the repository at this point in the history
  • Loading branch information
agl29 committed Jun 25, 2024
1 parent d34fd43 commit 49fd0db
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 16 deletions.
10 changes: 5 additions & 5 deletions apps/beeswax/src/beeswax/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def _autocomplete(db, database=None, table=None, column=None, nested=None, query
response = parse_tree
# If column or nested type is scalar/primitive, add sample of values
if parser.is_scalar_type(parse_tree['type']):
sample = _get_sample_data(db, database, table, column, cluster=cluster)
sample = _get_sample_data(db, database, table, column, nested, cluster=cluster)
if 'rows' in sample:
response['sample'] = sample['rows']
else:
Expand Down Expand Up @@ -713,18 +713,18 @@ def clear_history(request):


@error_handler
def get_sample_data(request, database, table, column=None):
def get_sample_data(request, database, table, column=None, nested=None):
app_name = get_app_name(request)
cluster = json.loads(request.POST.get('cluster', '{}'))

query_server = get_query_server_config(app_name, connector=cluster)
db = dbms.get(request.user, query_server)

response = _get_sample_data(db, database, table, column, cluster=cluster)
response = _get_sample_data(db, database, table, column, nested, cluster=cluster)
return JsonResponse(response)


def _get_sample_data(db, database, table, column, is_async=False, cluster=None, operation=None):
def _get_sample_data(db, database, table, column, nested, is_async=False, cluster=None, operation=None):
if operation == 'hello':
table_obj = None
else:
Expand All @@ -738,7 +738,7 @@ def _get_sample_data(db, database, table, column, is_async=False, cluster=None,
response['message'] = _('Not getting sample data as this is a view which can be expensive when run.')
return response

sample_data = db.get_sample(database, table_obj, column, generate_sql_only=is_async, operation=operation)
sample_data = db.get_sample(database, table_obj, column, nested, generate_sql_only=is_async, operation=operation)
response = {'status': -1}

if sample_data:
Expand Down
37 changes: 32 additions & 5 deletions apps/beeswax/src/beeswax/server/dbms.py
Original file line number Diff line number Diff line change
Expand Up @@ -666,15 +666,15 @@ def get_sample(self, database, table, column=None, nested=None, limit=100, gener
else:
hql = "SELECT * FROM `%s`.`%s` LIMIT %s;" % (database, table.name, limit)
else:
select_clause, from_clause = self.get_nested_select(database, table.name, column, nested)
if operation == 'distinct':
hql = "SELECT DISTINCT %s FROM `%s`.`%s` LIMIT %s;" % (column, database, table.name, limit)
hql = 'SELECT DISTINCT %s FROM %s LIMIT %s;' % (select_clause, from_clause, limit)
elif operation == 'max':
hql = "SELECT max(%s) FROM `%s`.`%s`;" % (column, database, table.name)
hql = 'SELECT max(%s) FROM %s;' % (select_clause, from_clause)
elif operation == 'min':
hql = "SELECT min(%s) FROM `%s`.`%s`;" % (column, database, table.name)
hql = 'SELECT min(%s) FROM %s;' % (select_clause, from_clause)
else:
hql = "SELECT %s FROM `%s`.`%s` LIMIT %s;" % (column, database, table.name, limit)
# TODO: Add nested select support for HS2
hql = 'SELECT %s FROM %s LIMIT %s;' % (select_clause, from_clause, limit)

if hql:
if generate_sql_only:
Expand All @@ -690,6 +690,33 @@ def get_sample(self, database, table, column=None, nested=None, limit=100, gener
return result


def get_nested_select(self, database, table, column, nested):
"""
Given a column or nested type, return the corresponding SELECT and FROM clauses in Hive's nested-type syntax
"""
select_clause = column
from_clause = f"{database}.{table}"

if nested:
nested_tokens = nested.strip('/').split('/')
for i, token in enumerate(nested_tokens):
if token == 'item':
from_clause += f" LATERAL VIEW explode({select_clause}) temp_table AS temp_item{i}"
select_clause = f"temp_item{i}"
elif token == 'key':
from_clause += f" LATERAL VIEW explode({select_clause}) temp_table AS temp_key{i}, temp_value{i}"
select_clause = f"temp_key{i}"
elif token == 'value':
from_clause += f" LATERAL VIEW explode({select_clause}) temp_table AS temp_key{i}, temp_value{i}"
select_clause = f"temp_value{i}"
else:
select_clause += f".{token}"
else:
select_clause = column

return select_clause, from_clause


def _get_sample_partition_query(self, database, table, column='*', limit=100, operation=None):
max_parts = QUERY_PARTITIONS_LIMIT.get()
partitions = self.get_partitions(database, table, partition_spec=None, max_parts=max_parts)
Expand Down
36 changes: 36 additions & 0 deletions apps/beeswax/src/beeswax/server/dbms_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import sys
from django.test import TestCase
from beeswax.server.dbms import get_query_server_config
from beeswax.server.dbms import HiveServer2Dbms
from desktop.lib.exceptions_renderable import PopupException
from desktop.settings import CACHES_HIVE_DISCOVERY_KEY
from django.core.cache import caches
Expand Down Expand Up @@ -173,3 +174,38 @@ def test_get_hs2_discovery(self):
# TODO: all the combinations in new test methods, e.g.:
# HIVE_DISCOVERY_LLAP_HA.get() --> True
# ...


class TestHiveServer2Dbms():

def test_get_nested_select_no_nested(self):
select_clause, from_clause = HiveServer2Dbms.get_nested_select(self, "db", "tbl", "col", None)
assert select_clause == "col"
assert from_clause == "db.tbl"

def test_get_nested_select_with_array_item(self):
select_clause, from_clause = HiveServer2Dbms.get_nested_select(self, "db", "tbl", "col", "item")
assert select_clause == "temp_item0"
assert from_clause == "db.tbl LATERAL VIEW explode(col) temp_table AS temp_item0"

def test_get_nested_select_with_map_key(self):
select_clause, from_clause = HiveServer2Dbms.get_nested_select(self, "db", "tbl", "col", "key")
assert select_clause == "temp_key0"
assert from_clause == "db.tbl LATERAL VIEW explode(col) temp_table AS temp_key0, temp_value0"

def test_get_nested_select_with_map_value(self):
select_clause, from_clause = HiveServer2Dbms.get_nested_select(self, "db", "tbl", "col", "value")
assert select_clause == "temp_value0"
assert from_clause == "db.tbl LATERAL VIEW explode(col) temp_table AS temp_key0, temp_value0"

def test_get_nested_select_with_nested_path(self):
select_clause, from_clause = HiveServer2Dbms.get_nested_select(self, "db", "tbl", "col", "item/key/value")
assert select_clause == "temp_value2"
assert from_clause == ("db.tbl LATERAL VIEW explode(col) temp_table AS temp_item0 "
"LATERAL VIEW explode(temp_item0) temp_table AS temp_key1, temp_value1 "
"LATERAL VIEW explode(temp_key1) temp_table AS temp_key2, temp_value2")

def test_get_nested_select_with_struct_path(self):
select_clause, from_clause = HiveServer2Dbms.get_nested_select(self, "db", "tbl", "col", "item/field1")
assert select_clause == "temp_item0.field1"
assert from_clause == "db.tbl LATERAL VIEW explode(col) temp_table AS temp_item0"
4 changes: 2 additions & 2 deletions desktop/core/src/desktop/api_public.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,12 @@ def get_logs(request):


@api_view(["POST"])
def get_sample_data(request, server=None, database=None, table=None, column=None):
def get_sample_data(request, server=None, database=None, table=None, column=None, nested=None):
django_request = get_django_request(request)

_patch_operation_id_request(django_request)

return notebook_api.get_sample_data(django_request, server, database, table, column)
return notebook_api.get_sample_data(django_request, server, database, table, column, nested)


@api_view(["POST"])
Expand Down
5 changes: 5 additions & 0 deletions desktop/core/src/desktop/api_public_urls_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@
api_public.get_sample_data,
name='editor_sample_data_column',
),
re_path(
r"^editor/sample/(?P<database>[^/?]*)/(?P<table>[\w_\-]+)/(?P<column>\w+)/(?P<nested>.+)/?$",
api_public.get_sample_data,
name="editor_sample_data_nested",
),
]

urlpatterns += [
Expand Down
40 changes: 40 additions & 0 deletions desktop/core/src/desktop/js/ko/components/ko.catalogEntriesList.js
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,41 @@ class CatalogEntriesList {
}
});

/**

Check failure on line 333 in desktop/core/src/desktop/js/ko/components/ko.catalogEntriesList.js

View workflow job for this annotation

GitHub Actions / build

Insert `····`
* Transforms the data and metadata of a complex response.

Check failure on line 334 in desktop/core/src/desktop/js/ko/components/ko.catalogEntriesList.js

View workflow job for this annotation

GitHub Actions / build

Insert `····`
* This function processes the response based on the column type (map, array, struct)

Check failure on line 335 in desktop/core/src/desktop/js/ko/components/ko.catalogEntriesList.js

View workflow job for this annotation

GitHub Actions / build

Insert `····`
* and appropriately formats the data and metadata for sampling.

Check failure on line 336 in desktop/core/src/desktop/js/ko/components/ko.catalogEntriesList.js

View workflow job for this annotation

GitHub Actions / build

Insert `····`
*/

Check failure on line 337 in desktop/core/src/desktop/js/ko/components/ko.catalogEntriesList.js

View workflow job for this annotation

GitHub Actions / build

Insert `····`
self.transformData = function(response, isComplex, dataType) {

Check failure on line 338 in desktop/core/src/desktop/js/ko/components/ko.catalogEntriesList.js

View workflow job for this annotation

GitHub Actions / build

Replace `(response,·isComplex,·dataType)·{····⏎······` with `·(response,·isComplex,·dataType)·{`

const encodeToHtml = value => JSON.stringify(value).replace(/"/g, '&quot;');
const decodeFromHtml = str => str.replace(/&quot;/g, '"');

Check failure on line 342 in desktop/core/src/desktop/js/ko/components/ko.catalogEntriesList.js

View workflow job for this annotation

GitHub Actions / build

Delete `······`
let { data, meta } = response;

Check failure on line 344 in desktop/core/src/desktop/js/ko/components/ko.catalogEntriesList.js

View workflow job for this annotation

GitHub Actions / build

Delete `······`
if (isComplex && ['map', 'array', 'struct'].includes(dataType)) {
if (dataType === 'array') {
meta = [{ name: 'item', type: '' }];
} else if (dataType === 'map') {
meta = [{ name: 'key', type: '' }, { name: 'value', type: '' }];

Check failure on line 349 in desktop/core/src/desktop/js/ko/components/ko.catalogEntriesList.js

View workflow job for this annotation

GitHub Actions / build

Replace `{·name:·'key',·type:·''·},·{·name:·'value',·type:·''·}` with `⏎············{·name:·'key',·type:·''·},⏎············{·name:·'value',·type:·''·}⏎··········`
data = data.map(item => {
const jsonDict = JSON.parse(decodeFromHtml(item[0]));
const keys = Object.keys(jsonDict).map(encodeToHtml);
const values = Object.values(jsonDict).map(encodeToHtml);
return [keys, values];
});
} else if (dataType === 'struct' && data.length > 0 && data[0].length > 0) {
const jsonDict = JSON.parse(decodeFromHtml(data[0][0]));
meta = Object.keys(jsonDict).map(key => ({ name: key, type: '' }));
data = data.map(item => {
const jsonValues = Object.values(JSON.parse(decodeFromHtml(item[0])));
return jsonValues.map(encodeToHtml);
});
}
}
return { data, meta };
};

Check failure on line 366 in desktop/core/src/desktop/js/ko/components/ko.catalogEntriesList.js

View workflow job for this annotation

GitHub Actions / build

Delete `····`

// TODO: Can be computed based on contents (instead of always suggesting all col types etc.)
self.knownFacetValues = ko.pureComputed(() => {
if (self.catalogEntry().isDatabase()) {
Expand Down Expand Up @@ -583,6 +618,11 @@ class CatalogEntriesList {
});
self.lastSamplePromise
.then(sample => {
let isComplex = self.catalogEntry().isComplex();

Check warning on line 621 in desktop/core/src/desktop/js/ko/components/ko.catalogEntriesList.js

View workflow job for this annotation

GitHub Actions / build

'isComplex' is never reassigned. Use 'const' instead
let dataType = self.catalogEntry().getType();

Check warning on line 622 in desktop/core/src/desktop/js/ko/components/ko.catalogEntriesList.js

View workflow job for this annotation

GitHub Actions / build

'dataType' is never reassigned. Use 'const' instead
const { data, meta } = self.transformData(sample, isComplex, dataType);
sample.meta = meta;
sample.data = data;
childPromise
.then(() => {
if (sample.meta && sample.meta.length && sample.data && sample.data.length) {
Expand Down
4 changes: 2 additions & 2 deletions desktop/libs/notebook/src/notebook/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -772,7 +772,7 @@ def autocomplete(request, server=None, database=None, table=None, column=None, n
@require_POST
@check_document_access_permission
@api_error_handler
def get_sample_data(request, server=None, database=None, table=None, column=None):
def get_sample_data(request, server=None, database=None, table=None, column=None, nested=None):
response = {'status': -1}

# Passed by check_document_access_permission but unused by APIs
Expand All @@ -781,7 +781,7 @@ def get_sample_data(request, server=None, database=None, table=None, column=None
is_async = json.loads(request.POST.get('async', 'false'))
operation = json.loads(request.POST.get('operation', '"default"'))

sample_data = get_api(request, snippet).get_sample_data(snippet, database, table, column, is_async=is_async, operation=operation)
sample_data = get_api(request, snippet).get_sample_data(snippet, database, table, column, nested, is_async=is_async, operation=operation)
response.update(sample_data)

response['status'] = 0
Expand Down
4 changes: 2 additions & 2 deletions desktop/libs/notebook/src/notebook/connectors/hiveserver2.py
Original file line number Diff line number Diff line change
Expand Up @@ -579,10 +579,10 @@ def autocomplete(self, snippet, database=None, table=None, column=None, nested=N


@query_error_handler
def get_sample_data(self, snippet, database=None, table=None, column=None, is_async=False, operation=None):
def get_sample_data(self, snippet, database=None, table=None, column=None, nested=None, is_async=False, operation=None):
try:
db = self._get_db(snippet, is_async=is_async, interpreter=self.interpreter)
return _get_sample_data(db, database, table, column, is_async, operation=operation, cluster=self.interpreter)
return _get_sample_data(db, database, table, column, nested, is_async, operation=operation, cluster=self.interpreter)
except QueryServerException as ex:
raise QueryError(ex.message)

Expand Down
5 changes: 5 additions & 0 deletions desktop/libs/notebook/src/notebook/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@
notebook_api.get_sample_data,
name='api_sample_data_column'
),
re_path(
r'^api/sample/(?P<database>[^/?]*)/(?P<table>[\w_\-]+)/(?P<column>\w+)/(?P<nested>.+)/?$',
notebook_api.get_sample_data,
name='api_sample_data_nested'
),

# SQLite
re_path(r'^api/autocomplete//?(?P<server>[\w_\-/]+)/(?P<database>[^/?]*)/?$', notebook_api.autocomplete, name='api_autocomplete_tables'),
Expand Down

0 comments on commit 49fd0db

Please sign in to comment.