blob: 3f58a365dffbd673d23e16dc2169798fd77ce8c2 [file] [log] [blame]
{
"metadata": {
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.10"
},
"orig_nbformat": 2,
"kernelspec": {
"name": "python3610jvsc74a57bd0eb5e09632d6ea1cbf3eb9da7e37b7cf581db5ed13074b21cc44e159dc62acdab",
"display_name": "Python 3.6.10 64-bit ('dataloader': conda)"
}
},
"nbformat": 4,
"nbformat_minor": 2,
"cells": [
{
"source": [
"## Standard flow control and data processing DataPipes"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"from torch.utils.data import IterDataPipe"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"# Example IterDataPipe\n",
"class ExampleIterPipe(IterDataPipe):\n",
" def __init__(self, range = 20):\n",
" self.range = range\n",
" def __iter__(self):\n",
" for i in range(self.range):\n",
" yield i"
]
},
{
"source": [
"## Batch\n",
"\n",
"Function: `batch`\n",
"\n",
"Description: \n",
"\n",
"Alternatives:\n",
"\n",
"Arguments:\n",
" - `batch_size: int` desired batch size\n",
" - `unbatch_level:int = 0` if specified calls `unbatch(unbatch_level=unbatch_level)` on source datapipe before batching (see `unbatch`)\n",
" - `drop_last: bool = False`\n",
"\n",
"Example:\n",
"\n",
"Classic batching produce partial batches by default\n"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"[0, 1, 2]\n[3, 4, 5]\n[6, 7, 8]\n[9]\n"
]
}
],
"source": [
"dp = ExampleIterPipe(10).batch(3)\n",
"for i in dp:\n",
" print(i)"
]
},
{
"source": [
"To drop incomplete batches add `drop_last` argument"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"[0, 1, 2]\n[3, 4, 5]\n[6, 7, 8]\n"
]
}
],
"source": [
"dp = ExampleIterPipe(10).batch(3, drop_last = True)\n",
"for i in dp:\n",
" print(i)"
]
},
{
"source": [
"Sequential calling of `batch` produce nested batches"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"[[0, 1, 2], [3, 4, 5]]\n[[6, 7, 8], [9, 10, 11]]\n[[12, 13, 14], [15, 16, 17]]\n[[18, 19, 20], [21, 22, 23]]\n[[24, 25, 26], [27, 28, 29]]\n"
]
}
],
"source": [
"dp = ExampleIterPipe(30).batch(3).batch(2)\n",
"for i in dp:\n",
" print(i)"
]
},
{
"source": [
"It is possible to unbatch source data before applying the new batching rule using `unbatch_level` argument"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]\n[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]\n[20, 21, 22, 23, 24, 25, 26, 27, 28, 29]\n"
]
}
],
"source": [
"dp = ExampleIterPipe(30).batch(3).batch(2).batch(10, unbatch_level=-1)\n",
"for i in dp:\n",
" print(i)"
]
},
{
"source": [
"## Unbatch\n",
"\n",
"Function: `unbatch`\n",
"\n",
"Description: \n",
"\n",
"Alternatives:\n",
"\n",
"Arguments:\n",
" `unbatch_level:int = 1`\n",
" \n",
"Example:"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"9\n0\n1\n2\n6\n7\n8\n3\n4\n5\n"
]
}
],
"source": [
"dp = ExampleIterPipe(10).batch(3).shuffle().unbatch()\n",
"for i in dp:\n",
" print(i)"
]
},
{
"source": [
"By default unbatching is applied only on the first layer, to unbatch deeper use `unbatch_level` argument"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"[0, 1]\n[2, 3]\n[4, 5]\n[6, 7]\n[8, 9]\n[10, 11]\n[12, 13]\n[14, 15]\n[16, 17]\n[18, 19]\n[20, 21]\n[22, 23]\n[24, 25]\n[26, 27]\n[28, 29]\n[30, 31]\n[32, 33]\n[34, 35]\n[36, 37]\n[38, 39]\n"
]
}
],
"source": [
"dp = ExampleIterPipe(40).batch(2).batch(4).batch(3).unbatch(unbatch_level = 2)\n",
"for i in dp:\n",
" print(i)"
]
},
{
"source": [
"Setting `unbatch_level` to `-1` will unbatch to the lowest level"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"0\n1\n2\n3\n4\n5\n6\n7\n8\n9\n10\n11\n12\n13\n14\n15\n16\n17\n18\n19\n20\n21\n22\n23\n24\n25\n26\n27\n28\n29\n30\n31\n32\n33\n34\n35\n36\n37\n38\n39\n"
]
}
],
"source": [
"dp = ExampleIterPipe(40).batch(2).batch(4).batch(3).unbatch(unbatch_level = -1)\n",
"for i in dp:\n",
" print(i)"
]
},
{
"source": [
"## Map\n",
"\n",
"Function: `map`\n",
"\n",
"Description: \n",
"\n",
"Alternatives:\n",
"\n",
"Arguments:\n",
" - `nesting_level: int = 0`\n",
" \n",
"Example:"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"0\n2\n4\n6\n8\n10\n12\n14\n16\n18\n"
]
}
],
"source": [
"dp = ExampleIterPipe(10).map(lambda x: x * 2)\n",
"for i in dp:\n",
" print(i)"
]
},
{
"source": [
"`map` by default applies function to every mini-batch as a whole\n"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"[0, 1, 2, 0, 1, 2]\n[3, 4, 5, 3, 4, 5]\n[6, 7, 8, 6, 7, 8]\n[9, 9]\n"
]
}
],
"source": [
"dp = ExampleIterPipe(10).batch(3).map(lambda x: x * 2)\n",
"for i in dp:\n",
" print(i)"
]
},
{
"source": [
"To apply function on individual items of the mini-batch use `nesting_level` argument"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"[[0, 2, 4], [6, 8, 10]]\n[[12, 14, 16], [18]]\n"
]
}
],
"source": [
"dp = ExampleIterPipe(10).batch(3).batch(2).map(lambda x: x * 2, nesting_level = 2)\n",
"for i in dp:\n",
" print(i)"
]
},
{
"source": [
"Setting `nesting_level` to `-1` will apply `map` function to the lowest level possible"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"[[[0, 2, 4], [6, 8, 10]], [[12, 14, 16], [18]]]\n"
]
}
],
"source": [
"dp = ExampleIterPipe(10).batch(3).batch(2).batch(2).map(lambda x: x * 2, nesting_level = -1)\n",
"for i in dp:\n",
" print(i)"
]
},
{
"source": [
"## Filter\n",
"\n",
"Function: `filter`\n",
"\n",
"Description: \n",
"\n",
"Alternatives:\n",
"\n",
"Arguments:\n",
" - `nesting_level: int = 0`\n",
" - `drop_empty_batches = True` whether empty many batches dropped or not.\n",
" \n",
"Example:"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"0\n2\n4\n6\n8\n"
]
}
],
"source": [
"dp = ExampleIterPipe(10).filter(lambda x: x % 2 == 0)\n",
"for i in dp:\n",
" print(i)"
]
},
{
"source": [
"Classic `filter` by default applies filter function to every mini-batches as a whole \n"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"[0, 1, 2]\n[3, 4, 5]\n[6, 7, 8]\n"
]
}
],
"source": [
"dp = ExampleIterPipe(10)\n",
"dp = dp.batch(3).filter(lambda x: len(x) > 2)\n",
"for i in dp:\n",
" print(i)"
]
},
{
"source": [
"You can apply filter function on individual elements by setting `nesting_level` argument"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"[5]\n[6, 7, 8]\n[9]\n"
]
}
],
"source": [
"dp = ExampleIterPipe(10)\n",
"dp = dp.batch(3).filter(lambda x: x > 4, nesting_level = 1)\n",
"for i in dp:\n",
" print(i)"
]
},
{
"source": [
"If mini-batch ends with zero elements after filtering default behaviour would be to drop them from the response. You can override this behaviour using `drop_empty_batches` argument.\n"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"[]\n[5]\n[6, 7, 8]\n[9]\n"
]
}
],
"source": [
"dp = ExampleIterPipe(10)\n",
"dp = dp.batch(3).filter(lambda x: x > 4, nesting_level = -1, drop_empty_batches = False)\n",
"for i in dp:\n",
" print(i)"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"[[[0, 1, 2], [3]], [[], [10, 11]]]\n[[[12, 13, 14], [15, 16, 17]], [[18, 19]]]\n"
]
}
],
"source": [
"dp = ExampleIterPipe(20)\n",
"dp = dp.batch(3).batch(2).batch(2).filter(lambda x: x < 4 or x > 9 , nesting_level = -1, drop_empty_batches = False)\n",
"for i in dp:\n",
" print(i)"
]
},
{
"source": [
"## Shuffle\n",
"\n",
"Function: `shuffle`\n",
"\n",
"Description: \n",
"\n",
"Alternatives:\n",
"\n",
"Arguments:\n",
" - `unbatch_level:int = 0` if specified calls `unbatch(unbatch_level=unbatch_level)` on source datapipe before batching (see `unbatch`)\n",
" - `buffer_size: int = 10000`\n",
" \n",
"Example:"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"2\n9\n4\n0\n3\n7\n8\n5\n6\n1\n"
]
}
],
"source": [
"dp = ExampleIterPipe(10).shuffle()\n",
"for i in dp:\n",
" print(i)"
]
},
{
"source": [
"`shuffle` operates on input mini-batches similar as on individual items"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"[0, 1, 2]\n[3, 4, 5]\n[9]\n[6, 7, 8]\n"
]
}
],
"source": [
"dp = ExampleIterPipe(10).batch(3).shuffle()\n",
"for i in dp:\n",
" print(i)"
]
},
{
"source": [
"To shuffle elements across batches use `shuffle(unbatch_level)` followed by `batch` pattern "
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"[2, 1, 0]\n[7, 9, 6]\n[3, 5, 4]\n[8]\n"
]
}
],
"source": [
"dp = ExampleIterPipe(10).batch(3).shuffle(unbatch_level = -1).batch(3)\n",
"for i in dp:\n",
" print(i)"
]
},
{
"source": [
"## Collate\n",
"\n",
"Function: `collate`\n",
"\n",
"Description: \n",
"\n",
"Alternatives:\n",
"\n",
"Arguments:\n",
" \n",
"Example:"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"tensor([0, 1, 2])\ntensor([3, 4, 5])\ntensor([6, 7, 8])\ntensor([9])\n"
]
}
],
"source": [
"dp = ExampleIterPipe(10).batch(3).collate()\n",
"for i in dp:\n",
" print(i)"
]
},
{
"source": [
"## GroupBy\n",
"\n",
"Function: `groupby`\n",
"\n",
"Usage: `dp.groupby(lambda x: x[0])`\n",
"\n",
"Description: Batching items by combining items with same key into same batch \n",
"\n",
"Arguments:\n",
" - `group_key_fn`\n",
" - `group_size` - yeild resulted group as soon as `group_size` elements accumulated\n",
" - `guaranteed_group_size:int = None`\n",
" - `unbatch_level:int = 0` if specified calls `unbatch(unbatch_level=unbatch_level)` on source datapipe before batching (see `unbatch`)\n",
"\n",
"#### Attention\n",
"As datasteam can be arbitrary large, grouping is done on best effort basis and there is no guarantee that same key will never present in the different groups. You can call it local groupby where locallity is the one DataPipe process/thread."
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"[0, 3, 6, 9]\n[1, 4, 7]\n[5, 2, 8]\n"
]
}
],
"source": [
"dp = ExampleIterPipe(10).shuffle().groupby(lambda x: x % 3)\n",
"for i in dp:\n",
" print(i)"
]
},
{
"source": [
"By default group key function is applied to entire input (mini-batch)"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 24,
"metadata": {},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"[[0, 1, 2], [3, 4, 5], [6, 7, 8]]\n[[9]]\n"
]
}
],
"source": [
"dp = ExampleIterPipe(10).batch(3).groupby(lambda x: len(x))\n",
"for i in dp:\n",
" print(i)"
]
},
{
"source": [
"It is possible to unnest items from the mini-batches using `unbatch_level` argument"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 25,
"metadata": {},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"[0, 3, 6, 9]\n[1, 4, 7]\n[2, 5, 8]\n"
]
}
],
"source": [
"dp = ExampleIterPipe(10).batch(3).groupby(lambda x: x % 3, unbatch_level = 1)\n",
"for i in dp:\n",
" print(i)"
]
},
{
"source": [
"When internal buffer (defined by `buffer_size`) is overfilled, groupby will yield biggest group available"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 26,
"metadata": {},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"[9, 3]\n[13, 4, 7]\n[2, 11, 14, 5]\n[0, 6, 12]\n[1, 10]\n[8]\n"
]
}
],
"source": [
"dp = ExampleIterPipe(15).shuffle().groupby(lambda x: x % 3, buffer_size = 5)\n",
"for i in dp:\n",
" print(i)"
]
},
{
"source": [
"`groupby` will produce `group_size` sized batches on as fast as possible basis"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 27,
"metadata": {},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"[6, 3, 12]\n[1, 16, 7]\n[2, 5, 8]\n[14, 11, 17]\n[15, 9, 0]\n[10, 4, 13]\n"
]
}
],
"source": [
"dp = ExampleIterPipe(18).shuffle().groupby(lambda x: x % 3, group_size = 3)\n",
"for i in dp:\n",
" print(i)"
]
},
{
"source": [
"Remaining groups must be at least `guaranteed_group_size` big. "
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 28,
"metadata": {},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"[11, 2, 5]\n[1, 4, 10]\n[0, 9, 6]\n[14, 8]\n[13, 7]\n[12, 3]\n"
]
}
],
"source": [
"dp = ExampleIterPipe(15).shuffle().groupby(lambda x: x % 3, group_size = 3, guaranteed_group_size = 2)\n",
"for i in dp:\n",
" print(i)"
]
},
{
"source": [
"Without defined `group_size` function will try to accumulate at least `guaranteed_group_size` elements before yielding resulted group"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 29,
"metadata": {},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"[3, 6, 9, 12, 0]\n[14, 2, 8, 11, 5]\n[7, 4, 1, 13, 10]\n"
]
}
],
"source": [
"dp = ExampleIterPipe(15).shuffle().groupby(lambda x: x % 3, guaranteed_group_size = 2)\n",
"for i in dp:\n",
" print(i)"
]
},
{
"source": [
"This behaviour becomes noticable when data is bigger than buffer and some groups getting evicted before gathering all potential items"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 30,
"metadata": {},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"[0, 3]\n[1, 4, 7]\n[2, 5, 8]\n[6, 9, 12]\n[10, 13]\n[11, 14]\n"
]
}
],
"source": [
"dp = ExampleIterPipe(15).groupby(lambda x: x % 3, guaranteed_group_size = 2, buffer_size = 6)\n",
"for i in dp:\n",
" print(i)"
]
},
{
"source": [
"With randomness involved you might end up with incomplete groups (so next example expected to fail in most cases)"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 31,
"metadata": {},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"[14, 5, 11]\n[1, 7, 4, 10]\n[0, 12, 6]\n[8, 2]\n[9, 3]\n"
]
},
{
"output_type": "error",
"ename": "Exception",
"evalue": "('Failed to group items', '[13]')",
"traceback": [
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[0;31mException\u001b[0m Traceback (most recent call last)",
"\u001b[0;32m<ipython-input-31-673b9dd7fb43>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m\u001b[0m\n\u001b[1;32m 1\u001b[0m \u001b[0mdp\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mExampleIterPipe\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;36m15\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mshuffle\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mgroupby\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;32mlambda\u001b[0m \u001b[0mx\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0mx\u001b[0m \u001b[0;34m%\u001b[0m \u001b[0;36m3\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mguaranteed_group_size\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;36m2\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mbuffer_size\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;36m6\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m----> 2\u001b[0;31m \u001b[0;32mfor\u001b[0m \u001b[0mi\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mdp\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 3\u001b[0m \u001b[0mprint\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mi\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/dataset/pytorch/torch/utils/data/datapipes/iter/grouping.py\u001b[0m in \u001b[0;36m__iter__\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 275\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 276\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mguaranteed_group_size\u001b[0m \u001b[0;32mis\u001b[0m \u001b[0;32mnot\u001b[0m \u001b[0;32mNone\u001b[0m \u001b[0;32mand\u001b[0m \u001b[0mbiggest_size\u001b[0m \u001b[0;34m<\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mguaranteed_group_size\u001b[0m \u001b[0;32mand\u001b[0m \u001b[0;32mnot\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mdrop_remaining\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 277\u001b[0;31m \u001b[0;32mraise\u001b[0m \u001b[0mException\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m'Failed to group items'\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mstr\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mbuffer\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0mbiggest_key\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 278\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 279\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mguaranteed_group_size\u001b[0m \u001b[0;32mis\u001b[0m \u001b[0;32mNone\u001b[0m \u001b[0;32mor\u001b[0m \u001b[0mbiggest_size\u001b[0m \u001b[0;34m>=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mguaranteed_group_size\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;31mException\u001b[0m: ('Failed to group items', '[13]')"
]
}
],
"source": [
"dp = ExampleIterPipe(15).shuffle().groupby(lambda x: x % 3, guaranteed_group_size = 2, buffer_size = 6)\n",
"for i in dp:\n",
" print(i)"
]
},
{
"source": [
"To avoid this error and drop incomplete groups, use `drop_remaining` argument"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 32,
"metadata": {},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"[5, 2, 14]\n[4, 7, 13, 1, 10]\n[12, 6, 3, 9]\n[8, 11]\n"
]
}
],
"source": [
"dp = ExampleIterPipe(15).shuffle().groupby(lambda x: x % 3, guaranteed_group_size = 2, buffer_size = 6, drop_remaining = True)\n",
"for i in dp:\n",
" print(i)"
]
},
{
"source": [
"## Zip\n",
"\n",
"Function: `zip`\n",
"\n",
"Description: \n",
"\n",
"Alternatives:\n",
"\n",
"Arguments:\n",
" \n",
"Example:"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 35,
"metadata": {},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"(0, 3)\n(1, 0)\n(2, 4)\n(3, 2)\n(4, 1)\n"
]
}
],
"source": [
"_dp = ExampleIterPipe(5).shuffle()\n",
"dp = ExampleIterPipe(5).zip(_dp)\n",
"for i in dp:\n",
" print(i)"
]
},
{
"source": [
"## Fork\n",
"\n",
"Function: `fork`\n",
"\n",
"Description: \n",
"\n",
"Alternatives:\n",
"\n",
"Arguments:\n",
" \n",
"Example:"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 36,
"metadata": {},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"0\n1\n0\n1\n0\n1\n"
]
}
],
"source": [
"dp = ExampleIterPipe(2)\n",
"dp1, dp2, dp3 = dp.fork(3)\n",
"for i in dp1 + dp2 + dp3:\n",
" print(i)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Demultiplexer\n",
"\n",
"Function: `demux`\n",
"\n",
"Description: \n",
"\n",
"Alternatives:\n",
"\n",
"Arguments:\n",
" \n",
"Example:"
]
},
{
"cell_type": "code",
"execution_count": 32,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"1\n",
"4\n",
"7\n"
]
}
],
"source": [
"dp = ExampleIterPipe(10)\n",
"dp1, dp2, dp3 = dp.demux(3, lambda x: x % 3)\n",
"for i in dp2:\n",
" print(i)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Multiplexer\n",
"\n",
"Function: `mux`\n",
"\n",
"Description: \n",
"\n",
"Alternatives:\n",
"\n",
"Arguments:\n",
" \n",
"Example:"
]
},
{
"cell_type": "code",
"execution_count": 34,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"0\n",
"0\n",
"0\n",
"1\n",
"10\n",
"100\n",
"2\n",
"20\n",
"200\n"
]
}
],
"source": [
"dp1 = ExampleIterPipe(3)\n",
"dp2 = ExampleIterPipe(3).map(lambda x: x * 10)\n",
"dp3 = ExampleIterPipe(3).map(lambda x: x * 100)\n",
"\n",
"dp = dp1.mux(dp2, dp3)\n",
"for i in dp:\n",
" print(i)"
]
},
{
"source": [
"## Concat\n",
"\n",
"Function: `concat`\n",
"\n",
"Description: Returns DataPipes with elements from the first datapipe following by elements from second datapipes\n",
"\n",
"Alternatives:\n",
" \n",
" `dp = dp.concat(dp2, dp3)`\n",
" `dp = dp.concat(*datapipes_list)`\n",
"\n",
"Example:\n"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 37,
"metadata": {},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"0\n1\n2\n3\n0\n1\n2\n"
]
}
],
"source": [
"dp = ExampleIterPipe(4)\n",
"dp2 = ExampleIterPipe(3)\n",
"dp = dp.concat(dp2)\n",
"for i in dp:\n",
" print(i)"
]
}
]
}