Source code for hidet.graph.flow_graph

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=protected-access
from __future__ import annotations
from typing import List, Union, Dict, Set, Optional, Tuple, Sequence
import logging
import os
import pickle
from collections import defaultdict

import hidet.graph.operator
import hidet.cuda
from hidet.cuda.graph import CudaGraphCreationError
from hidet import option
from hidet.ir.expr import is_constant
from hidet.ir.task import Task
from hidet.graph.tensor import Tensor, zeros_like, randn_like
from hidet.graph.operator import Operator, SymbolVar

logger = logging.getLogger(__name__)


class GraphForwardInstrument:
    def before_graph(self, graph: FlowGraph, inputs: List[Tensor]) -> None:
        pass

    def after_graph(self, graph: FlowGraph, inputs: List[Tensor], outputs: List[Tensor]) -> None:
        pass

    def before_operator(self, op: Operator, inputs: List[Tensor]) -> None:
        pass

    def after_operator(self, op: Operator, inputs: List[Tensor], outputs: List[Tensor]) -> None:
        pass


class GraphForwardContext:
    _stack: List[GraphForwardContext] = []

    def __init__(self):
        self.instruments: List[GraphForwardInstrument] = []

    def __enter__(self):
        GraphForwardContext._stack.append(self)
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        GraphForwardContext._stack.pop()

    @staticmethod
    def current() -> GraphForwardContext:
        if len(GraphForwardContext._stack) == 0:
            GraphForwardContext._stack.append(GraphForwardContext())
        return GraphForwardContext._stack[-1]

    @staticmethod
    def _before_graph(graph: FlowGraph, inputs: List[Tensor]) -> None:
        ctx = GraphForwardContext.current()
        for instrument in ctx.instruments:
            instrument.before_graph(graph, inputs)

    @staticmethod
    def _after_graph(graph: FlowGraph, inputs: List[Tensor], outputs: List[Tensor]) -> None:
        ctx = GraphForwardContext.current()
        for instrument in ctx.instruments:
            instrument.after_graph(graph, inputs, outputs)

    @staticmethod
    def _before_operator(op: Operator, inputs: List[Tensor]) -> None:
        ctx = GraphForwardContext.current()
        for instrument in ctx.instruments:
            instrument.before_operator(op, inputs)

    @staticmethod
    def _after_operator(op: Operator, inputs: List[Tensor], outputs: List[Tensor]) -> None:
        ctx = GraphForwardContext.current()
        for instrument in ctx.instruments:
            instrument.after_operator(op, inputs, outputs)

    def append_instrument(self, instrument: GraphForwardInstrument):
        self.instruments.append(instrument)

    def debug(self, output_dir='./outs/debug', print_summary: bool = False, dump_outputs: bool = False):
        from .graph_utils.instruments import GraphForwardDebugInstrument

        self.instruments.append(GraphForwardDebugInstrument(output_dir, print_summary, dump_outputs))

    def benchmark(self, output_dir='./outs/benchmark', print_summary: bool = False, warmup=3, number=10, repeat=3):
        from .graph_utils.instruments import GraphForwardBenchmarkInstrument

        self.instruments.append(GraphForwardBenchmarkInstrument(output_dir, print_summary, warmup, number, repeat))


def forward_context() -> GraphForwardContext:
    return GraphForwardContext()


[docs]class FlowGraph: """The computation graph representation.""" def __init__(self, outputs: Sequence[Tensor], inputs: Optional[Sequence[Tensor]] = None, nodes=None): self.outputs: List[Tensor] = list(outputs) self.inputs: Optional[List[Tensor]] = list(inputs) if inputs is not None else None self._nodes: Optional[List[Operator]] = nodes self._usage_count: Optional[Dict[Tensor, int]] = None self.update_nodes()
[docs] def __call__(self, *inputs: Tensor) -> Union[List[Tensor], Tensor]: """ Run the computation graph. Parameters ---------- inputs : Sequence[Tensor] The input tensors. Returns ------- ret: Union[List[Tensor], Tensor] The output tensors. If there is only one output, return it directly. """ outputs = self.forward(list(inputs)) return outputs[0] if len(outputs) == 1 else outputs
def __str__(self): from .graph_utils import flow_graph_as_text return flow_graph_as_text(self) @property def nodes(self) -> List[Operator]: """The list of operators in the computation graph.""" if self._nodes is None: self.update_nodes() return self._nodes @property def usage_count(self) -> Dict[Tensor, int]: """The usage count of each tensor in the computation graph.""" if self._usage_count is None: self.update_nodes() return self._usage_count.copy() def invalid_cache(self): self._nodes = None self._usage_count = None def _build_nodes(self): tasks: List[Tuple[Task, str]] = [] tunable_tasks: List[Tuple[Task, str]] = [] task_keys = set() search_space = hidet.option.get_option('search_space') for node in self.nodes: if node._compiled_task is None: task_key = hash(str(node.task)) if task_key in task_keys: continue task_keys.add(task_key) if search_space == 0 or all( method not in node.task.__class__.__dict__ for method in ['implement_cuda', 'implement_cpu', 'implement'] ): tasks.append((node.task, node.build_target)) else: tunable_tasks.append((node.task, node.build_target)) hidet.drivers.build_task_batch(tasks) with option.context(): hidet.option.parallel_build(False) hidet.drivers.build_task_batch(tunable_tasks) # build tunable tasks one by one
[docs] def forward(self, inputs: List[Tensor]) -> List[Tensor]: """Run the computation graph. Parameters ---------- inputs: List[Tensor] The input tensors. They should be consistent with the symbolic inputs of the computation graph. Returns ------- output: List[Tensor] The output tensors of the computation graph. """ from hidet.ffi import runtime_api inputs: List[Tensor] = list(inputs) # the input tensors should be non-symbolic for idx, tensor in enumerate(inputs): if tensor.storage is None: msg = 'Expect non-symbolic input tensors, got symbolic input {} ({}).'.format(idx, tensor.signature()) raise ValueError(msg) # build the kernel for each operator in the graph self._build_nodes() # set the symbol values for expect_input, actual_input in zip(self.inputs, inputs): if expect_input.device != actual_input.device: raise ValueError( 'Expect input {} to have device {}, got {}.'.format( expect_input, expect_input.device, actual_input.device ) ) for expect_dim, actual_dim in zip(expect_input.shape, actual_input.shape): if isinstance(expect_dim, SymbolVar): runtime_api.set_symbol_value(expect_dim.name, int(actual_dim)) else: assert is_constant(actual_dim, expect_dim) and expect_dim == actual_dim GraphForwardContext._before_graph(self, inputs) # count the usage of each tensor. We use this count to determine whether # a tensor should be freed after running an operator. usage_count = self.usage_count.copy() tensor_map: Dict[Tensor, Tensor] = {} # symbolic tensor -> actual tensor during the forward process for st, at in zip(self.inputs, inputs): tensor_map[st] = at # run each operator in the graph in a topological order for idx, node in enumerate(self.nodes): # prepare node inputs node_inputs = [] for node_input in node.inputs: if node_input.storage is None: # symbolic input node_inputs.append(tensor_map[node_input]) usage_count[node_input] -= 1 if usage_count[node_input] == 0: # this temporary tensor is no longer needed # free the memory del tensor_map[node_input] else: # constant input node_inputs.append(node_input) node_inputs = node_inputs[: len(node.inputs)] # run node GraphForwardContext._before_operator(node, node_inputs) logger.debug('[%4d/%d] run operator %s, %s', idx, len(self.nodes), node.name, node.task) logger.debug(' inputs: %s', [x.signature() for x in node_inputs]) node_outputs = node.compiled_task.run_async(node_inputs) logger.debug(' outputs: %s', [x.signature() for x in node_outputs]) GraphForwardContext._after_operator(node, node_inputs, node_outputs) # update map for node_output, symbolic_output in zip(node_outputs, node.outputs): tensor_map[symbolic_output] = node_output outputs = [] for graph_output in self.outputs: if graph_output in tensor_map: outputs.append(tensor_map[graph_output]) elif graph_output.storage is not None: outputs.append(graph_output) # constant output, not the graph input or produced by any operator else: raise RuntimeError('Graph output {} is not produced by any operator.'.format(graph_output.signature())) GraphForwardContext._after_graph(self, inputs, outputs) return outputs
def dummy_inputs(self) -> List[Tensor]: inputs = [] for symbolic_input in self.inputs: if symbolic_input.dtype.is_integer(): inputs.append(zeros_like(symbolic_input)) elif symbolic_input.dtype.is_float(): inputs.append(randn_like(symbolic_input)) else: assert False return inputs
[docs] def save(self, model_file: str): """Save the flow graph to a file. Parameters ---------- model_file: str The model file to store the flow graph. """ # before save, clear the packed func cache because ctypes object can not be pickled for node in self.nodes: node._compiled_task = None self._usage_count, self._nodes = None, None dirname = os.path.dirname(model_file) os.makedirs(dirname, exist_ok=True) # save to a temporary file first, in case pickle fails. with open(model_file + '.temp', 'wb') as f: pickle.dump(self, f) os.rename(model_file + '.temp', model_file)
[docs] @staticmethod def load(model_file: str) -> FlowGraph: """Load a flow graph from a file. Parameters ---------- model_file: str The path to the flow graph. Returns ------- ret: FlowGraph The loaded flow graph. """ with open(model_file, 'rb') as f: ret = pickle.load(f) if not isinstance(ret, FlowGraph): raise TypeError('Expect to load FlowGraph, got {}'.format(type(ret))) return ret
def update_nodes(self): free_vars, self._nodes, self._usage_count = self._analyze(self.outputs) if self.inputs: non_bound_free_vars: Set[Tensor] = set(free_vars) - set(self.inputs) if len(non_bound_free_vars) > 0: msg = ['There is free variable(s) not given in inputs:'] for v in non_bound_free_vars: msg.append(' {}'.format(v.signature())) raise ValueError('\n'.join(msg)) else: if len(free_vars) > 1: raise ValueError( f'The traced graph has found {len(free_vars)} free varaibles. ' f'When there are multiple free ' f'variables, it is mandatory to specify the "inputs" argument explicitly when calling ' f'hidet.trace_from(...):\n' ' hidet.trace_from(..., inputs=[tensor1, tensor2, ...])\n' ) self.inputs = free_vars return self
[docs] def build(self, *, space=0): """ Build the flow graph to a compiled model (hidet.runtime.CompiledModel). Parameters ---------- space: int The space to allocate for the compiled model. Candidates are 0, 1 and 2. Space 0 means each operator will be compiled with the default schedule. Space 1 means each operator will be compiled with a small set of schedules. Space 2 means each operator will be compiled with a large set of schedules. The larger the space, the more schedules will be tried, and the better the performance will be, with the cost of longer compilation and tuning time. Returns ------- ret: hidet.runtime.CompiledGraph The compiled model. """ from hidet.drivers.build_graph import build_flow_graph return build_flow_graph(self, space=space)
[docs] def cuda_graph(self): """Create a CudaGraph from FlowGraph. Returns ------- ret: hidet.cuda.graph.CudaGraph The created cuda graph. """ from hidet.cuda.graph import CudaGraph for x in self.inputs: if not x.device.is_cuda(): raise CudaGraphCreationError( 'FlowGraph.cuda_graph() only supports cuda inputs, got {}'.format(x.signature()) ) for d in x.shape: if not isinstance(d, int): raise CudaGraphCreationError( 'FlowGraph.cuda_graph() only supports inputs with static shape, got {}'.format(x.signature()) ) def f_create_inputs() -> List[Tensor]: return self.dummy_inputs() def f_run(inputs: List[Tensor]) -> List[Tensor]: return self.forward(inputs) return CudaGraph(f_create_inputs, f_run, ref_objs=[self])
[docs] def latency( self, warmup=1, number=3, repeat=3, median=True, dummy_inputs: Optional[Sequence[Tensor]] = None ) -> Union[float, List[float]]: """Measure the latency of the flow graph. Parameters ---------- warmup: int The number of warmup runs. number: int The number of runs to measure the latency. repeat: int The number of times to repeat the measurement. median: bool Whether to return the median latency. dummy_inputs: Optional[Sequence[Tensor]] The dummy inputs to run the flow graph. If not given, automatic generated dummy inputs would be used. Returns ------- ret: Union[float, List[float]] The measured latency in milliseconds. """ import time import numpy as np if dummy_inputs is None: dummy_inputs = self.dummy_inputs() for _ in range(warmup): self.forward(dummy_inputs) results = [] for _ in range(repeat): hidet.cuda.synchronize() t1 = time.time() for _ in range(number): self.forward(dummy_inputs) hidet.cuda.synchronize() t2 = time.time() results.append((t2 - t1) * 1000 / number) if median: return float(np.median(results)) else: return results
@staticmethod def _analyze(outputs: List[Tensor]) -> Tuple[List[Tensor], List[Operator], Dict[Tensor, int]]: """ Analyze the implicit flow graph by backwards traversing the graph from given outputs. Parameters ---------- outputs: List[Tensor] The outputs of the flow graph to traversing from. Returns ------- free_vars, nodes, usage_count: Tuple[List[Tensor], List[Operator], Dict[Tensor, int]] The free variables, nodes and usage count of the flow graph. The free variables are the free symbolic tensors that are not produced by any operators and do not contain the non-None storage attribute. The nodes are the operators that are used to produce the outputs, in topological order. The usage count contains the number of times each tensor is used. """ free_vars = [] nodes: List[Operator] = [] # find out all nodes all_nodes: Set[Operator] = set() def find_all_nodes(u: Operator): all_nodes.add(u) for x in u.inputs: if x.op is None: continue v: Operator = x.op if v not in all_nodes: find_all_nodes(v) for ot in outputs: if ot.trace: find_all_nodes(ot.op) # topological sort out_degree: Dict[Operator, int] = {u: 0 for u in all_nodes} for u in all_nodes: for it in u.inputs: if it.op is None or it.op not in all_nodes: continue out_degree[it.op] += 1 for u in outputs: if u.op: out_degree[u.op] += 1 stack: List[Operator] = [] for u in outputs: if u.op: out_degree[u.op] -= 1 if out_degree[u.op] == 0: stack.append(u.op) while len(stack) > 0: op = stack.pop() nodes.append(op) for it in op.inputs: if it.op is None: if it.storage is None and all(it is not v for v in free_vars): # input free_vars.append(it) elif it.op not in all_nodes: pass else: if it is not it.op.outputs[it.trace[1]]: raise ValueError('The trace is broken') out_degree[it.op] -= 1 if out_degree[it.op] == 0: stack.append(it.op) nodes = list(reversed(nodes)) assert len(nodes) == len(all_nodes), 'all_nodes {} topo_order {}'.format(len(all_nodes), len(nodes)) # tensor usage count usage_count: Dict[Tensor, int] = defaultdict(int) for op in all_nodes: for inp in op.inputs: usage_count[inp] += 1 for graph_output in outputs: usage_count[graph_output] += 1 return free_vars, nodes, usage_count
[docs] def vcuda_(self) -> None: """ casts the flow graph object to vcuda device in place """ from hidet.runtime.device import instantiate_device, Device for x in self.inputs: if not x.device.is_cuda(): raise ValueError("Inputs must be on cuda device") x.vcuda_() for node in self.nodes: if 'device' in node.attrs: dev = instantiate_device(node.attrs['device']) if dev.is_cuda(): dev = Device('vcuda', dev.id) node.attrs['device'] = dev for inp in node.inputs: if inp.device.is_cuda(): inp.vcuda_() for outp in node.outputs: if outp.device.is_cuda(): outp.vcuda_()
[docs] def cuda_(self) -> None: """ casts the flow graph object from vcuda device in place """ from hidet.runtime.device import instantiate_device, Device for x in self.inputs: if not x.device.is_vcuda(): raise ValueError("Inputs must be on vcuda device") x.cuda_() for node in self.nodes: if 'device' in node.attrs: dev = instantiate_device(node.attrs['device']) if dev.is_vcuda(): dev = Device('cuda', dev.id) node.attrs['device'] = dev for inp in node.inputs: if inp.device.is_vcuda(): inp.cuda_() for outp in node.outputs: if outp.device.is_vcuda(): outp.cuda_()
[docs]def trace_from(tensor: Union[Tensor, List[Tensor]], inputs: Optional[Union[Tensor, List[Tensor]]] = None) -> FlowGraph: """ Trace the flow graph given the output tensor(s). Each :class:`hidet.graph.Tensor` has an attribute :class:`hidet.graph.Tensor.trace` which indicates how the tensor is generated. If the tensor is generated by an operator with symbolic input(s), the tensor itself is also symbolic. And the tensor will have a reference to the operator that generates it. The reference is stored in this attribute. What this function does is to walk through the trace of the given tensor(s) and construct a flow graph. When there are multiple symbol inputs, it is mandatory to specify the "inputs" argument explicitly to avoid ambiguity. Parameters ---------- tensor: Tensor or List[Tensor] The output tensor(s) that we trace from. inputs: Optional, Tensor or List[Tensor] The inputs of the flow graph. When there is only a single symbol tensor in the flow graph, it is optional. When there are multiple inputs, this is required to specify the input order. Returns ------- ret: FlowGraph The flow graph that outputs the given input tensor(s). """ if isinstance(tensor, Tensor): if tensor.trace is None: raise ValueError('trace_from expects symbol tensor(s).') outputs = [tensor] else: outputs = list(tensor) assert all(isinstance(v, Tensor) for v in outputs) if inputs is not None: if isinstance(inputs, Tensor): inputs = [inputs] else: inputs = list(inputs) return FlowGraph(outputs, inputs).update_nodes()
def save_graph(graph: FlowGraph, fname: str): graph.save(fname) def load_graph(fname: str) -> FlowGraph: return FlowGraph.load(fname)