| ## @package pipeline |
| # Module caffe2.python.pipeline |
| |
| |
| |
| |
| |
| from caffe2.python import core, queue_util |
| from caffe2.python.dataio import Reader, Writer |
| from caffe2.python.net_builder import NetBuilder, ops |
| from caffe2.python.schema import as_record, Field |
| from caffe2.python.task import Node, Task, TaskGroup |
| |
| |
| class Output: |
| """ |
| Represents the result of a processor function. A processor can either |
| return an Output, or it can return a record, in which case an Output will be |
| created for it afterwards. |
| """ |
| def __init__(self, nets=None, record=None, should_stop=None): |
| builder_children = NetBuilder.current().get() |
| assert nets is None or len(builder_children) == 0, ( |
| 'Cannot both use `ops` syntax and return a list of nets.') |
| if nets is None: |
| nets = builder_children |
| if isinstance(nets, core.Net): |
| nets = [nets] |
| self.nets = [] if nets is None else list(nets) |
| self.record = None if record is None else as_record(record) |
| self.should_stop = should_stop |
| |
| |
| DEFAULT_QUEUE_CAPACITY = 100 |
| |
| |
| def _init_output(output, capacity, global_init_net, global_exit_net): |
| if output is None: |
| out_queue = queue_util.Queue( |
| capacity=( |
| capacity if capacity is not None |
| else DEFAULT_QUEUE_CAPACITY)) |
| writer = out_queue.writer() |
| elif isinstance(output, Writer): |
| assert capacity is None, 'capacity would not be used.' |
| out_queue = None |
| writer = output |
| elif hasattr(output, 'writer'): |
| assert capacity is None, 'capacity would not be used.' |
| out_queue = output |
| writer = output.writer() |
| else: |
| raise ValueError('output must be a reader, queue or stream.') |
| writer.setup_ex(global_init_net, global_exit_net) |
| return out_queue, writer |
| |
| |
| def make_processor(processor, reader=None): |
| if processor is None: |
| return lambda rec: rec |
| elif isinstance(processor, core.Net): |
| return NetProcessor(processor) |
| else: |
| if reader is not None and hasattr(processor, "schema_func"): |
| def processor_schema(): |
| return processor.schema_func(reader) |
| |
| processor.schema = processor_schema |
| return processor |
| |
| |
| def normalize_processor_output(output): |
| """ |
| Allow for processors to return results in several formats. |
| TODO(azzolini): simplify once all processors use NetBuilder API. |
| """ |
| if isinstance(output, Output): |
| """ Processor returned an Output. """ |
| return output |
| elif isinstance(output, Field): |
| """ Processor returned a record. """ |
| return Output(record=output) |
| elif isinstance(output, tuple): |
| is_record_and_blob = ( |
| len(output) == 2 and |
| isinstance(output[0], Field) and |
| isinstance(output[1], core.BlobReference)) |
| if is_record_and_blob: |
| """ Processor returned (record, stop_blob) """ |
| return Output(None, *output) |
| else: |
| """ Processor returned (nets, record, stop_blob) """ |
| return Output(*output) |
| else: |
| """ Processor returned nets, no output """ |
| return Output(output) |
| |
| |
| def pipe( |
| input, output=None, num_threads=1, processor=None, name=None, |
| capacity=None, group=None, num_runtime_threads=1): |
| """ |
| Given a Reader, Queue or DataStream in `input`, and optionally, a Writer, |
| Queue or DataStream in `output`, creates a Task that, when run, will |
| pipe the input into the output, using multiple parallel threads. |
| Additionally, if a processor is given, it will be called between reading |
| and writing steps, allowing it to transform the record. |
| |
| Args: |
| input: either a Reader, Queue or DataStream that will be read |
| until a stop is signaled either by the reader or the |
| writer. |
| output: either a Writer, a Queue or a DataStream that will be |
| written to as long as neither reader nor writer signal |
| a stop condition. If output is not provided or is None, |
| a Queue is created with given `capacity` and written to. |
| num_threads: number of concurrent threads used for processing and |
| piping. If set to 0, no Task is created, and a |
| reader is returned instead -- the reader returned will |
| read from the reader passed in and process it. |
| ** DEPRECATED **. Use `num_runtime_threads` instead. |
| This option will be removed once all readers/processors |
| support `num_runtime_threads`. |
| processor: (optional) function that takes an input record and |
| optionally returns a record; this will be called |
| between read and write steps. If the processor does |
| not return a record, a writer will not be instantiated. |
| Processor can also be a core.Net with input and output |
| records properly set. In that case, a NetProcessor is |
| instantiated, cloning the net for each of the threads. |
| name: (optional) name of the task to be created. |
| capacity: when output is not passed, a queue of given `capacity` |
| is created and written to. |
| group: (optional) explicitly add the created Task to this |
| TaskGroup, instead of using the currently active one. |
| num_runtime_threads: Similar to `num_threads`, but instead of expanding |
| the tasks with a `for` loop in python, does that at |
| runtime. This is preferable to `num_threads`, but some |
| processors/readers still require to be called multiple |
| times in python. |
| |
| Returns: |
| Output Queue, DataStream, Reader, or None, depending on the parameters |
| passed. |
| """ |
| result, _ = _pipe_step( |
| input, output, num_threads, processor, name, capacity, group, |
| num_runtime_threads) |
| return result |
| |
| |
| def pipe_and_output( |
| input, output=None, num_threads=1, processor=None, name=None, |
| capacity=None, group=None, num_runtime_threads=1, final_outputs=None): |
| """ |
| Similar to `pipe`, with the additional ability for the pipe Task to |
| return output values to the `Session` once done. |
| |
| Returns: |
| Tuple (out_queue, *task_outputs) |
| out_queue: same as return value of `pipe`. |
| task_outputs: TaskOutput object, fetchable from the client after |
| session.run() returns. |
| """ |
| assert num_threads > 0 |
| result, task = _pipe_step( |
| input, output, num_threads, processor, name, capacity, group, |
| num_runtime_threads, final_outputs) |
| output = None |
| if final_outputs is not None: |
| output = task.outputs() |
| if type(final_outputs) not in (list, tuple): |
| output = output[0] |
| return result, output |
| |
| |
| def processor_name(processor): |
| if hasattr(processor, 'name'): |
| return processor.name |
| if hasattr(processor, 'func_name'): |
| if processor.func_name == '<lambda>': |
| return processor.__module__ |
| if hasattr(processor, 'im_class'): |
| return '%s.%s' % (processor.im_class.__name__, processor.func_name) |
| return processor.func_name |
| return processor.__class__.__name__ |
| |
| |
| def _runtime_threads_task(name, group, final_outputs, reader, num_threads, |
| output, capacity): |
| node_name = str(Node.current()) |
| profiler_name = "{0}/{1}/{2}/{3}/{4}".format( |
| node_name, |
| "pipe", |
| name, |
| processor_name(input) if input else "NoInput", |
| processor_name(output) if output else "NoOutput") |
| |
| with Task(name=name, group=group, outputs=final_outputs, |
| num_instances=num_threads) as task: |
| global_exit_net = core.Net('pipe:exit') |
| global_init_net = core.Net('pipe:init') |
| reader.setup_ex(global_init_net, global_exit_net) |
| |
| init_net = core.Net('pipe:instance:init') |
| exit_net = core.Net('pipe:instance:exit') |
| read_nets, status, rec = reader.read_record_ex(init_net, exit_net) |
| init_net.ConstantFill( |
| [], [status], |
| shape=[], |
| value=False, |
| dtype=core.DataType.BOOL |
| ) |
| |
| if rec is not None: |
| out_queue, writer = _init_output( |
| output, capacity, global_init_net, global_exit_net) |
| write_nets, _ = writer.write_record_ex( |
| rec, init_net, exit_net, status) |
| else: |
| out_queue = None |
| write_nets = [] |
| |
| with ops.task_init(): |
| ops.net(global_init_net) |
| with ops.task_instance_init(): |
| ops.net(init_net) |
| |
| timer_start_net = core.Net('timer_start') |
| timer = timer_start_net.TimerBegin([], counter_name=profiler_name) |
| timer_end_net = core.Net('timer_end') |
| timer_end_net.TimerEnd(timer, []) |
| |
| ops.net(core.execution_step( |
| 'body', |
| [timer_start_net] + list(read_nets) + list(write_nets) + |
| [timer_end_net], |
| should_stop_blob=status)) |
| ops.net(timer_end_net) |
| |
| with ops.task_instance_exit(): |
| ops.net(exit_net) |
| with ops.task_exit(): |
| ops.net(global_exit_net) |
| |
| return out_queue, task |
| |
| |
| def _static_threads_task(name, group, final_outputs, reader, num_threads, |
| output, capacity): |
| node_name = str(Node.current()) |
| profiler_name = "{0}/{1}/{2}/{3}/{4}".format( |
| node_name, |
| "pipe", |
| name, |
| processor_name(input) if input else "NoInput", |
| processor_name(output) if output else "NoOutput") |
| |
| with Task(name=name, group=group, outputs=final_outputs) as task: |
| global_exit_net = core.Net('exit') |
| global_init_net = core.Net('init') |
| reader.setup_ex(global_init_net, global_exit_net) |
| |
| out_queue = None |
| writer = None |
| |
| steps = [] |
| for thread_id in range(num_threads): |
| with NetBuilder(name='t:%d' % thread_id) as nb: |
| init_net = core.Net('init') |
| exit_net = core.Net('exit') |
| read_nets, status, rec = reader.read_record_ex( |
| init_net, exit_net) |
| init_net.ConstantFill( |
| [], [status], |
| shape=[], |
| value=False, |
| dtype=core.DataType.BOOL |
| ) |
| |
| if rec is not None: |
| if writer is None: |
| # hack so that the out queue gets the right name prefix |
| # (otherwise they would be prefixed with the thread id) |
| with NetBuilder(_fullname=task.name): |
| out_queue, writer = _init_output( |
| output, capacity, global_init_net, |
| global_exit_net) |
| write_nets, _ = writer.write_record_ex( |
| rec, init_net, exit_net, status) |
| else: |
| write_nets = [] |
| |
| timer_start_net = core.Net('timer_start') |
| timer = timer_start_net.TimerBegin([], counter_name=profiler_name) |
| timer_end_net = core.Net('timer_end') |
| timer_end_net.TimerEnd(timer, []) |
| |
| ops.net(init_net) |
| ops.net(core.execution_step( |
| 'body', |
| [timer_start_net] + list(read_nets) + list(write_nets) + |
| [timer_end_net], |
| should_stop_blob=status)) |
| ops.net(timer_end_net) |
| ops.net(exit_net) |
| steps.append(core.to_execution_step(nb)) |
| ops.net(global_init_net) |
| ops.net(core.execution_step('body', steps, concurrent_substeps=True)) |
| ops.net(global_exit_net) |
| return out_queue, task |
| |
| |
| def _pipe_step( |
| input, output=None, num_threads=1, processor=None, name=None, |
| capacity=None, group=None, num_runtime_threads=None, final_outputs=None): |
| """ |
| """ |
| assert num_threads <= 1 or num_runtime_threads <= 1, ( |
| 'Only one of num_threads or num_runtime_threads must be set.') |
| |
| if isinstance(input, Reader): |
| reader = input |
| elif hasattr(input, 'reader'): |
| reader = input.reader() |
| else: |
| raise ValueError( |
| 'Input must be a reader, queue or stream. Got {}'.format(type(input))) |
| |
| if processor is not None: |
| reader = ProcessingReader(reader, processor) |
| |
| if num_threads == 0 or num_runtime_threads == 0: |
| assert output is None |
| return reader, None |
| |
| if name is None and processor is not None: |
| name = processor_name(processor) |
| if name is None and output is not None: |
| name = 'pipe_into:%s' % processor_name(output) |
| if name is None: |
| name = 'pipe_from:%s' % processor_name(input) |
| |
| if num_threads > 1: |
| return _static_threads_task( |
| name, group, final_outputs, reader, num_threads, output, capacity) |
| else: |
| return _runtime_threads_task( |
| name, group, final_outputs, reader, num_runtime_threads, output, |
| capacity) |
| |
| |
| class ProcessingReader(Reader): |
| """ |
| Reader that reads from an upstream reader, calls the processor, and returns |
| the processed record. |
| """ |
| def __init__(self, reader, processor): |
| Reader.__init__(self) |
| self.reader = reader |
| self.processor = make_processor(processor, reader) |
| |
| def schema(self): |
| return self.processor.schema() |
| |
| def setup_ex(self, init_net, finish_net): |
| self.reader.setup_ex(init_net, finish_net) |
| |
| def read_ex(self, init_net, exit_net): |
| read_nets, status, rec = self.reader.read_record_ex(init_net, exit_net) |
| # We don't use status as stop_blob of NetBuilder it's not guarantee that |
| # it would end up being the true stob_blob. For example, |
| # ReaderWithLimitBase doesn't pass the status through but rather copy |
| # from it. |
| with NetBuilder() as nb: |
| # Current NetBuilder is optionally used inside the processor, |
| # then its children are retrieved inside of |
| # normalize_processor_output. |
| # Once readers and writers also use NetBuilder, |
| # this logic will be more natural. |
| result = normalize_processor_output(self.processor(rec)) |
| read_nets += result.nets |
| if result.should_stop or nb._stop_blob: |
| stop_net = core.Net('stop_net') |
| if result.should_stop: |
| stop_net.Or([status, result.should_stop], [status]) |
| if nb._stop_blob: |
| stop_net.Or([status, nb._stop_blob], [status]) |
| read_nets.append(stop_net) |
| if hasattr(self.processor, 'setup'): |
| init_net.add_attribute(TaskGroup.LOCAL_SETUP, self.processor) |
| self._set_schema(result.record) |
| fields = result.record.field_blobs() if result.record else None |
| return read_nets, status, fields |
| |
| |
| class NetProcessor: |
| """ |
| Processor that clones a core.Net each time it's called, executing |
| the cloned net as the processor. It requires the Net to have input |
| and (optionally) output records set, with net.set_input_record() and |
| net.set_output_record(). |
| """ |
| def __init__(self, net, stop_signal=None, thread_init_nets=None, name=None): |
| assert isinstance(net, core.Net) |
| assert stop_signal is None or isinstance( |
| stop_signal, core.BlobReference) |
| self.name = name or str(net) |
| self.thread_init_nets = thread_init_nets or [] |
| self.net = net |
| self._stop_signal = stop_signal |
| self._blob_maps = [] |
| self._frozen = False |
| self._cloned_init_nets = [] |
| |
| def schema(self): |
| return self.net.output_record() |
| |
| def setup(self, init_net): |
| self._frozen = True |
| cloned_init_nets = self._cloned_init_nets |
| self._cloned_init_nets = [] |
| return cloned_init_nets |
| |
| def __call__(self, rec): |
| assert not self._frozen |
| prefix = NetBuilder.current().name + '/' |
| blob_remap = {} |
| for net in self.thread_init_nets: |
| new_net, _ = core.clone_and_bind_net( |
| net, str(net) + prefix, prefix, blob_remap) |
| self._cloned_init_nets.append(new_net) |
| |
| new_net, remappings = core.clone_and_bind_net( |
| self.net, str(self.net) + prefix, prefix, blob_remap, rec) |
| |
| if self._stop_signal is None: |
| stop_signal = None |
| elif str(self._stop_signal) in remappings: |
| stop_signal = core.BlobReference( |
| remappings[str(self._stop_signal)], |
| net=new_net) |
| else: |
| stop_signal = self._stop_signal |
| |
| self._blob_maps.append(remappings) |
| return Output([new_net], new_net.output_record(), stop_signal) |
| |
| def blob_maps(self): |
| self._frozen = True |
| return self._blob_maps |