From cf8ea8ee5dafffc31f0870e0931bfc4e75559822 Mon Sep 17 00:00:00 2001 From: alexmorten Date: Thu, 29 Feb 2024 17:15:12 +0900 Subject: [PATCH 1/3] migrate away from travis --- .github/workflows/linting.yml | 24 ++++++++++++++++++++++++ .travis.yml | 23 ----------------------- 2 files changed, 24 insertions(+), 23 deletions(-) create mode 100644 .github/workflows/linting.yml delete mode 100644 .travis.yml diff --git a/.github/workflows/linting.yml b/.github/workflows/linting.yml new file mode 100644 index 0000000..e034f49 --- /dev/null +++ b/.github/workflows/linting.yml @@ -0,0 +1,24 @@ +name: linting + + +on: + push + +jobs: + lint: + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: 3.7 + + - name: Install dependencies + run: cd lib && pip install -U pip && pip install -e .[dev] + + - name: Run flake8 + run: python -m flake8 lib/ diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index 014361d..0000000 --- a/.travis.yml +++ /dev/null @@ -1,23 +0,0 @@ -sudo: false -language: python -python: - - 3.5 - - 3.6 - - 3.7 - -branches: - only: - - master - -before_install: - - 'cd lib' - - 'pip install -U pip' - - 'pip install -e .[dev]' - -script: - - 'python -m flake8 lib/' - -notifications: - email: false - slack: - secure: A/BZm/7ISFDUQi+IDYtmph8Jd7Un7Gpbl1N/L7J53dpvwirHEcoir4a105fgDAVw21JQ8YFQNZTtwR42YGKd1uUs3F+u5PwXk5e3FmU2yCfAhT7Fc7G7rlR4TzsF8llltKUqxv34yBU3P6pmosomMkyyV1ohbAqB10uctxqa02w= From a726f418f1fa4e4f8bb2342be0c9ea2a909292f0 Mon Sep 17 00:00:00 2001 From: alexmorten Date: Mon, 4 Mar 2024 15:12:14 +0900 Subject: [PATCH 2/3] autoformat helpers.py --- lib/helpers.py | 447 +++++++++++++++++++++++++++++++++---------------- 1 file changed, 306 insertions(+), 141 deletions(-) diff --git a/lib/helpers.py b/lib/helpers.py index e4a4f57..73371a3 100644 --- a/lib/helpers.py +++ b/lib/helpers.py @@ -7,30 +7,43 @@ from datetime import datetime -from lib.const import __version__, TEST, CONTROL, CSV_SOURCE_MARKS_AND_SPEND, CSV_SOURCE_ATTRIBUTIONS, USER_ID_LENGTH +from lib.const import ( + __version__, + TEST, + CONTROL, + CSV_SOURCE_MARKS_AND_SPEND, + CSV_SOURCE_ATTRIBUTIONS, + USER_ID_LENGTH, +) + +START_APPSFLYER_SIDE_DEDUPLICATION = "17.09.2020" -START_APPSFLYER_SIDE_DEDUPLICATION = '17.09.2020' def log(*args): """ Print something in a cleanly formatted fashion, with a timestamp - :param args: A list of arguments to print. Internally, they will be passed to built-in print function + + :param args: A list of arguments to print. + Internally, they will be passed to the built-in print function :return None """ - print('[%s]:' % datetime.now(), *args) + print("[%s]:" % datetime.now(), *args) class Helpers(object): """ - A class, wrapping a set of helper functions, providing functionality to calculate uplift report, given the + A class, wrapping a set of helper functions, providing functionality to calculate uplift report, + given the setup arguments :param customer: Name of the customer the report is created for :param audiences: A list of audiences for which the report is going to be calculated :param revenue_event: An event which is going to be taken as a revenue event, e.g. "purchase" - :param dates: Date range of marks & spend, for which the report is to be generated (use pandas.date_range to generate a range) - :param attribution_dates: Optional date range of attributions defaults to dates (use pandas.date_range to generate a range) + :param dates: Date range of marks & spend, for which the report is to be generated + (use pandas.date_range to generate a range) + :param attribution_dates: Optional date range of attributions defaults to dates + (use pandas.date_range to generate a range) :param groups: An optional dictionary of named campaign groups, by which the report should be split. Example: { "All US campaigns": [1234, 3456, 5678], @@ -54,9 +67,20 @@ class Helpers(object): :type csv_helpers_kwargs: dict[string, object] """ - def __init__(self, customer, audiences, revenue_event, dates, attribution_dates=None, groups=None, per_campaign_results=False, - use_converters_for_significance=False, use_deduplication=False, export_user_ids=False, - csv_helpers_kwargs=None): + def __init__( + self, + customer, + audiences, + revenue_event, + dates, + attribution_dates=None, + groups=None, + per_campaign_results=False, + use_converters_for_significance=False, + use_deduplication=False, + export_user_ids=False, + csv_helpers_kwargs=None, + ): self.customer = customer self.audiences = audiences @@ -99,11 +123,15 @@ def load_marks_and_spend_data(self): :rtype: pandas.DataFrame """ df = pd.concat( - [self._csv_helpers.read_csv( - audience=audience, - source=CSV_SOURCE_MARKS_AND_SPEND, - date=date, - ) for audience in self.audiences for date in self.dates], + [ + self._csv_helpers.read_csv( + audience=audience, + source=CSV_SOURCE_MARKS_AND_SPEND, + date=date, + ) + for audience in self.audiences + for date in self.dates + ], ignore_index=True, verify_integrity=True, ) @@ -121,24 +149,30 @@ def load_attribution_data(self, marks_and_spend_df): :return: Resulting attributions dataframe :rtype: pandas.DataFrame """ - marked_user_ids = self._marked(marks_and_spend_df)['user_id'] + marked_user_ids = self._marked(marks_and_spend_df)["user_id"] df = pd.concat( - [self._filter_by_user_ids( - df=self._csv_helpers.read_csv( - audience=audience, - source=CSV_SOURCE_ATTRIBUTIONS, - date=date, - chunk_filter_fn=self._extract_revenue_events, - ), - user_ids=marked_user_ids, - ) for audience in self.audiences for date in self.attribution_dates], + [ + self._filter_by_user_ids( + df=self._csv_helpers.read_csv( + audience=audience, + source=CSV_SOURCE_ATTRIBUTIONS, + date=date, + chunk_filter_fn=self._extract_revenue_events, + ), + user_ids=marked_user_ids, + ) + for audience in self.audiences + for date in self.attribution_dates + ], ignore_index=True, verify_integrity=True, ) # AppsFlyer sends some events twice - we want to remove the duplicates before the analysis if self.use_deduplication: - df = self._drop_duplicates_in_attributions(df=df, max_timedelta=pd.Timedelta('1 minute')) + df = self._drop_duplicates_in_attributions( + df=df, max_timedelta=pd.Timedelta("1 minute") + ) return df @@ -166,18 +200,23 @@ def uplift_report(self, marks_and_spend_df, attributions_df): # if there are groups filter the events against the per campaign groups and generate report if report_df is not None and self.groups: for name, campaigns in self.groups.items(): - group_df = marks_and_spend_df[marks_and_spend_df.campaign_id.isin(campaigns)] + group_df = marks_and_spend_df[ + marks_and_spend_df.campaign_id.isin(campaigns) + ] report_df[name] = self._uplift( marks_and_spend_df=group_df, attributions_df=attributions_df, index_name=name, - m_hypothesis=len(self.groups)) + m_hypothesis=len(self.groups), + ) if report_df is not None and self.per_campaign_results: - campaigns = marks_and_spend_df['campaign_id'].unique() + campaigns = marks_and_spend_df["campaign_id"].unique() for campaign in campaigns: name = "c_{0}".format(campaign) - campaign_df = marks_and_spend_df[marks_and_spend_df.campaign_id == campaign] + campaign_df = marks_and_spend_df[ + marks_and_spend_df.campaign_id == campaign + ] report_df[name] = self._uplift( marks_and_spend_df=campaign_df, attributions_df=attributions_df, @@ -197,22 +236,35 @@ def remove_users_marked_as_control_and_test(self, marks_and_spend_df): :return: Dataframe without users marked as both test and control :rtype: pandas.DataFrame """ - user_to_ab_test_group_count = (marks_and_spend_df[marks_and_spend_df['event_type'] == 'mark'] - .groupby('user_id', as_index=False) - .agg({'ab_test_group': pd.Series.nunique })) + user_to_ab_test_group_count = ( + marks_and_spend_df[marks_and_spend_df["event_type"] == "mark"] + .groupby("user_id", as_index=False) + .agg({"ab_test_group": pd.Series.nunique}) + ) - users_with_double_marks = user_to_ab_test_group_count[user_to_ab_test_group_count['ab_test_group'] > 1] + users_with_double_marks = user_to_ab_test_group_count[ + user_to_ab_test_group_count["ab_test_group"] > 1 + ] number_of_users_with_double_marks = len(users_with_double_marks) if number_of_users_with_double_marks > 0: - percentage_removed = (number_of_users_with_double_marks / marks_and_spend_df['user_id'].nunique()) * 100 - - log("Removed %s users (%.5f%%) due to double marking." % (number_of_users_with_double_marks, percentage_removed)) + percentage_removed = ( + number_of_users_with_double_marks + / marks_and_spend_df["user_id"].nunique() + ) * 100 + + log( + "Removed %s users (%.5f%%) due to double marking." + % (number_of_users_with_double_marks, percentage_removed) + ) else: log("No users were double marked") - return marks_and_spend_df[~marks_and_spend_df['user_id'].isin(users_with_double_marks['user_id'])] + return marks_and_spend_df[ + ~marks_and_spend_df["user_id"].isin( + users_with_double_marks["user_id"]) + ] @staticmethod def export_csv(df, file_name): @@ -230,12 +282,12 @@ def export_csv(df, file_name): """ df.to_csv(file_name) - log('Stored results as a local CSV file', file_name) + log("Stored results as a local CSV file", file_name) try: import google.colab - log('The download of the results file should start automatically') + log("The download of the results file should start automatically") google.colab.files.download(file_name) except ImportError: # We are not in the colab, no need to run the download @@ -247,12 +299,12 @@ def _extract_revenue_events(df, revenue_event): Only keep rows where the event is a revenue event and drop the partner_event column afterwards """ df = df[df.partner_event == revenue_event] - return df.drop(columns=['partner_event']) + return df.drop(columns=["partner_event"]) @staticmethod def _filter_by_user_ids(df, user_ids): - if 'user_id' in df.columns: - return df[df['user_id'].isin(user_ids)] + if "user_id" in df.columns: + return df[df["user_id"].isin(user_ids)] else: return df @@ -272,28 +324,44 @@ def _drop_duplicates_in_attributions(df, max_timedelta): Therefore we rely on a heuristic. We consider an event a duplicate if the user and revenue are equal and the events are less than a minute apart. """ - sorted_values = df.sort_values(['user_id', 'revenue_eur']) + sorted_values = df.sort_values(["user_id", "revenue_eur"]) - pdDedupStarts = pd.Timestamp(START_APPSFLYER_SIDE_DEDUPLICATION).tz_localize(None) + pdDedupStarts = pd.Timestamp(START_APPSFLYER_SIDE_DEDUPLICATION).tz_localize( + None + ) - time_condition = pd.DatetimeIndex(pd.to_datetime(sorted_values['ts'])).tz_localize(None) >= pdDedupStarts + time_condition = ( + pd.DatetimeIndex(pd.to_datetime( + sorted_values["ts"])).tz_localize(None) + >= pdDedupStarts + ) filtered_by_time = sorted_values[time_condition] sorted_values = sorted_values[~time_condition] # Get values of the previous row - sorted_values['last_ts'] = sorted_values['ts'].shift(1) - sorted_values['last_user_id'] = sorted_values['user_id'].shift(1) - sorted_values['last_revenue'] = sorted_values['revenue_eur'].shift(1) + sorted_values["last_ts"] = sorted_values["ts"].shift(1) + sorted_values["last_user_id"] = sorted_values["user_id"].shift(1) + sorted_values["last_revenue"] = sorted_values["revenue_eur"].shift(1) # Remove rows if the previous row has the same revenue_eur and user id and the ts are less than max_timedelta # apart filtered = sorted_values[ - ('appsflyer_deduplicated' in sorted_values.columns and sorted_values['appsflyer_deduplicated']) | - (sorted_values['user_id'] != sorted_values['last_user_id']) | - (sorted_values['revenue_eur'] != sorted_values['last_revenue']) | - ((pd.to_datetime(sorted_values['ts']) - pd.to_datetime(sorted_values['last_ts'])) > max_timedelta)] + ( + "appsflyer_deduplicated" in sorted_values.columns + and sorted_values["appsflyer_deduplicated"] + ) + | (sorted_values["user_id"] != sorted_values["last_user_id"]) + | (sorted_values["revenue_eur"] != sorted_values["last_revenue"]) + | ( + ( + pd.to_datetime(sorted_values["ts"]) + - pd.to_datetime(sorted_values["last_ts"]) + ) + > max_timedelta + ) + ] - filtered = filtered[['ts', 'user_id', 'revenue_eur']] + filtered = filtered[["ts", "user_id", "revenue_eur"]] return pd.concat([filtered, filtered_by_time]) @@ -310,19 +378,25 @@ def _uplift(self, marks_and_spend_df, attributions_df, index_name, m_hypothesis= marks_df = self._marked(marks_and_spend_df) # calculate group sizes - test_group_size = marks_df[marks_df['ab_test_group'] == TEST]['user_id'].nunique() + test_group_size = marks_df[marks_df["ab_test_group"] == TEST][ + "user_id" + ].nunique() if test_group_size == 0: - log("WARNING: No users marked as test for ", index_name, 'skipping.. ') + log("WARNING: No users marked as test for ", index_name, "skipping.. ") return None - control_group_size = marks_df[marks_df['ab_test_group'] == CONTROL]['user_id'].nunique() + control_group_size = marks_df[marks_df["ab_test_group"] == CONTROL][ + "user_id" + ].nunique() if control_group_size == 0: - log("WARNING: No users marked as control for ", index_name, 'skipping.. ') + log("WARNING: No users marked as control for ", + index_name, "skipping.. ") return None # join marks and revenue events - merged_df = self._merge(marks_df=marks_df, attributions_df=attributions_df) - grouped_revenue = merged_df.groupby(by='ab_test_group') + merged_df = self._merge( + marks_df=marks_df, attributions_df=attributions_df) + grouped_revenue = merged_df.groupby(by="ab_test_group") # init all KPIs with 0s first: test_revenue_micros = 0 @@ -336,30 +410,30 @@ def _uplift(self, marks_and_spend_df, attributions_df, index_name, m_hypothesis= # we might not have any events for a certain group in the time-period, if TEST in grouped_revenue.groups: test_revenue_df = grouped_revenue.get_group(TEST) - test_revenue_micros = test_revenue_df['revenue_eur'].sum() + test_revenue_micros = test_revenue_df["revenue_eur"].sum() # test_conversions = test_revenue_df['partner_event'].count() # as we filtered by revenue event and dropped the column we can just use - test_conversions = test_revenue_df['user_id'].count() - test_converters = test_revenue_df['user_id'].nunique() + test_conversions = test_revenue_df["user_id"].count() + test_converters = test_revenue_df["user_id"].nunique() if CONTROL in grouped_revenue.groups: control_revenue_df = grouped_revenue.get_group(CONTROL) - control_revenue_micros = control_revenue_df['revenue_eur'].sum() + control_revenue_micros = control_revenue_df["revenue_eur"].sum() # control_conversions = control_revenue_df['partner_event'].count() # as we filtered by revenue event and dropped the column we can just use - control_conversions = control_revenue_df['user_id'].count() - control_converters = control_revenue_df['user_id'].nunique() + control_conversions = control_revenue_df["user_id"].count() + control_converters = control_revenue_df["user_id"].nunique() # calculate KPIs - test_revenue = test_revenue_micros / 10 ** 6 - control_revenue = control_revenue_micros / 10 ** 6 + test_revenue = test_revenue_micros / 10**6 + control_revenue = control_revenue_micros / 10**6 ratio = float(test_group_size) / float(control_group_size) scaled_control_conversions = float(control_conversions) * ratio scaled_control_revenue_micros = float(control_revenue_micros) * ratio incremental_conversions = test_conversions - scaled_control_conversions incremental_revenue_micros = test_revenue_micros - scaled_control_revenue_micros - incremental_revenue = incremental_revenue_micros / 10 ** 6 + incremental_revenue = incremental_revenue_micros / 10**6 incremental_converters = test_converters - control_converters * ratio # calculate the ad spend @@ -387,15 +461,22 @@ def _uplift(self, marks_and_spend_df, attributions_df, index_name, m_hypothesis= control_successes, test_successes = control_conversions, test_conversions if self.use_converters_for_significance or max(test_cvr, control_cvr) > 1.0: control_successes, test_successes = control_converters, test_converters - chi_df = pd.DataFrame({ - "conversions": [control_successes, test_successes], - "total": [control_group_size, test_group_size] - }, index=['control', 'test']) + chi_df = pd.DataFrame( + { + "conversions": [control_successes, test_successes], + "total": [control_group_size, test_group_size], + }, + index=["control", "test"], + ) # CHI square calculation will fail with insufficient data # Fallback to no significance try: chi, p, _, _ = scipy.stats.chi2_contingency( - pd.concat([chi_df.total - chi_df.conversions, chi_df.conversions], axis=1), correction=False) + pd.concat( + [chi_df.total - chi_df.conversions, chi_df.conversions], axis=1 + ), + correction=False, + ) except: chi, p = 0, 1.0 @@ -416,7 +497,7 @@ def _uplift(self, marks_and_spend_df, attributions_df, index_name, m_hypothesis= "control revenue": control_revenue, "ratio test/control": ratio, "control conversions (scaled)": scaled_control_conversions, - "control revenue (scaled)": scaled_control_revenue_micros / 10 ** 6, + "control revenue (scaled)": scaled_control_revenue_micros / 10**6, "incremental conversions": incremental_conversions, "incremental converters": incremental_converters, "incremental revenue": incremental_revenue, @@ -430,7 +511,7 @@ def _uplift(self, marks_and_spend_df, attributions_df, index_name, m_hypothesis= "iCPA": icpa, "chi^2": chi, "p-value": p, - "significant": significant + "significant": significant, } # show results as a dataframe @@ -449,21 +530,23 @@ def _marked(df): if df.empty: return df - mark_df = df[df.event_type == 'mark'] + mark_df = df[df.event_type == "mark"] # we dont need the event_type anymore (to save memory) - mark_df = mark_df.drop(columns=['event_type']) + mark_df = mark_df.drop(columns=["event_type"]) - sorted_mark_df = mark_df.sort_values('ts') + sorted_mark_df = mark_df.sort_values("ts") - deduplicated_mark_df = sorted_mark_df.drop_duplicates(['user_id']) + deduplicated_mark_df = sorted_mark_df.drop_duplicates(["user_id"]) return deduplicated_mark_df @staticmethod def _calculate_ad_spend(df): - ad_spend_micros = df[(df.event_type == 'buying_conversion') & (df.ab_test_group == TEST)]['cost_eur'].sum() - return ad_spend_micros / 10 ** 6 + ad_spend_micros = df[ + (df.event_type == "buying_conversion") & (df.ab_test_group == TEST) + ]["cost_eur"].sum() + return ad_spend_micros / 10**6 @staticmethod def _merge(marks_df, attributions_df): @@ -471,13 +554,15 @@ def _merge(marks_df, attributions_df): `merge` joins the marked users with the revenue events and excludes any revenue event that happened before the user was marked. """ - merged_df = pd.merge(attributions_df, marks_df, on='user_id') + merged_df = pd.merge(attributions_df, marks_df, on="user_id") return merged_df[merged_df.ts_x > merged_df.ts_y] class _CSVHelpers(object): - def __init__(self, customer, revenue_event, chunk_size=10 ** 3, export_user_ids=False): + def __init__( + self, customer, revenue_event, chunk_size=10**3, export_user_ids=False + ): """ Internal class, containing technical read-write related methods and helpers :param customer: Name of the customer the report is created for @@ -497,15 +582,31 @@ def __init__(self, customer, revenue_event, chunk_size=10 ** 3, export_user_ids= # columns to load from CSV self.columns = dict() - self.columns[CSV_SOURCE_MARKS_AND_SPEND] = ['ts', 'user_id', 'ab_test_group', 'campaign_id', 'cost_eur', - 'event_type'] - self.columns[CSV_SOURCE_ATTRIBUTIONS] = ['ts', 'user_id', 'partner_event', - 'revenue_eur', 'appsflyer_deduplicated'] + self.columns[CSV_SOURCE_MARKS_AND_SPEND] = [ + "ts", + "user_id", + "ab_test_group", + "campaign_id", + "cost_eur", + "event_type", + ] + self.columns[CSV_SOURCE_ATTRIBUTIONS] = [ + "ts", + "user_id", + "partner_event", + "revenue_eur", + "appsflyer_deduplicated", + ] def _export_user_ids(self, date, audience, test_users, control_users): if self.export_user_ids: - Helpers.export_csv(test_users, '{}_{}-{}.csv'.format(audience, date, 'test_users')) - Helpers.export_csv(control_users, '{}_{}-{}.csv'.format(audience, date, 'control_users')) + Helpers.export_csv( + test_users, "{}_{}-{}.csv".format(audience, date, "test_users") + ) + Helpers.export_csv( + control_users, "{}_{}-{}.csv".format( + audience, date, "control_users") + ) def read_csv(self, audience, source, date, chunk_filter_fn=None): """ @@ -514,20 +615,20 @@ def read_csv(self, audience, source, date, chunk_filter_fn=None): """ now = datetime.now() - date_str = date.strftime('%Y%m%d') + date_str = date.strftime("%Y%m%d") cache_folder = "cache-v{0}".format(__version__) if self.export_user_ids: cache_folder += "-user-export" - filename = '{0}/{1}/{2}.csv.gz'.format( + filename = "{0}/{1}/{2}.csv.gz".format( self._audience_data_path(audience), source, date_str, ) # local cache - cache_dir = '{0}/{1}/{2}'.format( + cache_dir = "{0}/{1}/{2}".format( cache_folder, audience, source, @@ -536,13 +637,13 @@ def read_csv(self, audience, source, date, chunk_filter_fn=None): if not os.path.exists(cache_dir): os.makedirs(cache_dir) - cache_file_name = '{0}/{1}.parquet'.format( + cache_file_name = "{0}/{1}.parquet".format( cache_dir, date_str, ) # s3 cache (useful if we don't have enough space on the Colab instance) - s3_cache_file_name = '{0}/{1}/{2}/{3}.parquet'.format( + s3_cache_file_name = "{0}/{1}/{2}/{3}.parquet".format( self._audience_data_path(audience), source, cache_folder, @@ -550,14 +651,14 @@ def read_csv(self, audience, source, date, chunk_filter_fn=None): ) if source == CSV_SOURCE_ATTRIBUTIONS: - cache_file_name = '{0}/{1}-{2}.parquet'.format( + cache_file_name = "{0}/{1}-{2}.parquet".format( cache_dir, date_str, self.revenue_event, ) # s3 cache (useful if we don't have enough space on the Colab instance) - s3_cache_file_name = '{0}/{1}/{2}/{3}-{4}.parquet'.format( + s3_cache_file_name = "{0}/{1}/{2}/{3}-{4}.parquet".format( self._audience_data_path(audience), source, cache_folder, @@ -572,24 +673,29 @@ def read_csv(self, audience, source, date, chunk_filter_fn=None): columns = self.columns.get(source) if os.path.exists(cache_file_name): - log('loading from', cache_file_name) + log("loading from", cache_file_name) ret, test_users, control_users = self._from_parquet_corrected( file_name=cache_file_name, s3_file_name=s3_cache_file_name, fs=fs, columns=columns, ) - self._export_user_ids(date=date, audience=audience, test_users=test_users, control_users=control_users) + self._export_user_ids( + date=date, + audience=audience, + test_users=test_users, + control_users=control_users, + ) return ret if fs.exists(path=s3_cache_file_name): - log('loading from S3 cache', s3_cache_file_name) + log("loading from S3 cache", s3_cache_file_name) # Download the file to local cache first to avoid timeouts during the load. # This way, if they happen, restart will be using local copies first. fs.get(s3_cache_file_name, cache_file_name) - log('stored S3 cache file to local drive, loading', cache_file_name) + log("stored S3 cache file to local drive, loading", cache_file_name) ret, test_users, control_users = self._from_parquet_corrected( file_name=cache_file_name, @@ -597,27 +703,41 @@ def read_csv(self, audience, source, date, chunk_filter_fn=None): fs=fs, columns=columns, ) - self._export_user_ids(date=date, audience=audience, test_users=test_users, control_users=control_users) + self._export_user_ids( + date=date, + audience=audience, + test_users=test_users, + control_users=control_users, + ) return ret - log('start loading CSV for', audience, source, date) - log('filename', filename) + log("start loading CSV for", audience, source, date) + log("filename", filename) - read_csv_kwargs = {'chunksize': self.chunk_size} + read_csv_kwargs = {"chunksize": self.chunk_size} if columns: - read_csv_kwargs['usecols'] = lambda c: c in columns + read_csv_kwargs["usecols"] = lambda c: c in columns df = pd.DataFrame() test_users = pd.DataFrame() control_users = pd.DataFrame() if not fs.exists(path=filename): - log('WARNING: no CSV file at for: ', audience, source, date, ', skipping the file: ', filename) + log( + "WARNING: no CSV file at for: ", + audience, + source, + date, + ", skipping the file: ", + filename, + ) return df with _S3CachedFile(fs, filename) as s3_file: - log('starting processing CSV for', date.strftime('%d.%m.%Y')) - for chunk in pd.read_csv(s3_file.local_path, escapechar='\\', low_memory=False, **read_csv_kwargs): + log("starting processing CSV for", date.strftime("%d.%m.%Y")) + for chunk in pd.read_csv( + s3_file.local_path, escapechar="\\", low_memory=False, **read_csv_kwargs + ): if chunk_filter_fn: filtered_chunk = chunk_filter_fn(chunk, self.revenue_event) else: @@ -625,37 +745,62 @@ def read_csv(self, audience, source, date, chunk_filter_fn=None): if source != CSV_SOURCE_ATTRIBUTIONS: # we are not interested in events that do not have a group amongst non-attribution events - filtered_chunk = filtered_chunk[filtered_chunk['ab_test_group'].isin(['test', 'control'])] + filtered_chunk = filtered_chunk[ + filtered_chunk["ab_test_group"].isin( + ["test", "control"]) + ] # remove events without a user id - filtered_chunk = filtered_chunk[filtered_chunk['user_id'].str.len() == USER_ID_LENGTH] + filtered_chunk = filtered_chunk[ + filtered_chunk["user_id"].str.len() == USER_ID_LENGTH + ] if self.export_user_ids: - test_users_chunk = filtered_chunk[filtered_chunk['ab_test_group'] == TEST][ - ['user_id']].drop_duplicates() - control_users_chunk = filtered_chunk[filtered_chunk['ab_test_group'] == CONTROL][ - ['user_id']].drop_duplicates() - test_users = pd.concat([test_users, test_users_chunk], ignore_index=True, verify_integrity=True) - control_users = pd.concat([control_users, control_users_chunk], ignore_index=True, - verify_integrity=True) + test_users_chunk = filtered_chunk[ + filtered_chunk["ab_test_group"] == TEST + ][["user_id"]].drop_duplicates() + control_users_chunk = filtered_chunk[ + filtered_chunk["ab_test_group"] == CONTROL + ][["user_id"]].drop_duplicates() + test_users = pd.concat( + [test_users, test_users_chunk], + ignore_index=True, + verify_integrity=True, + ) + control_users = pd.concat( + [control_users, control_users_chunk], + ignore_index=True, + verify_integrity=True, + ) filtered_chunk = self._improve_types(filtered_chunk) - df = pd.concat([df, filtered_chunk], - ignore_index=True, verify_integrity=True) + df = pd.concat( + [df, filtered_chunk], ignore_index=True, verify_integrity=True + ) - log('finished processing CSV for', date.strftime('%d.%m.%Y'), 'took', datetime.now() - now) + log( + "finished processing CSV for", + date.strftime("%d.%m.%Y"), + "took", + datetime.now() - now, + ) - self._export_user_ids(date=date, audience=audience, test_users=test_users, control_users=control_users) + self._export_user_ids( + date=date, + audience=audience, + test_users=test_users, + control_users=control_users, + ) if not os.path.exists(cache_dir): os.makedirs(cache_dir) - log('caching local as parquet', cache_file_name) + log("caching local as parquet", cache_file_name) self._to_parquet(df, cache_file_name) # write it to the S3 cache folder as well - log('caching on S3 as parquet', s3_cache_file_name) + log("caching on S3 as parquet", s3_cache_file_name) self._to_parquet(df, s3_cache_file_name) return df @@ -671,27 +816,29 @@ def _to_parquet(df, file_name): """ parquet save and load helper """ - df.to_parquet(file_name, engine='pyarrow') + df.to_parquet(file_name, engine="pyarrow") @staticmethod def _improve_types(df): """ Use more memory efficient types for ts,user_id and ab_test_group """ - df['ts'] = pd.to_datetime(df['ts']) - df['ts'] = (df['ts'].astype('int64') / 1e9).astype('int32') - df['user_id'] = df['user_id'].apply(xxhash.xxh64_intdigest).astype('int64') - if 'ab_test_group' in df.columns: - df['ab_test_group'] = df['ab_test_group'].transform(lambda g: g == 'test') + df["ts"] = pd.to_datetime(df["ts"]) + df["ts"] = (df["ts"].astype("int64") / 1e9).astype("int32") + df["user_id"] = df["user_id"].apply( + xxhash.xxh64_intdigest).astype("int64") + if "ab_test_group" in df.columns: + df["ab_test_group"] = df["ab_test_group"].transform( + lambda g: g == "test") return df @staticmethod def _from_parquet(filename, fs): - if filename.startswith('s3://'): + if filename.startswith("s3://"): with _S3CachedFile(fs, filename) as s3_file: - return pd.read_parquet(s3_file.local_path, engine='pyarrow') + return pd.read_parquet(s3_file.local_path, engine="pyarrow") else: - return pd.read_parquet(filename, engine='pyarrow') + return pd.read_parquet(filename, engine="pyarrow") def _from_parquet_corrected(self, file_name, s3_file_name, fs, columns): """ @@ -708,15 +855,22 @@ def _from_parquet_corrected(self, file_name, s3_file_name, fs, columns): test_users = pd.DataFrame() control_users = pd.DataFrame() if self.export_user_ids: - test_users = df[df['ab_test_group'] == TEST][['user_id']].drop_duplicates() - control_users = df[df['ab_test_group'] == CONTROL][['user_id']].drop_duplicates() + test_users = df[df["ab_test_group"] == + TEST][["user_id"]].drop_duplicates() + control_users = df[df["ab_test_group"] == CONTROL][ + ["user_id"] + ].drop_duplicates() - if df['ts'].dtype != 'int32': + if df["ts"].dtype != "int32": df = _CSVHelpers._improve_types(df) update_cache = True if update_cache: - log('rewritting cached file with correct types (local and S3)', file_name, s3_file_name) + log( + "rewritting cached file with correct types (local and S3)", + file_name, + s3_file_name, + ) _CSVHelpers._to_parquet(df=df, file_name=file_name) fs.put(file_name, s3_file_name) @@ -733,9 +887,10 @@ def __init__(self, fs, s3_path, local_path=None): if not self.local_path: import tempfile - original_extension = '.'.join(self.s3_path.split('/')[-1].split('.')[1:]) + original_extension = ".".join( + self.s3_path.split("/")[-1].split(".")[1:]) - _, tmp_path = tempfile.mkstemp(suffix='.' + original_extension) + _, tmp_path = tempfile.mkstemp(suffix="." + original_extension) self.local_path = tmp_path @@ -750,10 +905,20 @@ def __del__(self): def __enter__(self): self._del_local_file_if_exists() - log('starting loading s3 file by path', self.s3_path, 'to local cache', self.local_path) + log( + "starting loading s3 file by path", + self.s3_path, + "to local cache", + self.local_path, + ) # download it right away self.fs.get(self.s3_path, self.local_path) - log('finished loading s3 file by path', self.s3_path, 'to local cache', self.local_path) + log( + "finished loading s3 file by path", + self.s3_path, + "to local cache", + self.local_path, + ) return self From 59353b9e012d3ab91478e64c1d81e8b5f1b4f25d Mon Sep 17 00:00:00 2001 From: alexmorten Date: Mon, 4 Mar 2024 15:12:34 +0900 Subject: [PATCH 3/3] ignore line break before/after binary operator --- lib/setup.cfg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/setup.cfg b/lib/setup.cfg index ba92f5c..9afde2a 100644 --- a/lib/setup.cfg +++ b/lib/setup.cfg @@ -7,5 +7,5 @@ ignore = E402, E722 max-line-length = 120 [flake8] -ignore = E402, E722 +ignore = E402, E722, W503, W504 max-line-length = 120