Add new dev_tests

these are tests for checking newly added features like outflow
from doors or waiting areas.

For now we have 1 test: outflow from doors: produces a figure.
parent e9e82ab1
Pipeline #20519 passed with stages
in 9 minutes and 37 seconds
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<geometry version="0.8" caption="corner" xmlns:xsi="" xsi:noNamespaceSchemaLocation="" unit="m">
<room id="0" caption="floor">
<subroom id="1" caption="Room 1" class="Corridor" A_x="0" B_y="0" C_z="0">
<polygon caption="wall">
<vertex px="15" py="10"/>
<vertex px="15" py="5.5"/>
<polygon caption="wall">
<vertex px="0" py="5.5"/>
<vertex px="0" py="10"/>
<polygon caption="wall">
<vertex px="0" py="4.5"/>
<vertex px="0" py="0"/>
<polygon caption="wall">
<vertex px="0" py="0"/>
<vertex px="15" py="0"/>
<polygon caption="wall">
<vertex px="15" py="0"/>
<vertex px="15" py="4.5"/>
<polygon caption="wall">
<vertex px="0" py="10"/>
<vertex px="15" py="10"/>
<transition id="0" caption="NaN" type="NaN" room1_id="0" subroom1_id="1" room2_id="-1" subroom2_id="-1">
<vertex px="15" py="5.5"/>
<vertex px="15" py="4.5"/>
<transition id="1" caption="NaN" type="NaN" room1_id="0" subroom1_id="1" room2_id="-1" subroom2_id="-1">
<vertex px="0" py="5.5"/>
<vertex px="0" py="4.5"/>
<?xml version="1.0" encoding="UTF-8" ?>
<JuPedSim project="Subway-Project" version="0.7"
<!-- seed used for initialising random generator -->
<!-- geometry file -->
<!-- trajectories file and format -->
<trajectories format="xml-plain" fps="64">
<file location="traj-door_capacity.xml" />
<!-- where to store the logs -->
<doors> <!-- doors states are: close or open -->
<door trans_id="0" caption="NaN" state="open" dn="2" outflow="0.5" max_agents="200"/>
<door trans_id="1" caption="NaN" state="open" dn="2" outflow="2" max_agents="200"/>
<!--persons information and distribution -->
<agents operational_model_id="3">
<group group_id="1" agent_parameter_id="1" room_id="0" subroom_id="1" number="50" goal_id="-1" router_id="1" />
<model operational_model_id="3" description="Tordeux2015">
<linkedcells enabled="true" cell_size="2.2" />
<force_ped a="8" D="0.1" />
<force_wall a="5" D="0.02" />
<agent_parameters agent_parameter_id="1">
<v0 mu="1.0" sigma="0.001" />
<bmax mu="0.15" sigma="0.0" /> <!-- this is l, assuming peds are circles with constant radius -->
<bmin mu="0.25" sigma="0.0" />
<amin mu="0.25" sigma="0.0" />
<tau mu="0.5" sigma="0.001" />
<atau mu="0.0" sigma="0.0" />
<T mu="1" sigma="0.001" />
<router router_id="1" description="ff_global_shortest">
#!/usr/bin/env python3
import os
import sys
import matplotlib.pyplot as plt
utestdir = os.path.abspath(os.path.dirname(os.path.dirname(sys.path[0])))
from sys import *
from JPSRunTest import JPSRunTestDriver
from utils import *
def runtest(inifile, trajfile):
tolerance = 0.1
failure = 0
ids, outflow = get_outflow(inifile)
for (i, o) in zip(ids, outflow):
filename = "flow_exit_id_%d_rate_%.2f.txt"%(i, o)
if not os.path.exists(filename):"ERROR: can not find statistics file %s"%filename)
data = np.loadtxt(filename)
N = data[:, 1]
T = data[:, 0]
J = N[-1]/T[-1]
plt.plot(T, N)
plt.plot(T, N, lw=2)
plt.plot(T, o*T, "-k", lw=2"outflow (file %s) | inifile: %.2f | state-file: %.2f (+-%.2f)"%(filename, o, J, abs(J-o)))
if abs(J-o) > tolerance:
failure = 1
if failure:"flow from statistics files does not much expected flow from inifile")
if __name__ == "__main__":
test = JPSRunTestDriver(1, argv0=argv[0], testdir=sys.path[0], utestdir=utestdir)
test.run_test(testfunction=runtest)"%s exits with SUCCESS" % (argv[0]))
......@@ -105,11 +105,11 @@ def PassedLineX(p, exit):
x = exit[0]
y1 = exit[1]
y2 = exit[2]
X = p[:, 2]
Y = p[:, 3]
return (np.logical_and(np.logical_and(X >= x-eps, X <= x+eps), np.logical_and(Y >= y1, Y <= y2) ) ).any()
return (np.logical_and(np.logical_and(X >= x-eps, X <= x+eps), np.logical_and(Y >= y1, Y <= y2) ) ).any()
def PassedLineY(p, exit):
......@@ -124,7 +124,7 @@ def PassedLineY(p, exit):
X = p[:, 2]
Y = p[:, 3]
return (np.logical_and(np.logical_and(Y >= y-eps, Y <= y+eps), np.logical_and(X >= x1, X <= x2) ) ).any()
return (np.logical_and(np.logical_and(Y >= y-eps, Y <= y+eps), np.logical_and(X >= x1, X <= x2) ) ).any()
def get_num_threads(filename):
......@@ -138,7 +138,7 @@ def get_num_threads(filename):
num_threads = float(xmldoc.getElementsByTagName('num_threads')[0].firstChild.nodeValue)
return num_threads
def get_maxtime(filename):
get max sim time
......@@ -152,6 +152,23 @@ def get_maxtime(filename):
maxtime = float(xmldoc.getElementsByTagName('max_sim_time')[0].firstChild.nodeValue)
return maxtime
def get_outflow(filename):
get outflow of doors
tree = ET.parse(filename)
root = tree.getroot()
ids = []
outflow = []
for node in root.iter():
tag = node.tag
if tag == "traffic_constraints":
for doors in node.getchildren(): # doors
for door in doors.getchildren(): # door
return ids, outflow
def parse_file(filename):
parse trajectories in Travisto-format and output results
......@@ -198,7 +215,7 @@ def parse_file(filename):
data = np.vstack((data, list(map(float, [ID, frame, x, y, z]) ) ) )
# data += [ID, frame, x, y, z]
# data += [ID, frame, x, y, z]
# data = np.array(data, dtype=float).reshape((-1, 5))
return fps, N, data
......@@ -277,9 +294,9 @@ def rolling_flow(fps, N, data, x0, name):
flow = fps*(windows-1)/(serie.rolling(windows, min_periods=minp).max() - serie.rolling(windows, min_periods=minp).min() )
flow = flow[~np.isnan(flow)] # remove NaN
wmean = flow.rolling(windows, min_periods=minp).mean()
np.savetxt(name, np.array(times))"min(times)=%f max(times)=%f"%(min(times), max(times)))
return np.mean(wmean) #fps * float(N-1) / (max(times) - min(times))
......@@ -314,19 +331,19 @@ def flow(fps, N, data, x0):
if len(times) < 2:
logging.warning("Number of pedestrians passing the line is small. return 0")
return 0"min(times)=%f max(times)=%f"%(min(times), max(times)))
return fps * float(N-1) / (max(times) - min(times))
def CalcBiVarCDF(x,y,xGrid,yGrid):
Calculate the bivariate CDF of two given input signals on predefined grids.
Calculate the bivariate CDF of two given input signals on predefined grids.
- x: array of size n1
- y: array of size n2
- xGrid: array of size m1
- yGrid: array of size m2
- CDF2D: matrix
nPoints = np.size(x);
......@@ -345,13 +362,13 @@ def CalcBiVarCDF(x,y,xGrid,yGrid):
def CDFDistance(x1, y1, x2, y2):
For two input 2D signals calculate the distance between their CDFs.
For two input 2D signals calculate the distance between their CDFs.
- x1: array of size n
- y2: array of size n
- x2: array of size m
- y2: array of size m
- KSD: not negative number
xPoints = 100;
......@@ -365,7 +382,3 @@ def CDFDistance(x1, y1, x2, y2):
# KSD = np.linalg.norm(CDF1-CDF2)/(np.linalg.norm(CDF1)+np.linalg.norm(CDF1); # Frobenius norm (p=2)
KSD = np.max(np.abs(CDF1-CDF2)); # Kolmogorov-Smirnov distance (p=inf)
return KSD
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment