Skip to content

Commit

Permalink
Merge pull request #153 from ssl-hep:gordonwatts/issue143
Browse files Browse the repository at this point in the history
gordonwatts/issue143
  • Loading branch information
gordonwatts authored Mar 29, 2021
2 parents 34233d3 + 4214aa2 commit b8c3a72
Show file tree
Hide file tree
Showing 10 changed files with 48 additions and 57 deletions.
1 change: 1 addition & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"fname",
"getenv",
"gitlab",
"idna",
"inmem",
"isabstractmethod",
"jupyter",
Expand Down
4 changes: 2 additions & 2 deletions scripts/run_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ async def run_query(endpoint: Optional[ServiceXAdaptor], dest: str) -> None:
r = ds.get_data_rootfiles(request)
print(r)
elif dest == 'rootfiles-minio':
r = ds.get_data_rootfiles_minio_async(request)
async for f in r:
r = ds.get_data_rootfiles_async(request)
async for f in r: # type: ignore
print(f)


Expand Down
36 changes: 13 additions & 23 deletions servicex/data_conversions.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import asyncio
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import Dict, Iterable, Optional, Union
from awkward.array.chunked import ChunkedArray
from awkward.array.table import Table
from typing import Iterable, Optional

import pandas as pd
import awkward as ak
Expand Down Expand Up @@ -65,13 +63,13 @@ def combine_pandas(self, dfs: Iterable[pd.DataFrame]) -> pd.DataFrame:
'''
return pd.concat(dfs)

def combine_awkward(self, awks: Iterable[Union[Table, ChunkedArray]]) -> Table:
def combine_awkward(self, awks: Iterable[ak.Array]) -> ak.Array:
'''Combine many awkward arrays into a single one, in order.
Args:
awks (Iterable[ChunkedArray]): The input list of awkward arrays
'''
return ak.concatenate(awks)
return ak.concatenate(awks) # type: ignore

async def _convert_root_to_pandas(self, file: Path):
'''
Expand All @@ -94,14 +92,11 @@ async def _convert_root_to_pandas(self, file: Path):
from pandas import DataFrame

def do_the_work(file: Path) -> DataFrame:
import uproot
import uproot as uproot

f_in = uproot.open(file)
try:
with uproot.open(file) as f_in:
r = f_in[f_in.keys()[0]]
return r.pandas.df() # type: ignore
finally:
f_in._context.source.close()
return r.arrays(library='pd') # type: ignore

return await asyncio.wrap_future(_conversion_pool.submit(do_the_work, file))

Expand Down Expand Up @@ -150,15 +145,13 @@ async def _convert_root_to_awkward(self, file: Path):
this will leak filehandles - as that has to be left open.
'''
from numpy import ndarray
from awkward import JaggedArray
def do_the_work(file: Path) -> ak.Array:
import uproot as uproot

def do_the_work(file: Path) -> Dict[Union[str, bytes], Union[ndarray, JaggedArray]]:
import uproot
with uproot.open(file) as f_in:
tree_name = f_in.keys()[0]

f_in = uproot.open(file)
r = f_in[f_in.keys()[0]]
return r.lazyarrays() # type: ignore
return uproot.lazy(f'{file}:{tree_name}')

return await asyncio.wrap_future(_conversion_pool.submit(do_the_work, file))

Expand All @@ -180,11 +173,8 @@ async def _convert_parquet_to_awkward(self, file: Path):
- Pandas is only imported if this is called.
'''
import awkward as ak

def do_the_work(file: Path) -> \
Union[Dict[Union[str, bytes], ak.ChunkedArray], ChunkedArray]:
def do_the_work(file: Path) -> ak.Array:
# TODO: When we move to awkward1, make sure this becomes lazy
return ak.fromparquet(str(file))
return ak.from_parquet(str(file)) # type: ignore

return await asyncio.wrap_future(_conversion_pool.submit(do_the_work, file))
3 changes: 0 additions & 3 deletions servicex/minio_adaptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,6 @@ def get_access_url(self, request_id: str, object_name: str) -> str:
request_id (str): The request id guid (that is the bucket in minio)
object_name (str): The file (the object in the minio bucket)
Raises:
NotImplementedError: [description]
Returns:
str: A url good for some amount of time to access the bucket.
'''
Expand Down
14 changes: 8 additions & 6 deletions servicex/servicex.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# Main front end interface
import asyncio
import functools
import logging
Expand Down Expand Up @@ -255,7 +254,8 @@ async def get_data_rootfiles_stream(self, selection_query: str) \
a `StreamInfoPath` which can be used to access the
file locally.
'''
async for f_info in self._stream_local_files(selection_query, 'root-files'):
async for f_info in \
self._stream_local_files(selection_query, 'root-files'): # type: ignore
yield f_info

@functools.wraps(ServiceXABC.get_data_parquet_async, updated=())
Expand All @@ -278,7 +278,7 @@ async def get_data_parquet_stream(self, selection_query: str) \
a `StreamInfoPath` which can be used to access the
file locally.
'''
async for f_info in self._stream_local_files(selection_query, 'parquet'):
async for f_info in self._stream_local_files(selection_query, 'parquet'): # type: ignore
yield f_info

@functools.wraps(ServiceXABC.get_data_pandas_df_async, updated=())
Expand Down Expand Up @@ -338,7 +338,8 @@ async def get_data_rootfiles_url_stream(self, selection_query: str) \
Args:
selection_query (str): The ServiceX Selection
'''
async for f_info in self._stream_url_buckets(selection_query, 'root-files'):
async for f_info in \
self._stream_url_buckets(selection_query, 'root-files'): # type: ignore
yield f_info

