HybridSimulationManager.cpp 5.79 KB
Newer Older
1 2 3 4 5 6 7 8
/*
 * HybridSimulationManager.cpp
 *
 *  Created on: Apr 20, 2015
 *      Author: piccolo
 */

#include "HybridSimulationManager.h"
9 10 11 12
#include "MATSimInterface.pb.h"
#include "../pedestrian/Pedestrian.h"
#include "../geometry/Building.h"
#include "../pedestrian/AgentsQueue.h"
13
#include "../pedestrian/AgentsSourcesManager.h"
14 15 16 17
#include "JPSserver.h"
#include "JPSclient.h"

// std stuffs
18 19 20
#include <thread>
#include <functional>

21
//google stuff
22
//server stuff
23 24 25 26 27 28
#include <grpc/grpc.h>
#include <grpc++/server.h>
#include <grpc++/server_builder.h>
#include <grpc++/server_context.h>
#include <grpc++/server_credentials.h>
#include <grpc++/status.h>
29 30 31 32 33 34
//client stuff
#include <grpc++/channel_arguments.h>
#include <grpc++/channel_interface.h>
#include <grpc++/client_context.h>
#include <grpc++/create_channel.h>
#include <grpc++/credentials.h>
35 36 37 38 39 40


using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
41 42 43 44
using grpc::ChannelArguments;
using grpc::ChannelInterface;
using grpc::ClientContext;
using grpc::Status;
45 46
using namespace std;

47 48
bool HybridSimulationManager::_shutdown = false;

49
HybridSimulationManager::HybridSimulationManager(const std::string& server,
50
          int port)
51
{
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
     _serverName = server;
     _port = port;

     GOOGLE_PROTOBUF_VERIFY_VERSION;
     grpc_init();

     _rpcClient = std::shared_ptr<JPSclient>(new JPSclient( grpc::CreateChannel("localhost:9999",
               grpc::InsecureCredentials(), ChannelArguments())));


//     std::string server_address(_serverName + ":" + std::to_string(_port));
//     JPSserver service(_agentSrcMng);
//
//     ServerBuilder builder;
//     builder.AddListeningPort(server_address,
//               grpc::InsecureServerCredentials());
//     builder.RegisterService(&service);
//     _rpcServer= builder.BuildAndStart();
//     Log->Write("INFO:\tJuPedSim Server listening on " + server_address);


73 74 75 76 77 78
}

HybridSimulationManager::~HybridSimulationManager()
{
}

79
bool HybridSimulationManager::Init(Building* building)
80
{
81
     //     GOOGLE_PROTOBUF_VERIFY_VERSION;
82

83 84
     _building = building;
     return true;
85 86 87 88
}

bool HybridSimulationManager::Run()
{
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
     //perform some initialisation stuff
     GOOGLE_PROTOBUF_VERIFY_VERSION;
     grpc_init();
     _rpcClient = std::shared_ptr<JPSclient>(new JPSclient( grpc::CreateChannel("localhost:9999",
                         grpc::InsecureCredentials(), ChannelArguments())));

     //create a socket and use it for the serveur and the client
     //std::thread t1(&HybridSimulationManager::RunClient, this);
     std::thread t2(&HybridSimulationManager::RunServer, this);
     //t1.join();
     t2.join();

     //clean up everything
     grpc_shutdown();
     google::protobuf::ShutdownProtobufLibrary();
     return true;
105 106 107
}
void HybridSimulationManager::operator()()
{
108
     Run();
109 110 111
}
bool HybridSimulationManager::RunClient()
{
112

113 114 115 116
     //check the message queue and send
     JPSclient client(
               grpc::CreateChannel("localhost:9999",
                         grpc::InsecureCredentials(), ChannelArguments()));
117

118 119
     do
     {
120

121 122
          //check if agents enter/left a link and fire event
          client.ProcessAgentQueue(_building);
123

124 125 126 127 128
          //wait some time, before a new attempt
          cout << "waiting for input:" << _shutdown << endl;
          //ProcessOutgoingAgent();
          std::this_thread::sleep_for(std::chrono::milliseconds(1000));
     } while (!_shutdown);
129

130
     return true;
131 132 133 134
}

bool HybridSimulationManager::RunServer()
{
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
     //check the message queue and send
     //TODO: move this to the constructor

     do
     {
          std::string server_address(_serverName + ":" + std::to_string(_port));
          JPSserver service(_agentSrcMng);

          ServerBuilder builder;
          builder.AddListeningPort(server_address,
                    grpc::InsecureServerCredentials());
          builder.RegisterService(&service);

          _rpcServer= builder.BuildAndStart();
          Log->Write("INFO:\tJuPedSim Server listening on " + server_address);

          _rpcServer->Wait();

          //wait for incoming connection
          //receive and parse message
          //process message
          //can insert pedestrian ?
          //accept pedestrian and feed to the queue

          //wait some time, before a new attempt
          cout << "waiting for output:" << _shutdown << endl;
          std::this_thread::sleep_for(std::chrono::milliseconds(1000));
     } while (!_shutdown);

     return true;
165 166 167 168
}

void HybridSimulationManager::Shutdown()
{
169 170 171
     _shutdown = true;
     if (_rpcServer) _rpcServer->Shutdown();
     //cout<<"shutting down: "<<_shutdown<<endl;
172 173
}

174
void HybridSimulationManager::ProcessIncomingAgent()
175
{
176

177 178
}

179
void HybridSimulationManager::ProcessOutgoingAgent()
180
{
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204

     _rpcClient->ProcessAgentQueue(_building);
/*
     std::vector<Pedestrian*> peds;
     AgentsQueueOut::GetandClear(peds);

     for (auto && ped:peds)
     {
          //pick the agent from the queue
          hybrid::Extern2MATSim msg;
          msg.mutable_agent()->set_id(std::to_string(ped->GetID()));
          msg.mutable_agent()->set_leavenode(std::to_string(ped->GetExitIndex()));

          //write to the disk or send to hell
          //std::fstream output("test.buf", ios::out | ios::trunc | ios::binary);
          //if (!msg.SerializeToOstream(&output))
          //{
          //    cerr << "Failed to write address book." << endl;
          //}
          cout<<"deleting:"<<endl;
          _building->DeletePedestrian(ped);
     }

     */
205 206
}

207
std::string HybridSimulationManager::ToString()
208
{
209 210 211 212 213 214 215 216 217
     return "INFO:\tHybrid Simulation working on [" + _serverName + ":"
               + std::to_string(_port) + "]\n";
}

void HybridSimulationManager::AttachSourceManager(const AgentsSourcesManager& src)
{
     //TODO create a deep copy of the manager
     // and copy the sources without copying the generated pedestrian
     _agentSrcMng=src;
218
}