Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support new event/jobrunner for beta #5678

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions EventBus.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
use MediaWiki\Extension\EventBus\Adapters\RCFeed\EventBusRCFeedEngine;
use MediaWiki\Extension\EventBus\Adapters\RCFeed\EventBusRCFeedFormatter;

require_once '/srv/mediawiki/config/JobQueueEventBusBeta.php';

$wgEnableEventBus = 'TYPE_ALL';

if ( $cwPrivate ) {
Expand Down Expand Up @@ -39,14 +41,14 @@
'class' => EventBusRCFeedEngine::class,
];

$beta = preg_match( '/^(.*)\.(mirabeta|nexttide)\.org$/', $wi->server );
$wgJobTypeConf['default'] = [
'class' => JobQueueEventBus::class,
'class' => preg_match( '/^(.*)\.(mirabeta|nexttide)\.org$/', $wi->server ) ?
JobQueueEventBusBeta::class :
JobQueueEventBus::class,
'readOnlyReason' => false
];

$wgEventBusEnableRunJobAPI =
wfHostname() === 'mwtask151' ||
wfHostname() === 'mwtask161' ||
wfHostname() === 'mwtask171' ||
wfHostname() === 'mwtask181' ||
wfHostname() === 'test151';
strpos( wfHostname(), 'mwtask' ) === 0 ||
strpos( wfHostname(), 'test' ) === 0;
53 changes: 30 additions & 23 deletions EventStreamConfig.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,106 +9,113 @@
'canary_events_enabled' => true,
];

$betaStream = preg_match( '/^(.*)\.(mirabeta|nexttide)\.org$/', $wi->server ) ? 'beta/' : '';

$wgEventStreams = [
'/^mediawiki\\.job\\..+/' => [
'schema_title' => 'mediawiki/job',
'schema_title' => "mediawiki/job",
'destination_event_service' => 'eventgate',
'canary_events_enabled' => false,
],
'/^mediawiki\\.beta\\.job\\..+/' => [
'schema_title' => "mediawiki/beta/job",
'destination_event_service' => 'eventgate',
'canary_events_enabled' => false,
],
'mediawiki.centralnotice.campaign-change' => [
'schema_title' => 'mediawiki/centralnotice/campaign/change',
'schema_title' => "mediawiki/{$betaStream}centralnotice/campaign/change",
'destination_event_service' => 'eventgate',
],
'mediawiki.centralnotice.campaign-create' => [
'schema_title' => 'mediawiki/centralnotice/campaign/create',
'schema_title' => "mediawiki/{$betaStream}centralnotice/campaign/create",
'destination_event_service' => 'eventgate',
],
'mediawiki.centralnotice.campaign-delete' => [
'schema_title' => 'mediawiki/centralnotice/campaign/delete',
'schema_title' => "mediawiki/{$betaStream}centralnotice/campaign/delete",
'destination_event_service' => 'eventgate',
],
'mediawiki.cirrussearch.page_rerender.v1' => [
'schema_title' => 'mediawiki/cirrussearch/page_rerender',
'schema_title' => "mediawiki/{$betaStream}cirrussearch/page_rerender",
'destination_event_service' => 'eventgate',
'message_key_fields' => [
'wiki_id' => 'wiki_id',
'page_id' => 'page_id',
],
],
'mediawiki.page-create' => [
'schema_title' => 'mediawiki/revision/create',
'schema_title' => "mediawiki/{$betaStream}revision/create",
'destination_event_service' => 'eventgate',
],
'mediawiki.page-delete' => [
'schema_title' => 'mediawiki/page/delete',
'schema_title' => "mediawiki/{$betaStream}page/delete",
'destination_event_service' => 'eventgate',
],
'mediawiki.page-links-change' => [
'schema_title' => 'mediawiki/page/links-change',
'schema_title' => "mediawiki/{$betaStream}page/links-change",
'destination_event_service' => 'eventgate',
],
'mediawiki.page-move' => [
'schema_title' => 'mediawiki/page/move',
'schema_title' => "mediawiki/{$betaStream}page/move",
'destination_event_service' => 'eventgate',
],
'mediawiki.page-properties-change' => [
'schema_title' => 'mediawiki/page/properties-change',
'schema_title' => "mediawiki/{$betaStream}page/properties-change",
'destination_event_service' => 'eventgate',
],
'mediawiki.page-restrictions-change' => [
'schema_title' => 'mediawiki/page/restrictions-change',
'schema_title' => "mediawiki/{$betaStream}page/restrictions-change",
'destination_event_service' => 'eventgate',
],
'mediawiki.page-suppress' => [
'schema_title' => 'mediawiki/page/delete',
'schema_title' => "mediawiki/{$betaStream}page/delete",
'destination_event_service' => 'eventgate',
],
'mediawiki.page-undelete' => [
'schema_title' => 'mediawiki/page/undelete',
'schema_title' => "mediawiki/{$betaStream}page/undelete",
'destination_event_service' => 'eventgate',
],
'mediawiki.recentchange' => [
'schema_title' => 'mediawiki/recentchange',
'schema_title' => "mediawiki/{$betaStream}recentchange",
'destination_event_service' => 'eventgate',
],
'mediawiki.revision-create' => [
'schema_title' => 'mediawiki/revision/create',
'schema_title' => "mediawiki/{$betaStream}revision/create",
'destination_event_service' => 'eventgate',
],
'mediawiki.revision-tags-change' => [
'schema_title' => 'mediawiki/revision/tags-change',
'schema_title' => "mediawiki/{$betaStream}revision/tags-change",
'destination_event_service' => 'eventgate',
],
'mediawiki.revision-visibility-change' => [
'schema_title' => 'mediawiki/revision/visibility-change',
'schema_title' => "mediawiki/{$betaStream}revision/visibility-change",
'destination_event_service' => 'eventgate',
],
'mediawiki.user-blocks-change' => [
'schema_title' => 'mediawiki/user/blocks-change',
'schema_title' => "mediawiki/{$betaStream}user/blocks-change",
'destination_event_service' => 'eventgate',
],
'resource_change' => [
'schema_title' => 'resource_change',
'schema_title' => "{$betaStream}resource_change",
'destination_event_service' => 'eventgate',
'canary_events_enabled' => false,
],
'resource-purge' => [
'schema_title' => 'resource_change',
'schema_title' => "{$betaStream}resource_change",
'destination_event_service' => 'eventgate',
'canary_events_enabled' => false,
],
'change-prop.transcludes.resource-change' => [
'schema_title' => 'resource_change',
'schema_title' => "{$betaStream}resource_change",
'destination_event_service' => 'eventgate',
'canary_events_enabled' => false,
],
// These are logging channels
'mediawiki.api-request' => [
'schema_title' => 'mediawiki/api/request',
'schema_title' => "mediawiki/{$betaStream}api/request",
'destination_event_service' => 'eventgate',
],
'mediawiki.cirrussearch-request' => [
'schema_title' => 'mediawiki/cirrussearch/request',
'schema_title' => "mediawiki/{$betaStream}cirrussearch/request",
'destination_event_service' => 'eventgate',
],
];
164 changes: 164 additions & 0 deletions JobQueueEventBusBeta.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
<?php

