HybridSimulationManager.cpp 8.54 KB
Newer Older
1 2 3 4 5 6 7 8
/*
 * HybridSimulationManager.cpp
 *
 *  Created on: Apr 20, 2015
 *      Author: piccolo
 */

#include "HybridSimulationManager.h"
9 10 11
#include "MATSimInterface.pb.h"
#include "../pedestrian/Pedestrian.h"
#include "../geometry/Building.h"
12
#include "../pedestrian/AgentsSourcesManager.h"
Ulrich Kemloh's avatar
Ulrich Kemloh committed
13
#include "../Simulation.h"
14 15 16 17
#include "JPSserver.h"
#include "JPSclient.h"

// std stuffs
18 19 20
#include <thread>
#include <functional>

21
//google stuff
22
//server stuff
23 24 25 26 27 28
#include <grpc/grpc.h>
#include <grpc++/server.h>
#include <grpc++/server_builder.h>
#include <grpc++/server_context.h>
#include <grpc++/server_credentials.h>
#include <grpc++/status.h>
29 30 31 32 33 34
//client stuff
#include <grpc++/channel_arguments.h>
#include <grpc++/channel_interface.h>
#include <grpc++/client_context.h>
#include <grpc++/create_channel.h>
#include <grpc++/credentials.h>
35
#include "../pedestrian/AgentsQueue.h"
36 37 38 39 40 41


using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
42 43 44 45
using grpc::ChannelArguments;
using grpc::ChannelInterface;
using grpc::ClientContext;
using grpc::Status;
46 47
using namespace std;

48 49
bool HybridSimulationManager::_shutdown = false;

50
HybridSimulationManager::HybridSimulationManager(const std::string& server,
51
          int port)
52
{
Ulrich Kemloh's avatar
Ulrich Kemloh committed
53 54 55 56
     _externalServerName = server;
     _externalServerPort = port;

     //get the canonical hostname
57 58 59
     char hostname[1024];
     gethostname(hostname, 1024);
     _internalServerName=std::string(hostname);
60

Ulrich Kemloh's avatar
Ulrich Kemloh committed
61 62
     //GOOGLE_PROTOBUF_VERIFY_VERSION;
     //grpc_init();
63

Ulrich Kemloh's avatar
Ulrich Kemloh committed
64 65
     //_rpcClient = std::unique_ptr<JPSclient>(new JPSclient( grpc::CreateChannel("localhost:9999",
     //          grpc::InsecureCredentials(), ChannelArguments())));
66 67


Ulrich Kemloh's avatar
Ulrich Kemloh committed
68 69 70 71 72 73 74 75 76
     //     std::string server_address(_serverName + ":" + std::to_string(_port));
     //     JPSserver service(_agentSrcMng);
     //
     //     ServerBuilder builder;
     //     builder.AddListeningPort(server_address,
     //               grpc::InsecureServerCredentials());
     //     builder.RegisterService(&service);
     //     _rpcServer= builder.BuildAndStart();
     //     Log->Write("INFO:\tJuPedSim Server listening on " + server_address);
77 78


79 80 81 82 83 84
}

HybridSimulationManager::~HybridSimulationManager()
{
}

85
bool HybridSimulationManager::Init(Building* building)
86
{
87 88
     _building = building;
     return true;
89 90
}

