Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto-submit partial Task results right before Unit expiration #1264

Merged
merged 1 commit into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,7 @@ mephisto:
task_tags: "test,simple,form,form-composer,interactive-image-prompts"
force_rebuild: true
max_num_concurrent_units: 1
auto_submit_before_expiration_sec: 10
assignment_duration_in_seconds: 120
aux_parameters:
max_answer_loops: 3
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ function MainApp() {
remoteProcedure,
handleSubmit,
handleFatalError,
setTaskSubmitData,
} = useMephistoRemoteProcedureTask();

if (isLoading || !initialTaskData) {
Expand All @@ -40,6 +41,7 @@ function MainApp() {
onSubmit={handleSubmit}
onError={handleFatalError}
remoteProcedure={remoteProcedure}
setTaskSubmitData={setTaskSubmitData}
/>
</ErrorBoundary>
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ function FormComposerBaseFrontend({
onError,
finalResults = null,
remoteProcedure,
setTaskSubmitData,
}) {
const [loadingFormData, setLoadingFormData] = React.useState(false);
const [formData, setFormData] = React.useState(null);
Expand Down Expand Up @@ -95,6 +96,7 @@ function FormComposerBaseFrontend({
<FormComposer
data={formData}
onSubmit={onSubmit}
setTaskSubmitData={setTaskSubmitData}
finalResults={finalResults}
setRenderingErrors={setFormComposerRenderingErrors}
remoteProcedureCollection={remoteProcedure}
Expand Down
24 changes: 8 additions & 16 deletions examples/static_react_task/webapp/src/app.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
* LICENSE file in the root directory of this source tree.
*/

import { ErrorBoundary, useMephistoTask } from "mephisto-core";
import React from "react";
import ReactDOM from "react-dom";
import {
BaseFrontend,
OnboardingComponent,
Instructions,
LoadingScreen,
OnboardingComponent,
StaticReactTaskFrontend,
} from "./components/core_components.jsx";
import { useMephistoTask, ErrorBoundary } from "mephisto-core";

/* ================= Application Components ================= */

Expand All @@ -38,18 +39,7 @@ function MainApp() {
}

if (isPreview) {
return (
<div className="card bg-primary mb-4">
<div className="card-body pt-xl-5 pb-xl-5">
<h2 className="text-white">
This is an incredibly simple React task working with Mephisto!
</h2>
<h5 className="text-white">
Inside you'll be asked to rate a given sentence as good or bad.
</h5>
</div>
</div>
);
return <Instructions />;
}
if (isLoading || !initialTaskData) {
return <LoadingScreen />;
Expand All @@ -61,7 +51,9 @@ function MainApp() {
return (
<div>
<ErrorBoundary handleError={handleFatalError}>
<BaseFrontend
<Instructions />

<StaticReactTaskFrontend
taskData={initialTaskData}
onSubmit={handleSubmit}
isOnboarding={isOnboarding}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,21 @@ function OnboardingComponent({ onSubmit }) {
);
}

function Instructions() {
return (
<div className="card bg-primary mb-4">
<div className="card-body pt-xl-5 pb-xl-5">
<h2 className="text-white">
This is an incredibly simple React task working with Mephisto!
</h2>
<h5 className="text-white">
Inside you'll be asked to rate a given sentence as good or bad.
</h5>
</div>
</div>
);
}

function LoadingScreen() {
return <Directions>Loading...</Directions>;
}
Expand All @@ -50,7 +65,12 @@ function Directions({ children }) {
);
}

function SimpleFrontend({ taskData, isOnboarding, onSubmit, onError }) {
function StaticReactTaskFrontend({
taskData,
isOnboarding,
onSubmit,
onError,
}) {
const [resonseSubmitted, setResonseSubmitted] = useState(false);

return (
Expand Down Expand Up @@ -100,4 +120,9 @@ function SimpleFrontend({ taskData, isOnboarding, onSubmit, onError }) {
);
}

export { LoadingScreen, SimpleFrontend as BaseFrontend, OnboardingComponent };
export {
Instructions,
LoadingScreen,
OnboardingComponent,
StaticReactTaskFrontend,
};
31 changes: 25 additions & 6 deletions mephisto/abstractions/_subcomponents/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,9 @@ def _launch_and_run_onboarding(
).track_inprogress(), EXECUTION_DURATION_SECONDS.labels(thread_type="onboarding").time():
live_run = onboarding_agent.get_live_run()
onboarding_id = onboarding_agent.get_agent_id()

logger.debug(f"Launching onboarding for {onboarding_agent}")

try:
self.run_onboarding(onboarding_agent)
except (
Expand All @@ -185,13 +187,16 @@ def _launch_and_run_onboarding(
if onboarding_agent.get_status() not in AgentState.complete():
# Absent agents at this stage should be disconnected
onboarding_agent.update_status(AgentState.STATUS_DISCONNECT)

self.cleanup_onboarding(onboarding_agent)
except Exception as e:
logger.exception(
f"Unhandled exception in onboarding {onboarding_agent}",
exc_info=True,
)

self.cleanup_onboarding(onboarding_agent)

del self.running_onboardings[onboarding_id]

# Onboarding now complete
Expand All @@ -207,6 +212,7 @@ async def register_then_cleanup():
f"Onboarding agent {onboarding_id} disconnected or errored, "
f"final status {onboarding_agent.get_status()}."
)

live_run.loop_wrap.execute_coro(cleanup_after())

def execute_unit(
Expand All @@ -218,6 +224,7 @@ def execute_unit(
if unit.db_id in self.running_units:
logger.debug(f"{unit} is already running")
return

unit_thread = threading.Thread(
target=self._launch_and_run_unit,
args=(unit, agent),
Expand Down Expand Up @@ -249,8 +256,11 @@ def _cleanup_special_units(self, unit: "Unit", agent: "Agent") -> None:
if agent.get_status() != AgentState.STATUS_COMPLETED:
if unit.unit_index == SCREENING_UNIT_INDEX:
blueprint = self.task_run.get_blueprint(args=self.args)

assert isinstance(blueprint, ScreenTaskRequired)

blueprint.screening_units_launched -= 1

unit.expire()

def _launch_and_run_unit(
Expand All @@ -272,14 +282,17 @@ def _launch_and_run_unit(
) as e:
# A returned Unit can be worked on again by someone else.
logger.exception(f"Handled exception in unit {unit}")

if unit.get_status() != AssignmentState.EXPIRED:
unit_agent = unit.get_assigned_agent()
if unit_agent is not None and unit_agent.db_id == agent.db_id:
logger.debug(f"Clearing {agent} from {unit} due to {e}")
unit.clear_assigned_agent()

if agent.get_status() not in AgentState.complete():
# Absent agents at this stage should be disconnected
agent.update_status(AgentState.STATUS_DISCONNECT)

self.cleanup_unit(unit)
except Exception as e:
logger.exception(f"Unhandled exception in unit {unit}", exc_info=True)
Expand All @@ -291,6 +304,7 @@ def _launch_and_run_unit(
if not agent.await_submit(timeout=None):
# Wait for a submit to occur
agent.await_submit(timeout=self.args.task.submission_timeout)

agent.update_status(AgentState.STATUS_COMPLETED)
agent.mark_done()

Expand All @@ -316,6 +330,7 @@ def execute_assignment(
if assignment.db_id in self.running_assignments:
logger.debug(f"Assignment {assignment} is already running")
return

assign_thread = threading.Thread(
target=self._launch_and_run_assignment,
args=(assignment, agents),
Expand All @@ -329,6 +344,7 @@ def execute_assignment(
agents=agents,
thread=assign_thread,
)

assign_thread.start()
return

Expand Down Expand Up @@ -359,6 +375,7 @@ def _launch_and_run_assignment(
# Must expire the disconnected unit so that
# new workers aren't shown it
agent.get_unit().expire()

if agent.get_status() not in AgentState.complete():
agent.update_status(AgentState.STATUS_DISCONNECT)
self.cleanup_assignment(assignment)
Expand All @@ -375,6 +392,7 @@ def _launch_and_run_assignment(
if not agent.await_submit(timeout=None):
# Wait for a submit to occur
agent.await_submit(timeout=self.args.task.submission_timeout)

agent.update_status(AgentState.STATUS_COMPLETED)
agent.mark_done()

Expand All @@ -386,6 +404,7 @@ def _launch_and_run_assignment(
f"Unhandled exception in on_unit_submitted for {unit}",
exc_info=True,
)

del self.running_assignments[assignment.db_id]

# Clear reservations
Expand All @@ -398,16 +417,12 @@ def _launch_and_run_assignment(

@staticmethod
def get_data_for_assignment(assignment: "Assignment") -> "InitializationData":
"""
Finds the right data to get for the given assignment.
"""
"""Finds the right data to get for the given assignment."""
return assignment.get_assignment_data()

@abstractmethod
def get_init_data_for_agent(self, agent: "Agent"):
"""
Return the data that an agent will need for their task.
"""
"""Return the data that an agent will need for their task."""
raise NotImplementedError()

def filter_units_for_worker(self, units: List["Unit"], worker: "Worker"):
Expand All @@ -433,17 +448,21 @@ def shutdown(self):
# Shut down the agents
for running_unit in running_units:
running_unit.agent.shutdown()

for running_assignment in running_assignments:
for agent in running_assignment.agents:
agent.shutdown()

for running_onboarding in running_onboardings:
running_onboarding.onboarding_agent.shutdown()

# Join the threads
for running_unit in running_units:
running_unit.thread.join()

for running_assignment in running_assignments:
running_assignment.thread.join()

for running_onboarding in running_onboardings:
running_onboarding.thread.join()

Expand Down
Loading
Loading