use MediaWiki\Extension\EventBus\EventBus;

/**
* Class is a copy of https://github.com/wikimedia/mediawiki-extensions-EventBus/blob/ea8ab686e80f56ac1e1c44296b8c823d4d769ee7/includes/Adapters/JobQueue/JobQueueEventBus.php
* with changes for beta.
*/
class JobQueueEventBusBeta extends JobQueue {
/**
* Get the allowed queue orders for configuration validation
*
* @return array Subset of (random, timestamp, fifo, undefined)
*/
protected function supportedOrders() {
return [ 'fifo' ];
}

/**
* Find out if delayed jobs are supported for configuration validation
*
* @return bool Whether delayed jobs are supported
*/
protected function supportsDelayedJobs() {
return true;
}

/**
* Get the default queue order to use if configuration does not specify one
*
* @return string One of (random, timestamp, fifo, undefined)
*/
protected function optimalOrder() {
return 'fifo';
}

/**
* @see JobQueue::isEmpty()
* @return bool
*/
protected function doIsEmpty() {
// not implemented
return false;
}

/**
* @see JobQueue::getSize()
* @return int
*/
protected function doGetSize() {
// not implemented
return 0;
}

/**
* @see JobQueue::getAcquiredCount()
* @return int
*/
protected function doGetAcquiredCount() {
// not implemented
return 0;
}

/**
* @see JobQueue::supportsTypeAgnostic()
* @return bool
*/
protected function supportsTypeAgnostic(): bool {
return true;
}

/**
* @param IJobSpecification[] $jobs
* @param int $flags
* @throws JobQueueError
* @see JobQueue::batchPush()
*/
protected function doBatchPush( array $jobs, $flags ) {
$streamEvents = [];
$streamBuses = [];
$count = 0;

foreach ( $jobs as $job ) {
$stream = 'mediawiki.beta.job.' . $job->getType();
if ( !isset( $streamBuses[$stream] ) ) {
$streamBuses[$stream] = EventBus::getInstanceForStream( $stream );
}
$item = $streamBuses[$stream]->getFactory()->createJobEvent(
$stream,
$this->getDomain(),
$job
);

if ( $item === null ) {
continue;
}

$count++;
// hash identifier => de-duplicate
if ( isset( $item['sha1'] ) ) {
$streamEvents[$stream][$item['sha1']] = $item;
} else {
$streamEvents[$stream][$item['meta']['id']] = $item;
}
}

if ( !$count ) {
// nothing to do
return;
}

foreach ( array_keys( $streamEvents ) as $stream ) {
$result = $streamBuses[$stream]->send(
array_values( $streamEvents[$stream] ),
EventBus::TYPE_JOB
);

// This means sending jobs to the $stream has failed.
if ( is_array( $result ) || is_string( $result ) ) {
// Details of backend failure are logged by EventBus::send().
// Details of which job failed is logged here.
EventBus::logger()->error( 'Could not enqueue jobs for stream {stream}', [
'stream' => $stream,
'exception' => new JobQueueError( "Could not enqueue jobs" ),
'invalidresponse' => (object)$result
] );
// Avoid fragmenting exception by job or stream name, since backend
// issues are generally unrelated to the job (T249745).
throw new JobQueueError( "Could not enqueue jobs" );
}
}
}

/**
* @see JobQueue::pop()
* @return Job|bool
*/
protected function doPop() {
// not implemented
return false;
}

/**
* @see JobQueue::ack()
*
* @param RunnableJob $job
*/
protected function doAck( RunnableJob $job ) {
// not implemented
}

/**
* Get an iterator to traverse over all available jobs in this queue.
* This does not include jobs that are currently acquired or delayed.
* Note: results may be stale if the queue is concurrently modified.
*
* @return Iterator
* @throws JobQueueError
*/
public function getAllQueuedJobs() {
// not implemented
return new ArrayIterator( [] );
}
}
Loading