Commit ae90dfe1 authored by Ulrich Kemloh's avatar Ulrich Kemloh

refactoring

parent dc3f5e9e
......@@ -14,7 +14,7 @@
<socket hostname="127.0.0.1" port="8989" />
</trajectories>
<!-- Parameters for the hybrid mode -->
<!-- Parameters for the hybrid (matsim) server mode -->
<hybrid_simulation server="localhost" port="9999" />
<!-- where to store the logs -->
......
......@@ -50,8 +50,13 @@ bool HybridSimulationManager::_shutdown = false;
HybridSimulationManager::HybridSimulationManager(const std::string& server,
int port)
{
_serverName = server;
_port = port;
_externalServerName = server;
_externalServerPort = port;
//get the canonical hostname
char hostname[1024];
gethostname(hostname, 1024);
_internalServerName=std::string(hostname);
//GOOGLE_PROTOBUF_VERIFY_VERSION;
//grpc_init();
......@@ -79,8 +84,6 @@ HybridSimulationManager::~HybridSimulationManager()
bool HybridSimulationManager::Init(Building* building)
{
// GOOGLE_PROTOBUF_VERIFY_VERSION;
_building = building;
return true;
}
......@@ -93,22 +96,21 @@ bool HybridSimulationManager::Run(Simulation& sim)
grpc_init();
//string extern_service_address("zam597:9999");
string extern_service_address("localhost:9999");
string extern_service_address(_externalServerName + ":" + std::to_string(_externalServerPort));
string jupedsim_service_address("localhost:9999")/*_serverName + ":" + std::to_string(_port)*/;
string jupedsim_service_address(_internalServerName + ":" + std::to_string(_internalServerPort));
//string jupedsim_service_address("0.0.0.0:9999")/*_serverName + ":" + std::to_string(_port)*/;
//create the client that will be running on its own thread
//_rpcClient = std::unique_ptr<JPSclient>(new JPSclient( grpc::CreateChannel(extern_service_address,
// grpc::InsecureCredentials(), ChannelArguments())));
//_rpcClient->NotifyExternalService();
_rpcClient = std::shared_ptr<JPSclient>(new JPSclient( grpc::CreateChannel(extern_service_address,
grpc::InsecureCredentials(), ChannelArguments())));
//create the server
std::string server_address(_serverName + ":" + std::to_string(_port));
std::string server_address(_externalServerName + ":" + std::to_string(_externalServerPort));
JPSserver jupedsimService(sim,extern_service_address);
jupedsimService.SetDuplexClient(_rpcClient);
//MATSIMserver jupedsimService;
ServerBuilder builder;
......@@ -117,33 +119,32 @@ bool HybridSimulationManager::Run(Simulation& sim)
builder.RegisterService(&jupedsimService);
_rpcServer= builder.BuildAndStart();
//builder.
Log->Write("INFO:\tJuPedSim Server is up and running on " + jupedsim_service_address);
//_rpcServer->Wait();
Log->Write("INFO:\tJuPedSim is up and running on " + jupedsim_service_address);
Log->Write("INFO:\tNotifying Matsim at " + extern_service_address);
//_rpcClient->NotifyExternalService();
//notify the external service
if(jupedsimService.NotifyExternalService())
if(false==_rpcClient->NotifyExternalService(_internalServerName,_internalServerPort))
{
cout<<"Failure"<<endl;
//exit(0);
}
//else
{
cout<<"Success"<<endl;
Log->Write("ERROR:\tNotification failed");
}
// Pedestrian ped;
// ped.SetID(14);
// ped.SetFinalDestination(4);
// _rpcClient->SendAgentToMatsim(&ped);
// _rpcClient->NotifyExternalService();
//starting the simulation thread and waiting
//std::thread t2(&JPSserver::RunSimulation, &jupedsimService);
std::thread t2(&JPSserver::RunSimulation, &jupedsimService);
_rpcServer->Wait();
//TestWorkflow();
//t2.join();
//_rpcServer->Wait();
t2.join();
//create a socket and use it for the serveur and the client
//std::thread t1(&HybridSimulationManager::RunClient, this);
//std::thread t2(&HybridSimulationManager::RunServer, this);
......@@ -246,7 +247,7 @@ bool HybridSimulationManager::RunServer()
// std::this_thread::sleep_for(std::chrono::milliseconds(1000));
// } while (!_shutdown);
//
return true;
return true;
}
void HybridSimulationManager::Shutdown()
......@@ -264,7 +265,7 @@ void HybridSimulationManager::ProcessIncomingAgent()
void HybridSimulationManager::TestWorkflow()
{
// notify the external service that I am awake
_rpcClient->NotifyExternalService();
//_rpcClient->NotifyExternalService();
}
......@@ -276,8 +277,8 @@ void HybridSimulationManager::ProcessOutgoingAgent()
std::string HybridSimulationManager::ToString()
{
return "INFO:\tHybrid Simulation working on [" + _serverName + ":"
+ std::to_string(_port) + "]\n";
return "INFO:\tHybrid Simulation working on [" + _externalServerName + ":"
+ std::to_string(_externalServerPort) + "]\n";
}
void HybridSimulationManager::AttachSourceManager(const AgentsSourcesManager& src)
......
......@@ -54,14 +54,16 @@ private:
private:
//std::atomic<bool> _shutdown=false;
static bool _shutdown;
int _port=-1;
std::string _serverName="localhost";
int _externalServerPort=9999;
int _internalServerPort=9998;
std::string _externalServerName="localhost";
std::string _internalServerName="0.0.0.0";
Building* _building=nullptr;
//TODO: the method should be passed bz reference in the main function
//std::unique_ptr<grpc::Server> _rpcServer;
std::shared_ptr<grpc::Server> _rpcServer;
std::unique_ptr<grpc::Server> _rpcServer;
AgentsSourcesManager _agentSrcMng;
std::unique_ptr<JPSclient> _rpcClient;
std::shared_ptr<JPSclient> _rpcClient;
};
#endif /* HYBRIDSIMULATIONMANAGER_H_ */
......@@ -14,12 +14,16 @@
using namespace std;
JPSclient::JPSclient(std::shared_ptr<ChannelInterface> channel) :
_jupedsimChannel(ExternInterfaceService::NewStub(channel))
{
//stub for testing and to be removed in the final version
_matsimChannel=MATSimInterfaceService::NewStub(channel);
}
JPSclient::JPSclient(std::shared_ptr<ChannelInterface> channel)
//:_jupedsimChannel(ExternInterfaceService::NewStub(channel))
{
//communication channel to matsim
_matsimChannel = MATSimInterfaceService::NewStub(channel);
//communication channel to JuPedsim
// stub for testing and to be removed in the final version
_jupedsimChannel= ExternInterfaceService::NewStub(channel);
}
JPSclient::~JPSclient()
{
......@@ -33,6 +37,7 @@ void JPSclient::ProcessAgentQueue(Building* building)
for (auto && ped:peds)
{
/* for testing only, the agent is send back to jupedsim
if(HasSpaceOnJuPedSim(ped->GetFinalDestination())==true)
{
if(SendAgentToJuPedSim(ped)==false)
......@@ -44,7 +49,7 @@ void JPSclient::ProcessAgentQueue(Building* building)
{
Log->Write("ERROR:\t RPC:JPSclient request failed (space on jupedsim)");
}
*/
if(HasSpaceOnMatsim(ped->GetFinalDestination())==true)
{
......@@ -60,9 +65,10 @@ bool JPSclient::HasSpaceOnMatsim(int nodeID)
//Status status =_matsimChannel->reqExtern2MATSim(&context, request, &reply);
//if(status.IsOk())
//{
//
// }
return true;
//
//}
return (nodeID>0);
}
bool JPSclient::SendAgentToMatsim(Pedestrian* ped)
......@@ -78,6 +84,7 @@ bool JPSclient::SendAgentToMatsim(Pedestrian* ped)
request.mutable_agent()->set_leavenode(leave_node);
Status status =_matsimChannel->reqExtern2MATSim(&context, request, &reply);
std::cout<<"Details: "<<status.details()<<endl;
return status.IsOk();
}
......@@ -90,18 +97,28 @@ bool JPSclient::HasSpaceOnJuPedSim(int nodeID)
ClientContext context;
Status status = _jupedsimChannel->reqMATSim2ExternHasSpace(&context, request, &reply);
return status.IsOk();
if(status.IsOk())
{
return reply.hasspace();
}
else
{
Log->Write("ERROR:\t Rpc call failed");
}
return false;
}
bool JPSclient::NotifyExternalService()
bool JPSclient::NotifyExternalService(const std::string& host, int port)
{
ClientContext context;
ExternalConnect request;
ExternalConnectConfirmed reply;
request.set_accepted(true);
//reply.
request.set_host(host);
request.set_port(port);
Status status =_matsimChannel->reqExternalConnect(&context, request, &reply);
std::cout<<"Details: "<<status.details()<<endl;
//std::cout<<"Details: "<<status.details()<<endl;
return status.IsOk();
}
......@@ -126,3 +143,15 @@ bool JPSclient::SendAgentToJuPedSim(Pedestrian* ped)
return status.IsOk();
}
bool JPSclient::NotifyEndOfSimulation()
{
ClientContext context;
ExternSimStepFinished request;
ExternSimStepFinishedReceived reply;
Status status =_matsimChannel->reqExternSimStepFinished(&context, request, &reply);
Log->Write("INFO:\t Simulation step finished");
//std::cout<<"Details: "<<status.details()<<endl;
return status.IsOk();
}
......@@ -40,10 +40,10 @@ using grpc::ServerContext;
using grpc::Status;
using grpc::ChannelInterface;
using grpc::ClientContext;
using hybrid::MATSim2ExternHasSpace;
using hybrid::MATSim2ExternHasSpaceConfirmed;
using org::matsim::hybrid::MATSim2ExternHasSpace;
using org::matsim::hybrid::MATSim2ExternHasSpaceConfirmed;
using namespace hybrid;
using namespace org::matsim::hybrid;
class Building;
class Pedestrian;
......@@ -77,11 +77,14 @@ public:
_matsimChannel.reset();
}
bool NotifyExternalService();
bool NotifyExternalService(const std::string& host, int port);
bool SendAgentToMatsim(Pedestrian* ped);
bool NotifyEndOfSimulation();
private:
bool HasSpaceOnMatsim(int nodeID);
bool SendAgentToMatsim(Pedestrian* ped);
bool HasSpaceOnJuPedSim(int nodeID);
bool SendAgentToJuPedSim(Pedestrian* ped);
......
......@@ -32,33 +32,31 @@ using namespace std;
JPSserver::JPSserver(Simulation& src, const std::string& connection): _SimManager(src)
{
_rpcClient = std::unique_ptr<JPSclient>(new JPSclient( grpc::CreateChannel(connection,
grpc::InsecureCredentials(), grpc::ChannelArguments())));
}
//_rpcClient = std::unique_ptr<JPSclient>(new JPSclient( grpc::CreateChannel(connection,
// grpc::InsecureCredentials(), grpc::ChannelArguments())));
JPSserver::~JPSserver()
{
_doSimulation=false;
}
bool JPSserver::NotifyExternalService()
JPSserver::~JPSserver()
{
return _rpcClient->NotifyExternalService();
}
void JPSserver::RunSimulation()
{
while(true)
{
if(_doSimulation)
if(!_doSimulation)
{
Log->Write("INFO:\tRPC::JPSserver starting a new simulation");
_SimManager.RunSimulation(20);
_doSimulation=false;
//TODO: notify simulation finished
exit(0);
_jpsClient->NotifyEndOfSimulation();
//exit(0);
}
Log->Write("INFO:\tRPC::JPSserver idle for 3 second");
Log->Write("INFO:\tRPC::JPSserver idle for 3 seconds");
std::this_thread::sleep_for(std::chrono::milliseconds(3000));
}
}
......@@ -71,6 +69,7 @@ Status JPSserver::reqMATSim2ExternHasSpace(ServerContext* context,
Log->Write("INFO:\tRPC::JPSserver I have space on node " + nodeID);
response->set_hasspace(true);
return Status::OK;
}
......@@ -114,9 +113,10 @@ Status JPSserver::reqMATSim2ExternPutAgent(ServerContext* context,
Status JPSserver::reqExternDoSimStep(ServerContext* context,
const ExternDoSimStep* request, ExternDoSimStepReceived* response)
{
std::cout << "Performing simulation step" << std::endl;
double from =request->fromtime();
double to=request->totime();
Log->Write("INFO:\tRPC::JPSserver I will perform a simulation step from %f to %f seconds",from,to);
_doSimulation=true;
return Status::OK;
}
......@@ -125,7 +125,7 @@ Status JPSserver::reqExternOnPrepareSim(ServerContext* context,
ExternOnPrepareSimConfirmed* response)
{
Log->Write("INFO:\tRPC::JPSserver I am ready for doing the simulation");
//response->
return Status::OK;
}
......@@ -135,3 +135,8 @@ Status JPSserver::reqExternAfterSim(ServerContext* context,
Log->Write("INFO:\tRPC::JPSserver I received shutdown order. But can I do that ?");
return Status::OK;
}
void JPSserver::SetDuplexClient(std::shared_ptr<JPSclient>& client)
{
_jpsClient=client;
}
......@@ -9,6 +9,7 @@
#define MATSIM_JPSSERVER_H_
#include <iostream>
#include <atomic>
#include <grpc/grpc.h>
#include <grpc++/server.h>
......@@ -22,7 +23,7 @@ using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
using namespace hybrid;
using namespace org::matsim::hybrid;
//forward declarations
class Simulation;
......@@ -49,31 +50,16 @@ public:
virtual Status reqExternOnPrepareSim(ServerContext* context, const ExternOnPrepareSim* request, ExternOnPrepareSimConfirmed* response);
virtual Status reqExternAfterSim(ServerContext* context, const ExternAfterSim* request, ExternAfterSimConfirmed* response);
bool NotifyExternalService();
void RunSimulation();
//void SetDuplexClient(std::unique_ptr<JPSclient>& client);
void SetDuplexClient(std::shared_ptr<JPSclient>& client);
//void SetAgentsSourcesManager(const AgentsSourcesManager& src) const;
private:
Simulation& _SimManager;
bool _doSimulation=false;
std::unique_ptr<JPSclient> _rpcClient;
};
class MATSIMserver final : public MATSimInterfaceService::Service
{
virtual Status reqExternalConnect(::grpc::ClientContext* context, const ::hybrid::ExternalConnect& request, ::hybrid::ExternalConnectConfirmed* response)
{
std::cout<<"INFO:\tRPC::JPSserver I have space on node "<<std::endl;
return Status::OK;
}
//bool _doSimulation=false;
std::atomic<bool> _doSimulation;
std::shared_ptr<JPSclient> _jpsClient;
};
#endif /* MATSIM_JPSSERVER_H_ */
This diff is collapsed.
This diff is collapsed.
This source diff could not be displayed because it is too large. You can view the blob instead.
This diff is collapsed.
syntax = "proto2";
syntax = "proto3";
//option java_generic_services = true;
//option cc_generic_services = true;
option java_outer_classname="ProtoMATSimInterface";
......@@ -81,9 +81,21 @@ message AgentsStuckConfirmed{
}
message ExternalConnect {
optional string host =1;
optional int32 port=2;
}
message ExternalConnectConfirmed {
}
service MATSimInterfaceService {
// Send JuPedSim connection parameter
rpc reqExternalConnect(ExternalConnect) returns (ExternalConnectConfirmed);
// Sending Agents to MATsim
rpc reqExtern2MATSim(Extern2MATSim) returns (Extern2MATSimConfirmed);
// ?
rpc reqAgentStuck(AgentsStuck) returns (AgentsStuckConfirmed);
// Notifying Matsim about the end of the simulation
rpc reqExternSimStepFinished(ExternSimStepFinished) returns (ExternSimStepFinishedReceived);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment