diff --git a/core/src/assembly/resources/sbin/start_iginx.sh b/core/src/assembly/resources/sbin/start_iginx.sh index 955e8efd57..df2ae41f7a 100755 --- a/core/src/assembly/resources/sbin/start_iginx.sh +++ b/core/src/assembly/resources/sbin/start_iginx.sh @@ -21,24 +21,35 @@ # You can put your env variable here # export JAVA_HOME=$JAVA_HOME -if [[ -z "${IGINX_HOME}" ]]; then - export IGINX_HOME="$( - cd "$(dirname "$0")"/.. - pwd - )" +if [[ -z "$IGINX_HOME" ]]; then + export IGINX_HOME=$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")"/.. && pwd) + # ${BASH_SOURCE[0]} instead of $0: $0 will be "bash" if the script is sourced (source / .) + # Double quoted ${BASH_SOURCE[0]} and $(dirname #): + # # Avoid Word Splitting and Filename Expansion, + # # deal with path contains IFSs and/or * + # Double dash "--": The end of command options, deal with path starts with "-" + # && instead of ; or newline: exit code needs to be checked + + # Even if IFS contains digit, $? is safe, + # since there is no Word Splitting in ((expression)) + if (($? != 0)); then + echo "Cannot determine IGINX_HOME, exit..." + exit 1 + fi fi MAIN_CLASS=cn.edu.tsinghua.iginx.Iginx CLASSPATH="" -for f in ${IGINX_HOME}/lib/*.jar; do - CLASSPATH=${CLASSPATH}":"$f +for f in "$IGINX_HOME"/lib/*.jar; do + CLASSPATH=${CLASSPATH}:${f} done +CLASSPATH=${CLASSPATH:1} # remove first ":" if [ -n "$JAVA_HOME" ]; then for java in "$JAVA_HOME"/bin/amd64/java "$JAVA_HOME"/bin/java; do if [ -x "$java" ]; then - JAVA="$java" + JAVA=$java break fi done @@ -50,12 +61,13 @@ fi calculate_heap_sizes() { case "$(uname)" in Linux) - system_memory_in_mb=$(free -m | sed -n '2p' | awk '{print $2}') + system_memory_in_kb=$(grep MemTotal /proc/meminfo | awk '{print $2}') + system_memory_in_mb=$((${system_memory_in_kb} / 1024)) system_cpu_cores=$(egrep -c 'processor([[:space:]]+):.*' /proc/cpuinfo) ;; FreeBSD) system_memory_in_bytes=$(sysctl hw.physmem | awk '{print $2}') - system_memory_in_mb=$(expr $system_memory_in_bytes / 1024 / 1024) + system_memory_in_mb=$((${system_memory_in_bytes} / 1024 / 1024)) system_cpu_cores=$(sysctl hw.ncpu | awk '{print $2}') ;; SunOS) @@ -64,20 +76,20 @@ calculate_heap_sizes() { ;; Darwin) system_memory_in_bytes=$(sysctl hw.memsize | awk '{print $2}') - system_memory_in_mb=$(expr $system_memory_in_bytes / 1024 / 1024) + system_memory_in_mb=$((${system_memory_in_bytes} / 1024 / 1024)) system_cpu_cores=$(sysctl hw.ncpu | awk '{print $2}') ;; *) # assume reasonable defaults for e.g. a modern desktop or # cheap server - system_memory_in_mb="2048" - system_cpu_cores="2" + system_memory_in_mb=2048 + system_cpu_cores=2 ;; esac # some systems like the raspberry pi don't report cores, use at least 1 - if [ "$system_cpu_cores" -lt "1" ]; then - system_cpu_cores="1" + if (( ${system_cpu_cores} < 1 )); then + system_cpu_cores=1 fi # set max heap size based on the following @@ -85,26 +97,29 @@ calculate_heap_sizes() { # calculate 1/2 ram and cap to 1024MB # calculate 1/4 ram and cap to 65536MB # pick the max - half_system_memory_in_mb=$(expr $system_memory_in_mb / 2) - quarter_system_memory_in_mb=$(expr $half_system_memory_in_mb / 2) - if [ "$half_system_memory_in_mb" -gt "1024" ]; then - half_system_memory_in_mb="1024" + half_system_memory_in_mb=$((${system_memory_in_mb} / 2)) + quarter_system_memory_in_mb=$((${half_system_memory_in_mb} / 2)) + if (( ${half_system_memory_in_mb} > 1024 )); then + half_system_memory_in_mb=1024 fi - if [ "$quarter_system_memory_in_mb" -gt "65536" ]; then - quarter_system_memory_in_mb="65536" + if (( ${quarter_system_memory_in_mb} > 65536 )); then + quarter_system_memory_in_mb=65536 fi - if [ "$half_system_memory_in_mb" -gt "$quarter_system_memory_in_mb" ]; then - max_heap_size_in_mb="$half_system_memory_in_mb" + if (( ${half_system_memory_in_mb} > ${quarter_system_memory_in_mb} )); then + max_heap_size_in_mb=$half_system_memory_in_mb else - max_heap_size_in_mb="$quarter_system_memory_in_mb" + max_heap_size_in_mb=$quarter_system_memory_in_mb fi - MAX_HEAP_SIZE="${max_heap_size_in_mb}M" + MAX_HEAP_SIZE=${max_heap_size_in_mb}M } calculate_heap_sizes -JMX_OPTS="" -JMX_OPTS="$JMX_OPTS -Xms${MAX_HEAP_SIZE}" -JMX_OPTS="$JMX_OPTS -Xmx${MAX_HEAP_SIZE}" +# If we use string joined by instead of array, +# we need #unset IFS# or #IFS=' '# +# to ensure Word Splitting works as expected, +# i.e. split $HEAP_OPTS into two arguments +HEAP_OPTS[0]=-Xmx$MAX_HEAP_SIZE +HEAP_OPTS[1]=-Xms$MAX_HEAP_SIZE # continue to other parameters ICONF="$IGINX_HOME/conf/config.properties" @@ -113,6 +128,7 @@ IDRIVER="$IGINX_HOME/driver/" export IGINX_CONF=$ICONF export IGINX_DRIVER=$IDRIVER -exec "$JAVA" -Duser.timezone=GMT+8 -cp "$CLASSPATH" "$MAIN_CLASS" "$@" +exec "$JAVA" -Duser.timezone=GMT+8 ${HEAP_OPTS[@]} -cp "$CLASSPATH" "$MAIN_CLASS" "$@" -exit $? +# Double quoted to avoid Word Splitting when IFS contains digit +exit "$?" diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/IginxWorker.java b/core/src/main/java/cn/edu/tsinghua/iginx/IginxWorker.java index f6f5a4fa54..81d6341186 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/IginxWorker.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/IginxWorker.java @@ -590,8 +590,8 @@ public ShowEligibleJobResp showEligibleJob(ShowEligibleJobReq req) { @Override public Status cancelTransformJob(CancelTransformJobReq req) { TransformJobManager manager = TransformJobManager.getInstance(); - manager.cancel(req.getJobId()); - return RpcUtils.SUCCESS; + boolean success = manager.cancel(req.getJobId()); + return success ? RpcUtils.SUCCESS : RpcUtils.FAILURE; } @Override diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/transform/exec/TransformJobManager.java b/core/src/main/java/cn/edu/tsinghua/iginx/transform/exec/TransformJobManager.java index e80b2c5920..afa9071cd1 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/transform/exec/TransformJobManager.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/transform/exec/TransformJobManager.java @@ -105,14 +105,14 @@ private void process(Job job) throws Exception { logger.info(String.format("Job id=%s cost %s ms.", job.getJobId(), job.getEndTime() - job.getStartTime())); } - public void cancel(long jobId) { + public boolean cancel(long jobId) { Job job = jobMap.get(jobId); if (job == null) { - return; + return false; } JobRunner runner = jobRunnerMap.get(jobId); if (runner == null) { - return; + return false; } // Since job state is set to FINISHED/FAILING/FAILED before runner removed from jobRunnerMap, // if runner == null, we can confirm that job state is not RUNNING or CREATED. @@ -130,11 +130,11 @@ public void cancel(long jobId) { case JOB_CREATED: break; // continue execution default: - return; + return false; } // atomic guard if (!job.getActive().compareAndSet(true, false)) { - return; + return false; } // reorder as Normal run: [set-ING,] close, set-ED, remove[, set end time, log time cost]. job.setState(JobState.JOB_CLOSING); @@ -143,6 +143,7 @@ public void cancel(long jobId) { jobRunnerMap.remove(jobId); job.setEndTime(System.currentTimeMillis()); logger.info(String.format("Job id=%s cost %s ms.", job.getJobId(), job.getEndTime() - job.getStartTime())); + return true; } public JobState queryJobState(long jobId) { diff --git a/test/src/test/java/cn/edu/tsinghua/iginx/integration/udf/TransformIT.java b/test/src/test/java/cn/edu/tsinghua/iginx/integration/udf/TransformIT.java index e37f462551..2f4e9d44e2 100644 --- a/test/src/test/java/cn/edu/tsinghua/iginx/integration/udf/TransformIT.java +++ b/test/src/test/java/cn/edu/tsinghua/iginx/integration/udf/TransformIT.java @@ -582,10 +582,12 @@ public void cancelJobTest() { logger.info("job {} state is {}", jobId, jobState.toString()); session.cancelTransformJob(jobId); + jobState = session.queryTransformJobStatus(jobId); + logger.info("After cancellation, job {} state is {}", jobId, jobState.toString()); + assertEquals(JobState.JOB_CLOSED, jobState); - List finishedJobIds = session.showEligibleJob(JobState.JOB_CLOSED); - - assertTrue(finishedJobIds.contains(jobId)); + List closedJobIds = session.showEligibleJob(JobState.JOB_CLOSED); + assertTrue(closedJobIds.contains(jobId)); } catch (SessionException | ExecutionException | InterruptedException e) { logger.error("Transform: execute fail. Caused by:", e); fail(); diff --git a/test/src/test/resources/transform/transformer_sleep.py b/test/src/test/resources/transform/transformer_sleep.py index 26d494192e..e8145a01c8 100644 --- a/test/src/test/resources/transform/transformer_sleep.py +++ b/test/src/test/resources/transform/transformer_sleep.py @@ -1,4 +1,3 @@ -import pandas as pd import time @@ -7,5 +6,5 @@ def __init__(self): pass def transform(self, rows): - time.sleep(5) - return rows + while True: + time.sleep(60)