diff --git a/erna/scripts/process_fact_data.py b/erna/scripts/process_fact_data.py index 60154c9..3a40545 100755 --- a/erna/scripts/process_fact_data.py +++ b/erna/scripts/process_fact_data.py @@ -22,7 +22,7 @@ def make_jobs(jar, xml, aux_source_path, output_directory, df_mapping, engine, for num, df in df_mapping.groupby("bunch_index"): df=df.copy() df["bunch_index"] = num - job = Job(stream_runner.run, [jar, xml, df, aux_source_path], queue=queue, walltime=walltime, engine=engine, mem_free='{}mb'.format(vmem)) + job = Job(stream_runner.run, [jar, xml, df, aux_source_path, '{}mb'.format(0.9*vmem)], queue=queue, walltime=walltime, engine=engine, mem_free='{}mb'.format(vmem)) jobs.append(job) return jobs diff --git a/erna/scripts/process_fact_mc.py b/erna/scripts/process_fact_mc.py index 9a445a1..67232c5 100644 --- a/erna/scripts/process_fact_mc.py +++ b/erna/scripts/process_fact_mc.py @@ -36,7 +36,7 @@ def make_jobs(jar, xml, data_paths, drs_paths, data_partitions = np.array_split(data_paths, num_jobs) drs_partitions = np.array_split(drs_paths, num_jobs) if output_path: - logger.info("Using stream runner für local output") + logger.info("Using stream runner for local output") else: logger.debug("Using std stream runner gathering output from all nodes") @@ -47,10 +47,10 @@ def make_jobs(jar, xml, data_paths, drs_paths, file_name, _ = path.splitext(path.basename(output_path)) file_name = create_filename_from_format(filename_format, file_name, num) out_path = path.dirname(output_path) - run = [jar, xml, df, path.join(out_path, file_name)] + run = [jar, xml, df, path.join(out_path, file_name), None, '{}mb'.format(0.9*vmem)] stream_runner = stream_runner_local else: - run = [jar, xml, df] + run = [jar, xml, df, None, '{}mb'.format(0.9*vmem)] stream_runner = stream_runner_std jobs.append( diff --git a/erna/scripts/process_fact_run_list.py b/erna/scripts/process_fact_run_list.py index c83c80e..18ed4d3 100644 --- a/erna/scripts/process_fact_run_list.py +++ b/erna/scripts/process_fact_run_list.py @@ -21,7 +21,7 @@ def make_jobs(jar, xml, aux_source_path, output_directory, df_mapping, engine, for num, indices in enumerate(split_indices): df = df_mapping[indices.min(): indices.max()] - job = Job(stream_runner.run, [jar, xml, df, aux_source_path], + job = Job(stream_runner.run, [jar, xml, df, aux_source_path, '{}mb'.format(0.9*vmem)], queue=queue, walltime=walltime, engine=engine, mem_free='{}mb'.format(vmem)) jobs.append(job) @@ -41,7 +41,7 @@ def make_jobs(jar, xml, aux_source_path, output_directory, df_mapping, engine, @click.option('--walltime', help='Estimated maximum walltime of your job in format hh:mm:ss.', default='02:00:00') @click.option('--engine', help='Name of the grid engine used by the cluster.', type=click.Choice(['PBS', 'SGE',]), default='SGE') @click.option('--num_jobs', help='Number of jobs to start on the cluster.', default='4', type=click.INT) -@click.option('--vmem', help='Amount of memory to use per node in MB.', default='400', type=click.INT) +@click.option('--vmem', help='Amount of memory to use per node in MB.', default='400', type=click.IntRange(2000, 1000000)) @click.option("--log_level", type=click.Choice(['INFO', 'DEBUG', 'WARN']), help='increase output verbosity', default='INFO') @click.option('--port', help='The port through which to communicate with the JobMonitor', default=12856, type=int) @click.option('--local', default=False,is_flag=True, help='Flag indicating whether jobs should be executed localy .') diff --git a/erna/stream_runner.py b/erna/stream_runner.py index 0a6c8c0..1e94015 100644 --- a/erna/stream_runner.py +++ b/erna/stream_runner.py @@ -11,7 +11,9 @@ ) -def run(jar, xml, input_files_df, aux_source_path=None): +def run(jar, xml, input_files_df, aux_source_path=None, + max_heap_size='2014m' + ): ''' This is what will be executed on the cluster ''' @@ -23,7 +25,14 @@ def run(jar, xml, input_files_df, aux_source_path=None): output_path = os.path.join(output_directory, "output.json") input_files_df.to_json(input_path, orient='records', date_format='epoch') - call = assemble_facttools_call(jar, xml, input_path, output_path, aux_source_path) + call = assemble_facttools_call( + jar, + xml, + input_path, + output_path, + aux_source_path, + max_heap_size, + ) check_environment_on_node() diff --git a/erna/stream_runner_local_output.py b/erna/stream_runner_local_output.py index 747d842..d04ba6b 100644 --- a/erna/stream_runner_local_output.py +++ b/erna/stream_runner_local_output.py @@ -12,7 +12,9 @@ ) -def run(jar, xml, input_files_df, output_path, aux_source_path=None): +def run(jar, xml, input_files_df, output_path, aux_source_path=None + max_heap_size='2014m' + ): ''' This is a version of ernas stream runner that will be executed on the cluster, but writes its results directly to disk without sending them @@ -26,7 +28,14 @@ def run(jar, xml, input_files_df, output_path, aux_source_path=None): tmp_output_path = os.path.join(output_directory, "output.json") input_files_df.to_json(input_path, orient='records', date_format='epoch') - call = assemble_facttools_call(jar, xml, input_path, tmp_output_path, aux_source_path) + call = assemble_facttools_call( + jar, + xml, + input_path, + tmp_output_path, + aux_source_path, + max_heap_size, + ) check_environment_on_node() diff --git a/erna/utils.py b/erna/utils.py index 9a4cad8..49d8787 100644 --- a/erna/utils.py +++ b/erna/utils.py @@ -65,17 +65,23 @@ def date_to_night_int(night): return 10000 * night.year + 100 * night.month + night.day -def assemble_facttools_call(jar, xml, input_path, output_path, aux_source_path=None): +def assemble_facttools_call(jar, xml, input_path, output_path, + aux_source_path=None, + max_heap_size='2014m', + initial_heap_size='512m', + compressed_class_space_size='64m', + max_meta_size='128m' + ): ''' Assemble the call for fact-tools with the given combinations of jar, xml, input_path and output_path. The db_path is optional for the case where a db_file is needed ''' call = [ 'java', - '-XX:MaxHeapSize=1024m', - '-XX:InitialHeapSize=512m', - '-XX:CompressedClassSpaceSize=64m', - '-XX:MaxMetaspaceSize=128m', + '-XX:MaxHeapSize={}'.format(max_heap_size), + '-XX:InitialHeapSize={}'.format(initial_heap_size), + '-XX:CompressedClassSpaceSize={}'.format(compressed_class_space_size), + '-XX:MaxMetaspaceSize={}'.format(max_meta_size), '-XX:+UseConcMarkSweepGC', '-XX:+UseParNewGC', '-jar',