Skip to content

Commit

Permalink
Fix & Refactor start_iginx.sh and Try fix job cancel (#7)
Browse files Browse the repository at this point in the history
* Fix & Refactor `start_iginx.sh`

* Make heap size computation compatible with Linux without `free` utility.
* Add heap opts into args of java to make it work.
* Unify script style and fix corner cases.

* Try fix job cancel

* set sleep time to infinity
* add more log
* let cancel fail fast

---------

Co-authored-by: Yuqing Zhu <[email protected]>
  • Loading branch information
YouJiacheng and zhuyuqing authored Feb 23, 2023
1 parent 189b80a commit daf6d21
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 43 deletions.
76 changes: 46 additions & 30 deletions core/src/assembly/resources/sbin/start_iginx.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,35 @@
# You can put your env variable here
# export JAVA_HOME=$JAVA_HOME

if [[ -z "${IGINX_HOME}" ]]; then
export IGINX_HOME="$(
cd "$(dirname "$0")"/..
pwd
)"
if [[ -z "$IGINX_HOME" ]]; then
export IGINX_HOME=$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")"/.. && pwd)
# ${BASH_SOURCE[0]} instead of $0: $0 will be "bash" if the script is sourced (source / .)
# Double quoted ${BASH_SOURCE[0]} and $(dirname #):
# # Avoid Word Splitting and Filename Expansion,
# # deal with path contains IFSs and/or *
# Double dash "--": The end of command options, deal with path starts with "-"
# && instead of ; or newline: exit code needs to be checked

# Even if IFS contains digit, $? is safe,
# since there is no Word Splitting in ((expression))
if (($? != 0)); then
echo "Cannot determine IGINX_HOME, exit..."
exit 1
fi
fi

MAIN_CLASS=cn.edu.tsinghua.iginx.Iginx

