Skip to content

Commit

Permalink
HSEARCH-5076 WIP batch rewrite
Browse files Browse the repository at this point in the history
  • Loading branch information
marko-bekhta committed Oct 25, 2024
1 parent 3037821 commit 2d20495
Show file tree
Hide file tree
Showing 25 changed files with 985 additions and 213 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright Red Hat Inc. and Hibernate Authors
*/
package org.hibernate.search.jakarta.batch.core.massindexing.loading.impl;

import java.util.List;

import jakarta.persistence.LockModeType;

import org.hibernate.FlushMode;
import org.hibernate.Session;
import org.hibernate.query.Query;
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchEntityLoader;
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchEntityLoadingOptions;
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchEntitySink;
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchLoadingTypeContext;

public class DefaultHibernateOrmBatchEntityLoader<E> implements HibernateOrmBatchEntityLoader {
private static final String ID_PARAMETER_NAME = "ids";

private final HibernateOrmBatchEntitySink<E> sink;
private final Query<E> query;

public DefaultHibernateOrmBatchEntityLoader(HibernateOrmBatchLoadingTypeContext<E> typeContext,
HibernateOrmBatchEntitySink<E> sink, HibernateOrmBatchEntityLoadingOptions options) {
this.sink = sink;

StringBuilder query = new StringBuilder();
query.append( "select e from " )
.append( typeContext.jpaEntityName() )
.append( " e where e." )
.append( typeContext.uniquePropertyName() )
.append( " in(:" )
.append( ID_PARAMETER_NAME )
.append( ")" );

this.query = options.context( Session.class ).createQuery( query.toString(), typeContext.javaClass() )
.setReadOnly( true )
.setCacheable( false )
.setLockMode( LockModeType.NONE )
.setCacheMode( options.cacheMode() )
.setHibernateFlushMode( FlushMode.MANUAL )
.setFetchSize( options.batchSize() );
}

@Override
public void close() {
}

@Override
public void load(List<Object> identifiers) {
sink.accept( query.setParameter( ID_PARAMETER_NAME, identifiers ).list() );
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright Red Hat Inc. and Hibernate Authors
*/
package org.hibernate.search.jakarta.batch.core.massindexing.loading.impl;

import java.util.HashSet;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.stream.Collectors;

import jakarta.persistence.LockModeType;

import org.hibernate.ScrollMode;
import org.hibernate.ScrollableResults;
import org.hibernate.StatelessSession;
import org.hibernate.query.Query;
import org.hibernate.search.jakarta.batch.core.massindexing.util.impl.IdOrder;
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchIdentifierLoader;
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchIdentifierLoadingOptions;
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchLoadingTypeContext;
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchReindexCondition;
import org.hibernate.search.util.common.AssertionFailure;
import org.hibernate.search.util.common.impl.Closer;

public class DefaultHibernateOrmBatchIdentifierLoader<E> implements HibernateOrmBatchIdentifierLoader {

private final StatelessSession session;
private final String ormEntityName;
private final String uniquePropertyName;
private final IdOrder idOrder;
private final HibernateOrmBatchIdentifierLoadingOptions options;
private final IdLoader idLoader;

public DefaultHibernateOrmBatchIdentifierLoader(HibernateOrmBatchLoadingTypeContext<E> typeContext,
HibernateOrmBatchIdentifierLoadingOptions options, IdOrder idOrder) {
this.session = options.context( StatelessSession.class );
this.ormEntityName = typeContext.jpaEntityName();
this.uniquePropertyName = typeContext.uniquePropertyName();
this.idOrder = idOrder;
this.options = options;
this.idLoader = options.maxResults().orElse( -1 ) == 1 ? new QuerySingleIdLoader() : new ScrollIdLoader();
}

@Override
public void close() {
try ( Closer<RuntimeException> closer = new Closer<>() ) {
if ( idLoader != null ) {
closer.push( IdLoader::close, idLoader );
}
}
}

@Override
public OptionalLong totalCount() {
StringBuilder query = new StringBuilder();
query.append( "select count(e) from " )
.append( ormEntityName )
.append( " e " );

return OptionalLong.of( createQuery( session, query,
options.reindexOnlyCondition().map( Set::of ).orElseGet( Set::of ), Long.class, Optional.empty() )
.uniqueResult() );
}

@Override
public Object next() {
return idLoader.next();
}

@Override
public boolean hasNext() {
return idLoader.hasNext();
}

private Query<Object> createQueryLoading(StatelessSession session) {
StringBuilder query = new StringBuilder();
query.append( "select e." )
.append( uniquePropertyName )
.append( " from " )
.append( ormEntityName )
.append( " e " );
Set<HibernateOrmBatchReindexCondition> conditions = new HashSet<>();
options.reindexOnlyCondition().ifPresent( conditions::add );
options.lowerBound().ifPresent( b -> conditions
.add( idOrder.idGreater( "HIBERNATE_SEARCH_ID_LOWER_BOUND_", b, options.lowerBoundInclusive() ) ) );
options.upperBound().ifPresent( b -> conditions
.add( idOrder.idLesser( "HIBERNATE_SEARCH_ID_UPPER_BOUND_", b, options.upperBoundInclusive() ) ) );

Query<Object> select = createQuery( session, query, conditions, Object.class, Optional.of( idOrder.ascOrder() ) )
.setFetchSize( options.fetchSize() )
.setReadOnly( true )
.setCacheable( false )
.setLockMode( LockModeType.NONE );
options.offset().ifPresent( select::setFirstResult );
options.maxResults().ifPresent( select::setMaxResults );
return select;
}

private <T> Query<T> createQuery(StatelessSession session,
StringBuilder hql, Set<HibernateOrmBatchReindexCondition> conditions, Class<T> returnedType,
Optional<String> order) {
if ( !conditions.isEmpty() ) {
hql.append( " where " );
hql.append( conditions.stream()
.map( c -> "( " + c.conditionString() + " )" )
.collect( Collectors.joining( " AND ", " ", " " ) )
);
}
order.ifPresent( o -> hql.append( " ORDER BY " ).append( o ) );
Query<T> query = session.createQuery( hql.toString(), returnedType )
.setCacheable( false );

for ( var condition : conditions ) {
for ( var entry : condition.params().entrySet() ) {
query.setParameter( entry.getKey(), entry.getValue() );
}
}

return query;
}

private interface IdLoader {
Object next();

boolean hasNext();

void close();
}

private class QuerySingleIdLoader implements IdLoader {

private boolean hasNextCalled = false;
private boolean nextCalled = false;

private Query<Object> id = createQueryLoading( session );
private Object currentId;

@Override
public Object next() {
if ( hasNextCalled ) {
nextCalled = true;
hasNextCalled = false;
return currentId;
}
else {
throw new AssertionFailure( "Cannot call next() before calling hasNext()" );
}
}

@Override
public boolean hasNext() {
if ( nextCalled ) {
// we expect to have just a single ID, so if we called next and got the id we don't need to execute the query anymore:
return false;
}
currentId = id.getSingleResultOrNull();
hasNextCalled = true;
return currentId != null;
}

@Override
public void close() {
id = null;
}
}

private class ScrollIdLoader implements IdLoader {
private ScrollableResults<Object> id = createQueryLoading( session ).scroll( ScrollMode.FORWARD_ONLY );

@Override
public Object next() {
return id.get();
}

@Override
public boolean hasNext() {
return id.next();
}

@Override
public void close() {
try ( Closer<RuntimeException> closer = new Closer<>() ) {
if ( id != null ) {
closer.push( ScrollableResults::close, id );
id = null;
}
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright Red Hat Inc. and Hibernate Authors
*/
package org.hibernate.search.jakarta.batch.core.massindexing.loading.impl;

import org.hibernate.metamodel.mapping.EmbeddableMappingType;
import org.hibernate.metamodel.mapping.EntityIdentifierMapping;
import org.hibernate.search.jakarta.batch.core.massindexing.util.impl.CompositeIdOrder;
import org.hibernate.search.jakarta.batch.core.massindexing.util.impl.IdOrder;
import org.hibernate.search.jakarta.batch.core.massindexing.util.impl.SingularIdOrder;
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchEntityLoader;
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchEntityLoadingOptions;
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchEntitySink;
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchIdentifierLoader;
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchIdentifierLoadingOptions;
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchLoadingStrategy;
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchLoadingTypeContext;
import org.hibernate.search.mapper.orm.loading.spi.HibernateOrmLoadingTypeContext;

public class DefaultHibernateOrmBatchLoadingStrategy<E, I> implements HibernateOrmBatchLoadingStrategy<E, I> {

private final IdOrder idOrder;

public DefaultHibernateOrmBatchLoadingStrategy(HibernateOrmLoadingTypeContext<E> type) {
EntityIdentifierMapping identifierMapping = type.entityMappingType().getIdentifierMapping();
if ( identifierMapping.getPartMappingType() instanceof EmbeddableMappingType ) {
idOrder = new CompositeIdOrder<>( type );
}
else {
idOrder = new SingularIdOrder<>( type );
}
}

@Override
public HibernateOrmBatchIdentifierLoader createIdentifierLoader(HibernateOrmBatchLoadingTypeContext<E> typeContext,
HibernateOrmBatchIdentifierLoadingOptions options) {
return new DefaultHibernateOrmBatchIdentifierLoader<>( typeContext, options, idOrder );
}

@Override
public HibernateOrmBatchEntityLoader createEntityLoader(HibernateOrmBatchLoadingTypeContext<E> typeContext,
HibernateOrmBatchEntitySink<E> sink, HibernateOrmBatchEntityLoadingOptions options) {
return new DefaultHibernateOrmBatchEntityLoader<>( typeContext, sink, options );
}
}
Loading

0 comments on commit 2d20495

Please sign in to comment.