Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ jobs:
- name: run test
run: |
ls -lhtr
./bin/run-clara -y ./etc/services/rgd-clarode.yml -t 4 -n 500 -c ./clara -o ./tmp ./clas_018779.evio.00001
./bin/run-clara -y ./etc/services/rgd-clarode.yml -t 4 -n 500 -c ./clara -o ./tmp ./clas_018779.evio.00001
ls -lhtr

test_coatjava:
Expand Down
26 changes: 19 additions & 7 deletions bin/run-clara
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Options:\n
\t-t number of threads (default=2)\n
\t-n number of events (default=-1)\n
\t-m merge output files (see dependencies below)\n
\t-l larger JVM memory requests\n
\t-h print this help and exit\n\n
Merging outputs (-m) requires hipo-utils and yq (https://github.com/mikefarah/yq).'

Expand All @@ -25,7 +26,7 @@ function error() {
threads=2
prefix=rec_
CLARA_USER_DATA=.
while getopts y:o:p:c:t:n:qmh opt
while getopts y:o:p:c:t:n:lqmh opt
do
case $opt in
y) yaml=$OPTARG ;;
Expand All @@ -34,7 +35,7 @@ do
c) CLARA_HOME=$OPTARG ;;
t) threads=$OPTARG && echo $threads | grep -q -E '^[0-9]+$' || error "-t must be an integer, threads" ;;
n) nevents="-e $OPTARG" && echo "$nevents" | grep -q -E '^-e [0-9]+$' || error "-n must be an integer, events" ;;
g) memory_gb="-e $OPTARG" && echo "$memory_gb" | grep -q -E '^-e [0-9]+$' || error "-g must be an integer, GB of memory" ;;
l) large=1 ;;
m) merge=1 ;;
q) quiet=1 ;;
h) echo -e "\n$usage" && echo -e $info && exit 0 ;;
Expand All @@ -43,8 +44,16 @@ done
shift $((OPTIND-1))
inputs=$@

# Configure JVM -Xmx memory setting:
[ -z ${memory_gb+x} ] && memory_gb=$((threads+2))
# Configure JVM memory settings (overridedable via $JAVA_OPTS):
if [ -z ${large+x} ]
then
gb_max=$((threads<3?threads+2:threads+2-threads/4))
gb_init=$((threads<3?threads:threads-threads/3))
else
gb_max=$((threads<3?threads+2:threads+2-threads/8))
gb_init=$((threads<3?threads:threads-threads/6))
fi
java_opts="-Xms${gb_init}g -Xmx${gb_max}g"

# Check configuration:
[ $# -lt 1 ] && error "Input data files are required"
Expand Down Expand Up @@ -76,11 +85,14 @@ done
[ $(cat $CLARA_USER_DATA/filelist.txt | wc -l) -gt 0 ] || error "Found no input files"

# Set some JVM options:
export JAVA_OPTS="$JAVA_OPTS -Xmx${memory_gb}g -XX:+IgnoreUnrecognizedVMOptions"
export JAVA_OPTS="$JAVA_OPTS -Djava.io.tmpdir=$CLARA_USER_DATA -Dorg.sqlite.tmpdir=$CLARA_USER_DATA"
JAVA_OPTS="$java_opts $JAVA_OPTS -XX:+IgnoreUnrecognizedVMOptions"
JAVA_OPTS="$JAVA_OPTS -Djava.io.tmpdir=$CLARA_USER_DATA -Dorg.sqlite.tmpdir=$CLARA_USER_DATA"

# Set verbosity:
[ -z ${quiet+x} ] && stub=fine || stub=info
export JAVA_OPTS="$JAVA_OPTS -Djava.util.logging.config.file=$CLAS12DIR/etc/logging/$stub.properties"
JAVA_OPTS="$JAVA_OPTS -Djava.util.logging.config.file=$CLAS12DIR/etc/logging/$stub.properties"

export JAVA_OPTS

function get_host_ip() {
if command -v ip >/dev/null 2>&1
Expand Down
102 changes: 102 additions & 0 deletions libexec/scaling
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#!/usr/bin/env python3

def cli():
import os,argparse
cli = argparse.ArgumentParser(description='CLARA scaling test')
cli.add_argument('-y',help='YAML file',required=True)
cli.add_argument('-c',help='CLARA_HOME path',default=os.getenv('CLARA_HOME',None))
cli.add_argument('-t',help='threads',default=[2,4],type=int,action='append')
cli.add_argument('-e',help='events per thread',default=100,type=int)
cli.add_argument('input',help='input data file')
cfg = cli.parse_args()
import sys
if cfg.c is None: sys.exit('-c or $CLARA_HOME is required')
return cfg

def run(cmd):
import subprocess
print('scaling >>> '+' '.join(cmd))
p = subprocess.Popen(cmd,stdout=subprocess.PIPE,stderr=subprocess.STDOUT,universal_newlines=True,encoding='latin-1')
for line in iter(p.stdout.readline, ''):
line = line.strip()
if len(line) > 0:
yield line
p.wait()
if p.returncode != 0:
pass

def benchmark(cfg, threads):
import collections
exiting,benchmarks = False,collections.OrderedDict()
cmd = ['run-clara',
'-c',cfg.c,
'-n',str(cfg.e*threads),
'-t',str(threads),
'-y',cfg.y,
'-o',f'tmp-scaling-{threads}',
cfg.input]
for line in run(cmd):
cols = line.split()
print(line)
try:
if line.find('Benchmark results:') >= 0:
exiting = True
elif line.find('Processing is complete') >= 0:
exiting = False
elif len(cols) > 20:
if line.find('Processed') >= 0:
benchmarks['event'] = float(cols[12])
elif exiting:
# catch-all for services:
if len(cols) > 14:
if 'services' not in benchmarks:
benchmarks['services'] = collections.OrderedDict()
benchmarks['services'][cols[2]] = float(cols[14])
# FIXME: what are these, why don't they add up?
elif line.find('Average processing time') >= 0:
benchmarks['avg'] = float(cols[6])
elif line.find('Total processing time') >= 0:
benchmarks['total'] = float(cols[6])
elif line.find('Total orchestrator time') >= 0:
benchmarks['orch'] = float(cols[6])
except ValueError:
pass
return benchmarks

def table(benchmarks):
table = []
header = [ 'threads' ]
b = benchmarks[0][1]
header.extend([x for x in b if x != 'services'])
if 'services' in b:
header.extend(b['services'].keys())
table.append(header)
for b in benchmarks:
threads,benchmark = b[0],b[1]
row = [threads]
for k in ['event','avg','total','orch','services']:
if k in benchmark:
if k == 'services':
row.extend(benchmark[k].values())
else:
row.append(benchmark[k])
table.append(row)
return table

def show(benchmarks):
for row in table(benchmarks):
print(' '.join([str(x) for x in row]))

def save(benchmarks):
with open('scaling.txt','w') as f:
for row in table(benchmarks):
f.write(' '.join([str(x) for x in row]))

if __name__ == '__main__':
cfg = cli()
benchmarks = []
for threads in cfg.t:
benchmarks.append([threads, benchmark(cfg, threads)])
show(benchmarks)
save(benchmarks)