CLASSPATH=""
for f in ${IGINX_HOME}/lib/*.jar; do
CLASSPATH=${CLASSPATH}":"$f
for f in "$IGINX_HOME"/lib/*.jar; do
CLASSPATH=${CLASSPATH}:${f}
done
CLASSPATH=${CLASSPATH:1} # remove first ":"

if [ -n "$JAVA_HOME" ]; then
for java in "$JAVA_HOME"/bin/amd64/java "$JAVA_HOME"/bin/java; do
if [ -x "$java" ]; then
JAVA="$java"
JAVA=$java
break
fi
done
Expand All @@ -50,12 +61,13 @@ fi
calculate_heap_sizes() {
case "$(uname)" in
Linux)
system_memory_in_mb=$(free -m | sed -n '2p' | awk '{print $2}')
system_memory_in_kb=$(grep MemTotal /proc/meminfo | awk '{print $2}')
system_memory_in_mb=$((${system_memory_in_kb} / 1024))
system_cpu_cores=$(egrep -c 'processor([[:space:]]+):.*' /proc/cpuinfo)
;;
FreeBSD)
system_memory_in_bytes=$(sysctl hw.physmem | awk '{print $2}')
system_memory_in_mb=$(expr $system_memory_in_bytes / 1024 / 1024)
system_memory_in_mb=$((${system_memory_in_bytes} / 1024 / 1024))
system_cpu_cores=$(sysctl hw.ncpu | awk '{print $2}')
;;
SunOS)
Expand All @@ -64,47 +76,50 @@ calculate_heap_sizes() {
;;
Darwin)
system_memory_in_bytes=$(sysctl hw.memsize | awk '{print $2}')
system_memory_in_mb=$(expr $system_memory_in_bytes / 1024 / 1024)
system_memory_in_mb=$((${system_memory_in_bytes} / 1024 / 1024))
system_cpu_cores=$(sysctl hw.ncpu | awk '{print $2}')
;;
*)
# assume reasonable defaults for e.g. a modern desktop or
# cheap server
system_memory_in_mb="2048"
system_cpu_cores="2"
system_memory_in_mb=2048
system_cpu_cores=2
;;
esac

# some systems like the raspberry pi don't report cores, use at least 1
if [ "$system_cpu_cores" -lt "1" ]; then
system_cpu_cores="1"
if (( ${system_cpu_cores} < 1 )); then
system_cpu_cores=1
fi

# set max heap size based on the following
# max(min(1/2 ram, 1024MB), min(1/4 ram, 64GB))
# calculate 1/2 ram and cap to 1024MB
# calculate 1/4 ram and cap to 65536MB
# pick the max
half_system_memory_in_mb=$(expr $system_memory_in_mb / 2)
quarter_system_memory_in_mb=$(expr $half_system_memory_in_mb / 2)
if [ "$half_system_memory_in_mb" -gt "1024" ]; then
half_system_memory_in_mb="1024"
half_system_memory_in_mb=$((${system_memory_in_mb} / 2))
quarter_system_memory_in_mb=$((${half_system_memory_in_mb} / 2))
if (( ${half_system_memory_in_mb} > 1024 )); then
half_system_memory_in_mb=1024
fi
if [ "$quarter_system_memory_in_mb" -gt "65536" ]; then
quarter_system_memory_in_mb="65536"
if (( ${quarter_system_memory_in_mb} > 65536 )); then
quarter_system_memory_in_mb=65536
fi
if [ "$half_system_memory_in_mb" -gt "$quarter_system_memory_in_mb" ]; then
max_heap_size_in_mb="$half_system_memory_in_mb"
if (( ${half_system_memory_in_mb} > ${quarter_system_memory_in_mb} )); then
max_heap_size_in_mb=$half_system_memory_in_mb
else
max_heap_size_in_mb="$quarter_system_memory_in_mb"
max_heap_size_in_mb=$quarter_system_memory_in_mb
fi
MAX_HEAP_SIZE="${max_heap_size_in_mb}M"
MAX_HEAP_SIZE=${max_heap_size_in_mb}M
}

calculate_heap_sizes
JMX_OPTS=""
JMX_OPTS="$JMX_OPTS -Xms${MAX_HEAP_SIZE}"
JMX_OPTS="$JMX_OPTS -Xmx${MAX_HEAP_SIZE}"
# If we use string joined by <space> instead of array,
# we need #unset IFS# or #IFS=' '#
# to ensure Word Splitting works as expected,
# i.e. split $HEAP_OPTS into two arguments
HEAP_OPTS[0]=-Xmx$MAX_HEAP_SIZE
HEAP_OPTS[1]=-Xms$MAX_HEAP_SIZE

# continue to other parameters
ICONF="$IGINX_HOME/conf/config.properties"
Expand All @@ -113,6 +128,7 @@ IDRIVER="$IGINX_HOME/driver/"
export IGINX_CONF=$ICONF
export IGINX_DRIVER=$IDRIVER

exec "$JAVA" -Duser.timezone=GMT+8 -cp "$CLASSPATH" "$MAIN_CLASS" "$@"
exec "$JAVA" -Duser.timezone=GMT+8 ${HEAP_OPTS[@]} -cp "$CLASSPATH" "$MAIN_CLASS" "$@"

exit $?
# Double quoted to avoid Word Splitting when IFS contains digit
exit "$?"
4 changes: 2 additions & 2 deletions core/src/main/java/cn/edu/tsinghua/iginx/IginxWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -590,8 +590,8 @@ public ShowEligibleJobResp showEligibleJob(ShowEligibleJobReq req) {
@Override
public Status cancelTransformJob(CancelTransformJobReq req) {
TransformJobManager manager = TransformJobManager.getInstance();
manager.cancel(req.getJobId());
return RpcUtils.SUCCESS;
boolean success = manager.cancel(req.getJobId());
return success ? RpcUtils.SUCCESS : RpcUtils.FAILURE;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,14 @@ private void process(Job job) throws Exception {
logger.info(String.format("Job id=%s cost %s ms.", job.getJobId(), job.getEndTime() - job.getStartTime()));
}

public void cancel(long jobId) {
public boolean cancel(long jobId) {
Job job = jobMap.get(jobId);
if (job == null) {
return;
return false;
}
JobRunner runner = jobRunnerMap.get(jobId);
if (runner == null) {
return;
return false;
}
// Since job state is set to FINISHED/FAILING/FAILED before runner removed from jobRunnerMap,
// if runner == null, we can confirm that job state is not RUNNING or CREATED.
Expand All @@ -130,11 +130,11 @@ public void cancel(long jobId) {
case JOB_CREATED:
break; // continue execution
default:
return;
return false;
}
// atomic guard
if (!job.getActive().compareAndSet(true, false)) {
return;
return false;
}
// reorder as Normal run: [set-ING,] close, set-ED, remove[, set end time, log time cost].
job.setState(JobState.JOB_CLOSING);
Expand All @@ -143,6 +143,7 @@ public void cancel(long jobId) {
jobRunnerMap.remove(jobId);
job.setEndTime(System.currentTimeMillis());
logger.info(String.format("Job id=%s cost %s ms.", job.getJobId(), job.getEndTime() - job.getStartTime()));
return true;
}

public JobState queryJobState(long jobId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -582,10 +582,12 @@ public void cancelJobTest() {
logger.info("job {} state is {}", jobId, jobState.toString());

session.cancelTransformJob(jobId);
jobState = session.queryTransformJobStatus(jobId);
logger.info("After cancellation, job {} state is {}", jobId, jobState.toString());
assertEquals(JobState.JOB_CLOSED, jobState);

List<Long> finishedJobIds = session.showEligibleJob(JobState.JOB_CLOSED);

assertTrue(finishedJobIds.contains(jobId));
List<Long> closedJobIds = session.showEligibleJob(JobState.JOB_CLOSED);
assertTrue(closedJobIds.contains(jobId));
} catch (SessionException | ExecutionException | InterruptedException e) {
logger.error("Transform: execute fail. Caused by:", e);
fail();
Expand Down
5 changes: 2 additions & 3 deletions test/src/test/resources/transform/transformer_sleep.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import pandas as pd
import time


Expand All @@ -7,5 +6,5 @@ def __init__(self):
pass

def transform(self, rows):
time.sleep(5)
return rows
while True:
time.sleep(60)

0 comments on commit daf6d21

Please sign in to comment.