Skip to content

Commit

Permalink
When retrieving time-series from a database, adjust the minimum valid…
Browse files Browse the repository at this point in the history
… time to capture sufficient data for upscaling, when required, #357.
  • Loading branch information
james-d-brown committed Nov 21, 2024
1 parent 034282d commit e913a48
Show file tree
Hide file tree
Showing 5 changed files with 224 additions and 29 deletions.
6 changes: 4 additions & 2 deletions test/wres/pipeline/pooling/PoolSupplierTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -559,10 +559,12 @@ void runAfterEachTest() throws Exception
void testGetReturnsPoolThatContainsSevenPairsInOneSeries()
{
// Pool One actual
Mockito.when( this.observationRetriever.get() ).thenReturn( Stream.of( this.observations ) );
Mockito.when( this.observationRetriever.get() )
.thenReturn( Stream.of( this.observations ) );
Supplier<Stream<TimeSeries<Double>>> obsSupplier = CachingRetriever.of( this.observationRetriever );

Mockito.when( this.forecastRetriever.get() ).thenReturn( Stream.of( this.forecastOne ) );
Mockito.when( this.forecastRetriever.get() )
.thenReturn( Stream.of( this.forecastOne ) );

Supplier<Stream<TimeSeries<Double>>> forcSupplierOne = this.forecastRetriever;

Expand Down
4 changes: 3 additions & 1 deletion wres-datamodel/src/wres/datamodel/time/TimeSeriesSlicer.java
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,9 @@ public static <T> TimeSeries<T> filter( TimeSeries<T> input,
builder.setMetadata( metadata );

// Some reference times existed and none were within the filter bounds?
if ( !input.getReferenceTimes().isEmpty() && notConsideredOrWithinBounds.isEmpty() )
if ( !input.getReferenceTimes()
.isEmpty()
&& notConsideredOrWithinBounds.isEmpty() )
{
return builder.build();
}
Expand Down
124 changes: 98 additions & 26 deletions wres-io/src/wres/io/retrieving/database/TimeSeriesRetriever.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,18 @@
import wres.io.database.Database;
import wres.io.retrieving.Retriever;
import wres.io.retrieving.DataAccessException;
import wres.statistics.MessageFactory;
import wres.statistics.generated.TimeScale.TimeScaleFunction;
import wres.statistics.generated.ReferenceTime.ReferenceTimeType;
import wres.statistics.generated.TimeWindow;

/**
* Abstract base class for retrieving {@link TimeSeries} from a database.
* <p>Abstract base class for retrieving {@link TimeSeries} from a database.
*
* <p>When retrieving time-series events within a time window, the timescale period is subtracted from the lower bound
* of the lead duration and valid time dimensions to ensure that sufficient data is returned to produce upscaled values
* that fall within the time window, as the event valid time represents the time at which the scale period ends and
* events that end within the window are selected.
*
* @author James Brown
*/
Expand Down Expand Up @@ -516,6 +523,10 @@ void addTimeWindowClause( DataScripter script )
{
TimeWindowOuter filter = this.getTimeWindow();

// Subtract any non-instantaneous desired timescale period from the lower bound of the lead duration and
// valid time
filter = TimeSeriesRetriever.adjustTimeWindowForTimeScale( filter, this.desiredTimeScale );

// Forecasts?
if ( this.isForecast() )
{
Expand Down Expand Up @@ -1305,6 +1316,84 @@ private <S> void addEventToTimeSeries( Event<S> event, TimeSeries.Builder<S> bui
}
}

/**
* Subtracts any non-instantaneous desired timescale from the lower bound of the lead duration and valid time to
* ensure that sufficient data is retrieved for upscaling.
*
* @param timeWindow the time window to adjust
* @param timeScale the timescale to use
* @return the adjusted time window
*/

private static TimeWindowOuter adjustTimeWindowForTimeScale( TimeWindowOuter timeWindow,
TimeScaleOuter timeScale )
{
TimeWindow.Builder adjusted = timeWindow.getTimeWindow()
.toBuilder();

// Earliest lead duration
if ( !timeWindow.getEarliestLeadDuration()
.equals( TimeWindowOuter.DURATION_MIN ) )
{
Duration period = Duration.ZERO;

// Adjust the lower bound of the lead duration window by the non-instantaneous desired timescale
if ( Objects.nonNull( timeScale )
&& !timeScale.isInstantaneous() )
{
period = TimeScaleOuter.getOrInferPeriodFromTimeScale( timeScale );
}

Duration lowered = timeWindow.getEarliestLeadDuration()
.minus( period );

if ( Objects.nonNull( timeScale )
&& LOGGER.isDebugEnabled() )
{
LOGGER.debug( "Adjusting the lower lead duration of time window {} from {} to {} "
+ "in order to acquire data at the desired timescale of {}.",
timeWindow,
timeWindow.getEarliestLeadDuration(),
lowered,
timeScale );
}

adjusted.setEarliestLeadDuration( MessageFactory.getDuration( lowered ) );
}

// Earliest valid time
if ( !timeWindow.getEarliestValidTime()
.equals( Instant.MIN ) )
{
Duration period = Duration.ZERO;

// Adjust the lower bound of the lead duration window by the non-instantaneous desired timescale
if ( Objects.nonNull( timeScale )
&& !timeScale.isInstantaneous() )
{
period = TimeScaleOuter.getOrInferPeriodFromTimeScale( timeScale );
}

Instant lowered = timeWindow.getEarliestValidTime()
.minus( period );

if ( Objects.nonNull( timeScale )
&& LOGGER.isDebugEnabled() )
{
LOGGER.debug( "Adjusting the lower valid datetime of time window {} from {} to {} "
+ "in order to acquire data at the desired timescale of {}.",
timeWindow,
timeWindow.getEarliestValidTime(),
lowered,
timeScale );
}

adjusted.setEarliestValidTime( MessageFactory.getTimestamp( lowered ) );
}

return TimeWindowOuter.of( adjusted.build() );
}

/**
* Adds the lead duration bounds (if any) to the script. The interval is left-closed.
*
Expand All @@ -1327,28 +1416,8 @@ private void addLeadBoundsToScript( DataScripter script, TimeWindowOuter filter,
if ( !filter.getEarliestLeadDuration()
.equals( TimeWindowOuter.DURATION_MIN ) )
{
Duration period = Duration.ZERO;

// Adjust the lower bound of the lead duration window by the non-instantaneous desired timescale
if ( Objects.nonNull( this.desiredTimeScale ) && !this.desiredTimeScale.isInstantaneous() )
{
period = TimeScaleOuter.getOrInferPeriodFromTimeScale( this.desiredTimeScale );
}

Duration lowered = filter.getEarliestLeadDuration()
.minus( period );

if ( Objects.nonNull( this.desiredTimeScale ) && LOGGER.isDebugEnabled() )
{
LOGGER.debug( "Adjusting the lower lead duration of time window {} from {} to {} "
+ "in order to acquire data at the desired timescale of {}.",
filter,
filter.getEarliestLeadDuration(),
lowered,
this.desiredTimeScale );
}

lowerLead = lowered.toMinutes();
lowerLead = filter.getEarliestLeadDuration()
.toMinutes();
}
// Upper bound
if ( !filter.getLatestLeadDuration()
Expand Down Expand Up @@ -1413,7 +1482,8 @@ private void addValidTimeBoundsToScriptUsingReferenceTimeAndLeadDuration( DataSc
Objects.requireNonNull( filter );

// Lower and upper bounds are equal
if ( filter.getEarliestValidTime().equals( filter.getLatestValidTime() ) )
if ( filter.getEarliestValidTime()
.equals( filter.getLatestValidTime() ) )
{
OffsetDateTime validTime = OffsetDateTime.ofInstant( filter.getEarliestValidTime(),
ZoneId.of( "UTC" ) );
Expand All @@ -1432,7 +1502,8 @@ private void addValidTimeBoundsToScriptUsingReferenceTimeAndLeadDuration( DataSc
else
{

if ( !filter.getEarliestValidTime().equals( Instant.MIN ) )
if ( !filter.getEarliestValidTime()
.equals( Instant.MIN ) )
{
OffsetDateTime lowerValidTime = OffsetDateTime.ofInstant( filter.getEarliestValidTime(),
ZoneId.of( "UTC" ) );
Expand All @@ -1449,7 +1520,8 @@ private void addValidTimeBoundsToScriptUsingReferenceTimeAndLeadDuration( DataSc
}

// Upper bound
if ( !filter.getLatestValidTime().equals( Instant.MAX ) )
if ( !filter.getLatestValidTime()
.equals( Instant.MAX ) )
{
OffsetDateTime upperValidTime = OffsetDateTime.ofInstant( filter.getLatestValidTime(),
ZoneId.of( "UTC" ) );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -201,6 +202,67 @@ public void testRetrievalOfObservedTimeSeriesWithTenEvents()
assertEquals( expectedSeries, actualSeries );
}

@Test
public void testRetrievalOfObservedTimeSeriesReturnsTenEventsWhenValidTimeLowerBoundAdjustedByScalePeriod()
{
// GitHub issue #357

// Time window filter
TimeWindow timeWindowInner = MessageFactory.getTimeWindow( Instant.parse( "2023-04-01T02:00:00Z" ),
Instant.parse( "2023-04-01T10:00:00Z" ) );
TimeWindowOuter timeWindow = TimeWindowOuter.of( timeWindowInner );

TimeScaleOuter timeScale = TimeScaleOuter.of( Duration.ofHours( 2 ) );

// Build the retriever
Retriever<TimeSeries<Double>> observedRetriever =
new ObservationRetriever.Builder().setDatabase( this.wresDatabase )
.setFeaturesCache( this.caches.getFeaturesCache() )
.setMeasurementUnitsCache( this.caches.getMeasurementUnitsCache() )
.setProjectId( PROJECT_ID )
.setVariable( VARIABLE )
.setFeatures( Set.of( FEATURE ) )
.setDatasetOrientation( ORIENTATION )
.setTimeWindow( timeWindow )
.setDesiredTimeScale( timeScale )
.build();

// Get the time-series
Stream<TimeSeries<Double>> observedSeries = observedRetriever.get();

// Stream into a collection
List<TimeSeries<Double>> actualCollection = observedSeries.toList();

// There is only one time-series, so assert that
assertEquals( 1, actualCollection.size() );
TimeSeries<Double> actualSeries = actualCollection.get( 0 );

// Create the expected series
TimeSeriesMetadata expectedMetadata =
TimeSeriesMetadata.of( Collections.emptyMap(),
TimeScaleOuter.of(),
VARIABLE_NAME,
FEATURE,
UNIT );
TimeSeries.Builder<Double> builder = new TimeSeries.Builder<>();
TimeSeries<Double> expectedSeries =
builder.setMetadata( expectedMetadata )
.addEvent( Event.of( Instant.parse( "2023-04-01T01:00:00Z" ), 30.0 ) )
.addEvent( Event.of( Instant.parse( "2023-04-01T02:00:00Z" ), 37.0 ) )
.addEvent( Event.of( Instant.parse( FIRST_TIME ), 44.0 ) )
.addEvent( Event.of( Instant.parse( "2023-04-01T04:00:00Z" ), 51.0 ) )
.addEvent( Event.of( Instant.parse( "2023-04-01T05:00:00Z" ), 58.0 ) )
.addEvent( Event.of( Instant.parse( "2023-04-01T06:00:00Z" ), 65.0 ) )
.addEvent( Event.of( Instant.parse( "2023-04-01T07:00:00Z" ), 72.0 ) )
.addEvent( Event.of( Instant.parse( "2023-04-01T08:00:00Z" ), 79.0 ) )
.addEvent( Event.of( Instant.parse( SECOND_TIME ), 86.0 ) )
.addEvent( Event.of( Instant.parse( "2023-04-01T10:00:00Z" ), 93.0 ) )
.build();

// Actual series equals expected series
assertEquals( expectedSeries, actualSeries );
}

@Test
public void testRetrievalOfPoolShapedObservedTimeSeriesWithSevenEvents()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,63 @@ public void setup() throws SQLException, LiquibaseException
this.addTwoForecastTimeSeriesEachWithFiveEventsToTheDatabase();
}

@Test
public void testRetrievalOfForecastTimeSeriesReturnsFiveEventsWhenValidTimeLowerBoundAdjustedByScalePeriod()
{
// GitHub issue #357

// Time window filter
TimeWindow timeWindowInner = MessageFactory.getTimeWindow( Instant.parse( "2023-04-01T02:00:00Z" ),
Instant.parse( "2023-04-01T05:00:00Z" ) );
TimeWindowOuter timeWindow = TimeWindowOuter.of( timeWindowInner );

TimeScaleOuter timeScale = TimeScaleOuter.of( Duration.ofHours( 2 ) );

// Build the retriever
Retriever<TimeSeries<Double>> forecastRetriever =
new SingleValuedForecastRetriever.Builder().setDatabase( this.wresDatabase )
.setFeaturesCache( this.caches.getFeaturesCache() )
.setMeasurementUnitsCache( this.caches.getMeasurementUnitsCache() )
.setProjectId( PROJECT_ID )
.setVariable( VARIABLE )
.setFeatures( Set.of( FEATURE ) )
.setDatasetOrientation( DatasetOrientation.RIGHT )
.setTimeWindow( timeWindow )
.setDesiredTimeScale( timeScale )
.build();

// Get the time-series
Stream<TimeSeries<Double>> forecastSeries = forecastRetriever.get();

// Stream into a collection
List<TimeSeries<Double>> actualCollection = forecastSeries.toList();

// There are two time-series, so assert that
assertEquals( 1, actualCollection.size() );
TimeSeries<Double> actualSeriesOne = actualCollection.get( 0 );

// Create the first expected series
TimeSeriesMetadata expectedMetadata =
TimeSeriesMetadata.of( Map.of( ReferenceTimeType.T0,
T2023_04_01T00_00_00Z ),
TimeScaleOuter.of(),
VARIABLE_NAME,
FEATURE,
UNIT );
TimeSeries.Builder<Double> builderOne = new TimeSeries.Builder<>();
TimeSeries<Double> expectedSeriesOne =
builderOne.setMetadata( expectedMetadata )
.addEvent( Event.of( Instant.parse( "2023-04-01T01:00:00Z" ), 30.0 ) )
.addEvent( Event.of( Instant.parse( "2023-04-01T02:00:00Z" ), 37.0 ) )
.addEvent( Event.of( Instant.parse( "2023-04-01T03:00:00Z" ), 44.0 ) )
.addEvent( Event.of( Instant.parse( "2023-04-01T04:00:00Z" ), 51.0 ) )
.addEvent( Event.of( Instant.parse( "2023-04-01T05:00:00Z" ), 58.0 ) )
.build();

// Actual series equals expected series
assertEquals( expectedSeriesOne, actualSeriesOne );
}

@Test
public void testRetrievalOfTwoForecastTimeSeriesEachWithFiveEvents()
{
Expand Down

0 comments on commit e913a48

Please sign in to comment.