Skip to content

Query Performance

Andrew Robertson edited this page Jul 9, 2021 · 1 revision

Overview

The following guide will discuss presto splits, and overall query performance

The guide assumes some familiarity with pravega - https://pravega.io/docs/nightly/pravega-concepts

Presto Splits

The goal here is to optimally generate presto splits so that we can process a large dataset in pravega efficiently.

Stream Segments

The first factor of parallelization when querying from pravega is the number of segments in a stream. The number of segments is controlled by the ScalingPolicy set when creating the stream in pravega.

With nothing else configured or setup, there will be 1 presto split per stream segment. The more stream segments, the more splits we can create, and thus more active presto workers we can have.

While certainly in some cases this may work out just fine, it may be limiting.
For example, we have 4 presto worker nodes with max 100 splits each, we have 400 total available workers. If we have only 16 segments, that is a lot of idle worker power.

Stream Cuts

Stream Cuts in a pravega stream note a particular position (of all segments) within a stream. This allows us to get start/end event boundaries within a segment which is an opportunity to break a segment up for processing.

With this, the number of generated splits becomes num_segments * num_stream_cuts

Using Stream Cuts in Presto

The pravega connector can use stream cuts in order to create more optimal splits.

Currently, these stream cuts are generated and stored out-of-band.

The pravega connector will look for a specially named stream, given the stream you are querying from. For example, if you are querying from hr.employees, the pravega connector will look for a stream named hr.employees-SC

This will be known here as the "-SC stream".

"-SC stream" format

Each event in the stream should be a serialized StreamCut.

The following example/pseudo code should show how to create such a stream when ingesting data. (Also note it is possible to generate this stream later after data has been ingested).

*WARN: the -SC stream must have a fixed ScalingPolicy of 1

public class StreamCutSerializer implements Serializer<StreamCut> {
    @Override
    public ByteBuffer serialize(StreamCut value) {
        return value.toBytes();
    }

    @Override
    public StreamCut deserialize(ByteBuffer serializedValue) {
        return StreamCut.fromBytes(serializedValue);
    }
}

EventStreamWriter<StreamCut> streamCutWriter = 
	clientFactory.createEventWriter(stream + "-SC",
		new StreamCutSerializer(), EventWriterConfig.builder().build());

// write starting stream cut
streamCutWriter.writeEvent(streamManager.getStreamInfo(scope, stream).getTailStreamCut())

for each event to write:
	if total_size % size_per_stream_cut = 0:
		// interval hit, whether number of events, or size in MB, etc
		streamCutWriter.writeEvent(streamManager.getStreamInfo(scope, stream).getTailStreamCut())

// write terminal stream cut
if total_size % size_per_stream_cut != 0:
	streamCutWriter.writeEvent(streamManager.getStreamInfo(scope, stream).getTailStreamCut())

A Better Way (future work)

While this works, it is inflexible in a number of ways.

Ideally, we would create optimal splits without putting additional burden on a pravega application.

There are some constructs in pravega that we can use. 1 is Watermarking (described here: https://pravega.io/docs/nightly/watermarking)

If watermarking is enabled for a stream, the pravega connector can use it. This will allow pravega connector to create splits of user defined size (e.g. 32MB, 64MB, etc, whatever makes sense for presto).

There is a currently a POC for this undergoing development

Since watermarking must be enabled before ingest, in order to "future proof" your streams w.r.t presto, this option can be enabled for a pravega stream now.

This must be set before ingest into a pravega stream, and can be set via the automaticallyNoteTime property of your EventWriterConfig, for e.g.

EventWriterConfig.builder().automaticallyNoteTime(true).build()