Commit 079b7777 authored by Ulrich Kemloh's avatar Ulrich Kemloh

Integrating rpc with current architecture

parent b60385dc
......@@ -30,6 +30,7 @@
#include "general/ArgumentParser.h"
#include "./Simulation.h"
#include "pedestrian/AgentsSourcesManager.h"
#include "matsim/HybridSimulationManager.h"
#include <thread>
#include <functional>
......@@ -56,11 +57,14 @@ int main(int argc, char **argv)
if(status&&sim.InitArgs(*args))
{
//Start the threads for managing the sources of agents
//AgentsSourcesManager sManager;
std::thread t1(sim.GetAgentSrcManager(),21);
//Start the thread for managing incoming messages from MatSim
std::thread t2;
auto hybrid=args->GetHybridSimManager();
if(hybrid)
t2 = std::thread(*hybrid);
//Start the thread for managing outgoing messages to MatSim
//main thread for the simulation
Log->Write("INFO: \tStart runSimulation()");
......@@ -72,8 +76,14 @@ int main(int argc, char **argv)
//so join the other threads
t1.join();
//only if in hybrid mode
if(hybrid)
{
hybrid->Shutdown();
t2.join();
}
// some output
// some statistics output
if(args->ShowStatistics())
{
sim.PrintStatistics();
......
......@@ -6,41 +6,87 @@
*/
#include "HybridSimulationManager.h"
#include "MATSimInterface.pb.h"
#include "../pedestrian/Pedestrian.h"
#include "../geometry/Building.h"
#include "../pedestrian/AgentsQueue.h"
#include "JPSserver.h"
#include "JPSclient.h"
// std stuffs
#include <thread>
#include <functional>
HybridSimulationManager::HybridSimulationManager()
{
// TODO Auto-generated constructor stub
//google stuff
#include <grpc/grpc.h>
#include <grpc/grpc.h>
#include <grpc++/server.h>
#include <grpc++/server_builder.h>
#include <grpc++/server_context.h>
#include <grpc++/server_credentials.h>
#include <grpc++/status.h>
bool HybridSimulationManager::_shutdown = false;
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
using namespace std;
HybridSimulationManager::HybridSimulationManager(const std::string& server,
int port)
{
_serverName = server;
_port = port;
}
HybridSimulationManager::~HybridSimulationManager()
{
// TODO Auto-generated destructor stub
}
bool HybridSimulationManager::Init()
bool HybridSimulationManager::Init(Building* building)
{
// GOOGLE_PROTOBUF_VERIFY_VERSION;
_building = building;
return true;
}
bool HybridSimulationManager::Run()
{
//perform some initialisation stuff
GOOGLE_PROTOBUF_VERIFY_VERSION;
grpc_init();
//create a socket and use it for the serveur and the client
std::thread t1(&HybridSimulationManager::RunClient, this);
std::thread t2(&HybridSimulationManager::RunServer, this);
t1.join();
t2.join();
return true;
//clean up everything
grpc_shutdown();
google::protobuf::ShutdownProtobufLibrary();
return true;
}
void HybridSimulationManager::operator()()
{
Run();
}
bool HybridSimulationManager::RunClient()
{
//check the message queue and send
do
{
//check the message queue and send
//check if agents enter/left a link and fire event
//wait some time, before a new attempt
cout << "waiting for input:" << _shutdown << endl;
ProcessOutgoingAgent();
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
} while (!_shutdown);
return true;
......@@ -51,7 +97,26 @@ bool HybridSimulationManager::RunServer()
//check the message queue and send
do
{
std::string server_address(_serverName + ":" + std::to_string(_port));
JPSserver service;
ServerBuilder builder;
builder.AddListeningPort(server_address,
grpc::InsecureServerCredentials());
builder.RegisterService(&service);
_rpcServer= builder.BuildAndStart();
Log->Write("INFO:\tJuPedSim Server listening on " + server_address);
_rpcServer->Wait();
//wait for incoming connection
//receive and parse message
//process message
//can insert pedestrian ?
//accept pedestrian and feed to the queue
//wait some time, before a new attempt
cout << "waiting for output:" << _shutdown << endl;
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
} while (!_shutdown);
return true;
......@@ -60,87 +125,40 @@ bool HybridSimulationManager::RunServer()
void HybridSimulationManager::Shutdown()
{
_shutdown = true;
if (_rpcServer) _rpcServer->Shutdown();
//cout<<"shutting down: "<<_shutdown<<endl;
}
/*
//
// blocking_tcp_echo_server.cpp
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2012 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <cstdlib>
#include <iostream>
#include <boost/bind.hpp>
#include <boost/smart_ptr.hpp>
#include <boost/asio.hpp>
#include <boost/thread/thread.hpp>
using boost::asio::ip::tcp;
const int max_length = 1024;
typedef boost::shared_ptr<tcp::socket> socket_ptr;
void session(socket_ptr sock)
void HybridSimulationManager::ProcessIncomingAgent()
{
try
{
for (;;)
{
char data[max_length];
boost::system::error_code error;
size_t length = sock->read_some(boost::asio::buffer(data), error);
if (error == boost::asio::error::eof)
break; // Connection closed cleanly by peer.
else if (error)
throw boost::system::system_error(error); // Some other error.
boost::asio::write(*sock, boost::asio::buffer(data, length));
}
}
catch (std::exception& e)
{
std::cerr << "Exception in thread: " << e.what() << "\n";
}
}
void server(boost::asio::io_service& io_service, short port)
void HybridSimulationManager::ProcessOutgoingAgent()
{
tcp::acceptor a(io_service, tcp::endpoint(tcp::v4(), port));
for (;;)
{
socket_ptr sock(new tcp::socket(io_service));
a.accept(*sock);
boost::thread t(boost::bind(session, sock));
}
std::vector<Pedestrian*> peds;
AgentsQueueOut::GetandClear(peds);
for (auto && ped:peds)
{
//pick the agent from the queue
hybrid::Extern2MATSim msg;
msg.mutable_agent()->set_id(std::to_string(ped->GetID()));
msg.mutable_agent()->set_leavenode(std::to_string(ped->GetExitIndex()));
//write to the disk or send to hell
//std::fstream output("test.buf", ios::out | ios::trunc | ios::binary);
//if (!msg.SerializeToOstream(&output))
//{
// cerr << "Failed to write address book." << endl;
//}
cout<<"deleting:"<<endl;
_building->DeletePedestrian(ped);
}
}
int main(int argc, char* argv[])
std::string HybridSimulationManager::ToString()
{
try
{
if (argc != 2)
{
std::cerr << "Usage: blocking_tcp_echo_server <port>\n";
return 1;
}
boost::asio::io_service io_service;
using namespace std; // For atoi.
server(io_service, atoi(argv[1]));
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
return "INFO:\tHybrid Simulation working on [" + _serverName + ":"
+ std::to_string(_port) + "]\n";
}
*/
......@@ -8,20 +8,48 @@
#ifndef HYBRIDSIMULATIONMANAGER_H_
#define HYBRIDSIMULATIONMANAGER_H_
#include <string>
#include <atomic>
#include <memory>
#include <grpc++/server.h>
//forward classes
class Building;
//class grpc::Server;
class HybridSimulationManager
{
public:
HybridSimulationManager();
HybridSimulationManager(const std::string& server,int port);
virtual ~HybridSimulationManager();
bool Init();
bool Init(Building* building);
bool Run();
bool RunClient();
bool RunServer();
void Shutdown();
std::string ToString();
/**
* Make the class "runnable" by overloading the operator
* @param value
*/
void operator()();
private:
void ProcessIncomingAgent();
void ProcessOutgoingAgent();
private:
bool _shutdown=false;
//std::atomic<bool> _shutdown=false;
static bool _shutdown;
int _port=-1;
std::string _serverName="localhost";
Building* _building=nullptr;
//grpc::Server* _rpcServer_;
//TODO: the method should be passed bz reference in the main function
//std::unique_ptr<grpc::Server> _rpcServer;
std::shared_ptr<grpc::Server> _rpcServer;
};
#endif /* HYBRIDSIMULATIONMANAGER_H_ */
......@@ -11,37 +11,21 @@
using namespace std;
vector<Pedestrian*> AgentsQueue::_agentsQueue;
vector<Pedestrian*> AgentsQueueOut::_agentsQueue;
mutex AgentsQueue::_queueMutex;
mutex AgentsQueueOut::_queueMutex;
void AgentsQueue::Add(vector<Pedestrian*>& peds)
{
_queueMutex.lock();
_agentsQueue.insert(_agentsQueue.end(),peds.begin(),peds.end());
_queueMutex.unlock();
// while (true)
// {
// // try to lock mutex to modify 'job_shared'
// if (_queueMutex.try_lock())
// {
// _agentsQueue.insert(_agentsQueue.end(),peds.begin(),peds.end());
// _queueMutex.unlock();
// return;
// }
// else
// {
// //maybe the mutex is beeing used
// std::cout << "unable to lock the mutex for adding 1" <<endl;
// //std::this_thread::sleep_for(100);
// }
// }
}
void AgentsQueue::GetandClear(std::vector<Pedestrian*>& peds)
{
//while (true)
//{
// try to lock mutex to modify _agentsQueue
_queueMutex.lock();
if(_agentsQueue.size()!=0)
......@@ -51,14 +35,6 @@ void AgentsQueue::GetandClear(std::vector<Pedestrian*>& peds)
_agentsQueue.clear();
}
_queueMutex.unlock();
// }
// else
// {
// //maybe the mutex is beeing used
// std::cout << "unable to lock the mutex for removing 2" <<endl;
// //std::this_thread::sleep_for(100);
// }
//}
}
bool AgentsQueue::IsEmpty()
......@@ -66,3 +42,44 @@ bool AgentsQueue::IsEmpty()
return (_agentsQueue.size()==0);
}
/////////////////////////////////////////////////////
////////////////////////////////////////////////////
void AgentsQueueOut::Add(vector<Pedestrian*>& peds)
{
_queueMutex.lock();
_agentsQueue.insert(_agentsQueue.end(),peds.begin(),peds.end());
//todo: avoid this by using a map
std::sort( _agentsQueue.begin(), _agentsQueue.end() );
_agentsQueue.erase( unique( _agentsQueue.begin(), _agentsQueue.end() ), _agentsQueue.end() );
//std::cout<<"queue size:"<<_agentsQueue.size()<<endl;
///
_queueMutex.unlock();
}
void AgentsQueueOut::Add(Pedestrian* ped)
{
_queueMutex.lock();
_agentsQueue.push_back(ped);
_queueMutex.unlock();
}
void AgentsQueueOut::GetandClear(std::vector<Pedestrian*>& peds)
{
_queueMutex.lock();
if(_agentsQueue.size()!=0)
{
peds.insert(peds.end(),_agentsQueue.begin(), _agentsQueue.end());
//_agentsQueue.pop_back();
_agentsQueue.clear();
}
_queueMutex.unlock();
}
bool AgentsQueueOut::IsEmpty()
{
return (_agentsQueue.size()==0);
}
......@@ -15,6 +15,9 @@
//forward declaration
class Pedestrian;
/**
* Queue for incoming agents
*/
class AgentsQueue
{
public:
......@@ -30,4 +33,21 @@ private:
static std::mutex _queueMutex;
};
/**
* Queue for outgoing agents
*/
class AgentsQueueOut
{
public:
static void Add(std::vector<Pedestrian*>& ped);
static void Add(Pedestrian* ped);
static void GetandClear(std::vector<Pedestrian*>& peds);
static bool IsEmpty();
private:
AgentsQueueOut(){};
virtual ~AgentsQueueOut(){};
static std::vector<Pedestrian*> _agentsQueue;
static std::mutex _queueMutex;
};
#endif /* AGENTSQUEUE_H_ */
......@@ -49,7 +49,7 @@ Pedestrian::Pedestrian()
_oldRoomID = -1;
_oldSubRoomID = -1;
_exitIndex = -1;
_id = 0;
_id = _agentsCreated;//default id
_mass = 1;
_tau = 0.5;
_newOrientationFlag = false;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment