Skip to content

Commit

Permalink
First go at getting awkward1 working
Browse files Browse the repository at this point in the history
Working on #143
  • Loading branch information
gordonwatts committed Dec 21, 2020
1 parent 29b475d commit 9c69381
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 33 deletions.
32 changes: 12 additions & 20 deletions servicex/data_conversions.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import asyncio
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import Dict, Iterable, Optional, Union
from awkward.array.chunked import ChunkedArray
from awkward.array.table import Table
from typing import Iterable, Optional

import pandas as pd
import awkward as ak
Expand Down Expand Up @@ -65,13 +63,13 @@ def combine_pandas(self, dfs: Iterable[pd.DataFrame]) -> pd.DataFrame:
'''
return pd.concat(dfs)

def combine_awkward(self, awks: Iterable[Union[Table, ChunkedArray]]) -> Table:
def combine_awkward(self, awks: Iterable[ak.Array]) -> ak.Array:
'''Combine many awkward arrays into a single one, in order.
Args:
awks (Iterable[ChunkedArray]): The input list of awkward arrays
'''
return ak.concatenate(awks)
return ak.concatenate(awks) # type: ignore

async def _convert_root_to_pandas(self, file: Path):
'''
Expand All @@ -96,12 +94,9 @@ async def _convert_root_to_pandas(self, file: Path):
def do_the_work(file: Path) -> DataFrame:
import uproot

f_in = uproot.open(file)
try:
with uproot.open(file) as f_in:
r = f_in[f_in.keys()[0]]
return r.pandas.df() # type: ignore
finally:
f_in._context.source.close()
return r.arrays(library='pd') # type: ignore

return await asyncio.wrap_future(_conversion_pool.submit(do_the_work, file))

Expand Down Expand Up @@ -150,15 +145,13 @@ async def _convert_root_to_awkward(self, file: Path):
this will leak filehandles - as that has to be left open.
'''
from numpy import ndarray
from awkward import JaggedArray

def do_the_work(file: Path) -> Dict[Union[str, bytes], Union[ndarray, JaggedArray]]:
def do_the_work(file: Path) -> ak.Array:
import uproot

f_in = uproot.open(file)
r = f_in[f_in.keys()[0]]
return r.lazyarrays() # type: ignore
with uproot.open(file) as f_in:
tree_name = f_in.keys()[0]

return uproot.lazy(f'{file}:{tree_name}')

return await asyncio.wrap_future(_conversion_pool.submit(do_the_work, file))

Expand All @@ -182,9 +175,8 @@ async def _convert_parquet_to_awkward(self, file: Path):
'''
import awkward as ak

def do_the_work(file: Path) -> \
Union[Dict[Union[str, bytes], ak.ChunkedArray], ChunkedArray]:
def do_the_work(file: Path) -> ak.Array:
# TODO: When we move to awkward1, make sure this becomes lazy
return ak.fromparquet(str(file))
return ak.from_parquet(str(file)) # type: ignore

return await asyncio.wrap_future(_conversion_pool.submit(do_the_work, file))
7 changes: 3 additions & 4 deletions servicex/servicexabc.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Dict, List, Optional, Union
from typing import Dict, List, Optional

import awkward
from make_it_sync import make_sync
import numpy as np
import awkward as ak
import pandas as pd

from .utils import (
Expand Down Expand Up @@ -106,7 +105,7 @@ async def get_data_pandas_df_async(self, selection_query: str) -> pd.DataFrame:

@abstractmethod
async def get_data_awkward_async(self, selection_query: str) \
-> Dict[bytes, Union[awkward.JaggedArray, np.ndarray]]:
-> Dict[bytes, ak.Array]:
'''
Fetch query data from ServiceX matching `selection_query` and return it as
dictionary of awkward arrays, an entry for each column. The data is uniquely
Expand Down
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
test_suite="tests",
install_requires=[
"pandas~=1.0",
"uproot~=3.7",
"uproot~=4.0",
"awkward~=1.0",
"backoff~=1.10",
"aiohttp~=3.6",
"minio~=5.0",
Expand Down
3 changes: 2 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ def good_awkward_file_data(mocker):
import awkward as awk

converter = asyncmock.MagicMock(spec=DataConverterAdaptor)
converter.convert_to_awkward.return_value = {'JetPt': awk.fromiter([0, 1, 2, 3, 4, 5])}
converter.convert_to_awkward.return_value = \
{'JetPt': awk.from_iter([0, 1, 2, 3, 4, 5])} # type: ignore
converter.combine_awkward.return_value = converter.convert_to_awkward.return_value

return converter
Expand Down
15 changes: 8 additions & 7 deletions tests/test_data_conversions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@

def check_awkward_accessible(col):
'Check to make sure we can look at every item in column'
col.flatten()
# TODO: fix this so it works for awkward arrays and numpy like arrays.
# ak.to_numpy(col) # type: ignore


def check_pandas_accessible(col):
Expand Down Expand Up @@ -74,8 +75,8 @@ def test_combine_pandas_from_root(good_root_file_path):
def load_df():
import uproot
with uproot.open(good_root_file_path) as f_in:
df = f_in[f_in.keys()[0]].pandas.df() # type: ignore
return df
r = f_in[f_in.keys()[0]]
return r.arrays(library='pd') # type: ignore

df1 = load_df()
df2 = load_df()
Expand Down Expand Up @@ -105,9 +106,9 @@ def test_combine_awkward_from_root(good_root_file_path):
'Load a dataframe from root files and make sure that they work when we ask them to combine'
def load_df():
import uproot
f_in = uproot.open(good_root_file_path)
df = f_in[f_in.keys()[0]].lazyarrays() # type: ignore
return df
with uproot.open(good_root_file_path) as f_in:
tree_name = f_in.keys()[0]
return uproot.lazy(f'{good_root_file_path}:{tree_name}')

df1 = load_df()
df2 = load_df()
Expand All @@ -122,7 +123,7 @@ def test_combine_awkward_from_parquet(good_uproot_file_path):
'Load a dataframe from a parquet file and make sure they work when we ask them to combine'
def load_df():
import awkward as ak
return ak.fromparquet(good_uproot_file_path)
return ak.from_parquet(good_uproot_file_path) # type: ignore

df1 = load_df()
df2 = load_df()
Expand Down

0 comments on commit 9c69381

Please sign in to comment.