blob: 74e36ce94ea0cde024b764ff8d61af75b997d4af [file] [log] [blame]
{
"metadata": {
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.10"
},
"orig_nbformat": 2,
"kernelspec": {
"name": "python3610jvsc74a57bd0eb5e09632d6ea1cbf3eb9da7e37b7cf581db5ed13074b21cc44e159dc62acdab",
"display_name": "Python 3.6.10 64-bit ('dataloader': conda)"
}
},
"nbformat": 4,
"nbformat_minor": 2,
"cells": [
{
"source": [
"## \\[RFC\\] How DataFrames (DF) and DataPipes (DP) work together"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"from importlib import reload\n",
"import torch\n",
"reload(torch)\n",
"from torch.utils.data import IterDataPipe"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"# Example IterDataPipe\n",
"class ExampleIterPipe(IterDataPipe):\n",
" def __init__(self, range = 20):\n",
" self.range = range\n",
" def __iter__(self):\n",
" for i in range(self.range):\n",
" yield i\n",
"\n",
"def get_dataframes_pipe(range = 10, dataframe_size = 7):\n",
" return ExampleIterPipe(range = range).map(lambda i: (i, i % 3))._to_dataframes_pipe(columns = ['i','j'], dataframe_size = dataframe_size)\n",
"\n",
"def get_regular_pipe(range = 10):\n",
" return ExampleIterPipe(range = range).map(lambda i: (i, i % 3))\n"
]
},
{
"source": [
"Doesn't matter how DF composed internally, iterator over DF Pipe gives single rows to user. This is similar to regular DataPipe."
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"DataFrames Pipe\n(0, 0)\n(1, 1)\n(2, 2)\n(3, 0)\n(4, 1)\n(5, 2)\n(6, 0)\n(7, 1)\n(8, 2)\n(9, 0)\nRegular DataPipe\n(0, 0)\n(1, 1)\n(2, 2)\n(3, 0)\n(4, 1)\n(5, 2)\n(6, 0)\n(7, 1)\n(8, 2)\n(9, 0)\n"
]
}
],
"source": [
"print('DataFrames Pipe')\n",
"dp = get_dataframes_pipe()\n",
"for i in dp:\n",
" print(i)\n",
"\n",
"print('Regular DataPipe')\n",
"dp = get_regular_pipe()\n",
"for i in dp:\n",
" print(i)"
]
},
{
"source": [
"You can iterate over raw DF using `raw_iterator`"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
" i j\n0 0 0\n1 1 1\n2 2 2\n3 3 0\n4 4 1\n5 5 2\n6 6 0\n i j\n0 7 1\n1 8 2\n2 9 0\n"
]
}
],
"source": [
"dp = get_dataframes_pipe()\n",
"for i in dp.raw_iterator():\n",
" print(i)"
]
},
{
"source": [
"Operations over DF Pipe is captured"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"tags": []
},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"var_3 = input_var_2.i * 100\nvar_4 = var_3 + input_var_2.j\nvar_5 = var_4 - 2.7\ninput_var_2[\"y\"] = var_5\n"
]
}
],
"source": [
"dp = get_dataframes_pipe(dataframe_size = 3)\n",
"dp['y'] = dp.i * 100 + dp.j - 2.7\n",
"print(dp.ops_str())\n"
]
},
{
"source": [
"Captured operations executed on `__next__` calls of constructed DataPipe"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
" i j y\n0 0 0 -2.7\n1 1 1 98.3\n2 2 2 199.3\n i j y\n0 3 0 297.3\n1 4 1 398.3\n2 5 2 499.3\n i j y\n0 6 0 597.3\n1 7 1 698.3\n2 8 2 799.3\n i j y\n0 9 0 897.3\n"
]
}
],
"source": [
"dp = get_dataframes_pipe(dataframe_size = 3)\n",
"dp['y'] = dp.i * 100 + dp.j - 2.7\n",
"for i in dp.raw_iterator():\n",
" print(i)"
]
},
{
"source": [
"`shuffle` of DataFramePipe effects rows in individual manner"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"Raw DataFrames iterator\n i j\n2 8 2\n2 2 2\n2 5 2\n i j\n1 4 1\n1 1 1\n0 3 0\n i j\n1 7 1\n0 9 0\n0 6 0\n i j\n0 0 0\nRegular DataFrames iterator\n(1, 1)\n(5, 2)\n(8, 2)\n(9, 0)\n(7, 1)\n(6, 0)\n(3, 0)\n(4, 1)\n(0, 0)\n(2, 2)\nRegular iterator\n(5, 2)\n(6, 0)\n(0, 0)\n(9, 0)\n(3, 0)\n(1, 1)\n(2, 2)\n(8, 2)\n(4, 1)\n(7, 1)\n"
]
}
],
"source": [
"dp = get_dataframes_pipe(dataframe_size = 3)\n",
"dp = dp.shuffle()\n",
"print('Raw DataFrames iterator')\n",
"for i in dp.raw_iterator():\n",
" print(i)\n",
"\n",
"print('Regular DataFrames iterator')\n",
"for i in dp:\n",
" print(i)\n",
"\n",
"\n",
"# this is similar to shuffle of regular DataPipe\n",
"dp = get_regular_pipe()\n",
"dp = dp.shuffle()\n",
"print('Regular iterator')\n",
"for i in dp:\n",
" print(i)"
]
},
{
"source": [
"You can continue mixing DF and DP operations"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
" i j y\n0 -17 -17 -197000.0\n1 -13 -16 3813000.0\n0 -11 -17 5803000.0\n i j y\n2 -12 -15 4823000.0\n1 -10 -16 6813000.0\n1 -16 -16 813000.0\n i j y\n0 -8 -17 8803000.0\n2 -9 -15 7823000.0\n0 -14 -17 2803000.0\n i j y\n2 -15 -15 1823000.0\n"
]
}
],
"source": [
"dp = get_dataframes_pipe(dataframe_size = 3)\n",
"dp['y'] = dp.i * 100 + dp.j - 2.7\n",
"dp = dp.shuffle()\n",
"dp = dp - 17\n",
"dp['y'] = dp.y * 10000\n",
"for i in dp.raw_iterator():\n",
" print(i)"
]
},
{
"source": [
"Batching combines everything into `list` it is possible to nest `list`s. List may have any number of DataFrames as soon as total number of rows equal to batch size."
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"Iterate over DataFrame batches\n[(6, 0),(0, 0)]\n[(4, 1),(1, 1)]\n[(2, 2),(9, 0)]\n[(3, 0),(5, 2)]\n[(7, 1),(8, 2)]\nIterate over regular batches\n[(1, 1),(4, 1)]\n[(2, 2),(3, 0)]\n[(6, 0),(7, 1)]\n[(8, 2),(0, 0)]\n[(5, 2),(9, 0)]\n"
]
}
],
"source": [
"dp = get_dataframes_pipe(dataframe_size = 3)\n",
"dp = dp.shuffle()\n",
"dp = dp.batch(2)\n",
"print(\"Iterate over DataFrame batches\")\n",
"for i,v in enumerate(dp):\n",
" print(v)\n",
"\n",
"# this is similar to batching of regular DataPipe\n",
"dp = get_regular_pipe()\n",
"dp = dp.shuffle()\n",
"dp = dp.batch(2)\n",
"print(\"Iterate over regular batches\")\n",
"for i in dp:\n",
" print(i)"
]
},
{
"source": [
"Some details about internal storage of batched DataFrames and how they are iterated"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"Type: <class 'torch.utils.data.datapipes.iter.dataframes.DataChunkDF'>\n",
"As string: [(0, 0),(3, 0)]\n",
"Iterated regularly:\n",
"-- batch start --\n",
"(0, 0)\n",
"(3, 0)\n",
"-- batch end --\n",
"Iterated in inner format (for developers):\n",
"-- df batch start --\n",
" i j\n",
"0 0 0\n",
"0 3 0\n",
"-- df batch end --\n",
"Type: <class 'torch.utils.data.datapipes.iter.dataframes.DataChunkDF'>\n",
"As string: [(6, 0),(1, 1)]\n",
"Iterated regularly:\n",
"-- batch start --\n",
"(6, 0)\n",
"(1, 1)\n",
"-- batch end --\n",
"Iterated in inner format (for developers):\n",
"-- df batch start --\n",
" i j\n",
"0 6 0\n",
"1 1 1\n",
"-- df batch end --\n",
"Type: <class 'torch.utils.data.datapipes.iter.dataframes.DataChunkDF'>\n",
"As string: [(9, 0),(4, 1)]\n",
"Iterated regularly:\n",
"-- batch start --\n",
"(9, 0)\n",
"(4, 1)\n",
"-- batch end --\n",
"Iterated in inner format (for developers):\n",
"-- df batch start --\n",
" i j\n",
"0 9 0\n",
"1 4 1\n",
"-- df batch end --\n",
"Type: <class 'torch.utils.data.datapipes.iter.dataframes.DataChunkDF'>\n",
"As string: [(5, 2),(2, 2)]\n",
"Iterated regularly:\n",
"-- batch start --\n",
"(5, 2)\n",
"(2, 2)\n",
"-- batch end --\n",
"Iterated in inner format (for developers):\n",
"-- df batch start --\n",
" i j\n",
"2 5 2\n",
"2 2 2\n",
"-- df batch end --\n",
"Type: <class 'torch.utils.data.datapipes.iter.dataframes.DataChunkDF'>\n",
"As string: [(8, 2),(7, 1)]\n",
"Iterated regularly:\n",
"-- batch start --\n",
"(8, 2)\n",
"(7, 1)\n",
"-- batch end --\n",
"Iterated in inner format (for developers):\n",
"-- df batch start --\n",
" i j\n",
"2 8 2\n",
"1 7 1\n",
"-- df batch end --\n"
]
}
],
"source": [
"dp = get_dataframes_pipe(dataframe_size = 3)\n",
"dp = dp.shuffle()\n",
"dp = dp.batch(2)\n",
"for i in dp:\n",
" print(\"Type: \", type(i))\n",
" print(\"As string: \", i)\n",
" print(\"Iterated regularly:\")\n",
" print('-- batch start --')\n",
" for item in i:\n",
" print(item)\n",
" print('-- batch end --')\n",
" print(\"Iterated in inner format (for developers):\")\n",
" print('-- df batch start --')\n",
" for item in i.raw_iterator():\n",
" print(item)\n",
" print('-- df batch end --') "
]
},
{
"source": [
"`concat` should work only of DF with same schema, this code should produce an error "
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [],
"source": [
"# TODO!\n",
"# dp0 = get_dataframes_pipe(range = 8, dataframe_size = 4)\n",
"# dp = get_dataframes_pipe(range = 6, dataframe_size = 3)\n",
"# dp['y'] = dp.i * 100 + dp.j - 2.7\n",
"# dp = dp.concat(dp0)\n",
"# for i,v in enumerate(dp.raw_iterator()):\n",
"# print(v)"
]
},
{
"source": [
"`unbatch` of `list` with DataFrame works similarly to regular unbatch.\n",
"Note: DataFrame sizes might change"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"output_type": "error",
"ename": "AttributeError",
"evalue": "",
"traceback": [
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[0;31mAttributeError\u001b[0m Traceback (most recent call last)",
"\u001b[0;32m<ipython-input-12-fa80e9c68655>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m\u001b[0m\n\u001b[1;32m 4\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 5\u001b[0m \u001b[0;31m# Here is bug with unbatching which doesn't detect DF type.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m----> 6\u001b[0;31m \u001b[0mdp\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;34m'z'\u001b[0m\u001b[0;34m]\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mdp\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0my\u001b[0m \u001b[0;34m-\u001b[0m \u001b[0;36m100\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 7\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 8\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mi\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mdp\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mraw_iterator\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/dataset/pytorch/torch/utils/data/dataset.py\u001b[0m in \u001b[0;36m__getattr__\u001b[0;34m(self, attribute_name)\u001b[0m\n\u001b[1;32m 222\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mfunction\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 223\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 224\u001b[0;31m \u001b[0;32mraise\u001b[0m \u001b[0mAttributeError\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 225\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 226\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0m__reduce_ex__\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m*\u001b[0m\u001b[0margs\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;31mAttributeError\u001b[0m: "
]
}
],
"source": [
"dp = get_dataframes_pipe(range = 18, dataframe_size = 3)\n",
"dp['y'] = dp.i * 100 + dp.j - 2.7\n",
"dp = dp.batch(5).batch(3).batch(1).unbatch(unbatch_level = 3)\n",
"\n",
"# Here is bug with unbatching which doesn't detect DF type.\n",
"dp['z'] = dp.y - 100\n",
"\n",
"for i in dp.raw_iterator():\n",
" print(i)"
]
},
{
"source": [
"`map` applied to individual rows, `nesting_level` argument used to penetrate batching"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"Iterate over DataFrame batches\n[(1111000, 1111000),(1112000, 1112000),(1113000, 1113000),(1114000, 1111000),(1115000, 1112000)]\n[(1116000, 1113000),(1117000, 1111000),(1118000, 1112000),(1119000, 1113000),(1120000, 1111000)]\nIterate over regular batches\n[(1111000, 0),(1112000, 1),(1113000, 2),(1114000, 0),(1115000, 1)]\n[(1116000, 2),(1117000, 0),(1118000, 1),(1119000, 2),(1120000, 0)]\n"
]
}
],
"source": [
"dp = get_dataframes_pipe(range = 10, dataframe_size = 3)\n",
"dp = dp.map(lambda x: x + 1111)\n",
"dp = dp.batch(5).map(lambda x: x * 1000, nesting_level = 1)\n",
"print(\"Iterate over DataFrame batches\")\n",
"for i in dp:\n",
" print(i)\n",
"\n",
"# Similarly works on row level for classic DataPipe elements\n",
"dp = get_regular_pipe(range = 10)\n",
"dp = dp.map(lambda x: (x[0] + 1111, x[1]))\n",
"dp = dp.batch(5).map(lambda x: (x[0] * 1000, x[1]), nesting_level = 1)\n",
"print(\"Iterate over regular batches\")\n",
"for i in dp:\n",
" print(i)\n",
"\n"
]
},
{
"source": [
"`filter` applied to individual rows, `nesting_level` argument used to penetrate batching"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"Iterate over DataFrame batches\n[(6, 0),(7, 1),(8, 2),(9, 0),(10, 1)]\n[(11, 2),(12, 0)]\nIterate over regular batches\n[(6, 0),(7, 1),(8, 2),(9, 0),(10, 1)]\n[(11, 2),(12, 0)]\n"
]
}
],
"source": [
"dp = get_dataframes_pipe(range = 30, dataframe_size = 3)\n",
"dp = dp.filter(lambda x: x.i > 5)\n",
"dp = dp.batch(5).filter(lambda x: x.i < 13, nesting_level = 1)\n",
"print(\"Iterate over DataFrame batches\")\n",
"for i in dp:\n",
" print(i)\n",
"\n",
"# Similarly works on row level for classic DataPipe elements\n",
"dp = get_regular_pipe(range = 30)\n",
"dp = dp.filter(lambda x: x[0] > 5)\n",
"dp = dp.batch(5).filter(lambda x: x[0] < 13, nesting_level = 1)\n",
"print(\"Iterate over regular batches\")\n",
"for i in dp:\n",
" print(i)"
]
}
]
}