From 5f68b59e43b6f1de0e41d859e91a027e83fe7971 Mon Sep 17 00:00:00 2001 From: Max Mynter <32773644+maxmynter@users.noreply.github.com> Date: Thu, 26 Oct 2023 11:24:00 +0200 Subject: [PATCH] Implement Demo Feedback from [ENG-111] (#117) --- README.md | 2 +- demos/.lakectl.yaml | 11 + demos/demo_data_science_project.ipynb | 758 ++++++++++++++++++++++++++ demos/rain_prediction.ipynb | 517 ------------------ demos/requirements.txt | 2 +- 5 files changed, 771 insertions(+), 519 deletions(-) create mode 100644 demos/.lakectl.yaml create mode 100644 demos/demo_data_science_project.ipynb delete mode 100644 demos/rain_prediction.ipynb diff --git a/README.md b/README.md index 55e743df..e44ea19f 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ Its main goal is to facilitate versioned data operations in lakeFS directly from See the examples below ([features](#usage), [versioning best-practices](#reproducibility-through-data-versioning-with-lakefs-and-lakefs-spec)) below for inspiration. -A more detailed example is in the notebook in the [`/demos` directory](/demos/rain_prediction.ipynb). +A more detailed example is in the notebook in the [`/demos` directory](/demos/demo_data_science_project.ipynb). ## Installation diff --git a/demos/.lakectl.yaml b/demos/.lakectl.yaml new file mode 100644 index 00000000..bab4838e --- /dev/null +++ b/demos/.lakectl.yaml @@ -0,0 +1,11 @@ +credentials: + access_key_id: AKIAIOSFOLQUICKSTART + secret_access_key: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY +metastore: + glue: + catalog_id: "" + hive: + db_location_uri: file:/user/hive/warehouse/ + uri: "" +server: + endpoint_url: http://127.0.0.1:8000 diff --git a/demos/demo_data_science_project.ipynb b/demos/demo_data_science_project.ipynb new file mode 100644 index 00000000..91405718 --- /dev/null +++ b/demos/demo_data_science_project.ipynb @@ -0,0 +1,758 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "06fa6255", + "metadata": {}, + "source": [ + "# Introduction" + ] + }, + { + "cell_type": "markdown", + "id": "eed38acb", + "metadata": {}, + "source": [ + "In this notebook, we will complete a small end-to-end data science tutorial that employs lakeFS-spec for data versioning. We will use weather data to train a random forest classifier to predict whether it is raining a day from now given the current weather.\n", + "\n", + "We will do the following:\n", + "* Environment Setup\n", + "* lakeFS Setup\n", + "* Data Ingestion\n", + " * Event Hooks\n", + " * PUT a File\n", + "* Model Training\n", + "* Updating Data and Retraining Model\n", + "* Accessing Data Version and Reproducing Experiment\n", + "* Using a Tag instead of a commit SHA for semantic versioning\n", + "\n", + "To execute the code in this tutorial as a jupyter notebook, download this `.ipynb` file to a convenient location on your machine. You can also clone the whole `lakefs-spec` repository. During the execution of this tutorial, in the same directory, a folder, 'data', will be created. We will also download a file `.lakectl.yaml`.\n", + "\n", + "Prerequisites before we start:\n", + "* Python 3.9 or higher\n", + "* Docker desktop installed - [see guidance](https://www.docker.com/get-started/)\n", + "* git installed" + ] + }, + { + "cell_type": "markdown", + "id": "dd441d97", + "metadata": {}, + "source": [ + "# Environment Setup" + ] + }, + { + "cell_type": "markdown", + "id": "79ba23c1", + "metadata": {}, + "source": [ + "To set up the environment, run the following commands in your console:\n", + "\n", + "Create a virtual environment:\n", + "\n", + " `python -m venv .venv`\n", + "\n", + "Activate environment \n", + "- macOS and Linux:\n", + "\n", + " `source .venv/bin/activate`\n", + "\n", + "- activate environment - Windows:\n", + "\n", + " `.venv\\Scripts\\activate`\n", + "\n", + "Install relevant libriaries on the environment you have just created:\n", + "\n", + " `pip install -r https://raw.githubusercontent.com/appliedAI-Initiative/lakefs-spec/main/demos/requirements.txt`\n", + "\n", + "From a terminal activate this jupyter notebook (if not running already).\n", + "\n", + "In the notebook \"Kernel\" menu select the environment you just created as a Jupyter kernel." + ] + }, + { + "cell_type": "markdown", + "id": "8cc48a58", + "metadata": {}, + "source": [ + "# lakeFS Setup" + ] + }, + { + "cell_type": "markdown", + "id": "f47c370c", + "metadata": {}, + "source": [ + "Ensure you have docker desktop is running.\n", + "\n", + "Set up LakeFS. You can do this by executing the docker run command given here lakeFS quickstart in your console:\n", + "\n", + "`docker run --name lakefs --pull always --rm --publish 8000:8000 treeverse/lakefs:latest run --quickstart`\n", + "\n", + "Open a browser and navigate to the lakeFS instance - by default: http://localhost:8000/. \n", + "\n", + "Authenticate with the credentials provided https://docs.lakefs.io/quickstart/launch.html :\n", + "\n", + " Access Key ID : AKIAIOSFOLQUICKSTART\n", + " Secret Access Key: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY\n", + "\n", + "As an email, you can enter anything, we won't need it in this example. \n", + "\n", + "Proceed to create an empty repository and call it 'weather'.\n", + "\n", + "In your jupyter notebook create variable REPO_NAME and set its value to the name of the repo you have just created in lakeFS web interface:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6008bb4e", + "metadata": {}, + "outputs": [], + "source": [ + "REPO_NAME = 'weather'" + ] + }, + { + "cell_type": "markdown", + "id": "7088b2f8", + "metadata": {}, + "source": [ + "There many ways to authenticate to lakeFS while executing Python code - in this tutorial we choose convinient yaml file configuration. Execute the code below to dowload yaml file including lakeFS quickstart credentials and server url. This file will be downloaded to the same location as your notebook. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b50593a7", + "metadata": {}, + "outputs": [], + "source": [ + "!curl -o .lakectl.yaml \"https://raw.githubusercontent.com/appliedAI-Initiative/lakefs-spec/main/demos/.lakectl.yaml\"" + ] + }, + { + "cell_type": "markdown", + "id": "cb9d3502", + "metadata": {}, + "source": [ + "# Data Ingestion" + ] + }, + { + "cell_type": "markdown", + "id": "1d37daf2", + "metadata": {}, + "source": [ + "Now it's time to get some data. We will use the [Open Meteo api](https://open-meteo.com/), where we can pull weather data from an API for free (as long as we are non-commercial) and without an API-token.\n", + "\n", + "First create folder 'data' inside a directory when your notebook is located:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "eb8d3608", + "metadata": {}, + "outputs": [], + "source": [ + "!mkdir -o data" + ] + }, + { + "cell_type": "markdown", + "id": "9ad33235", + "metadata": {}, + "source": [ + "Then, for the purpose of training, get the full data of the 2010s from Munich:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "bf4386cb", + "metadata": {}, + "outputs": [], + "source": [ + "!curl -o data/weather-2010s.json \"https://archive-api.open-meteo.com/v1/archive?latitude=52.52&longitude=13.41&start_date=2010-01-01&end_date=2019-12-31&hourly=temperature_2m,relativehumidity_2m,rain,pressure_msl,surface_pressure,cloudcover,cloudcover_low,cloudcover_mid,cloudcover_high,windspeed_10m,windspeed_100m,winddirection_10m,winddirection_100m\"" + ] + }, + { + "cell_type": "markdown", + "id": "dd51df25", + "metadata": {}, + "source": [ + "# PUT a file" + ] + }, + { + "cell_type": "markdown", + "id": "a85df281", + "metadata": {}, + "source": [ + "The data is in JSON format. Therefore, we need to wrangle the data a bit to make it usable. But first we will save it into our lakeFS instance.\n", + "\n", + "lakeFS works similar to `git` as a versioning system. You can create `commits` that encapsulate specific changes to the data. You can also work with `branches` to fork of your own copy of the data such that you don't interfere with your colleagues. Every commit (on any branch) is identified by a `commit-SHA`. This can be used to programmatically interact with specific states of your data and enables logging of the specific versions used to create a certain model. We will cover all this later in this demo.\n", + "\n", + "For now, we will `put` the file we have. Therefore, we will create a new branch, `transform-raw-data` for our data." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b2aa0207", + "metadata": {}, + "outputs": [], + "source": [ + "from lakefs_spec import LakeFSFileSystem\n", + "\n", + "NEW_BRANCH_NAME = 'transform-raw-data'\n", + "\n", + "\n", + "fs = LakeFSFileSystem()\n", + "fs.put('./data/weather-2010s.json', f'{REPO_NAME}/{NEW_BRANCH_NAME}/weather-2010.json')" + ] + }, + { + "cell_type": "markdown", + "id": "c793a244", + "metadata": {}, + "source": [ + "Now, on LakeFS in your browser, can change the branch to transform-raw-data and see the saved file. However, the change is not yet committed. While you can do that manually via the uncommitted changes tab in the lakeFS UI, we will commit the file in a different way." + ] + }, + { + "cell_type": "markdown", + "id": "8bd285eb", + "metadata": {}, + "source": [ + "# Event Hooks" + ] + }, + { + "cell_type": "markdown", + "id": "729cadd9", + "metadata": {}, + "source": [ + "To commit changes programmatically, we can register a hook. This hook needs to have the signature `(client, context) -> None`, where the `client` is the file system's LakeFS client. The context object contains information about the requested resource. Within this hook, we can automatically create a commit. We will register the hook for the `PUT_FILE` and `FILEUPLOAD` events. Pandas uses the latter in 'DataFrame.to_csv()' and hence we commit when using 'DataFrame.to_csv()' as well." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "593d3b3b", + "metadata": { + "scrolled": true + }, + "outputs": [], + "source": [ + "from lakefs_sdk.client import LakeFSClient\n", + "from lakefs_spec.client_helpers import commit\n", + "from lakefs_spec.hooks import FSEvent, HookContext\n", + "\n", + "# Define the commit hook\n", + "def commit_on_put(client: LakeFSClient, ctx:HookContext) -> None:\n", + " commit_message = f\"Add file {ctx.resource}\"\n", + " print(f\"Attempting Commit: {commit_message}\")\n", + " commit(client, repository=ctx.repository, branch=ctx.ref, message=commit_message)\n", + " \n", + "\n", + "# Register the commit hook to be executed after the PUT_FILE and FILEUPLOAD events\n", + "fs.register_hook(FSEvent.PUT_FILE, commit_on_put)\n", + "fs.register_hook(FSEvent.FILEUPLOAD, commit_on_put)" + ] + }, + { + "cell_type": "markdown", + "id": "35a7bd34", + "metadata": {}, + "source": [ + "Now uploading the file will create a commit. Since we already uploaded the file, lakeFS will skip the upload as the checksums of the local and remote file match. The hook will be executed regardless.\n", + "\n", + "If we want to execute the upload even for an unchanged file, we can do so by passing `precheck=False` to the `fs.put()` operation." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "53165cc0", + "metadata": {}, + "outputs": [], + "source": [ + "fs.put('data/weather-2010s.json', f'{REPO_NAME}/{NEW_BRANCH_NAME}/weather-2010.json', precheck=True)" + ] + }, + { + "cell_type": "markdown", + "id": "8584fb6c", + "metadata": {}, + "source": [ + "# Data Transformation" + ] + }, + { + "cell_type": "markdown", + "id": "1c14e122", + "metadata": {}, + "source": [ + "Now let's transform the data for our use case. We put the transformation into a function such that we can reuse it later\n", + "\n", + "In this notebook, we follow a simple toy example to predict whether it is raining at the same time tomorrow given weather data from right now.\n", + "\n", + "We will skip a lot of possible feature engineering etc. in order to focus on the application of lakeFS and the `LakeFSFileSystem`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1598343b", + "metadata": {}, + "outputs": [], + "source": [ + "import pandas as pd\n", + "import numpy as np\n", + "import json \n", + "\n", + "def transform_json_weather_data(filepath):\n", + " with open(filepath,\"r\") as f:\n", + " data = json.load(f)\n", + "\n", + " df = pd.DataFrame.from_dict(data[\"hourly\"])\n", + " df.time = pd.to_datetime(df.time)\n", + " df['is_raining'] = df.rain > 0\n", + " df['is_raining_in_1_day'] = df.is_raining.shift(24)\n", + " df = df.dropna()\n", + " return df\n", + " \n", + "df = transform_json_weather_data('data/weather-2010s.json')\n", + "df.head(5)" + ] + }, + { + "cell_type": "markdown", + "id": "a90d5134", + "metadata": {}, + "source": [ + "Now we save this data as a CSV file into the main branch. The `DataFrame.to_csv` method calls an `open` operation behind the scenes, our commit hook is called and the file is committed. You can verify the saving worked in the LakeFS UI in your browser by switching to the commits tab of the main branch." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "32a47df2", + "metadata": {}, + "outputs": [], + "source": [ + "df.to_csv(f'lakefs://{REPO_NAME}/main/weather_2010s.csv')" + ] + }, + { + "cell_type": "markdown", + "id": "1ff5ae50", + "metadata": {}, + "source": [ + "# Model Training" + ] + }, + { + "cell_type": "markdown", + "id": "b50c3599", + "metadata": {}, + "source": [ + "First we will do a train-test split:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "db4921f7", + "metadata": {}, + "outputs": [], + "source": [ + "import sklearn.model_selection\n", + "\n", + "model_data=df.drop('time', axis=1)\n", + "\n", + "train, test = sklearn.model_selection.train_test_split(model_data, random_state=7)" + ] + }, + { + "cell_type": "markdown", + "id": "4f4d2ec4", + "metadata": {}, + "source": [ + "We save these train and test datasets into a new `training` branch. If the branch does not yet exist, as in this case, it is implicitly created by default. You can control this behaviour with the `create_branch_ok` flag when initializing the 'LakeFSFileSystem'. By default `create_branch_ok=True`, so that we needed only `fs = LakeFSFileSystem()` to enable implicit branch creation." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1de56b2d", + "metadata": {}, + "outputs": [], + "source": [ + "TRAINING_BRANCH = 'training'\n", + "train.to_csv(f'lakefs://{REPO_NAME}/{TRAINING_BRANCH}/train_weather.csv')\n", + "test.to_csv(f'lakefs://{REPO_NAME}/{TRAINING_BRANCH}/test_weather.csv')" + ] + }, + { + "cell_type": "markdown", + "id": "4956865e", + "metadata": {}, + "source": [ + "Implicit branch creation is a convenient way to create new branches programmatically. However, one drawback is that typos in your code might result in new accidental branch creations. If you want to avoid this implicit behavior and raise errors instead, you can disable implicit branch creation by setting `fs.create_branch_ok=False`.\n", + "\n", + "We can now read train and test files directly from the remote LakeFS instance. (You can verify that neither the train nor the test file are saved in the `/data` directory)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4be4e17c", + "metadata": {}, + "outputs": [], + "source": [ + "train = pd.read_csv(f'lakefs://{REPO_NAME}/{TRAINING_BRANCH}/train_weather.csv', index_col=0)\n", + "test = pd.read_csv(f'lakefs://{REPO_NAME}/{TRAINING_BRANCH}/test_weather.csv', index_col=0)\n", + "\n", + "train.head()" + ] + }, + { + "cell_type": "markdown", + "id": "9e51cc98", + "metadata": {}, + "source": [ + "Let's check the shape of train and test data. Later on we will train to get back to this data version and reproduce the results of the experiment." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5890cfc8", + "metadata": {}, + "outputs": [], + "source": [ + "print(f'Train initial data shape: {train.shape}')\n", + "print(f'Test initial data shape: {test.shape}')" + ] + }, + { + "cell_type": "markdown", + "id": "5d277134", + "metadata": {}, + "source": [ + "We now proceed to train a random forest classifier and evaluate it on the test set:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4a285d9f", + "metadata": {}, + "outputs": [], + "source": [ + "from sklearn.ensemble import RandomForestClassifier\n", + "\n", + "dependent_variable = 'is_raining_in_1_day'\n", + "\n", + "model = RandomForestClassifier(random_state=7)\n", + "x_train, y_train = train.drop(dependent_variable, axis=1), train[dependent_variable].astype(bool)\n", + "x_test, y_test = test.drop(dependent_variable, axis=1), test[dependent_variable].astype(bool)\n", + "\n", + "model.fit(x_train, y_train)\n", + "\n", + "test_acc = model.score(x_test, y_test)\n", + "\n", + "print(f\"Test accuracy: {round(test_acc, 4) * 100 } %\")" + ] + }, + { + "cell_type": "markdown", + "id": "cc5559ba", + "metadata": {}, + "source": [ + "# Updating Data and Model" + ] + }, + { + "cell_type": "markdown", + "id": "6421366b", + "metadata": {}, + "source": [ + "Until now, we only have used data from the 2010s. Let's download additional 2020s data, transform it, and save it to LakeFS." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9a68a5a3", + "metadata": {}, + "outputs": [], + "source": [ + "!curl -o data/weather-2020s.json \"https://archive-api.open-meteo.com/v1/archive?latitude=52.52&longitude=13.41&start_date=2020-01-01&end_date=2023-08-31&hourly=temperature_2m,relativehumidity_2m,rain,pressure_msl,surface_pressure,cloudcover,cloudcover_low,cloudcover_mid,cloudcover_high,windspeed_10m,windspeed_100m,winddirection_10m,winddirection_100m\"\n", + "\n", + "new_data = transform_json_weather_data('./data/weather-2020s.json')\n", + "new_data.to_csv(f'lakefs://{REPO_NAME}/main/weather_2020s.csv')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "11242a6d", + "metadata": {}, + "outputs": [], + "source": [ + "new_data = new_data.drop('time', axis=1)" + ] + }, + { + "cell_type": "markdown", + "id": "8e9fbc98", + "metadata": {}, + "source": [ + "Let's concatenate the old data and the new data, create a new train-test split, and overwrite the files on lakeFS:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d005bffd", + "metadata": {}, + "outputs": [], + "source": [ + "df_train = pd.read_csv(f'lakefs://{REPO_NAME}/{TRAINING_BRANCH}/train_weather.csv', index_col=0)\n", + "df_test = pd.read_csv(f'lakefs://{REPO_NAME}/{TRAINING_BRANCH}/test_weather.csv', index_col=0)\n", + "\n", + "full_data = pd.concat([new_data, df_train, df_test])\n", + "\n", + "train_df, test_df = sklearn.model_selection.train_test_split(full_data, random_state=7)\n", + "\n", + "train_df.to_csv(f'lakefs://{REPO_NAME}/{TRAINING_BRANCH}/train_weather.csv')\n", + "test_df.to_csv(f'lakefs://{REPO_NAME}/{TRAINING_BRANCH}/test_weather.csv')" + ] + }, + { + "cell_type": "markdown", + "id": "1ac797cf", + "metadata": {}, + "source": [ + "We may now read the updated data directly from lakeFS and check their shape to insure that initial files `train_weather.csv` and `test_weather.csv` have been overwritten successfully (number of rows should be significantly higher as 2020 data were added):" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3f9b49d9", + "metadata": {}, + "outputs": [], + "source": [ + "train = pd.read_csv(f'lakefs://{REPO_NAME}/{TRAINING_BRANCH}/train_weather.csv', index_col=0)\n", + "test = pd.read_csv(f'lakefs://{REPO_NAME}/{TRAINING_BRANCH}/test_weather.csv', index_col=0)\n", + "print(f'train updated data shape: {train.shape}')\n", + "print(f'test updated data shape: {test.shape}')" + ] + }, + { + "cell_type": "markdown", + "id": "46c1f6d4", + "metadata": {}, + "source": [ + "Now we may train the model based on the new train data and validate based on the new test data:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "78ff3de1", + "metadata": {}, + "outputs": [], + "source": [ + "x_train, y_train = train.drop(dependent_variable, axis=1), train[dependent_variable].astype(bool)\n", + "x_test, y_test = test.drop(dependent_variable, axis=1), test[dependent_variable].astype(bool)\n", + "\n", + "model.fit(x_train, y_train)\n", + "\n", + "test_acc = model.score(x_test, y_test)\n", + "\n", + "print(f\"Test accuracy: {round(test_acc, 4) * 100 } %\")" + ] + }, + { + "cell_type": "markdown", + "id": "71bac6b7", + "metadata": {}, + "source": [ + "# Accessing Data Version and Reproducing Experiment" + ] + }, + { + "cell_type": "markdown", + "id": "fc9e66f2", + "metadata": {}, + "source": [ + "Let's assume we need to go to our initial data and reproduce initial experiment (initial model with its initial accuracy). This might be tricky as we have overwritten initial train and test data on lakeFS.\n", + "\n", + "To enable data versioning we should save the `ref` of the specific datasets. `ref` can be a branch we are pulling a file from LakeFS. `ref` can be also a commit id - then you can access different data versions within the same branch and not only the version from the latest commit. Therefore, we will use explicit versioning and get the actual commit SHA. We have multiple ways to do this. Manually, we could go into the lakeFS UI, select the training branch, and navigate to the \"Commits\" tab. There, we could see the latest two commits, titled `Add file test_weather.csv` and `Add file train_weather.csv`, and copy their IDs.\n", + "\n", + "However, we want to automate as much as possible and therefore use a helper function. You find pre-written helper functions in the `lakefs_spec.client_helpers` module:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c7d67ec8", + "metadata": {}, + "outputs": [], + "source": [ + "from lakefs_spec.client_helpers import rev_parse\n", + "\n", + "fixed_commit_id = rev_parse(fs.client, REPO_NAME, TRAINING_BRANCH, parent=2) # parent is a relative number of a commit when 0 is the latest\n", + "print(fixed_commit_id)" + ] + }, + { + "cell_type": "markdown", + "id": "02e2558b", + "metadata": {}, + "source": [ + "With our commit hook setup, both DataFrame.to_csv() operations create an individual commit. lakeFS saves the state of every file at every commit. To get other commits with the rev_parse function, you can change the repo, branch parameters. To go back in the chosen branch's commit history, you can increase the `parent` parameter. In our case the initial data was commited 3 commits ago - we count the latest commit on a branch as 0, thus `parent` = 2." + ] + }, + { + "cell_type": "markdown", + "id": "9adf24fc", + "metadata": {}, + "source": [ + "Let's check whether we manage to get the initial train and test data with this commit SHA - let's compare the shape to the initial data shape:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2ca3fa7b", + "metadata": {}, + "outputs": [], + "source": [ + "train = pd.read_csv(f\"lakefs://{REPO_NAME}/{fixed_commit_id}/train_weather.csv\", index_col=0)\n", + "test = pd.read_csv(f\"lakefs://{REPO_NAME}/{fixed_commit_id}/test_weather.csv\", index_col=0)\n", + "\n", + "print(f'train data shape: {train.shape}')\n", + "print(f'test data shape: {test.shape}')" + ] + }, + { + "cell_type": "markdown", + "id": "893dde83", + "metadata": {}, + "source": [ + "Let's train and validate the model based on re-fetched data and see whether we manage to reproduce the initial accuracy ratio: " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "51b3fbc2", + "metadata": {}, + "outputs": [], + "source": [ + "x_train, y_train = train.drop(dependent_variable, axis=1), train[dependent_variable].astype(bool)\n", + "x_test, y_test = test.drop(dependent_variable, axis=1), test[dependent_variable].astype(bool)\n", + "\n", + "model.fit(x_train, y_train)\n", + "\n", + "test_acc = model.score(x_test, y_test)\n", + "\n", + "print(f\"Test accuracy: {round(test_acc, 4) * 100 } %\")" + ] + }, + { + "cell_type": "markdown", + "id": "def6cddd", + "metadata": {}, + "source": [ + "# Using a Tag instead of a commit SHA for semantic versioning\n", + "The above method for data versioning works great when you have experiment tracking tools to store and retrieve the commit SHA in automated pipelines. But it is hard to remember and tedious to retrieve in manual prototyping. We can make selected versions of the dataset more accessible with semantic versioning. We attach a human-interpretable tag that points to a specific commit SHA.\n", + "\n", + "The `client_helpers` module of the `lakefs-spec` library provides the helper function `create_tag` to achieve this. We make a semantic tag point to the `fixed_commit_id`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cbfbcc3e", + "metadata": {}, + "outputs": [], + "source": [ + "from lakefs_spec.client_helpers import create_tag\n", + "\n", + "TAG='train-test-split'\n", + "\n", + "create_tag(client=fs.client, repository=REPO_NAME,ref=fixed_commit_id, tag=TAG)" + ] + }, + { + "cell_type": "markdown", + "id": "362797ae", + "metadata": {}, + "source": [ + "Now we can access the specific files with the semantic tag. Now the `fixed_commit_id` and `TAG` reference the same version `ref` in lakeFS whereas specifying a branch points to the latest version on the respective branch." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "84a271c9", + "metadata": {}, + "outputs": [], + "source": [ + "train_from_branch_head = pd.read_csv(f'lakefs://{REPO_NAME}/{TRAINING_BRANCH}/train_weather.csv', index_col=0)\n", + "train_from_commit_sha = pd.read_csv(f'lakefs://{REPO_NAME}/{fixed_commit_id}/train_weather.csv', index_col=0)\n", + "train_from_semantic_tag = pd.read_csv(f'lakefs://{REPO_NAME}/{TAG}/train_weather.csv', index_col=0)" + ] + }, + { + "cell_type": "markdown", + "id": "8c9fabff", + "metadata": {}, + "source": [ + "We can verify this by looking at the lengths of the `DataFrame`s. We see that the `train_from_commit_sha` and `train_from_semantic_tag` are equal. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7a12ffc3", + "metadata": {}, + "outputs": [], + "source": [ + "print(len(train_from_branch_head))\n", + "print(len(train_from_commit_sha))\n", + "print(len(train_from_semantic_tag))" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.3" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/demos/rain_prediction.ipynb b/demos/rain_prediction.ipynb deleted file mode 100644 index cea2c304..00000000 --- a/demos/rain_prediction.ipynb +++ /dev/null @@ -1,517 +0,0 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "In this notebook we will complete a small end to end data science tutorial that employs LakeFS-spec for data versioning.\n", - "We will use weather data to train a random forest classifier and aim to predict whether it is raining a day from now given the current weather. " - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "We will do the following:\n", - "- Environment and lakeFS Setup\n", - "- Data Ingestion\n", - " - Event Hooks\n", - " - PUT a File\n", - "- Model Training\n", - "- Updating Data\n", - " - Accessing specific Data Versions" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Environment and lakeFS Setup \n", - "Lets set the environment up. First create an environment (`python -m venv .venv`) and activate it (`source .venv/bin/activate`). Execute the comands in the parenthesis in your console. Select the environment you just created as a Jupyter kernel. Then install the dependencies by executing the cell below. " - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "%pip install -r requirements.txt" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Next, let us set up LakeFS. You can do this by executing the `docker run` command given here [lakeFS quickstart](https://docs.lakefs.io/quickstart/launch.html) in a terminal of your choice. Open a browser and navigate to the lakeFS instance (by default`localhost:8000`). Authenticate with the credentials given in the terminal where you executed the docker container. As an email, you can enter anything, we won't need it in this example. Proceed to create an empty repository. You may call it `weather`.\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "REPO_NAME = 'weather'" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "We will also set-up automatic authentication. We can do this via two approaches. \n", - "\n", - "Either we install `lakectl`. Then lakeFS-spec can automatically handle authentication. To do so, open a terminal of your choice and `brew install lakefs`. Then use `lakectl config`. You find the authentication information in the terminal window where you started the LakeFS Docker container. \n", - "\n", - "As a second option, if we don't want to install `lakectl`, you can manually create the `.lakectl.yaml` in the default location `~/.lakectl.yaml`. It needs to contain the following information (replace the values if applicable): \n", - "\n", - "```yaml\n", - "credentials:\n", - " access_key_id: AKIAIOSFOLQUICKSTART\n", - " secret_access_key: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY\n", - "server:\n", - " endpoint_url: http://127.0.0.1:8000\n", - "```\n", - "\n", - "Note: for the automatic authentication to work, you need the `pyyaml` package which is not a default dependency of LakeFS-spec. We already installed via the `requirements.txt`. In you own projects you need to add the dependency manually, for example by running `pip install --upgrade pyyaml`, if you want to use automatic authentication." - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Data Ingestion\n", - "Now it's time to get some data. We will use the [Open-Meteo api](https://open-meteo.com/), where we can pull weather data from an API for free (as long as we are non-commercial) and without an API-token. \n", - "\n", - "For training, we get the full data of the 2010s from Munich (where I am writing this right now ;) ) " - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "!curl -o 'data/weather-2010s.json' 'https://archive-api.open-meteo.com/v1/archive?latitude=52.52&longitude=13.41&start_date=2010-01-01&end_date=2019-12-31&hourly=temperature_2m,relativehumidity_2m,rain,pressure_msl,surface_pressure,cloudcover,cloudcover_low,cloudcover_mid,cloudcover_high,windspeed_10m,windspeed_100m,winddirection_10m,winddirection_100m'" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## PUT a file \n", - "The data is in JSON format so we need to wrangle the data a bit to make it usable. But first we will save it into our lakeFS instance. \n", - "\n", - "lakeFS works similar to `git` as a versioning system. You can create `commits` that encapsulate specific changes to the data. You can also work with `branches` to fork of your own copy of the data such that you don't interfere with your colleagues. Every commit (on any branch) is identified by a `commit-SHA`. This can be used to programmatically interact with specific states of your data and enables logging of the specific versions used to create a certain model. We will cover all this later in this demo. \n", - "\n", - "\n", - "For now, we will `put` the file we have. Therefore, we will create a new branch, `transform-raw-data` for our data." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from lakefs_spec import LakeFSFileSystem\n", - "\n", - "NEW_BRANCH_NAME = 'transform-raw-data'\n", - "\n", - "\n", - "fs = LakeFSFileSystem()\n", - "fs.put('./data/weather-2010s.json', f'{REPO_NAME}/{NEW_BRANCH_NAME}/weather-2010.json')\n", - "\n" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Now, on LakeFS in your browser, can change the branch to `transform-raw-data` and see the saved file. However, the change is not yet committed. \n", - "While you can do that manually via the uncommitted changes tab in the lakeFS UI, we will commit the file in a different way. " - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Event Hooks\n", - "To commit changes programmatically, we can register a hook. This hook needs to have the signature `(client, context) -> None`, where the `client` is the file system's LakeFS client. The context object contains information about the requested resource. Within this hook, we can automatically create a commit. We will register the hook for the `PUT_FILE` and `FILEUPLOAD` events. Pandas uses the latter in `DataFrame.to_csv()` and hence we commit when using `DataFrame.to_csv()` as well. " - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from lakefs_sdk.client import LakeFSClient\n", - "from lakefs_spec.client_helpers import commit\n", - "from lakefs_spec.hooks import FSEvent, HookContext\n", - "\n", - "# Define the commit hook\n", - "def commit_on_put(client: LakeFSClient, ctx:HookContext) -> None:\n", - " commit_message = f\"Add file {ctx.resource}\"\n", - " print(f\"Attempting Commit: {commit_message}\")\n", - " commit(client, repository=ctx.repository, branch=ctx.ref, message=commit_message)\n", - " \n", - "\n", - "# Register the commit hook to be executed after the PUT_FILE and FILEUPLOAD events\n", - "fs.register_hook(FSEvent.PUT_FILE, commit_on_put)\n", - "fs.register_hook(FSEvent.FILEUPLOAD, commit_on_put)\n" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Now uploading the file will create a commit. Since we already uploaded the file, lakeFS will skip the upload as the checksums of the local and remote file match. The hook will be executed regardless. \n", - "\n", - "If we want to execute the upload even for an unchanged file, we can do so by passing `precheck=False` to the `fs.put()` operation." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "fs.put('data/weather-2010s.json', f'{REPO_NAME}/{NEW_BRANCH_NAME}/weather-2010.json', precheck=True)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "However, especially when dealing with large files, using prechecks makes sense to avoid repeated unnecessary transfers of large files." - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Now let's transform the data for our use case. We put the transformation into a function such that we can reuse it later\n", - "\n", - "In this notebook, we follow a simple toy example to predict whether it is raining at the same time tomorrow given weather data from right now. \n", - "\n", - "We will skip a lot of possible feature engineering etc. in order to focus on the application of lakeFS and the `LakeFSFileSystem`." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import pandas as pd\n", - "import numpy as np\n", - "import json \n", - "\n", - "def transform_json_weather_data(filepath):\n", - " with open(filepath,\"r\") as f:\n", - " data = json.load(f)\n", - "\n", - " df = pd.DataFrame.from_dict(data[\"hourly\"])\n", - " df.time = pd.to_datetime(df.time)\n", - " df['is_raining'] = df.rain > 0\n", - " df['is_raining_in_1_day'] = df.is_raining.shift(24)\n", - " df = df.dropna()\n", - " return df\n", - " \n", - "df = transform_json_weather_data('data/weather-2010s.json')\n", - "df.head(5)\n" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Now we save this data as a CSV file into the main branch. The `DataFrame.to_csv` method calls an `open` operation behind the scenes, our commit hook is called and the file is committed. You can verify the saving worked in the LakeFS UI in your browser by switching to the commits tab of the main branch. " - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "df.to_csv(f'lakefs://{REPO_NAME}/main/weather_2010s.csv')" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Model Training\n", - "First we will do a train-test split." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import sklearn.model_selection\n", - "\n", - "model_data=df.drop('time', axis=1)\n", - "\n", - "train, test = sklearn.model_selection.train_test_split(model_data)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "We save these train and test datasets into a new `training` branch. If the branch does not yet exist, as in this case, it is implicitly created by default. You can control this behaviour with the `create_branch_ok` flag when initializing the `LakeFSFileSystem`. " - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "TRAINING_BRANCH = 'training'\n", - "train.to_csv(f'lakefs://{REPO_NAME}/{TRAINING_BRANCH}/train_weather.csv')\n", - "test.to_csv(f'lakefs://{REPO_NAME}/{TRAINING_BRANCH}/test_weather.csv')\n" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Implicit branch creation is a convenient way to create new branches programmatically. However, one drawback is that typos in your code might result in new accidental branch creations. If you want to avoid this implicit behavior and raise errors instead, you can disable implicit branch creation by setting `fs.create_branch_ok=False`." - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "We can now read these files directly from the remote LakeFS instance. (You can verify that neither the train nor the test file are saved in the `/data` directory). " - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "train = pd.read_csv(f'lakefs://{REPO_NAME}/{TRAINING_BRANCH}/train_weather.csv', index_col=0)\n", - "test = pd.read_csv(f'lakefs://{REPO_NAME}/{TRAINING_BRANCH}/test_weather.csv', index_col=0)\n", - "\n", - "train.head()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "We now proceed to train a random forest classifier and evaluate it on the test set. " - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from sklearn.ensemble import RandomForestClassifier\n", - "\n", - "dependent_variable = 'is_raining_in_1_day'\n", - "\n", - "model = RandomForestClassifier()\n", - "x_train, y_train = train.drop(dependent_variable, axis=1), train[dependent_variable].astype(bool)\n", - "x_test, y_test = test.drop(dependent_variable, axis=1), test[dependent_variable].astype(bool)\n", - "\n", - "model.fit(x_train, y_train)\n", - "\n", - "test_acc = model.score(x_test, y_test)\n", - "\n", - "print(f\"Test accuracy: {round(test_acc, 4) * 100 } %\")" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Updating Data\n", - "Until now, we only have used data from the 2010s. Let's download additional 2020s data, transform it, and save it to LakeFS. " - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "!curl -o './data/weather-2020s.json' 'https://archive-api.open-meteo.com/v1/archive?latitude=52.52&longitude=13.41&start_date=2020-01-01&end_date=2023-08-31&hourly=temperature_2m,relativehumidity_2m,rain,pressure_msl,surface_pressure,cloudcover,cloudcover_low,cloudcover_mid,cloudcover_high,windspeed_10m,windspeed_100m,winddirection_10m,winddirection_100m'\n", - "\n", - "new_data = transform_json_weather_data('./data/weather-2020s.json')\n", - "new_data.to_csv(f'lakefs://{REPO_NAME}/main/weather_2020s.csv')" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Let's test how well our model performs on 2020s data.\n", - "\n", - "First, we drop the `time` column such that we have the same variables as during the fit in the data. " - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "new_data = new_data.drop('time', axis=1)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "acc_2020s = model.score(new_data.drop(dependent_variable, axis=1), new_data[dependent_variable].astype(bool))\n", - "\n", - "print(f\"Test accuracy: {round(acc_2020s, 4) * 100 } %\")" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Accessing specific Data Versions\n", - "We have an accuracy similar to the one we had on the 2020s data. Yet, it makes sense to utilize this data for training as well. We will create a concatenated dataframe and perform a new train test split. \n", - "\n", - "However, this means that we now have multiple models which will perform differently on different datasets. For example, if someone takes the model we are about to train and evaluates it on the 2020s data, the accuracy will probably be higher, because of data leakage. We are going to use some of the data points in the 2020s data to train. \n", - "\n", - "To circumvent this issue (or at least enable the traceability and reproducibility) we should save the `ref` of the specific datasets. `ref` can be the branch we are pulling the file from LakeFS from. \n", - "\n", - "\n", - "We are going to implement versioning with the commit ids now.\n", - "\n", - "First we create the new train test split and save it in the training branch.\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "df_train = pd.read_csv(f'lakefs://{REPO_NAME}/{TRAINING_BRANCH}/train_weather.csv', index_col=0)\n", - "df_test = pd.read_csv(f'lakefs://{REPO_NAME}/{TRAINING_BRANCH}/test_weather.csv', index_col=0)\n", - "\n", - "full_data = pd.concat([new_data, df_train, df_test])\n", - "\n", - "train_df, test_df = sklearn.model_selection.train_test_split(full_data, test_size=0.9)\n", - "\n", - "train_df.to_csv(f'lakefs://{REPO_NAME}/{TRAINING_BRANCH}/train_weather.csv')\n", - "test_df.to_csv(f'lakefs://{REPO_NAME}/{TRAINING_BRANCH}/test_weather.csv')" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "This concatenates the old data, creates a new train-test split, and overwrites the files. Of course, this presents problems with respect to strict versioning. When we get the data directly from a branch, we only get the version from the latest commit. \n", - "\n", - "Let's use explicit versioning instead and get the actual commit SHA. We have multiple ways to do this. Manually, we could go into the lakeFS UI, select the training branch, and navigate to the \"Commits\" tab. There, we could see the latest two commits, titled `Add file test_weather.csv` and `Add file train_weather.csv`, and copy their IDs. \n", - "\n", - "However, we want to automate as much as possible and therefore use a helper function. You find pre-written helper functions in the `lakefs_spec.client_helpers` module." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from lakefs_spec.client_helpers import rev_parse\n", - "fixed_commit_id = rev_parse(fs.client, REPO_NAME, TRAINING_BRANCH, parent=0)\n", - "print(fixed_commit_id)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Now, obtain the ID of the latest commit. With our commit hook setup, both `DataFrame.to_csv()` operations create an individual commit. But as lakeFS saves the state of every file at every commit, it suffices to get the latest commit only. To get other commits with the `rev_parse` function, you can change the repo and branch parameters. To go back in the chosen branch's commit history, you can increase the `parent` parameter. \n", - "\n", - "Since we now have the commit ID fixed, we can now get the specific dataset versions irrespective of subsequent changes to the files on any other branch." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "train = pd.read_csv(f\"lakefs://{REPO_NAME}/{fixed_commit_id}/train_weather.csv\", index_col=0)\n", - "test = pd.read_csv(f\"lakefs://{REPO_NAME}/{fixed_commit_id}/test_weather.csv\", index_col=0)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Let's train the model" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "model.fit(train.drop(dependent_variable, axis=1), train[dependent_variable].astype(bool))\n", - "\n", - "test_acc = model.score(test.drop(dependent_variable, axis=1), test[dependent_variable].astype(bool))\n", - "\n", - "print(f\"Test accuracy: {round(test_acc, 4) * 100 } %\")" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "We're done. We have a model trained on the new data. \n", - "\n", - "Now, we can save the commit SHAs as well as the model and accuracy metrics into an experiment tracking tool of our choice. With this, we achieve reproducible experiments and have a clear trail on what input data and hyperparameters resulted in which set of model weights. \n", - "\n", - "If in the future we decide to train another model, use different data, or invest more time in feature engineering, we can always come back to the current state to study the model performance and draw insights that might help us down the road. " - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3 (ipykernel)", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.11.3" - } - }, - "nbformat": 4, - "nbformat_minor": 2 -} diff --git a/demos/requirements.txt b/demos/requirements.txt index 294643af..bef787fb 100644 --- a/demos/requirements.txt +++ b/demos/requirements.txt @@ -13,7 +13,7 @@ joblib==1.3.2 jupyter_client==8.3.1 jupyter_core==5.3.1 lakefs-sdk==1.0.0 -lakefs-spec @ git+https://github.com/appliedAI-Initiative/lakefs-spec.git@main +lakefs-spec @ git+https://github.com/appliedAI-Initiative/lakefs-spec.git@ce1b44c513a994f46800fbdc0c673ee17817e299 matplotlib-inline==0.1.6 nest-asyncio==1.5.7 numpy==1.25.2