91
bool HybridSimulationManager::Run(Simulation& sim)
92
{
93 94
     //perform some initialisation stuff
     GOOGLE_PROTOBUF_VERIFY_VERSION;
Ulrich Kemloh's avatar
Ulrich Kemloh committed
95

96 97
     grpc_init();

Ulrich Kemloh's avatar
Ulrich Kemloh committed
98
     //string extern_service_address("zam597:9999");
Ulrich Kemloh's avatar
Ulrich Kemloh committed
99
     string extern_service_address(_externalServerName + ":" + std::to_string(_externalServerPort));
Ulrich Kemloh's avatar
Ulrich Kemloh committed
100

101 102 103 104
     ///0.0.0.0 means to listen on all devices
     string jupedsim_service_address("0.0.0.0:" + std::to_string(_internalServerPort));
     //string jupedsim_service_address(_internalServerName + ":" + std::to_string(_internalServerPort));
     //string jupedsim_service_address("0.0.0.0:9998")/*_serverName + ":" + std::to_string(_port)*/;
Ulrich Kemloh's avatar
Ulrich Kemloh committed
105 106 107


     //create the client that will be running on its own thread
Ulrich Kemloh's avatar
Ulrich Kemloh committed
108 109
     _rpcClient = std::shared_ptr<JPSclient>(new JPSclient( grpc::CreateChannel(extern_service_address,
               grpc::InsecureCredentials(), ChannelArguments())));
Ulrich Kemloh's avatar
Ulrich Kemloh committed
110 111

     //create the server
Ulrich Kemloh's avatar
Ulrich Kemloh committed
112
     std::string server_address(_externalServerName + ":" + std::to_string(_externalServerPort));
Ulrich Kemloh's avatar
Ulrich Kemloh committed
113

Ulrich Kemloh's avatar
Ulrich Kemloh committed
114
     JPSserver jupedsimService(sim);
Ulrich Kemloh's avatar
Ulrich Kemloh committed
115
     jupedsimService.SetDuplexClient(_rpcClient);
Ulrich Kemloh's avatar
Ulrich Kemloh committed
116 117 118 119 120 121 122 123 124
     //MATSIMserver jupedsimService;

     ServerBuilder builder;
     builder.AddListeningPort(jupedsim_service_address,
               grpc::InsecureServerCredentials());
     builder.RegisterService(&jupedsimService);

     _rpcServer= builder.BuildAndStart();

Ulrich Kemloh's avatar
Ulrich Kemloh committed
125 126
     Log->Write("INFO:\tJuPedSim is up and running on " + jupedsim_service_address);
     Log->Write("INFO:\tNotifying Matsim at " + extern_service_address);
Ulrich Kemloh's avatar
Ulrich Kemloh committed
127

Ulrich Kemloh's avatar
Ulrich Kemloh committed
128
     //if(false==_rpcClient->NotifyExternalService(_exinternalServerName,_internalServerPort))
129
     if(false==_rpcClient->NotifyExternalService(_internalServerName,_internalServerPort))
Ulrich Kemloh's avatar
Ulrich Kemloh committed
130
     {
Ulrich Kemloh's avatar
Ulrich Kemloh committed
131
          Log->Write("ERROR:\tNotification failed");
Ulrich Kemloh's avatar
Ulrich Kemloh committed
132 133
     }

Ulrich Kemloh's avatar
Ulrich Kemloh committed
134

Ulrich Kemloh's avatar
Ulrich Kemloh committed
135
     //starting the simulation thread and waiting
Ulrich Kemloh's avatar
Ulrich Kemloh committed
136 137
     std::thread t2(&JPSserver::RunSimulation, &jupedsimService);

Ulrich Kemloh's avatar
Ulrich Kemloh committed
138
     //_rpcServer->Wait();
Ulrich Kemloh's avatar
Ulrich Kemloh committed
139 140

     //TestWorkflow();
Ulrich Kemloh's avatar
Ulrich Kemloh committed
141 142 143
     t2.join();


144 145
     //create a socket and use it for the serveur and the client
     //std::thread t1(&HybridSimulationManager::RunClient, this);
Ulrich Kemloh's avatar
Ulrich Kemloh committed
146
     //std::thread t2(&HybridSimulationManager::RunServer, this);
147
     //t1.join();
Ulrich Kemloh's avatar
Ulrich Kemloh committed
148
     //t2.join();
149 150 151 152 153

     //clean up everything
     grpc_shutdown();
     google::protobuf::ShutdownProtobufLibrary();
     return true;
154
}
Ulrich Kemloh's avatar
Ulrich Kemloh committed
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190