async def get_data_parquet_url_stream(self, selection_query: str) \
Expand All @@ -349,7 +350,7 @@ async def get_data_parquet_url_stream(self, selection_query: str) \
Args:
selection_query (str): The ServiceX Selection
'''
async for f_info in self._stream_url_buckets(selection_query, 'parquet'):
async for f_info in self._stream_url_buckets(selection_query, 'parquet'): # type: ignore
yield f_info

async def _file_return(self, selection_query: str, data_format: str):
Expand Down Expand Up @@ -489,7 +490,8 @@ async def _stream_return(self, selection_query: str,
on the converter call.
'''
as_data = (StreamInfoData(f.file, await asyncio.ensure_future(converter(f.path)))
async for f in self._stream_local_files(selection_query, data_format))
async for f in
self._stream_local_files(selection_query, data_format)) # type: ignore

async for r in as_data:
yield r
Expand Down
7 changes: 3 additions & 4 deletions servicex/servicexabc.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Dict, List, Optional, Union
from typing import Dict, List, Optional

import awkward
from make_it_sync import make_sync
import numpy as np
import awkward as ak
import pandas as pd

from .utils import (
Expand Down Expand Up @@ -106,7 +105,7 @@ async def get_data_pandas_df_async(self, selection_query: str) -> pd.DataFrame:

@abstractmethod
async def get_data_awkward_async(self, selection_query: str) \
-> Dict[bytes, Union[awkward.JaggedArray, np.ndarray]]:
-> Dict[bytes, ak.Array]:
'''
Fetch query data from ServiceX matching `selection_query` and return it as
dictionary of awkward arrays, an entry for each column. The data is uniquely
Expand Down
4 changes: 2 additions & 2 deletions servicex/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,10 +328,10 @@ class translator(Transformer):
def record(self, children):
if (len(children) == 0
or isinstance(children[0], Token)
and children[0].type == 'WHITESPACE'):
and children[0].type == 'WHITESPACE'): # type: ignore
return ""
else:
return children[0].text
return children[0].text # type: ignore

def expression(self, children):
for child in children:
Expand Down
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@
install_requires=[
"idna==2.10", # Required to thread version needle with requests library
"pandas~=1.0",
"uproot~=3.7",
"uproot>=4.0.1, <5",
"awkward>=1.0.1, <2",
"backoff~=1.10",
"aiohttp~=3.6",
"minio~=5.0",
Expand Down
11 changes: 6 additions & 5 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ def __init__(self, text, status):
self._status_iter = iter([status])

async def text(self):
return next(self._text_iter)
return next(self._text_iter) # type: ignore

async def json(self):
return loads(next(self._text_iter))
return loads(next(self._text_iter)) # type: ignore

@property
def status(self):
return next(self._status_iter)
return next(self._status_iter) # type: ignore

async def __aexit__(self, exc_type, exc, tb):
pass
Expand Down Expand Up @@ -192,10 +192,11 @@ def good_pandas_file_data(mocker):

@pytest.fixture
def good_awkward_file_data(mocker):
import awkward as awk
import awkward as ak

converter = asyncmock.MagicMock(spec=DataConverterAdaptor)
converter.convert_to_awkward.return_value = {'JetPt': awk.fromiter([0, 1, 2, 3, 4, 5])}
converter.convert_to_awkward.return_value = \
{'JetPt': ak.from_iter([0, 1, 2, 3, 4, 5])} # type: ignore
converter.combine_awkward.return_value = converter.convert_to_awkward.return_value

return converter
Expand Down
22 changes: 11 additions & 11 deletions tests/test_data_conversions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
from servicex.data_conversions import DataConverterAdaptor
import pytest
import pandas as pd
import awkward as ak


def check_awkward_accessible(col):
def check_awkward_accessible(col: ak.Array):
'Check to make sure we can look at every item in column'
col.flatten()
ak.repartition(col, 3) # type: ignore


def check_pandas_accessible(col):
Expand Down Expand Up @@ -72,10 +73,10 @@ async def test_to_panads_fail(good_root_file_path):
def test_combine_pandas_from_root(good_root_file_path):
'Load a dataframe from root files and make sure that they work when we ask them to combine'
def load_df():
import uproot
import uproot as uproot
with uproot.open(good_root_file_path) as f_in:
df = f_in[f_in.keys()[0]].pandas.df() # type: ignore
return df
r = f_in[f_in.keys()[0]]
return r.arrays(library='pd') # type: ignore

df1 = load_df()
df2 = load_df()
Expand Down Expand Up @@ -104,10 +105,10 @@ def load_df():
def test_combine_awkward_from_root(good_root_file_path):
'Load a dataframe from root files and make sure that they work when we ask them to combine'
def load_df():
import uproot
f_in = uproot.open(good_root_file_path)
df = f_in[f_in.keys()[0]].lazyarrays() # type: ignore
return df
import uproot as uproot
with uproot.open(good_root_file_path) as f_in:
tree_name = f_in.keys()[0]
return uproot.lazy(f'{good_root_file_path}:{tree_name}')

df1 = load_df()
df2 = load_df()
Expand All @@ -121,8 +122,7 @@ def load_df():
def test_combine_awkward_from_parquet(good_uproot_file_path):
'Load a dataframe from a parquet file and make sure they work when we ask them to combine'
def load_df():
import awkward as ak
return ak.fromparquet(good_uproot_file_path)
return ak.from_parquet(good_uproot_file_path) # type: ignore

df1 = load_df()
df2 = load_df()
Expand Down

0 comments on commit b8c3a72

Please sign in to comment.