Commit a8f2c93d authored by Ulrich Kemloh's avatar Ulrich Kemloh

Agents can now be introduced in the simulation using RPC

parent d3501d69
......@@ -526,11 +526,18 @@ const AgentsSourcesManager& Simulation::GetAgentSrcManager()
void Simulation::ProcessAgentsQueue()
{
//incoming pedestrians
vector<Pedestrian*> peds;
AgentsQueue::GetandClear(peds);
for(auto&& ped: peds)
{
_building->AddPedestrian(ped);
}
//outgoing pedestrians
if (_hybridSimManager)
{
_hybridSimManager->ProcessOutgoingAgent();
}
}
......@@ -81,7 +81,7 @@
</agents_distribution>
<agents_sources><!-- frequency in persons/seconds -->
<source id="1" frequency="5" agents_max="15" group_id="0" caption="source 1" />
<sources id="1" frequency="5" agents_max="15" group_id="0" caption="source 1" />
<sources id="1" frequency="5" agents_max="15" group_id="0" caption="source 1" />
</agents_sources>
......
......@@ -11,12 +11,12 @@
<!-- trajectories file and format -->
<trajectories format="xml-plain" fps="8">
<file location="big_room_trajectories.xml" />
<sockett hostname="127.0.0.1" port="8989" />
<socket hostname="127.0.0.1" port="8989" />
</trajectories>
<!-- Parameters for the hybrid mode -->
<hybrid_simulation server="localhost" port="9999" />
<!-- where to store the logs -->
<!--<logfile>outputfiles/log.txt</logfile> -->
......@@ -71,14 +71,21 @@
<!--persons information and distribution -->
<agents operational_model_id="2">
<agents_distribution>
<group group_id="0" room_id="0" subroom_id="0" number="0" router_id="1" agent_parameter_id="1" x_min="14" x_max="16" y_min="14" y_max="16" />
<group group_id="1" room_id="0" subroom_id="0" number="10" router_id="1" agent_parameter_id="1" goal_id="4"/>
<group group_id="0" room_id="0" subroom_id="0" number="0" router_id="1" agent_parameter_id="1" x_min="14" x_max="16" y_min="14" y_max="16" />
<group group_id="1" room_id="1" subroom_id="0" number="0" router_id="1" agent_parameter_id="1" />
<group group_id="2" room_id="2" subroom_id="0" number="0" router_id="1" agent_parameter_id="1" />
<group group_id="3" room_id="3" subroom_id="0" number="0" router_id="1" agent_parameter_id="1" />
<group group_id="4" room_id="4" subroom_id="0" number="0" router_id="1" agent_parameter_id="1" />
<!-- real start group -->
<group group_id="5" room_id="1" subroom_id="0" number="5" router_id="1" agent_parameter_id="1" goal_id="3" />
<group group_id="6" room_id="2" subroom_id="0" number="5" router_id="1" agent_parameter_id="1" goal_id="4" />
</agents_distribution>
<agents_sources><!-- frequency in persons/seconds -->
<sources id="1" frequency="5" agents_max="15" group_id="0" caption="source 1" />
<sources id="1" frequency="5" agents_max="15" group_id="0" caption="source 1" />
<source id="1" frequency="1" agents_max="5" group_id="1" caption="source 1" />
<source id="2" frequency="1" agents_max="5" group_id="2" caption="source 2" />
<source id="3" frequency="1" agents_max="5" group_id="3" caption="source 3" />
<source id="4" frequency="1" agents_max="5" group_id="4" caption="source 4" />
</agents_sources>
</agents>
......@@ -143,6 +150,9 @@
<router router_id="1" description="global_shortest">
<parameters></parameters>
</router>
<router router_id="2" description="quickest">
<parameters></parameters>
</router>
</route_choice_models>
......
......@@ -62,10 +62,12 @@ int main(int argc, char **argv)
//Start the thread for managing incoming messages from MatSim
std::thread t2;
auto hybrid=args->GetHybridSimManager();
if(hybrid)
if(hybrid){
hybrid->AttachSourceManager(sim.GetAgentSrcManager());
t2 = std::thread(*hybrid);
//t1.detach();
//t2.detach();
}
//t1.detach();
//t2.detach();
//main thread for the simulation
Log->Write("INFO: \tStart runSimulation()");
......@@ -104,7 +106,7 @@ int main(int argc, char **argv)
Log->Write("Warnings : %d", Log->GetWarnings());
Log->Write("Errors : %d", Log->GetErrors());
if (NULL == dynamic_cast<STDIOHandler*>(Log))
if (nullptr == dynamic_cast<STDIOHandler*>(Log))
{
printf("\nExec Time [s] : %4.2f\n", execTime);
printf("Evac Time [s] : %d\n", evacTime);
......
......@@ -47,10 +47,29 @@ using namespace std;
bool HybridSimulationManager::_shutdown = false;
HybridSimulationManager::HybridSimulationManager(const std::string& server,
int port)
int port)
{
_serverName = server;
_port = port;
_serverName = server;
_port = port;
GOOGLE_PROTOBUF_VERIFY_VERSION;
grpc_init();
_rpcClient = std::shared_ptr<JPSclient>(new JPSclient( grpc::CreateChannel("localhost:9999",
grpc::InsecureCredentials(), ChannelArguments())));
// std::string server_address(_serverName + ":" + std::to_string(_port));
// JPSserver service(_agentSrcMng);
//
// ServerBuilder builder;
// builder.AddListeningPort(server_address,
// grpc::InsecureServerCredentials());
// builder.RegisterService(&service);
// _rpcServer= builder.BuildAndStart();
// Log->Write("INFO:\tJuPedSim Server listening on " + server_address);
}
HybridSimulationManager::~HybridSimulationManager()
......@@ -59,98 +78,97 @@ HybridSimulationManager::~HybridSimulationManager()
bool HybridSimulationManager::Init(Building* building)
{
// GOOGLE_PROTOBUF_VERIFY_VERSION;
_building = building;
return true;
}
void HybridSimulationManager::AttachSourceManager(const AgentsSourcesManager& src)
{
//create a source manager and copy
// GOOGLE_PROTOBUF_VERIFY_VERSION;
_building = building;
return true;
}
bool HybridSimulationManager::Run()
{
//perform some initialisation stuff
GOOGLE_PROTOBUF_VERIFY_VERSION;
grpc_init();
//create a socket and use it for the serveur and the client
std::thread t1(&HybridSimulationManager::RunClient, this);
std::thread t2(&HybridSimulationManager::RunServer, this);
t1.join();
t2.join();
//clean up everything
grpc_shutdown();
google::protobuf::ShutdownProtobufLibrary();
return true;
//perform some initialisation stuff
GOOGLE_PROTOBUF_VERIFY_VERSION;
grpc_init();
_rpcClient = std::shared_ptr<JPSclient>(new JPSclient( grpc::CreateChannel("localhost:9999",
grpc::InsecureCredentials(), ChannelArguments())));
//create a socket and use it for the serveur and the client
//std::thread t1(&HybridSimulationManager::RunClient, this);
std::thread t2(&HybridSimulationManager::RunServer, this);
//t1.join();
t2.join();
//clean up everything
grpc_shutdown();
google::protobuf::ShutdownProtobufLibrary();
return true;
}
void HybridSimulationManager::operator()()
{
Run();
Run();
}
bool HybridSimulationManager::RunClient()
{
//check the message queue and send
JPSclient client(
grpc::CreateChannel("localhost:9999",
grpc::InsecureCredentials(), ChannelArguments()));
//client.SetSourceManager();
//check the message queue and send
JPSclient client(
grpc::CreateChannel("localhost:9999",
grpc::InsecureCredentials(), ChannelArguments()));
do
{
do
{
//check if agents enter/left a link and fire event
client.ProcessAgentQueue(_building);
//check if agents enter/left a link and fire event
client.ProcessAgentQueue(_building);
//wait some time, before a new attempt
cout << "waiting for input:" << _shutdown << endl;
//ProcessOutgoingAgent();
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
} while (!_shutdown);
//wait some time, before a new attempt
cout << "waiting for input:" << _shutdown << endl;
//ProcessOutgoingAgent();
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
} while (!_shutdown);
return true;
return true;
}
bool HybridSimulationManager::RunServer()
{
//check the message queue and send
do
{
std::string server_address(_serverName + ":" + std::to_string(_port));
JPSserver service;
ServerBuilder builder;
builder.AddListeningPort(server_address,
grpc::InsecureServerCredentials());
builder.RegisterService(&service);
_rpcServer= builder.BuildAndStart();
Log->Write("INFO:\tJuPedSim Server listening on " + server_address);
//_rpcServer->Wait();
//wait for incoming connection
//receive and parse message
//process message
//can insert pedestrian ?
//accept pedestrian and feed to the queue
//wait some time, before a new attempt
cout << "waiting for output:" << _shutdown << endl;
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
} while (!_shutdown);
return true;
//check the message queue and send
//TODO: move this to the constructor
do
{
std::string server_address(_serverName + ":" + std::to_string(_port));
JPSserver service(_agentSrcMng);
ServerBuilder builder;
builder.AddListeningPort(server_address,
grpc::InsecureServerCredentials());
builder.RegisterService(&service);
_rpcServer= builder.BuildAndStart();
Log->Write("INFO:\tJuPedSim Server listening on " + server_address);
_rpcServer->Wait();
//wait for incoming connection
//receive and parse message
//process message
//can insert pedestrian ?
//accept pedestrian and feed to the queue
//wait some time, before a new attempt
cout << "waiting for output:" << _shutdown << endl;
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
} while (!_shutdown);
return true;
}
void HybridSimulationManager::Shutdown()
{
_shutdown = true;
if (_rpcServer) _rpcServer->Shutdown();
//cout<<"shutting down: "<<_shutdown<<endl;
_shutdown = true;
if (_rpcServer) _rpcServer->Shutdown();
//cout<<"shutting down: "<<_shutdown<<endl;
}
void HybridSimulationManager::ProcessIncomingAgent()
......@@ -160,29 +178,41 @@ void HybridSimulationManager::ProcessIncomingAgent()
void HybridSimulationManager::ProcessOutgoingAgent()
{
std::vector<Pedestrian*> peds;
AgentsQueueOut::GetandClear(peds);
for (auto && ped:peds)
{
//pick the agent from the queue
hybrid::Extern2MATSim msg;
msg.mutable_agent()->set_id(std::to_string(ped->GetID()));
msg.mutable_agent()->set_leavenode(std::to_string(ped->GetExitIndex()));
//write to the disk or send to hell
//std::fstream output("test.buf", ios::out | ios::trunc | ios::binary);
//if (!msg.SerializeToOstream(&output))
//{
// cerr << "Failed to write address book." << endl;
//}
cout<<"deleting:"<<endl;
_building->DeletePedestrian(ped);
}
_rpcClient->ProcessAgentQueue(_building);
/*
std::vector<Pedestrian*> peds;
AgentsQueueOut::GetandClear(peds);
for (auto && ped:peds)
{
//pick the agent from the queue
hybrid::Extern2MATSim msg;
msg.mutable_agent()->set_id(std::to_string(ped->GetID()));
msg.mutable_agent()->set_leavenode(std::to_string(ped->GetExitIndex()));
//write to the disk or send to hell
//std::fstream output("test.buf", ios::out | ios::trunc | ios::binary);
//if (!msg.SerializeToOstream(&output))
//{
// cerr << "Failed to write address book." << endl;
//}
cout<<"deleting:"<<endl;
_building->DeletePedestrian(ped);
}
*/
}
std::string HybridSimulationManager::ToString()
{
return "INFO:\tHybrid Simulation working on [" + _serverName + ":"
+ std::to_string(_port) + "]\n";
return "INFO:\tHybrid Simulation working on [" + _serverName + ":"
+ std::to_string(_port) + "]\n";
}
void HybridSimulationManager::AttachSourceManager(const AgentsSourcesManager& src)
{
//TODO create a deep copy of the manager
// and copy the sources without copying the generated pedestrian
_agentSrcMng=src;
}
......@@ -13,10 +13,11 @@
#include <memory>
#include <grpc++/server.h>
#include "../pedestrian/AgentsSourcesManager.h"
//forward classes
class Building;
class AgentsSourcesManager;
//class grpc::Server;
class JPSclient;
class HybridSimulationManager
{
......@@ -38,13 +39,14 @@ public:
void operator()();
/**
*
* Create another source manager
*/
void AttachSourceManager(const AgentsSourcesManager& src);
void ProcessOutgoingAgent();
private:
void ProcessIncomingAgent();
void ProcessOutgoingAgent();
private:
//std::atomic<bool> _shutdown=false;
......@@ -52,10 +54,11 @@ private:
int _port=-1;
std::string _serverName="localhost";
Building* _building=nullptr;
//grpc::Server* _rpcServer_;
//TODO: the method should be passed bz reference in the main function
//std::unique_ptr<grpc::Server> _rpcServer;
std::shared_ptr<grpc::Server> _rpcServer;
AgentsSourcesManager _agentSrcMng;
std::shared_ptr<JPSclient> _rpcClient;
};
#endif /* HYBRIDSIMULATIONMANAGER_H_ */
......@@ -12,6 +12,7 @@
#include "MATSimInterface.pb.h"
using namespace std;
JPSclient::JPSclient(std::shared_ptr<ChannelInterface> channel) :
_stub(ExternInterfaceService::NewStub(channel))
......@@ -32,7 +33,7 @@ void JPSclient::ProcessAgentQueue(Building* building)
for (auto && ped:peds)
{
if(HasSpaceOnJuPedSim(ped->GetExitIndex())==true)
if(HasSpaceOnJuPedSim(ped->GetFinalDestination())==true)
{
if(SendAgentToJuPedSim(ped)==false)
{
......@@ -45,7 +46,7 @@ void JPSclient::ProcessAgentQueue(Building* building)
}
if(HasSpaceOnMatsim(ped->GetExitIndex())==true)
if(HasSpaceOnMatsim(ped->GetFinalDestination())==true)
{
SendAgentToMatsim(ped);
}
......@@ -56,6 +57,7 @@ void JPSclient::ProcessAgentQueue(Building* building)
bool JPSclient::HasSpaceOnMatsim(int nodeID)
{
return false;
}
bool JPSclient::SendAgentToMatsim(Pedestrian* ped)
......@@ -64,6 +66,7 @@ bool JPSclient::SendAgentToMatsim(Pedestrian* ped)
//hybrid::Extern2MATSim msg;
//msg.mutable_agent()->set_id(std::to_string(ped->GetID()));
//msg.mutable_agent()->set_leavenode(std::to_string(ped->GetExitIndex()));
return false;
}
......@@ -89,12 +92,17 @@ bool JPSclient::SendAgentToJuPedSim(Pedestrian* ped)
ClientContext context;
MATSim2ExternPutAgent request;
MATSim2ExternPutAgentConfirmed reply;
string leave_node_id=std::to_string(ped->GetFinalDestination());
string enter_node_id="-1";
if(leave_node_id=="3") enter_node_id="1";
if(leave_node_id=="4") enter_node_id="2";
request.mutable_agent()->set_id(std::to_string(ped->GetID()));
request.mutable_agent()->set_enternode(std::to_string(ped->GetExitIndex()));
request.mutable_agent()->set_enternode(enter_node_id);
//only one final destination is supported
request.mutable_agent()->add_nodes("-1");
request.mutable_agent()->add_nodes("0");
request.mutable_agent()->add_nodes(leave_node_id);
//request.mutable_agent()->add_nodes("0");
Status status =_stub->reqMATSim2ExternPutAgent(&context, request, &reply);
return status.IsOk();
......
......@@ -7,6 +7,10 @@
#include "JPSserver.h"
#include "../IO/OutputHandler.h"
#include "../pedestrian/AgentsSourcesManager.h"
#include "../pedestrian/AgentsSource.h"
#include "../pedestrian/AgentsQueue.h"
#include "../pedestrian/Pedestrian.h"
#include <iostream>
......@@ -18,12 +22,13 @@ extern OutputHandler* Log;
using namespace std;
JPSserver::JPSserver()
JPSserver::JPSserver(AgentsSourcesManager& src): _agentSrcMng(src)
{
}
JPSserver::~JPSserver()
{
}
Status JPSserver::reqMATSim2ExternHasSpace(ServerContext* context,
......@@ -42,7 +47,33 @@ Status JPSserver::reqMATSim2ExternPutAgent(ServerContext* context,
MATSim2ExternPutAgentConfirmed* response)
{
//Log->Write("INFO:\tRPC::JPSserver I have space on node " + nodeID);
std::cout << "I am taking the agent" << std::endl;
string agent_id=request->agent().id();
string leave_node=request->agent().nodes(0);//just the first node
string enter_node=request->agent().enternode();
std::cout << "I am taking the agent: " << agent_id<< " going to "<<leave_node<<std::endl;
auto srcs=_agentSrcMng.GetSources();
cout<<"There are: "<<srcs.size()<<" options"<<endl;
for(auto&& src:srcs)
{
cout<<"size: "<<src->GetPoolSize()<<endl;
if(src->GetId()==std::stoi(enter_node))
{
std::vector<Pedestrian*> peds;
src->GenerateAgents(peds,1,_agentSrcMng.GetBuilding());
//there should be only one agent in this vector
for(auto&& ped:peds)
{
ped->SetID(std::stoi(agent_id));
ped->SetFinalDestination(std::stoi(leave_node));
//schedule the agent
src->AddToPool(ped);
}
_agentSrcMng.ProcessAllSources();
//AgentsQueue::Add(peds);
}
}
//take the nodeID and the destination ID
//find the corresponding source
......
......@@ -20,20 +20,27 @@ using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
//using helloworld::HelloRequest;
//using helloworld::HelloReply;
//using helloworld::Greeter;
using namespace hybrid;
//forward declarations
class AgentsSourcesManager;
class JPSserver final : public ExternInterfaceService::Service
{
public:
JPSserver();
/**
* constructor with an agent source manager, which will be
* responsible for positioning the agents.
*/
JPSserver(AgentsSourcesManager& src);
/**
* Destructor
*/
virtual ~JPSserver();
virtual Status reqMATSim2ExternHasSpace(ServerContext* context, const MATSim2ExternHasSpace* request, MATSim2ExternHasSpaceConfirmed* response);
virtual Status reqMATSim2ExternPutAgent(ServerContext* context, const MATSim2ExternPutAgent* request, MATSim2ExternPutAgentConfirmed* response);
virtual Status reqExternDoSimStep(ServerContext* context, const ExternDoSimStep* request, ExternDoSimStepReceived* response);
......@@ -46,7 +53,7 @@ public:
//void SetAgentsSourcesManager(const AgentsSourcesManager& src) const;
private:
AgentsSourcesManager& _agentSrcMng;
};
#endif /* MATSIM_JPSSERVER_H_ */
......@@ -40,6 +40,7 @@
using namespace std;
#define MAX_AGENT_COUNT 1000
LCGrid::LCGrid(double boundaries[4], double cellsize, int nPeds)
{
......@@ -48,8 +49,7 @@ LCGrid::LCGrid(double boundaries[4], double cellsize, int nPeds)
_gridYmin=boundaries[2];
_gridYmax=boundaries[3];
_cellSize=cellsize;
_nPeds=nPeds;
_nPeds = nPeds + MAX_AGENT_COUNT;
// add 1 to ensure that the whole area is covered by cells if not divisible without remainder
_gridSizeX = (int) ((_gridXmax - _gridXmin) / _cellSize) + 1 + 2; // 1 dummy cell on each side
......@@ -67,12 +67,12 @@ LCGrid::LCGrid(double boundaries[4], double cellsize, int nPeds)
}
// creating and resetting the pedestrians list
_list = new int[nPeds];
for(int i=0; i<nPeds; i++) _list[i]=0;
_list = new int[_nPeds];
for(int i=0; i<_nPeds; i++) _list[i]=0;
//allocating the place for the peds copy
_localPedsCopy=new Pedestrian*[nPeds];
for(int i=0; i<nPeds; i++) _localPedsCopy[i]=NULL;
_localPedsCopy=new Pedestrian*[_nPeds];
for(int i=0; i<_nPeds; i++) _localPedsCopy[i]=nullptr;
}
......@@ -90,8 +90,8 @@ LCGrid::~LCGrid()
void LCGrid::ShallowCopy(const vector<Pedestrian*>& peds)
{
for(unsigned int p=0; p<peds.size(); p++) {
for(unsigned int p=0; p<peds.size(); p++)
{
int id= peds[p]->GetID()-1;
_localPedsCopy[id]=peds[p];
}
......@@ -99,12 +99,11 @@ void LCGrid::ShallowCopy(const vector<Pedestrian*>& peds)
void LCGrid::Update(const vector<Pedestrian*>& peds)
{
int nSize=peds.size();
ClearGrid();
for (int p = 0; p < nSize; p++) {
Pedestrian* ped = peds[p];
for (auto& ped: peds)
{
//Pedestrian* ped = peds[p];
int id=ped->GetID()-1;
// determine the cell coordinates of pedestrian i
int ix = (int) ((ped->GetPos().GetX() - _gridXmin) / _cellSize) + 1; // +1 because of dummy cells
......@@ -146,7 +145,7 @@ void LCGrid::ClearGrid()
for(int i=0; i<_nPeds; i++) {
_list[i]=LIST_EMPTY;
_localPedsCopy[i]=NULL;
_localPedsCopy[i]=nullptr;
}
}
......
......@@ -12,13 +12,14 @@
#include <iostream>
AgentsSource::AgentsSource(int id,std::string caption,int max_agents,int group_id,int frequency)
AgentsSource::AgentsSource(int id, const std::string& caption,int max_agents,int group_id,int frequency)
{
_id=id;
_caption=caption;
_maxAgents=max_agents;
_groupID=group_id;
_frequency=frequency;
_agentsGenerated=0;
_agents.clear();
}
......@@ -26,12 +27,20 @@ AgentsSource::~AgentsSource()
{
}
void AgentsSource::GenerateByFrequency(std::vector<Pedestrian*>& ped)
void AgentsSource::GenerateAgentsAndAddToPool(int count, Building* building)
{
if((int)_agents.size()>=_frequency)
std::vector<Pedestrian*> peds;
GenerateAgents(peds, count, building);
_agents.insert(_agents.begin(),peds.begin(),peds.end());
_agentsGenerated+=count;
}
void AgentsSource::RemoveAgentsFromPool(std::vector<Pedestrian*>& ped, int count)
{
if((int)_agents.size()>=count)
{
ped.insert(ped.begin(),_agents.begin(),_agents.begin()+_frequency);
_agents.erase(_agents.begin(),_agents.begin()+_frequency);
ped.insert(ped.begin(),_agents.begin(),_agents.begin()+count);
_agents.erase(_agents.begin(),_agents.begin()+count);
}
else
{
......@@ -45,7 +54,7 @@ int AgentsSource::GetPoolSize() const
return _agents.size();
}