Zookeeper stores the data in the path just like Standard File System, e.g., entry-{priority}-{dataset}:{groupid}-{sequence number}. This ways when can read some important data from the name itself. It will be useful when we want to read all important data from children because Zookeeper does not support all children data read, but get all children will only return the name of each child node. Therefore, we can save read work load from the client by putting the important information at the name of each node.
Using the sequence generated by Zookeeper is also good idea. This is because each node in Zookeeper has to be unique. Also, the sequence will be in the same order that it is created. Thus, no need to put any timestamp in the name if we want to know which node is created first.
This example bases on the code in zk_compute.py. In this code, we can sort the children node by priority of the job and the time the job is put in the system. Therefore, it is possible to create a priority work queue from the code here.
self.unowned_job = self.client.get_children('/unowned')
# Node name format is entry-prioirty-dataset:groupid-sequence
# -int(entry.split("-")[1]) changes the sort order to descending order
self.unowned_job = sorted(self.unowned_job, key=lambda entry: (-int(entry.split("-")[1]), entry.split("-")[-1]))
As show in the previous section, if we design the name of the node properly we can create priority Queue as we want. The ordinary queue can be created using just the sequence of the node name to specify each node created time.
In this project, compute node use this behavior to get all the job from the /unowned path and then choose the job which is related to its local cache and ordered by job priority and job created time.
This part of code come from run()
of Slave
class
# Get all unowned job from compute node
self.unowned_job = self.client.get_children('/unowned')
if len(self.unowned_job) == 0:
# no more task to do
continue
# sort the unowned job to make it like a prioirty queue
self.unowned_job = sorted(self.unowned_job, key=lambda entry: (-int(entry.split("-")[1]), entry.split("-")[-1]))
# This function return the job if it is in the local machine cache
job = self.get_job_from_list()
If there is the job that is should be calculated by this compute node, then the compute node get the job like this. KazooRetry
is suggested here because the command might fail when go to and come back from Zookeeper as well. We define another _inner_get so that it could be passed to Kazoo Retry with ForceRetryError() being raised. In this _inner_get, we also removed the job that we get from /unowned
path because we want to re-create it at the /owned
path.
def get_job(self, entry):
path = self.unowned_path + "/" + str(entry)
kr = KazooRetry(max_tries=3, ignore_expire=False)
try:
result = kr(self._inner_get, path)
except RetryFailedError:
return None
return result
def _inner_get(self, path):
try:
data, stat = self.client.get(path)
except NoNodeError:
# the first node has vanished in the meantime, try to
# get another one
raise ForceRetryError()
try:
self.client.delete(path)
except NoNodeError:
# we were able to get the data but someone else has removed
# the node in the meantime. consider the item as processed
# by the other process
raise ForceRetryError()
del self.unowned_job[:]
return data
def own_job(self, value, priority=100):
# move job to owned part
self._check_put_arguments(value, priority)
self._ensure_paths()
value_dict = json.loads(value)
value_dict["state"] = Jobstate.RUNNING
value_dict["worker"] = str(self.slave_id)
path = '{path}/{prefix}{priority:03d}-{dataset}:{groupid}-'.format(
path=self.owned_path, prefix=self.prefix, priority=priority,
dataset=value_dict.get("dataset"),
groupid=value_dict.get("groupid")
)
final_val = json.dumps(value_dict)
self.running_job_path.put(self.client.create(path, final_val, sequence=True))
The compute Node is also responsible for moving the job to the /done
path when the job is successfully caculated. The update_state function which will create a new job in the path will be sent as a call back to the running compute node
def update_state(self, state):
# update state in owned job
current_job_path = self.running_job_path.get()
job = self.get_job_from_path(current_job_path)
if not job:
logging.debug("Fail to update State, No Job in {path}".format(path=self.running_job_path))
return
priority = int(current_job_path.split("-")[1])
job_object = json.loads(job)
job_object["state"] = state
job_updated = json.dumps(job_object)
if state == Jobstate.SUCCESSFUL:
finish_path = '{path}/{prefix}{priority:03d}-{dataset}:{groupid}-'.format(
path=self.done_path, prefix=self.prefix, priority=priority,
dataset=job_object.get("dataset"),
groupid=job_object.get("groupid")
)
self.client.create(finish_path, job_updated, sequence=True)
# save in Mongo
job_mongo_id = self.results.insert_one(job_object).inserted_id
logging.debug("finish saving job_id:{job_mongo_id} in mongo".format(job_mongo_id=job_mongo_id))
try:
self.client.delete(current_job_path)
except NoNodeError:
raise ForceRetryError()
else:
self.client.retry(self.client.set, current_job_path, job_updated)
# if state == Jobstate.SUCCESSFUL:
# self.clear_running_path()
Dispatcher is responsible to put the job in the Zookeeper, and read the job status from the Zookeeper. There are two main function that Dispatcher has to deal with Zookeeper. The first one is to check and decide if it should put the job into Zookeeper or not. The other job is a watch job. Dispatcher has to watch for the job which has been successfully done. So the function will be simpler than the one used in compute node. Two main function used here are client.create()
and client.get()