Commit dc3f5e9e authored by Ulrich Kemloh's avatar Ulrich Kemloh

New version of the proto file

parent 2931663a
......@@ -11,6 +11,7 @@
#include "../geometry/Building.h"
#include "../pedestrian/AgentsQueue.h"
#include "../pedestrian/AgentsSourcesManager.h"
#include "../Simulation.h"
#include "JPSserver.h"
#include "JPSclient.h"
......@@ -52,22 +53,22 @@ HybridSimulationManager::HybridSimulationManager(const std::string& server,
_serverName = server;
_port = port;
GOOGLE_PROTOBUF_VERIFY_VERSION;
grpc_init();
//GOOGLE_PROTOBUF_VERIFY_VERSION;
//grpc_init();
_rpcClient = std::unique_ptr<JPSclient>(new JPSclient( grpc::CreateChannel("localhost:9999",
grpc::InsecureCredentials(), ChannelArguments())));
//_rpcClient = std::unique_ptr<JPSclient>(new JPSclient( grpc::CreateChannel("localhost:9999",
// grpc::InsecureCredentials(), ChannelArguments())));
// std::string server_address(_serverName + ":" + std::to_string(_port));
// JPSserver service(_agentSrcMng);
//
// ServerBuilder builder;
// builder.AddListeningPort(server_address,
// grpc::InsecureServerCredentials());
// builder.RegisterService(&service);
// _rpcServer= builder.BuildAndStart();
// Log->Write("INFO:\tJuPedSim Server listening on " + server_address);
// std::string server_address(_serverName + ":" + std::to_string(_port));
// JPSserver service(_agentSrcMng);
//
// ServerBuilder builder;
// builder.AddListeningPort(server_address,
// grpc::InsecureServerCredentials());
// builder.RegisterService(&service);
// _rpcServer= builder.BuildAndStart();
// Log->Write("INFO:\tJuPedSim Server listening on " + server_address);
}
......@@ -88,28 +89,114 @@ bool HybridSimulationManager::Run(Simulation& sim)
{
//perform some initialisation stuff
GOOGLE_PROTOBUF_VERIFY_VERSION;
grpc_init();
_rpcClient = std::unique_ptr<JPSclient>(new JPSclient( grpc::CreateChannel("localhost:9999",
grpc::InsecureCredentials(), ChannelArguments())));
//string extern_service_address("zam597:9999");
string extern_service_address("localhost:9999");
string jupedsim_service_address("localhost:9999")/*_serverName + ":" + std::to_string(_port)*/;
//string jupedsim_service_address("0.0.0.0:9999")/*_serverName + ":" + std::to_string(_port)*/;
//create the client that will be running on its own thread
//_rpcClient = std::unique_ptr<JPSclient>(new JPSclient( grpc::CreateChannel(extern_service_address,
// grpc::InsecureCredentials(), ChannelArguments())));
//_rpcClient->NotifyExternalService();
//create the server
std::string server_address(_serverName + ":" + std::to_string(_port));
JPSserver jupedsimService(sim,extern_service_address);
//MATSIMserver jupedsimService;
ServerBuilder builder;
builder.AddListeningPort(jupedsim_service_address,
grpc::InsecureServerCredentials());
builder.RegisterService(&jupedsimService);
_rpcServer= builder.BuildAndStart();
//builder.
Log->Write("INFO:\tJuPedSim Server is up and running on " + jupedsim_service_address);
//_rpcServer->Wait();
//_rpcClient->NotifyExternalService();
//notify the external service
if(jupedsimService.NotifyExternalService())
{
cout<<"Failure"<<endl;
//exit(0);
}
//else
{
cout<<"Success"<<endl;
}
//starting the simulation thread and waiting
//std::thread t2(&JPSserver::RunSimulation, &jupedsimService);
//TestWorkflow();
//t2.join();
//_rpcServer->Wait();
//create a socket and use it for the serveur and the client
//std::thread t1(&HybridSimulationManager::RunClient, this);
std::thread t2(&HybridSimulationManager::RunServer, this);
//std::thread t2(&HybridSimulationManager::RunServer, this);
//t1.join();
t2.join();
//t2.join();
//clean up everything
grpc_shutdown();
google::protobuf::ShutdownProtobufLibrary();
return true;
}
//bool HybridSimulationManager::Run(Simulation& sim)
//{
// //perform some initialisation stuff
// GOOGLE_PROTOBUF_VERIFY_VERSION;
//
// grpc_init();
//
// //create the client that will be running on its own thread
// _rpcClient = std::unique_ptr<JPSclient>(new JPSclient( grpc::CreateChannel("localhost:9999",
// grpc::InsecureCredentials(), ChannelArguments())));
//
// //create the server
// std::string server_address(_serverName + ":" + std::to_string(_port));
// JPSserver service(_agentSrcMng);
//
// ServerBuilder builder;
// builder.AddListeningPort(server_address,
// grpc::InsecureServerCredentials());
// builder.RegisterService(&service);
//
// _rpcServer= builder.BuildAndStart();
// Log->Write("INFO:\tJuPedSim Server listening on " + server_address);
//
// _rpcServer->Wait();
// //create a socket and use it for the serveur and the client
// //std::thread t1(&HybridSimulationManager::RunClient, this);
// std::thread t2(&HybridSimulationManager::RunServer, this);
// //t1.join();
// t2.join();
//
// //clean up everything
// grpc_shutdown();
// google::protobuf::ShutdownProtobufLibrary();
// return true;
//}
void HybridSimulationManager::operator()()
{
//Run();
}
bool HybridSimulationManager::RunClient()
{
//check the message queue and send
JPSclient client(
grpc::CreateChannel("localhost:9999",
......@@ -122,7 +209,7 @@ bool HybridSimulationManager::RunClient()
client.ProcessAgentQueue(_building);
//wait some time, before a new attempt
cout << "waiting for input:" << _shutdown << endl;
cout << "processing Requests for input:" << _shutdown << endl;
//ProcessOutgoingAgent();
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
} while (!_shutdown);
......@@ -132,36 +219,34 @@ bool HybridSimulationManager::RunClient()
bool HybridSimulationManager::RunServer()
{
//check the message queue and send
//TODO: move this to the constructor
do
{
std::string server_address(_serverName + ":" + std::to_string(_port));
JPSserver service(_agentSrcMng);
ServerBuilder builder;
builder.AddListeningPort(server_address,
grpc::InsecureServerCredentials());
builder.RegisterService(&service);
_rpcServer= builder.BuildAndStart();
Log->Write("INFO:\tJuPedSim Server listening on " + server_address);
_rpcServer->Wait();
//wait for incoming connection
//receive and parse message
//process message
//can insert pedestrian ?
//accept pedestrian and feed to the queue
//wait some time, before a new attempt
cout << "waiting for output:" << _shutdown << endl;
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
} while (!_shutdown);
return true;
//
// do
// {
// std::string server_address(_serverName + ":" + std::to_string(_port));
// JPSserver service(_agentSrcMng);
//
// ServerBuilder builder;
// builder.AddListeningPort(server_address,
// grpc::InsecureServerCredentials());
// builder.RegisterService(&service);
//
// _rpcServer= builder.BuildAndStart();
// Log->Write("INFO:\tJuPedSim Server listening on " + server_address);
//
// _rpcServer->Wait();
//
// //wait for incoming connection
// //receive and parse message
// //process message
// //can insert pedestrian ?
// //accept pedestrian and feed to the queue
//
// //wait some time, before a new attempt
// cout << "waiting for output:" << _shutdown << endl;
// std::this_thread::sleep_for(std::chrono::milliseconds(1000));
// } while (!_shutdown);
//
return true;
}
void HybridSimulationManager::Shutdown()
......@@ -176,32 +261,17 @@ void HybridSimulationManager::ProcessIncomingAgent()
}
void HybridSimulationManager::ProcessOutgoingAgent()
void HybridSimulationManager::TestWorkflow()
{
// notify the external service that I am awake
_rpcClient->NotifyExternalService();
}
_rpcClient->ProcessAgentQueue(_building);
/*
std::vector<Pedestrian*> peds;
AgentsQueueOut::GetandClear(peds);
for (auto && ped:peds)
{
//pick the agent from the queue
hybrid::Extern2MATSim msg;
msg.mutable_agent()->set_id(std::to_string(ped->GetID()));
msg.mutable_agent()->set_leavenode(std::to_string(ped->GetExitIndex()));
//write to the disk or send to hell
//std::fstream output("test.buf", ios::out | ios::trunc | ios::binary);
//if (!msg.SerializeToOstream(&output))
//{
// cerr << "Failed to write address book." << endl;
//}
cout<<"deleting:"<<endl;
_building->DeletePedestrian(ped);
}
*/
//call after each simulation step
void HybridSimulationManager::ProcessOutgoingAgent()
{
_rpcClient->ProcessAgentQueue(_building);
}
std::string HybridSimulationManager::ToString()
......
......@@ -49,6 +49,8 @@ public:
private:
void ProcessIncomingAgent();
void TestWorkflow();
private:
//std::atomic<bool> _shutdown=false;
static bool _shutdown;
......
......@@ -15,10 +15,10 @@
using namespace std;
JPSclient::JPSclient(std::shared_ptr<ChannelInterface> channel) :
_stub(ExternInterfaceService::NewStub(channel))
_jupedsimChannel(ExternInterfaceService::NewStub(channel))
{
//stub for testing and to be removed in the final version
_stub_matsim=MATSimInterfaceService::NewStub(channel);
_matsimChannel=MATSimInterfaceService::NewStub(channel);
}
JPSclient::~JPSclient()
......@@ -57,17 +57,29 @@ void JPSclient::ProcessAgentQueue(Building* building)
bool JPSclient::HasSpaceOnMatsim(int nodeID)
{
return false;
//Status status =_matsimChannel->reqExtern2MATSim(&context, request, &reply);
//if(status.IsOk())
//{
//
// }
return true;
}
bool JPSclient::SendAgentToMatsim(Pedestrian* ped)
{
//pick the agent from the queue
//hybrid::Extern2MATSim msg;
//msg.mutable_agent()->set_id(std::to_string(ped->GetID()));
//msg.mutable_agent()->set_leavenode(std::to_string(ped->GetExitIndex()));
return false;
ClientContext context;
Extern2MATSim request;
Extern2MATSimConfirmed reply;
string leave_node=std::to_string(ped->GetFinalDestination());
string agent_id=std::to_string(ped->GetID());
request.mutable_agent()->set_id(agent_id);
request.mutable_agent()->set_leavenode(leave_node);
Status status =_matsimChannel->reqExtern2MATSim(&context, request, &reply);
return status.IsOk();
}
bool JPSclient::HasSpaceOnJuPedSim(int nodeID)
......@@ -77,14 +89,20 @@ bool JPSclient::HasSpaceOnJuPedSim(int nodeID)
MATSim2ExternHasSpaceConfirmed reply;
ClientContext context;
Status status = _stub->reqMATSim2ExternHasSpace(&context, request, &reply);
Status status = _jupedsimChannel->reqMATSim2ExternHasSpace(&context, request, &reply);
return status.IsOk();
}
// if (status.IsOk()) {
// return true;
// } else {
// return "Rpc failed";
// }
bool JPSclient::NotifyExternalService()
{
ClientContext context;
ExternalConnect request;
ExternalConnectConfirmed reply;
request.set_accepted(true);
//reply.
Status status =_matsimChannel->reqExternalConnect(&context, request, &reply);
std::cout<<"Details: "<<status.details()<<endl;
return status.IsOk();
}
bool JPSclient::SendAgentToJuPedSim(Pedestrian* ped)
......@@ -104,7 +122,7 @@ bool JPSclient::SendAgentToJuPedSim(Pedestrian* ped)
request.mutable_agent()->add_nodes(leave_node_id);
//request.mutable_agent()->add_nodes("0");
Status status =_stub->reqMATSim2ExternPutAgent(&context, request, &reply);
Status status =_jupedsimChannel->reqMATSim2ExternPutAgent(&context, request, &reply);
return status.IsOk();
}
......@@ -73,10 +73,11 @@ public:
*/
void Shutdown()
{
_stub.reset();
_jupedsimChannel.reset();
_matsimChannel.reset();
}
//void SetAgentsSourcesManager(const AgentsSourcesManager& src) const;
bool NotifyExternalService();
private:
bool HasSpaceOnMatsim(int nodeID);
......@@ -85,9 +86,8 @@ private:
bool SendAgentToJuPedSim(Pedestrian* ped);
private:
std::unique_ptr<ExternInterfaceService::Stub> _stub;
std::unique_ptr<MATSimInterfaceService::Stub> _stub_matsim;
//AgentsSourcesManager _agentSrcMng;
std::unique_ptr<ExternInterfaceService::Stub> _jupedsimChannel;
std::unique_ptr<MATSimInterfaceService::Stub> _matsimChannel;
};
#endif /* MATSIM_JPSCLIENT_H_ */
......@@ -6,29 +6,61 @@
*/
#include "JPSserver.h"
#include "JPSclient.h"
#include "../IO/OutputHandler.h"
#include "../pedestrian/AgentsSourcesManager.h"
#include "../pedestrian/AgentsSource.h"
#include "../pedestrian/AgentsQueue.h"
#include "../pedestrian/Pedestrian.h"
#include "../Simulation.h"
#include <iostream>
#include <thread>
//client stuff
#include <grpc++/channel_arguments.h>
//#include <grpc++/channel_interface.h>
//#include <grpc++/client_context.h>
#include <grpc++/create_channel.h>
//#include <grpc++/credentials.h>
// external variables
extern OutputHandler* Log;
using namespace std;
JPSserver::JPSserver(AgentsSourcesManager& src): _agentSrcMng(src)
JPSserver::JPSserver(Simulation& src, const std::string& connection): _SimManager(src)
{
_rpcClient = std::unique_ptr<JPSclient>(new JPSclient( grpc::CreateChannel(connection,
grpc::InsecureCredentials(), grpc::ChannelArguments())));
}
JPSserver::~JPSserver()
{
}
bool JPSserver::NotifyExternalService()
{
return _rpcClient->NotifyExternalService();
}
void JPSserver::RunSimulation()
{
while(true)
{
if(_doSimulation)
{
Log->Write("INFO:\tRPC::JPSserver starting a new simulation");
_SimManager.RunSimulation(20);
_doSimulation=false;
//TODO: notify simulation finished
exit(0);
}
Log->Write("INFO:\tRPC::JPSserver idle for 3 second");
std::this_thread::sleep_for(std::chrono::milliseconds(3000));
}
}
Status JPSserver::reqMATSim2ExternHasSpace(ServerContext* context,
......@@ -52,7 +84,8 @@ Status JPSserver::reqMATSim2ExternPutAgent(ServerContext* context,
string enter_node=request->agent().enternode();
Log->Write("INFO:\tRPC::JPSserver I am taking agent %s going to node %s ",agent_id.c_str(),leave_node.c_str());
auto srcs=_agentSrcMng.GetSources();
auto agentSrcMng=_SimManager.GetAgentSrcManager();
auto srcs=agentSrcMng.GetSources();
//cout<<"There are: "<<srcs.size()<<" options"<<endl;
for(auto&& src:srcs)
......@@ -61,7 +94,7 @@ Status JPSserver::reqMATSim2ExternPutAgent(ServerContext* context,
if(src->GetId()==std::stoi(enter_node))
{
std::vector<Pedestrian*> peds;
src->GenerateAgents(peds,1,_agentSrcMng.GetBuilding());
src->GenerateAgents(peds,1,agentSrcMng.GetBuilding());
//there should be only one agent in this vector
for(auto&& ped:peds)
{
......@@ -70,15 +103,11 @@ Status JPSserver::reqMATSim2ExternPutAgent(ServerContext* context,
//schedule the agent
src->AddToPool(ped);
}
_agentSrcMng.ProcessAllSources();
agentSrcMng.ProcessAllSources();
//AgentsQueue::Add(peds);
}
}
//take the nodeID and the destination ID
//find the corresponding source
//generate a new agent and add to the source
//call processAllSource on the AgentsoursceManager
return Status::OK;
}
......@@ -86,6 +115,8 @@ Status JPSserver::reqExternDoSimStep(ServerContext* context,
const ExternDoSimStep* request, ExternDoSimStepReceived* response)
{
std::cout << "Performing simulation step" << std::endl;
_doSimulation=true;
return Status::OK;
}
......@@ -93,13 +124,14 @@ Status JPSserver::reqExternOnPrepareSim(ServerContext* context,
const ExternOnPrepareSim* request,
ExternOnPrepareSimConfirmed* response)
{
std::cout << "I am preparing the simulation" << std::endl;
Log->Write("INFO:\tRPC::JPSserver I am ready for doing the simulation");
//response->
return Status::OK;
}
Status JPSserver::reqExternAfterSim(ServerContext* context,
const ExternAfterSim* request, ExternAfterSimConfirmed* response)
{
std::cout << "Simulation step completed" << std::endl;
Log->Write("INFO:\tRPC::JPSserver I received shutdown order. But can I do that ?");
return Status::OK;
}
......@@ -8,6 +8,8 @@
#ifndef MATSIM_JPSSERVER_H_
#define MATSIM_JPSSERVER_H_
#include <iostream>
#include <grpc/grpc.h>
#include <grpc++/server.h>
#include <grpc++/server_builder.h>
......@@ -23,18 +25,18 @@ using grpc::Status;
using namespace hybrid;
//forward declarations
class AgentsSourcesManager;
class Simulation;
class JPSclient;
class JPSserver final : public ExternInterfaceService::Service
{
public:
/**
* constructor with an agent source manager, which will be
* responsible for positioning the agents.
*/
JPSserver(AgentsSourcesManager& src);
JPSserver(Simulation& src, const std::string& connection);
/**
* Destructor
......@@ -47,13 +49,31 @@ public:
virtual Status reqExternOnPrepareSim(ServerContext* context, const ExternOnPrepareSim* request, ExternOnPrepareSimConfirmed* response);
virtual Status reqExternAfterSim(ServerContext* context, const ExternAfterSim* request, ExternAfterSimConfirmed* response);
/**
*
*/
bool NotifyExternalService();
void RunSimulation();
//void SetDuplexClient(std::unique_ptr<JPSclient>& client);
//void SetAgentsSourcesManager(const AgentsSourcesManager& src) const;
private:
AgentsSourcesManager& _agentSrcMng;
Simulation& _SimManager;
bool _doSimulation=false;
std::unique_ptr<JPSclient> _rpcClient;
};
class MATSIMserver final : public MATSimInterfaceService::Service
{
virtual Status reqExternalConnect(::grpc::ClientContext* context, const ::hybrid::ExternalConnect& request, ::hybrid::ExternalConnectConfirmed* response)
{
std::cout<<"INFO:\tRPC::JPSserver I have space on node "<<std::endl;
return Status::OK;
}
};
#endif /* MATSIM_JPSSERVER_H_ */
......@@ -11,6 +11,7 @@
#include <grpc++/impl/rpc_service_method.h>
#include <grpc++/impl/service_type.h>
#include <grpc++/stream.h>
namespace hybrid {
static const char* MATSimInterfaceService_method_names[] = {
......
This source diff could not be displayed because it is too large. You can view the blob instead.
This diff is collapsed.
syntax = "proto2";
option java_generic_services = true;
//option java_generic_services = true;
//option cc_generic_services = true;
option java_outer_classname="ProtoMATSimInterface";
option java_package = "playground.gregor.proto";
option java_package = "org.matsim.hybrid";
package hybrid;
package org.matsim.hybrid;
//for compilation
// protoc --grpc_out=. --plugin=protoc-gen-grpc=`which grpc_cpp_plugin` MATSimInterface.proto
// protoc --cpp_out="./" MATSimInterface.proto
// protoc --cpp_out=. MATSimInterface.proto
// protoc -I ../../protos/ --cpp_out=. ../../protos/helloworld.proto
message MATSim2ExternPutAgent {
message Agent {
required string id = 1;
required string enterNode = 2;
optional string id = 1;
optional string enterNode = 2;
repeated string nodes = 3;
}
required Agent agent = 1;
optional Agent agent = 1;
}
message MATSim2ExternHasSpace {
required string nodeId = 1;
optional string nodeId = 1;
}
message MATSim2ExternHasSpaceConfirmed {
required bool hasSpace = 1;
optional bool hasSpace = 1;
}
message MATSim2ExternPutAgentConfirmed {
......@@ -32,41 +32,32 @@ message MATSim2ExternPutAgentConfirmed {
message Extern2MATSim {
message Agent {
required string id = 1;
required string leaveNode = 2;
optional string id = 1;
optional string leaveNode = 2;
}
required Agent agent = 15;
optional Agent agent = 1;
}
message Extern2MATSimConfirmed {
required bool accepted = 1;
optional bool accepted = 1;
}