Skip to content

Commit

Permalink
initial new JSON spiking
Browse files Browse the repository at this point in the history
  • Loading branch information
genzgd committed Sep 20, 2024
1 parent 58fbbc1 commit 9a39a4e
Show file tree
Hide file tree
Showing 11 changed files with 150 additions and 74 deletions.
2 changes: 1 addition & 1 deletion .docker/clickhouse/single_node_tls/config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
<server>
<certificateFile>/etc/clickhouse-server/certs/server.crt</certificateFile>
<privateKeyFile>/etc/clickhouse-server/certs/server.key</privateKeyFile>
<verificationMode>relaxed</verificationMode>
<verificationMode>strict</verificationMode>
<caConfig>/etc/clickhouse-server/certs/ca.crt</caConfig>
<cacheSessions>true</cacheSessions>
<disableProtocols>sslv2,sslv3,tlsv1</disableProtocols>
Expand Down
7 changes: 5 additions & 2 deletions clickhouse_connect/datatypes/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,13 @@ def data_size(self, sample: Sequence) -> int:
d_size += 1
return d_size

def _data_size(self, _sample: Collection) -> int:
def _data_size(self, sample: Collection) -> int:
if self.byte_size:
return self.byte_size
return 0
total = 0
for x in sample:
total += str(x)
return total / len(sample) + 1