//bool HybridSimulationManager::Run(Simulation& sim)
//{
//     //perform some initialisation stuff
//     GOOGLE_PROTOBUF_VERIFY_VERSION;
//
//     grpc_init();
//
//     //create the client that will be running on its own thread
//     _rpcClient = std::unique_ptr<JPSclient>(new JPSclient( grpc::CreateChannel("localhost:9999",
//                         grpc::InsecureCredentials(), ChannelArguments())));
//
//     //create the server
//     std::string server_address(_serverName + ":" + std::to_string(_port));
//               JPSserver service(_agentSrcMng);
//
//               ServerBuilder builder;
//               builder.AddListeningPort(server_address,
//                         grpc::InsecureServerCredentials());
//               builder.RegisterService(&service);
//
//               _rpcServer= builder.BuildAndStart();
//               Log->Write("INFO:\tJuPedSim Server listening on " + server_address);
//
//               _rpcServer->Wait();
//     //create a socket and use it for the serveur and the client
//     //std::thread t1(&HybridSimulationManager::RunClient, this);
//     std::thread t2(&HybridSimulationManager::RunServer, this);
//     //t1.join();
//     t2.join();
//
//     //clean up everything
//     grpc_shutdown();
//     google::protobuf::ShutdownProtobufLibrary();
//     return true;
//}
191 192
void HybridSimulationManager::operator()()
{
193
     //Run();
194 195 196
}
bool HybridSimulationManager::RunClient()
{
197 198 199 200
     //check the message queue and send
     JPSclient client(
               grpc::CreateChannel("localhost:9999",
                         grpc::InsecureCredentials(), ChannelArguments()));
201

202 203
     do
     {
204

205 206
          //check if agents enter/left a link and fire event
          client.ProcessAgentQueue(_building);
207

208
          //wait some time, before a new attempt
Ulrich Kemloh's avatar
Ulrich Kemloh committed
209
          cout << "processing Requests for input:" << _shutdown << endl;
210 211 212
          //ProcessOutgoingAgent();
          std::this_thread::sleep_for(std::chrono::milliseconds(1000));
     } while (!_shutdown);
213

214
     return true;
215 216 217 218
}

bool HybridSimulationManager::RunServer()
{
Ulrich Kemloh's avatar
Ulrich Kemloh committed
219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245
     //
     //     do
     //     {
     //          std::string server_address(_serverName + ":" + std::to_string(_port));
     //          JPSserver service(_agentSrcMng);
     //
     //          ServerBuilder builder;
     //          builder.AddListeningPort(server_address,
     //                    grpc::InsecureServerCredentials());
     //          builder.RegisterService(&service);
     //
     //          _rpcServer= builder.BuildAndStart();
     //          Log->Write("INFO:\tJuPedSim Server listening on " + server_address);
     //
     //          _rpcServer->Wait();
     //
     //          //wait for incoming connection
     //          //receive and parse message
     //          //process message
     //          //can insert pedestrian ?
     //          //accept pedestrian and feed to the queue
     //
     //          //wait some time, before a new attempt
     //          cout << "waiting for output:" << _shutdown << endl;
     //          std::this_thread::sleep_for(std::chrono::milliseconds(1000));
     //     } while (!_shutdown);
     //
Ulrich Kemloh's avatar
Ulrich Kemloh committed
246
     return true;
247 248 249 250
}

void HybridSimulationManager::Shutdown()
{
251 252 253
     _shutdown = true;
     if (_rpcServer) _rpcServer->Shutdown();
     //cout<<"shutting down: "<<_shutdown<<endl;
254 255
}

256
void HybridSimulationManager::ProcessIncomingAgent()
257
{
258

259 260
}

Ulrich Kemloh's avatar
Ulrich Kemloh committed
261
void HybridSimulationManager::TestWorkflow()
262
{
Ulrich Kemloh's avatar
Ulrich Kemloh committed
263
     // notify the external service that I am awake
Ulrich Kemloh's avatar
Ulrich Kemloh committed
264
     //_rpcClient->NotifyExternalService();
Ulrich Kemloh's avatar
Ulrich Kemloh committed
265
}
266 267


Ulrich Kemloh's avatar
Ulrich Kemloh committed
268 269 270 271
//call after each simulation step
void HybridSimulationManager::ProcessOutgoingAgent()
{
     _rpcClient->ProcessAgentQueue(_building);
272 273
}

274
std::string HybridSimulationManager::ToString()
275
{
Ulrich Kemloh's avatar
Ulrich Kemloh committed
276 277
     return "INFO:\tHybrid Simulation working on [" + _externalServerName + ":"
               + std::to_string(_externalServerPort) + "]\n";
278 279
}

280 281 282 283 284 285
//void HybridSimulationManager::AttachSourceManager(const AgentsSourcesManager& src)
//{
//     //TODO create a deep copy of the manager
//     // and copy the sources without copying the generated pedestrian
//     _agentSrcMng=src;
//}