Commit 8b5075a4 authored by Ulrich Kemloh's avatar Ulrich Kemloh

Refactoring server and client code

parent ab359c6a
...@@ -57,14 +57,15 @@ int main(int argc, char **argv) ...@@ -57,14 +57,15 @@ int main(int argc, char **argv)
if(status&&sim.InitArgs(*args)) if(status&&sim.InitArgs(*args))
{ {
//Start the threads for managing the sources of agents //Start the threads for managing the sources of agents
std::thread t1(sim.GetAgentSrcManager(),21); std::thread t1(sim.GetAgentSrcManager());
//Start the thread for managing incoming messages from MatSim //Start the thread for managing incoming messages from MatSim
std::thread t2; std::thread t2;
auto hybrid=args->GetHybridSimManager(); auto hybrid=args->GetHybridSimManager();
if(hybrid) if(hybrid)
t2 = std::thread(*hybrid); t2 = std::thread(*hybrid);
//t1.detach();
//t2.detach();
//main thread for the simulation //main thread for the simulation
Log->Write("INFO: \tStart runSimulation()"); Log->Write("INFO: \tStart runSimulation()");
......
...@@ -80,7 +80,7 @@ bool GompertzModel::Init (Building* building) const ...@@ -80,7 +80,7 @@ bool GompertzModel::Init (Building* building) const
//a destination could not be found for that pedestrian //a destination could not be found for that pedestrian
if (ped->FindRoute() == -1) { if (ped->FindRoute() == -1) {
Log->Write( Log->Write(
"WARNING: \allPeds::Init() cannot initialise route. ped %d is deleted.\n",ped->GetID()); "ERROR:\tGompertzModel::Init() cannot initialise route. ped %d is deleted.\n",ped->GetID());
building->DeletePedestrian(ped); building->DeletePedestrian(ped);
continue; continue;
} }
......
...@@ -18,23 +18,33 @@ ...@@ -18,23 +18,33 @@
#include <functional> #include <functional>
//google stuff //google stuff
#include <grpc/grpc.h> //server stuff
#include <grpc/grpc.h> #include <grpc/grpc.h>
#include <grpc++/server.h> #include <grpc++/server.h>
#include <grpc++/server_builder.h> #include <grpc++/server_builder.h>
#include <grpc++/server_context.h> #include <grpc++/server_context.h>
#include <grpc++/server_credentials.h> #include <grpc++/server_credentials.h>
#include <grpc++/status.h> #include <grpc++/status.h>
//client stuff
#include <grpc++/channel_arguments.h>
#include <grpc++/channel_interface.h>
#include <grpc++/client_context.h>
#include <grpc++/create_channel.h>
#include <grpc++/credentials.h>
bool HybridSimulationManager::_shutdown = false;
using grpc::Server; using grpc::Server;
using grpc::ServerBuilder; using grpc::ServerBuilder;
using grpc::ServerContext; using grpc::ServerContext;
using grpc::Status; using grpc::Status;
using grpc::ChannelArguments;
using grpc::ChannelInterface;
using grpc::ClientContext;
using grpc::Status;
using namespace std; using namespace std;
bool HybridSimulationManager::_shutdown = false;
HybridSimulationManager::HybridSimulationManager(const std::string& server, HybridSimulationManager::HybridSimulationManager(const std::string& server,
int port) int port)
{ {
...@@ -80,12 +90,16 @@ bool HybridSimulationManager::RunClient() ...@@ -80,12 +90,16 @@ bool HybridSimulationManager::RunClient()
do do
{ {
//check the message queue and send //check the message queue and send
JPSclient client(
grpc::CreateChannel("localhost:9999",
grpc::InsecureCredentials(), ChannelArguments()));
//check if agents enter/left a link and fire event //check if agents enter/left a link and fire event
client.ProcessAgentQueue(_building);
//wait some time, before a new attempt //wait some time, before a new attempt
cout << "waiting for input:" << _shutdown << endl; cout << "waiting for input:" << _shutdown << endl;
ProcessOutgoingAgent(); //ProcessOutgoingAgent();
std::this_thread::sleep_for(std::chrono::milliseconds(1000)); std::this_thread::sleep_for(std::chrono::milliseconds(1000));
} while (!_shutdown); } while (!_shutdown);
......
...@@ -6,15 +6,97 @@ ...@@ -6,15 +6,97 @@
*/ */
#include "JPSclient.h" #include "JPSclient.h"
#include "../pedestrian/Pedestrian.h"
#include "../pedestrian/AgentsQueue.h"
#include "../geometry/Building.h"
JPSclient::JPSclient() #include "MATSimInterface.pb.h"
JPSclient::JPSclient(std::shared_ptr<ChannelInterface> channel) :
_stub(ExternInterfaceService::NewStub(channel))
{
//stub for testing and to be removed in the final version
_stub_matsim=MATSimInterfaceService::NewStub(channel);
}
JPSclient::~JPSclient()
{
}
void JPSclient::ProcessAgentQueue(Building* building)
{ {
// TODO Auto-generated constructor stub std::vector<Pedestrian*> peds;
AgentsQueueOut::GetandClear(peds);
for (auto && ped:peds)
{
if(HasSpaceOnJuPedSim(ped->GetExitIndex())==true)
{
if(SendAgentToJuPedSim(ped)==false)
{
Log->Write("ERROR:\t RPC:JPSclient request failed (send agent to jupedsim)");
}
}
else
{
Log->Write("ERROR:\t RPC:JPSclient request failed (space on jupedsim)");
}
if(HasSpaceOnMatsim(ped->GetExitIndex())==true)
{
SendAgentToMatsim(ped);
}
std::cout<<"deleting from the client:"<<std::endl;
building->DeletePedestrian(ped);
}
} }
JPSclient::~JPSclient() bool JPSclient::HasSpaceOnMatsim(int nodeID)
{ {
// TODO Auto-generated destructor stub
} }
bool JPSclient::SendAgentToMatsim(Pedestrian* ped)
{
//pick the agent from the queue
//hybrid::Extern2MATSim msg;
//msg.mutable_agent()->set_id(std::to_string(ped->GetID()));
//msg.mutable_agent()->set_leavenode(std::to_string(ped->GetExitIndex()));
}
bool JPSclient::HasSpaceOnJuPedSim(int nodeID)
{
MATSim2ExternHasSpace request;
request.set_nodeid(std::to_string(nodeID));
MATSim2ExternHasSpaceConfirmed reply;
ClientContext context;
Status status = _stub->reqMATSim2ExternHasSpace(&context, request, &reply);
return status.IsOk();
// if (status.IsOk()) {
// return true;
// } else {
// return "Rpc failed";
// }
}
bool JPSclient::SendAgentToJuPedSim(Pedestrian* ped)
{
ClientContext context;
MATSim2ExternPutAgent request;
MATSim2ExternPutAgentConfirmed reply;
request.mutable_agent()->set_id(std::to_string(ped->GetID()));
request.mutable_agent()->set_enternode(std::to_string(ped->GetExitIndex()));
//only one final destination is supported
request.mutable_agent()->add_nodes("-1");
request.mutable_agent()->add_nodes("0");
Status status =_stub->reqMATSim2ExternPutAgent(&context, request, &reply);
return status.IsOk();
}
...@@ -8,30 +8,83 @@ ...@@ -8,30 +8,83 @@
#ifndef MATSIM_JPSCLIENT_H_ #ifndef MATSIM_JPSCLIENT_H_
#define MATSIM_JPSCLIENT_H_ #define MATSIM_JPSCLIENT_H_
//#include <grpc++/status.h>
//client stuff
//#include <grpc++/channel_arguments.h>
//#include <grpc++/create_channel.h>
//#include <grpc++/credentials.h>
//using grpc::Server;
//using grpc::ServerBuilder;
//using grpc::ServerContext;
//using grpc::Status;
//using grpc::ChannelArguments;
//using grpc::ClientContext;
//using grpc::Status;
#include <grpc/grpc.h> #include <grpc/grpc.h>
#include <grpc++/server.h> #include <grpc++/server.h>
#include <grpc++/server_builder.h> #include <grpc++/server_builder.h>
#include <grpc++/server_context.h> #include <grpc++/server_context.h>
#include <grpc++/server_credentials.h> #include <grpc++/server_credentials.h>
#include <grpc++/channel_interface.h>
#include <grpc++/client_context.h>
#include <grpc++/status.h> #include <grpc++/status.h>
#include "MATSimInterface.grpc.pb.h" #include "MATSimInterface.grpc.pb.h"
using grpc::Server; using grpc::Server;
using grpc::ServerBuilder; using grpc::ServerBuilder;
using grpc::ServerContext; using grpc::ServerContext;
using grpc::Status; using grpc::Status;
//using helloworld::HelloRequest; using grpc::ChannelInterface;
//using helloworld::HelloReply; using grpc::ClientContext;
//using helloworld::Greeter; using hybrid::MATSim2ExternHasSpace;
using hybrid::MATSim2ExternHasSpaceConfirmed;
using namespace hybrid; using namespace hybrid;
class Building;
class Pedestrian;
class JPSclient class JPSclient
{ {
public: public:
JPSclient();
/**
* Constructor
* @param channel, the rpc channel for the communication
*/
JPSclient(std::shared_ptr<ChannelInterface> channel);
/**
* Destructor
*/
virtual ~JPSclient(); virtual ~JPSclient();
/**
* Process the finished agents queue
*/
void ProcessAgentQueue(Building* building);
/**
* Shutdown the channel
*/
void Shutdown()
{
_stub.reset();
}
private:
bool HasSpaceOnMatsim(int nodeID);
bool SendAgentToMatsim(Pedestrian* ped);
bool HasSpaceOnJuPedSim(int nodeID);
bool SendAgentToJuPedSim(Pedestrian* ped);
private:
std::unique_ptr<ExternInterfaceService::Stub> _stub;
std::unique_ptr<MATSimInterfaceService::Stub> _stub_matsim;
}; };
#endif /* MATSIM_JPSCLIENT_H_ */ #endif /* MATSIM_JPSCLIENT_H_ */
...@@ -6,42 +6,69 @@ ...@@ -6,42 +6,69 @@
*/ */
#include "JPSserver.h" #include "JPSserver.h"
#include "../IO/OutputHandler.h"
#include <iostream>
// external variables
extern OutputHandler* Log;
using namespace std;
JPSserver::JPSserver() JPSserver::JPSserver()
{ {
// TODO Auto-generated constructor stub
} }
JPSserver::~JPSserver() JPSserver::~JPSserver()
{ {
// TODO Auto-generated destructor stub
} }
Status JPSserver::reqMATSim2ExternHasSpace(ServerContext* context, Status JPSserver::reqMATSim2ExternHasSpace(ServerContext* context,
const MATSim2ExternHasSpace* request, const MATSim2ExternHasSpace* request,
MATSim2ExternHasSpaceConfirmed* response) MATSim2ExternHasSpaceConfirmed* response)
{ {
string nodeID=request->nodeid();
Log->Write("INFO:\tRPC::JPSserver I have space on node " + nodeID);
response->set_hasspace(true);
return Status::OK;
} }
Status JPSserver::reqMATSim2ExternPutAgent(ServerContext* context, Status JPSserver::reqMATSim2ExternPutAgent(ServerContext* context,
const MATSim2ExternPutAgent* request, const MATSim2ExternPutAgent* request,
MATSim2ExternPutAgentConfirmed* response) MATSim2ExternPutAgentConfirmed* response)
{ {
//Log->Write("INFO:\tRPC::JPSserver I have space on node " + nodeID);
std::cout << "I am taking the agent" << std::endl;
//take the nodeID and the destination ID
//find the corresponding source
//generate a new agent and add to the source
//call processAllSource on the AgentsoursceManager
return Status::OK;
} }
Status JPSserver::reqExternDoSimStep(ServerContext* context, Status JPSserver::reqExternDoSimStep(ServerContext* context,
const ExternDoSimStep* request, ExternDoSimStepReceived* response) const ExternDoSimStep* request, ExternDoSimStepReceived* response)
{ {
std::cout << "Performing simulation step" << std::endl;
return Status::OK;
} }
Status JPSserver::reqExternOnPrepareSim(ServerContext* context, Status JPSserver::reqExternOnPrepareSim(ServerContext* context,
const ExternOnPrepareSim* request, const ExternOnPrepareSim* request,
ExternOnPrepareSimConfirmed* response) ExternOnPrepareSimConfirmed* response)
{ {
std::cout << "I am preparing the simulation" << std::endl;
return Status::OK;
} }
Status JPSserver::reqExternAfterSim(ServerContext* context, Status JPSserver::reqExternAfterSim(ServerContext* context,
const ExternAfterSim* request, ExternAfterSimConfirmed* response) const ExternAfterSim* request, ExternAfterSimConfirmed* response)
{ {
std::cout << "Simulation step completed" << std::endl;
return Status::OK;
} }
...@@ -25,6 +25,10 @@ using grpc::Status; ...@@ -25,6 +25,10 @@ using grpc::Status;
//using helloworld::Greeter; //using helloworld::Greeter;
using namespace hybrid; using namespace hybrid;
class JPSserver final : public ExternInterfaceService::Service class JPSserver final : public ExternInterfaceService::Service
{ {
public: public:
......
...@@ -71,7 +71,6 @@ public: ...@@ -71,7 +71,6 @@ public:
const std::shared_ptr<StartDistribution> GetStartDistribution() const; const std::shared_ptr<StartDistribution> GetStartDistribution() const;
private: private:
int _id=-1; int _id=-1;
int _frequency=0; int _frequency=0;
......
...@@ -29,55 +29,62 @@ AgentsSourcesManager::~AgentsSourcesManager() ...@@ -29,55 +29,62 @@ AgentsSourcesManager::~AgentsSourcesManager()
{ {
} }
void AgentsSourcesManager::operator()(int value) void AgentsSourcesManager::operator()()
{ {
//cout<<"starting thread"<<endl; Log->Write("INFO:\tStarting agent manager thread");
//the loop is updated each second. //the loop is updated each second.
//it might be better to use a timer //it might be better to use a timer
_isCompleted=false; _isCompleted = false;
bool finished = false; bool finished = false;
long updateFrequency = 5; // 1 second long updateFrequency = 5; // 1 second
do do
{ {
int current_time = Pedestrian::GetGlobalTime(); int current_time = Pedestrian::GetGlobalTime();
if ((current_time != _lastUpdateTime) && ((current_time % updateFrequency) == 0)) if ((current_time != _lastUpdateTime)
&& ((current_time % updateFrequency) == 0))
{ {
//cout<<"TIME:"<<current_time<<endl; //cout<<"TIME:"<<current_time<<endl;
finished = true; finished=ProcessAllSources();
for (const auto& src : _sources)
{
if (src->GetPoolSize())
{
vector<Pedestrian*> peds;
src->GenerateByFrequency(peds);
ComputeBestPositionRandom(src.get(), peds);
// compute the optimal position for insertion
for (auto&& ped : peds)
{
//ComputeBestPositionVoronoi(src.get(), ped);
//ped->SetPos(Point(15,15),true);
//ped->Dump(ped->GetID());
}
AgentsQueue::Add(peds);
finished = false;
cout<<"Agents generated: "<<peds.size()<<endl;
}
//src->Dump();//exit(0);
}
_lastUpdateTime = current_time; _lastUpdateTime = current_time;
//cout<<"size: "<<_sources.size()<<endl; cout << "source size: " << _sources.size() << endl;
} }
//wait some time //wait some time
std::this_thread::sleep_for(std::chrono::milliseconds(10)); //cout<<"sleepinp..."<<endl;
//std::this_thread::sleep_for(std::chrono::milliseconds(1));
} while (!finished); } while (!finished);
Log->Write("INFO:\tTerminating agent manager thread");
_isCompleted = true;
}
_isCompleted=true; bool AgentsSourcesManager::ProcessAllSources()
{
bool empty=true;
for (const auto& src : _sources)
{
if (src->GetPoolSize())
{
vector<Pedestrian*> peds;
src->GenerateByFrequency(peds);
ComputeBestPositionRandom(src.get(), peds);
// compute the optimal position for insertion
for (auto&& ped : peds)
{
//ComputeBestPositionVoronoi(src.get(), ped);
//ped->SetPos(Point(15,15),true);
//ped->Dump(ped->GetID());
}
AgentsQueue::Add(peds);
empty = false;
cout << "Agents generated: " << peds.size() << endl;
}
//src->Dump();//exit(0);
}
return empty;
} }
void AgentsSourcesManager::ComputeBestPositionVoronoi(AgentsSource* src, Pedestrian* agent) void AgentsSourcesManager::ComputeBestPositionVoronoi(AgentsSource* src,
Pedestrian* agent)
{ {
auto dist = src->GetStartDistribution(); auto dist = src->GetStartDistribution();
double bounds[4]; double bounds[4];
...@@ -99,8 +106,7 @@ void AgentsSourcesManager::ComputeBestPositionVoronoi(AgentsSource* src, Pedestr ...@@ -99,8 +106,7 @@ void AgentsSourcesManager::ComputeBestPositionVoronoi(AgentsSource* src, Pedestr
iter = peds.erase(iter); iter = peds.erase(iter);
cout << "removing..." << endl; cout << "removing..." << endl;
exit(0); exit(0);
} } else
else
{ {
++iter; ++iter;
} }
...@@ -115,8 +121,8 @@ void AgentsSourcesManager::ComputeBestPositionVoronoi(AgentsSource* src, Pedestr ...@@ -115,8 +121,8 @@ void AgentsSourcesManager::ComputeBestPositionVoronoi(AgentsSource* src, Pedestr
} }
// compute the cells and cut with the bounds // compute the cells and cut with the bounds
const int count = peds.size(); const int count = peds.size();
float* xValues= new float[count]; float* xValues = new float[count];
float* yValues= new float[count]; float* yValues = new float[count];
//float xValues[count]; //float xValues[count];
//float yValues[count]; //float yValues[count];
...@@ -127,7 +133,8 @@ void AgentsSourcesManager::ComputeBestPositionVoronoi(AgentsSource* src, Pedestr ...@@ -127,7 +133,8 @@ void AgentsSourcesManager::ComputeBestPositionVoronoi(AgentsSource* src, Pedestr
} }
VoronoiDiagramGenerator vdg; VoronoiDiagramGenerator vdg;
vdg.generateVoronoi(xValues, yValues, count, bounds[0], bounds[1], bounds[2], bounds[3], 3); vdg.generateVoronoi(xValues, yValues, count, bounds[0], bounds[1],
bounds[2], bounds[3], 3);
vdg.resetIterator(); vdg.resetIterator();
vdg.resetVerticesIterator(); vdg.resetVerticesIterator();
...@@ -162,7 +169,8 @@ void AgentsSourcesManager::ComputeBestPositionVoronoi(AgentsSource* src, Pedestr ...@@ -162,7 +169,8 @@ void AgentsSourcesManager::ComputeBestPositionVoronoi(AgentsSource* src, Pedestr
//list the result //list the result
for (auto&& mp : map_dist_to_position) for (auto&& mp : map_dist_to_position)
{ {
cout << "dist: " << mp.first << " pos: " << mp.second.toString() << endl; cout << "dist: " << mp.first << " pos: " << mp.second.toString()
<< endl;
//agent->SetPos(mp.second, true); //agent->SetPos(mp.second, true);
} }
...@@ -174,12 +182,11 @@ void AgentsSourcesManager::ComputeBestPositionVoronoi(AgentsSource* src, Pedestr ...@@ -174,12 +182,11 @@ void AgentsSourcesManager::ComputeBestPositionVoronoi(AgentsSource* src, Pedestr
cout << "position:" << agent->GetPos().toString() << endl; cout << "position:" << agent->GetPos().toString() << endl;
//exit(0); //exit(0);
} } else
else
{ {
cout << "position not set:" << endl; cout << "position not set:" << endl;
cout << "size: " << map_dist_to_position.size() << endl; cout << "size: " << map_dist_to_position.size() << endl;