def write_column_prefix(self, dest: bytearray):
"""
Expand Down
61 changes: 0 additions & 61 deletions clickhouse_connect/datatypes/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,64 +252,3 @@ def write_column_data(self, column: Sequence, dest: bytearray, ctx: InsertContex
keys = self.element_names
data = [[tuple(sub_row[key] for key in keys) for sub_row in row] for row in column]
self.tuple_array.write_column_data(data, dest, ctx)


class JSON(ClickHouseType):
python_type = dict
# Native is a Python type (primitive, dict, array), string is an actual JSON string
valid_formats = 'string', 'native'

def write_column_prefix(self, dest: bytearray):
dest.append(0x01)

def _data_size(self, sample: Collection) -> int:
if len(sample) == 0:
return 0
total = 0
for x in sample:
if isinstance(x, str):
total += len(x)
elif x:
total += len(any_to_json(x))
return total // len(sample) + 1

# pylint: disable=duplicate-code
def write_column_data(self, column: Sequence, dest: bytearray, ctx: InsertContext):
app = dest.append
first = self._first_value(column)
if isinstance(first, str) or self.write_format(ctx) == 'string':
for x in column:
v = x.encode()
sz = len(v)
while True:
b = sz & 0x7f
sz >>= 7
if sz == 0:
app(b)
break
app(0x80 | b)
dest += v
else:
to_json = any_to_json
for x in column:
v = to_json(x)
sz = len(v)
while True:
b = sz & 0x7f
sz >>= 7
if sz == 0:
app(b)
break
app(0x80 | b)
dest += v


class Object(JSON):
python_type = dict

def __init__(self, type_def):
data_type = type_def.values[0].lower().replace(' ', '')
if data_type not in ("'json'", "nullable('json')"):
raise NotImplementedError('Only json or Nullable(json) Object type is currently supported')
super().__init__(type_def)
self._name_suffix = type_def.arg_str
3 changes: 3 additions & 0 deletions clickhouse_connect/datatypes/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ def parse_name(name: str) -> Tuple[str, str, TypeDef]:
elif base.startswith('Tuple'):
keys, values = parse_columns(base[5:])
base = 'Tuple'
elif base.startswith('Variant'):
key, values = parse_columns(base[7:])
base = 'Variant'
elif base == 'Point':
values = ('Float64', 'Float64')
else:
Expand Down
83 changes: 82 additions & 1 deletion clickhouse_connect/datatypes/special.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
from typing import Union, Sequence, MutableSequence
from typing import Union, Sequence, MutableSequence, Collection
from uuid import UUID as PYUUID

from clickhouse_connect.datatypes import registry
from clickhouse_connect.datatypes.base import TypeDef, ClickHouseType, ArrayType, UnsupportedType
from clickhouse_connect.datatypes.registry import get_from_name
from clickhouse_connect.driver.ctypes import data_conv
from clickhouse_connect.driver.errors import handle_error
from clickhouse_connect.driver.exceptions import DataError
from clickhouse_connect.driver.insert import InsertContext
from clickhouse_connect.driver.query import QueryContext
from clickhouse_connect.driver.types import ByteSource
from clickhouse_connect.json_impl import any_to_json

empty_uuid_b = bytes(b'\x00' * 16)

Expand Down Expand Up @@ -107,3 +111,80 @@ def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: b

class AggregateFunction(UnsupportedType):
pass


def json_sample_size(_, sample: Collection) -> int:
if len(sample) == 0:
return 0
total = 0
for x in sample:
if isinstance(x, str):
total += len(x)
elif x:
total += len(any_to_json(x))
return total // len(sample) + 1

def write_json(self, column: Sequence, dest: bytearray, ctx: InsertContext):
first = self._first_value(column)
write_col = column
encoding = ctx.encoding or self.encoding
if not isinstance(first, str) and self.write_format(ctx) != 'string':
to_json = any_to_json
write_col = [to_json(v) for v in column]
encoding = None
handle_error(data_conv.write_str_col(write_col, self.nullable, encoding, dest))


class JSON(ClickHouseType):
valid_formats = 'string', 'native'
_data_size = json_sample_size
write_column_data = write_json

def read_column(self, source: ByteSource, num_rows: int, ctx: QueryContext):
if source.read_uint64() != 0: # object serialization version, currently only 0 is recognized
raise DataError('unrecognized object serialization version')
source.read_leb128() # the max number of dynamic paths. Used to preallocate storage in ClickHouse, we ignore it
dynamic_path_cnt = source.read_leb128()
dynamic_paths = [source.read_leb128_str() for _ in range(dynamic_path_cnt)]
shared_col_type = registry.get_from_name('Array(Tuple(String, String))')
shared_state = shared_col_type.read_column(source, num_rows, ctx)
sub_columns = []
sub_types = []
for _ in dynamic_paths:
type_name = source.read_leb128_str()
col_type = registry.get_from_name(type_name)
sub_types.append(col_type)
sub_columns.append(col_type.read_column(source, num_rows, ctx))
print(dynamic_paths)


class Variant(ClickHouseType):
_slots = 'element_types'

def __init__(self, type_def: TypeDef):
super().__init__(type_def)
self.element_types = [get_from_name(name) for name in type_def.values]
self._name_suffix = f"({', '.join(ch_type.name for ch_type in self.element_types)})"


class Dynamic(ClickHouseType):
python_type = object



class Object(ClickHouseType):
python_type = dict
# Native is a Python type (primitive, dict, array), string is an actual JSON string
valid_formats = 'string', 'native'
_data_size = json_sample_size
write_column_data = write_json

def __init__(self, type_def):
data_type = type_def.values[0].lower().replace(' ', '')
if data_type not in ("'json'", "nullable('json')"):
raise NotImplementedError('Only json or Nullable(json) Object type is currently supported')
super().__init__(type_def)
self._name_suffix = type_def.arg_str

def write_column_prefix(self, dest: bytearray):
dest.append(0x01)
2 changes: 1 addition & 1 deletion clickhouse_connect/driver/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def create_client(*,
database = database or parsed.path
for k, v in parse_qs(parsed.query).items():
kwargs[k] = v[0]
use_tls = str(secure).lower() == 'true' or interface == 'https' or (not interface and port in (443, 8443))
use_tls = str(secure).lower() == 'true' or interface == 'https' or (not interface and str(port) in ('443', '8443'))
if not host:
host = 'localhost'
if not interface:
Expand Down
3 changes: 2 additions & 1 deletion clickhouse_connect/driver/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ def chunk_gen():
col_enc = col_name.encode()
write_leb128(len(col_enc), output)
output += col_enc
col_enc = col_type.name.encode()
t_name = 'String' if col_type.name == 'JSON' else col_type.name
col_enc = t_name.encode()
write_leb128(len(col_enc), output)
output += col_enc
context.start_column(col_name)
Expand Down
5 changes: 3 additions & 2 deletions tests/integration_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ def test_config_fixture() -> Iterator[TestConfig]:
cloud = coerce_bool(os.environ.get('CLICKHOUSE_CONNECT_TEST_CLOUD', 'True'))
username = os.environ.get('CLICKHOUSE_CONNECT_TEST_USER', 'default')
password = os.environ.get('CLICKHOUSE_CONNECT_TEST_PASSWORD', '')
test_database = f'ch_connect__{random.randint(100000, 999999)}__{int(time.time() * 1000)}'
test_database = os.environ.get('CLICKHOUSE_CONNECT_TEST_DATABASE',
f'ch_connect__{random.randint(100000, 999999)}__{int(time.time() * 1000)}')
compress = os.environ.get('CLICKHOUSE_CONNECT_TEST_COMPRESS', 'True')
insert_quorum = int(os.environ.get('CLICKHOUSE_CONNECT_TEST_INSERT_QUORUM', '0'))
proxy_address = os.environ.get('CLICKHOUSE_CONNECT_TEST_PROXY_ADDR', '')
Expand Down Expand Up @@ -112,7 +113,7 @@ def test_client_fixture(test_config: TestConfig, test_db: str) -> Iterator[Clien
client.database = test_db
yield client

client.command(f'DROP database IF EXISTS {test_db}', use_database=False)
# client.command(f'DROP database IF EXISTS {test_db}', use_database=False)
if test_config.docker:
down_result = run_cmd(['docker', 'compose', '-f', compose_file, 'down', '-v'])
if down_result[0]:
Expand Down
43 changes: 43 additions & 0 deletions tests/integration_tests/test_json.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from typing import Callable

from clickhouse_connect.datatypes.format import set_write_format
from clickhouse_connect.driver import Client


def test_basic_json(test_client: Client, table_context: Callable):
with table_context('new_json_basic', [
'key Int32',
'value JSON',
'e2 Int32',
"null_value JSON"
]):
jv1 = {'key1': 337, 'value.2': 'vvvv', 'HKD@spéçiäl': 'Special K', 'blank': 'not_really_blank'}
jv3 = {'key3': 752, 'value.2': 'v2_rules', 'blank': None}
njv2 = {'nk1': -302, 'nk2': {'sub1': 372, 'sub2': 'a string'}}
njv3 = {'nk1': 5832.44, 'nk2': {'sub1': 47788382, 'sub2':'sub2val', 'sub3': 'sub3str', 'space key': 'spacey'}}
test_client.insert('new_json_basic', [
[5, jv1, -44, None],
[20, None, 5200, njv2],
[25, jv3, 7302, njv3]])

result = test_client.query('SELECT * FROM new_json_basic ORDER BY key')
json1 = result.result_set[0][1]
assert json1['HKD@spéçiäl'] == 'Special K'
assert json1['key3'] == 0
json2 = result.result_set[1][3]
assert json2['nk1'] == -302.0
assert json2['nk2']['sub2'] == 'a string'
assert json2['nk2']['sub3'] is None
json3 = result.result_set[2][1]
assert json3['value.2'] == 'v2_rules'
assert json3['blank'] == ''
assert json3['key1'] == 0
assert json3['key3'] == 752
null_json3 = result.result_set[2][3]
assert null_json3['nk2']['space key'] == 'spacey'

set_write_format('JSON', 'string')
test_client.insert('native_json_test', [[999, '{"key4": 283, "value.2": "str_value"}', 77, '{"nk1":53}']])
result = test_client.query('SELECT value.key4, null_value.nk1 FROM native_json_test ORDER BY key')
assert result.result_set[3][0] == 283
assert result.result_set[3][1] == 53
10 changes: 5 additions & 5 deletions tests/integration_tests/test_native.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ def test_nulls(test_client: Client, table_context: Callable):
assert result[3] == (4, 'nonnull2', None)


def test_json(test_client: Client, table_context: Callable):
if not coerce_bool(os.environ.get('CLICKHOUSE_CONNECT_TEST_JSON_TYPE')):
def test_old_json(test_client: Client, table_context: Callable):
if not coerce_bool(os.environ.get('CLICKHOUSE_CONNECT_TEST_OLD_JSON_TYPE')):
pytest.skip('Deprecated JSON type not tested')
with table_context('native_json_test', [
with table_context('old_json_test', [
'key Int32',
'value JSON',
'e2 Int32',
Expand All @@ -71,12 +71,12 @@ def test_json(test_client: Client, table_context: Callable):
jv3 = {'key3': 752, 'value.2': 'v2_rules', 'blank': None}
njv2 = {'nk1': -302, 'nk2': {'sub1': 372, 'sub2': 'a string'}}
njv3 = {'nk1': 5832.44, 'nk2': {'sub1': 47788382, 'sub2':'sub2val', 'sub3': 'sub3str', 'space key': 'spacey'}}
test_client.insert('native_json_test', [
test_client.insert('old_json_test', [
[5, jv1, -44, None],
[20, None, 5200, njv2],
[25, jv3, 7302, njv3]])

result = test_client.query('SELECT * FROM native_json_test ORDER BY key')
result = test_client.query('SELECT * FROM old_json_test ORDER BY key')
json1 = result.result_set[0][1]
assert json1['HKD@spéçiäl'] == 'Special K'
assert json1['key3'] == 0
Expand Down
5 changes: 5 additions & 0 deletions tests/unit_tests/test_driver/test_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ def test_map_type():
assert ch_type.name == 'Map(String, Decimal(5, 5))'


def test_variant_type():
ch_type = get_from_name('Variant(UInt64, String, Array(UInt64))')
assert ch_type.name == 'Variant(UInt64, String, Array(UInt64))'


def test_remove_comments():
sql = """SELECT -- 6dcd92a04feb50f14bbcf07c661680ba
* FROM benchmark_results /*With an inline comment */ WHERE result = 'True'
Expand Down

0 comments on commit 9a39a4e

Please sign in to comment.