Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set vmen correctly, fixes #66 #70

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion erna/scripts/process_fact_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def make_jobs(jar, xml, aux_source_path, output_directory, df_mapping, engine,
for num, df in df_mapping.groupby("bunch_index"):
df=df.copy()
df["bunch_index"] = num
job = Job(stream_runner.run, [jar, xml, df, aux_source_path], queue=queue, walltime=walltime, engine=engine, mem_free='{}mb'.format(vmem))
job = Job(stream_runner.run, [jar, xml, df, aux_source_path, '{}mb'.format(0.9*vmem)], queue=queue, walltime=walltime, engine=engine, mem_free='{}mb'.format(vmem))
jobs.append(job)

return jobs
Expand Down
4 changes: 2 additions & 2 deletions erna/scripts/process_fact_mc.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ def make_jobs(jar, xml, data_paths, drs_paths,
file_name, _ = path.splitext(path.basename(output_path))
file_name = create_filename_from_format(filename_format, file_name, num)
out_path = path.dirname(output_path)
run = [jar, xml, df, path.join(out_path, file_name)]
run = [jar, xml, df, path.join(out_path, file_name), None, '{}mb'.format(0.9*vmem)]
stream_runner = stream_runner_local
else:
run = [jar, xml, df]
run = [jar, xml, df, None, '{mb}'.format(0.9*vmem)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will raise a KeyError, I think you ment '{}mb'

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I think it should be '{}m' as this will work, I don't know if java works with 'mb', it should be checked while fixing it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shit I though I had them all. Missed that one. Good catch. mb works. worked before

stream_runner = stream_runner_std

jobs.append(
Expand Down
2 changes: 1 addition & 1 deletion erna/scripts/process_fact_run_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def make_jobs(jar, xml, aux_source_path, output_directory, df_mapping, engine,
for num, indices in enumerate(split_indices):
df = df_mapping[indices.min(): indices.max()]

job = Job(stream_runner.run, [jar, xml, df, aux_source_path],
job = Job(stream_runner.run, [jar, xml, df, aux_source_path, '{}mb'.format(0.9*vmem)],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can result in bad errors, if vmem is lower then 10GB (the memory of everything else is set to around 1 GB). I think going for vmem-1000 and making sure that vmem can't be set lower then 2 GB for example, should be good. To create a check for vmem change its click type to: type=click.IntRange(2000, 1000000) (A maximum of 1 TB should be good enough for a little while).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand what you mean. This way the heap_size is always set to 90% vmem. so in case of 10GB vmem to 9GB heap_size and 1GB vmem to 900MB heap_size. Why should vmem-1000 help?

queue=queue, walltime=walltime, engine=engine,
mem_free='{}mb'.format(vmem))
jobs.append(job)
Expand Down
13 changes: 11 additions & 2 deletions erna/stream_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
)


def run(jar, xml, input_files_df, aux_source_path=None):
def run(jar, xml, input_files_df, aux_source_path=None,
max_heap_size='2014m'
):
'''
This is what will be executed on the cluster
'''
Expand All @@ -23,7 +25,14 @@ def run(jar, xml, input_files_df, aux_source_path=None):
output_path = os.path.join(output_directory, "output.json")

input_files_df.to_json(input_path, orient='records', date_format='epoch')
call = assemble_facttools_call(jar, xml, input_path, output_path, aux_source_path)
call = assemble_facttools_call(
jar,
xml,
input_path,
output_path,
aux_source_path,
max_heap_size,
)

check_environment_on_node()

Expand Down
13 changes: 11 additions & 2 deletions erna/stream_runner_local_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
)


def run(jar, xml, input_files_df, output_path, aux_source_path=None):
def run(jar, xml, input_files_df, output_path, aux_source_path=None
max_heap_size='2014m'
):
'''
This is a version of ernas stream runner that will be executed on the cluster,
but writes its results directly to disk without sending them
Expand All @@ -26,7 +28,14 @@ def run(jar, xml, input_files_df, output_path, aux_source_path=None):
tmp_output_path = os.path.join(output_directory, "output.json")

input_files_df.to_json(input_path, orient='records', date_format='epoch')
call = assemble_facttools_call(jar, xml, input_path, tmp_output_path, aux_source_path)
call = assemble_facttools_call(
jar,
xml,
input_path,
tmp_output_path,
aux_source_path,
max_heap_size,
)

check_environment_on_node()

Expand Down
16 changes: 11 additions & 5 deletions erna/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,23 @@ def date_to_night_int(night):
return 10000 * night.year + 100 * night.month + night.day


def assemble_facttools_call(jar, xml, input_path, output_path, aux_source_path=None):
def assemble_facttools_call(jar, xml, input_path, output_path,
aux_source_path=None,
max_heap_size='2014m',
initial_heap_size='512m',
compressed_class_space_size='64m',
max_meta_size='128m'
):
''' Assemble the call for fact-tools with the given combinations
of jar, xml, input_path and output_path. The db_path is optional
for the case where a db_file is needed
'''
call = [
'java',
'-XX:MaxHeapSize=1024m',
'-XX:InitialHeapSize=512m',
'-XX:CompressedClassSpaceSize=64m',
'-XX:MaxMetaspaceSize=128m',
'-XX:MaxHeapSize={}'.format(max_heap_size),
'-XX:InitialHeapSize={}'.format(initial_heap_size),
'-XX:CompressedClassSpaceSize={}'.format(compressed_class_space_size),
'-XX:MaxMetaspaceSize={}'.format(max_meta_size),
'-XX:+UseConcMarkSweepGC',
'-XX:+UseParNewGC',
'-jar',
Expand Down