Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple NodeShared per process #535

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions python/test/nodeDiscoveryPubs_TEST.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from threading import Thread
from multiprocessing import Process
import time


def create_pubs(id_):
'''creates publishers at isolated discovery network, isolated network is created based on id_'''
import os
isolated_env = {'GZ_DISCOVERY_MSG_PORT':f'{11320+id_*10}',
'GZ_DISCOVERY_SRV_PORT':f'{11321+id_*10}',
'GZ_DISCOVERY_MULTICAST_IP': f'239.255.0.{10+id_}',
'GZ_PARTITION':f'env{id_}',
}

print("Exec the following in cli to listen to topics at isolated env", id_)
[print(f'export {key}={isolated_env[key]}') for key in isolated_env]
os.environ.update(isolated_env)
import gz.transport14 as transport
from gz.msgs11.empty_pb2 import Empty
node = transport.Node()
pub = node.advertise(f'/empty{id_}', Empty)
import time
while True:
print(f"publishing at env {id_}")
pub.publish(Empty())
time.sleep(1.0)



def pub_proc(n_threads):
'''process for creating multiple pubs'''

pub_threads =[Thread(target=create_pubs,args=[i]) for i in range(n_threads)]
for pi in pub_threads:
pi.start()
time.sleep(2.0)
for pi in pub_threads:
pi.join()



proc_pub = Thread(target=pub_proc, args=[2])
proc_pub.start()
proc_pub.join()
64 changes: 64 additions & 0 deletions python/test/nodeDiscoverySubs_TEST.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from threading import Thread
from multiprocessing import Process
import time



def create_subs(id_):
'''creates subscribers at isolated discovery network, isolated network is created based on id_'''
import os
isolated_env = {'GZ_DISCOVERY_MSG_PORT':f'{11320+id_*10}',
'GZ_DISCOVERY_SRV_PORT':f'{11321+id_*10}',
'GZ_DISCOVERY_MULTICAST_IP': f'239.255.0.{10+id_}',
'GZ_PARTITION':f'env{id_}',
}

print("Exec the following in cli to listen to topics at isolated env", id_)
[print(f'export {key}={isolated_env[key]}') for key in isolated_env]
os.environ.update(isolated_env)
import gz.transport14 as transport
from gz.msgs11.empty_pb2 import Empty
node = transport.Node()
def cb(msg):
print(f"Recieved msg at env {id_}")

sub = node.subscribe(Empty, f'/empty{id_}', cb)

import time
while True:
time.sleep(1.0)




def sub_proc(n_threads):
'''process for creating multiple subs'''
# We reverse the order of env, this is to ensure
# that the mew environments do not keep the initial envf vars
# For example, consider the scenario where
# two publishers are created with ids 0,1
# If the env var change did affect them, then each one should be publishing
# with different discovery ports
# If it didn't affect, then all of them should be publishing at same discovery port
# In order to verify that these publishers are indeed publishing at different ports
# We can reverse the order of env id, when creating subscribers.
# This ensures that the env var change indeed affected
# See below:
# 1. pub1 and pub2 created under same discovery (order of env creation 0->1)
# sub1 and sub2 created under same discovery (order of env creation 0->1)
# Since they are under same discovery, both pubs and subs will receive their msgs
# But its not easy to verify if they were created under different discovery
# 2. pub1 and pub2 created under different discovery (order of env creation 0->1)
# sub1 and sub2 created under different discovery (order of env creation 1->0)
# If they are under same discovery, both subs will NOT receive their msgs

sub_threads =[Thread(target=create_subs,args=[i]) for i in reversed(range(n_threads))]
for si in sub_threads:
si.start()
time.sleep(2.0)
for si in sub_threads:
si.join()

proc_sub = Thread(target=sub_proc, args=[2])
proc_sub.start()
proc_sub.join()
18 changes: 13 additions & 5 deletions src/NodeShared.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,19 @@ NodeShared *NodeShared::Instance()
// is not shared between different processes.

static std::shared_mutex mutex;
static std::unordered_map<unsigned int, NodeShared*> nodeSharedMap;
static std::unordered_map<std::string, NodeShared*> nodeSharedMap;

// Get current process ID.
auto pid = getProcessId();

// Get current thread ID, so we can create multiple NodeShared per Process
AmalDevHaridevan marked this conversation as resolved.
Show resolved Hide resolved
// , per thread
auto tid = std::this_thread::get_id();
// Create a unique id using both pid and tid
std::stringstream ss;
ss << tid;
std::stringstream unique_ss;
unique_ss << std::hex << pid << "-" << ss.str();
auto uid = unique_ss.str();
// Check if there's already a NodeShared instance for this process.
// Use a shared_lock so multiple threads can read simultaneously.
// This will only block if there's another thread locking exclusively
Expand All @@ -173,7 +181,7 @@ NodeShared *NodeShared::Instance()
try
{
std::shared_lock readLock(mutex);
return nodeSharedMap.at(pid);
return nodeSharedMap.at(uid);
}
catch (...)
{
Expand All @@ -182,15 +190,15 @@ NodeShared *NodeShared::Instance()
// not an already constructed NodeShared instance for this process.
std::lock_guard writeLock(mutex);

auto iter = nodeSharedMap.find(pid);
auto iter = nodeSharedMap.find(uid);
if (iter != nodeSharedMap.end())
{
// There's already an instance for this process, return it.
return iter->second;
}

// No instance, construct a new one.
auto ret = nodeSharedMap.insert({pid, new NodeShared});
auto ret = nodeSharedMap.insert({uid, new NodeShared});
assert(ret.second); // Insert operation should be successful.
return ret.first->second;
}
Expand Down
Loading