{ "cells": [ { "cell_type": "code", "execution_count": 162, "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "import numpy as np\n", "from sklearn.model_selection import train_test_split\n", "import matplotlib.pyplot as plt\n", "import random\n", "from typing import Optional\n", "import seaborn as sn\n", "\n", "def map_range(x, from_min, from_max, to_min, to_max):\n", " percent = (x - from_min) / abs(from_max - from_min)\n", " return percent * abs(to_max - to_min) + to_min\n", "\n", "def prepare_dataset(dataset: pd.DataFrame):\n", " prepared_dataset = dataset.copy(True)\n", "\n", " crops = prepared_dataset[\"Crop\"].unique()\n", " crops.sort()\n", "\n", " prepared_dataset[\"Crop\"] = prepared_dataset[\"Crop\"].map(lambda crop: np.where(crop == crops)[0][0])\n", " for column_name in prepared_dataset.columns:\n", " if column_name == \"Crop\": continue\n", " column = prepared_dataset[column_name]\n", " min_value = column.min()\n", " max_value = column.max()\n", " prepared_dataset[column_name] = prepared_dataset[column_name].map(lambda x: map_range(x, min_value, max_value, -1, 1))\n", "\n", " return prepared_dataset, crops\n", "\n", "def calc_distance(a: pd.Series, b: pd.Series):\n", " return (a.sub(b) ** 2).sum() ** 0.5\n", "\n", "def find_farthest_neighbours(k: int, dataset: pd.DataFrame, seed=None) -> list[pd.Series]:\n", " rng = random.Random(seed)\n", "\n", " rows = list(row for _, row in dataset.iterrows())\n", "\n", " solutions_indexes = []\n", " solutions_indexes.append(rng.randint(0, len(rows)))\n", "\n", " def min_solution_distance(row_idx):\n", " return min(map(lambda solution_idx: calc_distance(rows[row_idx], rows[solution_idx]), solutions_indexes))\n", "\n", " for _ in range(k-1):\n", " distanace_iter = map(lambda row_idx: (row_idx, min_solution_distance(row_idx)), range(len(rows)))\n", " best_distance_idx, best_distance = next(distanace_iter)\n", "\n", " for i, distance in distanace_iter:\n", " if distance > best_distance:\n", " best_distance_idx = i\n", "\n", " solutions_indexes.append(best_distance_idx)\n", "\n", " return list(map(lambda idx: rows[idx], solutions_indexes))\n", "\n", "def plot_dataset_by_label(x, y, labels, x_column, y_column):\n", " for label_idx, label in enumerate(labels):\n", " label_x = x[y == label_idx]\n", " plt.plot(label_x[x_column], label_x[y_column], marker='o', linewidth=0, label=label)\n", " plt.xlim(-1.1, 1.1)\n", " plt.ylim(-1.1, 1.1)\n", " plt.xlabel(x_column)\n", " plt.ylabel(y_column)\n", "\n", "class KMeansClassifier:\n", " def __init__(self, k):\n", " self.centroids = []\n", " self.centroid_labels = []\n", " self.k = k\n", "\n", " def train(self,\n", " x: pd.DataFrame,\n", " y: pd.Series,\n", " epochs=1,\n", " starting_points: Optional[list[pd.Series]] = None,\n", " seed = None,\n", " ):\n", " if starting_points == None:\n", " self.centroids = find_farthest_neighbours(self.k, x, seed)\n", " else:\n", " assert len(starting_points) == self.k\n", " self.centroids = starting_points.copy()\n", "\n", " for epoch in range(epochs):\n", " print(f\"Epoch: {epoch+1}/{epochs}\")\n", "\n", " points_by_centroid_count = []\n", " points_by_centroid_sum = []\n", " for _ in range(self.k):\n", " zero_series = {}\n", " for column_name in x.columns:\n", " zero_series[column_name] = 0\n", " points_by_centroid_sum.append(pd.Series(zero_series, index=x.columns, dtype=float))\n", " points_by_centroid_count.append(0)\n", "\n", " for _, row in x.iterrows():\n", " centroid_distances = list(map(lambda centroid: calc_distance(centroid, row), self.centroids))\n", " centroid_idx = centroid_distances.index(min(centroid_distances))\n", " points_by_centroid_sum[centroid_idx] = points_by_centroid_sum[centroid_idx].add(row)\n", " points_by_centroid_count[centroid_idx] += 1\n", "\n", " for i in range(self.k):\n", " self.centroids[i] = points_by_centroid_sum[i].div(points_by_centroid_count[i])\n", "\n", " self.centroid_labels = []\n", " for i, centroid in enumerate(self.centroids):\n", " distance_by_label = [0] * self.k\n", "\n", " for idx, row in x.iterrows():\n", " distance_by_label[y[idx]] += calc_distance(centroid, row)\n", "\n", " self.centroid_labels.append(distance_by_label.index(min(distance_by_label)))\n", "\n", " def predict(self, x: pd.Series) -> int:\n", " best_distance = calc_distance(x, self.centroids[0])\n", " best_distance_idx = 0\n", " for idx in range(1, len(self.centroids)):\n", " distance = calc_distance(x, self.centroids[idx])\n", " if distance < best_distance:\n", " best_distance_idx = idx\n", " best_distance = distance\n", "\n", " return self.centroid_labels[best_distance_idx]\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Parameters" ] }, { "cell_type": "code", "execution_count": 69, "metadata": {}, "outputs": [], "source": [ "dataset_filename = \"dataset.csv\"\n", "test_size = 0.2\n", "seed = 42" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Prepare dataset" ] }, { "cell_type": "code", "execution_count": 50, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", " | Nitrogen | \n", "Phosphorus | \n", "Potassium | \n", "Temperature | \n", "Humidity | \n", "pH_Value | \n", "Rainfall | \n", "Crop | \n", "
---|---|---|---|---|---|---|---|---|
0 | \n", "0.285714 | \n", "-0.471429 | \n", "-0.62 | \n", "-0.308228 | \n", "0.580534 | \n", "-0.067473 | \n", "0.312916 | \n", "20 | \n", "
1 | \n", "0.214286 | \n", "-0.242857 | \n", "-0.64 | \n", "-0.257110 | \n", "0.541266 | \n", "0.098961 | \n", "0.483349 | \n", "20 | \n", "
2 | \n", "-0.142857 | \n", "-0.285714 | \n", "-0.61 | \n", "-0.186292 | \n", "0.587953 | \n", "0.348438 | \n", "0.751421 | \n", "20 | \n", "
3 | \n", "0.057143 | \n", "-0.571429 | \n", "-0.65 | \n", "0.013803 | \n", "0.537503 | \n", "0.081016 | \n", "0.599811 | \n", "20 | \n", "
4 | \n", "0.114286 | \n", "-0.471429 | \n", "-0.63 | \n", "-0.351245 | \n", "0.571251 | \n", "0.282583 | \n", "0.742461 | \n", "20 | \n", "