Source code for pytezos.michelson.program

from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from typing import Type
from typing import cast

from pytezos.context.impl import ExecutionContext
from pytezos.crypto.encoding import base58_encode
from pytezos.michelson.instructions.base import MichelsonInstruction
from pytezos.michelson.instructions.base import format_stdout
from pytezos.michelson.instructions.tzt import BigMapInstruction
from pytezos.michelson.instructions.tzt import StackEltInstruction
from pytezos.michelson.micheline import MichelineSequence
from pytezos.michelson.micheline import get_script_section
from pytezos.michelson.micheline import get_script_sections
from pytezos.michelson.micheline import try_catch
from pytezos.michelson.micheline import validate_sections
from pytezos.michelson.sections.code import CodeSection
from pytezos.michelson.sections.parameter import ParameterSection
from pytezos.michelson.sections.storage import StorageSection
from pytezos.michelson.sections.tzt import AmountSection
from pytezos.michelson.sections.tzt import BalanceSection
from pytezos.michelson.sections.tzt import BigMapsSection
from pytezos.michelson.sections.tzt import ChainIdSection
from pytezos.michelson.sections.tzt import InputSection
from pytezos.michelson.sections.tzt import NowSection
from pytezos.michelson.sections.tzt import OutputSection
from pytezos.michelson.sections.tzt import SelfSection
from pytezos.michelson.sections.tzt import SenderSection
from pytezos.michelson.sections.tzt import SourceSection
from pytezos.michelson.sections.view import ViewSection
from pytezos.michelson.stack import MichelsonStack
from pytezos.michelson.types import ListType
from pytezos.michelson.types import MichelsonType
from pytezos.michelson.types import OperationType
from pytezos.michelson.types import PairType


[docs]class MichelsonProgram: """Michelson .tz contract interpreter interface""" parameter: Type[ParameterSection] storage: Type[StorageSection] code: Type[CodeSection] views: List[Type[ViewSection]] def __init__(self, name: str, parameter: ParameterSection, storage: StorageSection) -> None: self.name = name self.parameter_value = parameter self.storage_value = storage
[docs] @staticmethod def load(context: ExecutionContext, with_code=False) -> Type['MichelsonProgram']: """Create MichelsonProgram type from filled context""" cls = type( MichelsonProgram.__name__, (MichelsonProgram,), { 'parameter': ParameterSection.match(context.get_parameter_expr()), 'storage': StorageSection.match(context.get_storage_expr()), 'code': CodeSection.match(context.get_code_expr() if with_code else []), 'views': [ViewSection.match(expr) for expr in context.get_views_expr()] if with_code else [], }, ) return cast(Type['MichelsonProgram'], cls)
[docs] @staticmethod def create(sequence: Type[MichelineSequence]) -> Type['MichelsonProgram']: """Create MichelsonProgram type from micheline""" validate_sections( sequence, ( 'parameter', 'storage', 'code', ), ) cls = type( MichelsonProgram.__name__, (MichelsonProgram,), { 'parameter': get_script_section(sequence, cls=ParameterSection, required=True), # type: ignore 'storage': get_script_section(sequence, cls=StorageSection, required=True), # type: ignore 'code': get_script_section(sequence, cls=CodeSection, required=True), # type: ignore 'views': get_script_sections(sequence, cls=ViewSection), # type: ignore }, ) return cast(Type['MichelsonProgram'], cls)
[docs] @staticmethod def match(expr) -> Type['MichelsonProgram']: seq = cast(Type[MichelineSequence], MichelineSequence.match(expr)) if not issubclass(seq, MichelineSequence): raise Exception(f'Expected sequence, got {seq.prim}') return MichelsonProgram.create(seq)
[docs] @classmethod def as_micheline_expr(cls) -> List[Dict[str, Any]]: return [ cls.parameter.as_micheline_expr(), cls.storage.as_micheline_expr(), cls.code.as_micheline_expr(), *[view.as_micheline_expr() for view in cls.views], ]
[docs] @classmethod def get_view(cls, name: str) -> Type[ViewSection]: return next(view for view in cls.views if view.name == name)
[docs] @classmethod def instantiate(cls, entrypoint: str, parameter, storage) -> 'MichelsonProgram': parameter_value = cls.parameter.from_parameters({'entrypoint': entrypoint, 'value': parameter}) storage_value = cls.storage.from_micheline_value(storage) return cls(entrypoint, parameter_value, storage_value)
[docs] @classmethod def instantiate_view(cls, name: str, parameter, storage) -> 'MichelsonProgram': view = cls.get_view(name) parameter_ty = ParameterSection.create_type(args=[view.args[1]]) parameter_value = parameter_ty.from_micheline_value(parameter) storage_value = cls.storage.from_micheline_value(storage) return cls(name, parameter_value, storage_value)
[docs] @try_catch('BEGIN') def begin(self, stack: MichelsonStack, stdout: List[str], context: ExecutionContext) -> None: """Prepare stack for contract execution""" self.parameter_value.attach_context(context) self.storage_value.attach_context(context) res = PairType.from_comb([self.parameter_value.item, self.storage_value.item]) stack.push(res) stdout.append(format_stdout(f'BEGIN %{self.name}', [], [res]))
[docs] def execute(self, stack: MichelsonStack, stdout: List[str], context: ExecutionContext) -> MichelsonInstruction: """Execute contract in interpreter""" return cast(MichelsonInstruction, self.code.args[0].execute(stack, stdout, context))
[docs] def execute_view(self, stack: MichelsonStack, stdout: List[str], context: ExecutionContext): """Execute view in interpreter""" view = self.get_view(self.name) return cast(MichelsonInstruction, view.args[3].execute(stack, stdout, context))
[docs] @try_catch('END') def end( self, stack: MichelsonStack, stdout: List[str], output_mode='readable', ) -> Tuple[List[dict], Any, List[dict], PairType]: """Finish contract execution""" res = cast(PairType, stack.pop1()) if len(stack): raise Exception(f'Stack is not empty: {repr(stack)}') res.assert_type_equal( PairType.create_type( args=[ListType.create_type(args=[OperationType]), self.storage.args[0]], ), message='list of operations + resulting storage', ) operations = [op.content for op in res.items[0]] # type: ignore lazy_diff = [] # type: ignore storage = res.items[1].aggregate_lazy_diff(lazy_diff).to_micheline_value(mode=output_mode) stdout.append(format_stdout(f'END %{self.name}', [res], [])) return operations, storage, lazy_diff, res
[docs] @try_catch('RET') def ret( self, stack: MichelsonStack, stdout: List[str], output_mode='readable', ) -> MichelsonType: view = self.get_view(self.name) res = stack.pop1() if len(stack): raise Exception(f'Stack is not empty: {repr(stack)}') res.assert_type_equal(view.args[2], message='view return type') stdout.append(format_stdout(f'RET %{self.name}', [res], [])) return view.args[2].from_micheline_value(res.to_micheline_value(mode=output_mode))
[docs]class TztMichelsonProgram: """Michelson .tzt contract interpreter interface""" code: Type[CodeSection] input: Type[InputSection] output: Type[OutputSection] big_maps: Optional[Type[BigMapsSection]]
[docs] @staticmethod def load(context: ExecutionContext, with_code=False): """Create TztMichelsonProgram type from filled context""" cls = type( TztMichelsonProgram.__name__, (TztMichelsonProgram,), { 'input': InputSection.match(context.get_input_expr()), 'output': OutputSection.match(context.get_output_expr()), 'code': CodeSection.match(context.get_code_expr() if with_code else []), 'big_maps': BigMapsSection.match(context.get_big_maps_expr()) if context.get_big_maps_expr() else None, }, ) return cast(Type['TztMichelsonProgram'], cls)
[docs] @staticmethod def create(sequence: Type[MichelineSequence]) -> Type['TztMichelsonProgram']: """Create TztMichelsonProgram type from micheline""" validate_sections(sequence, ('input', 'output', 'code')) cls = type( TztMichelsonProgram.__name__, (TztMichelsonProgram,), { 'input': get_script_section(sequence, cls=InputSection, required=True), # type: ignore 'output': get_script_section(sequence, cls=OutputSection, required=True), # type: ignore 'code': get_script_section(sequence, cls=CodeSection, required=True), # type: ignore 'big_maps': get_script_section(sequence, cls=BigMapsSection, required=False), # type: ignore }, ) return cast(Type['TztMichelsonProgram'], cls)
[docs] @staticmethod def match(expr) -> Type['TztMichelsonProgram']: seq = cast(Type[MichelineSequence], MichelineSequence.match(expr)) if not issubclass(seq, MichelineSequence): raise Exception(f'expected sequence, got {seq.prim}') return TztMichelsonProgram.create(seq)
[docs] @classmethod def as_micheline_expr(cls) -> List[Dict[str, Any]]: # TODO: Serialize all sections return [ cls.code.as_micheline_expr(), cls.input.as_micheline_expr(), cls.output.as_micheline_expr(), ]
[docs] @classmethod def instantiate(cls) -> 'TztMichelsonProgram': return cls()
[docs] def fill_context(self, script, context: ExecutionContext) -> None: sender = context.get_sender_expr() if sender: context.sender = SenderSection.match(sender).args[0].get_string() # type: ignore amount = context.get_amount_expr() if amount: context.amount = AmountSection.match(amount).args[0].get_int() # type: ignore balance = context.get_balance_expr() if balance: context.balance = BalanceSection.match(balance).args[0].get_int() # type: ignore _self = context.get_self_expr() if _self: context.address = SelfSection.match(_self).args[0].get_string() # type: ignore now = context.get_now_expr() if now: context.now = NowSection.match(now).args[0].get_int() # type: ignore source = context.get_source_expr() if source: context.source = SourceSection.match(source).args[0].get_string() # type: ignore chain_id = context.get_chain_id_expr() if chain_id: # FIXME: Move to some common place context.chain_id = base58_encode( cast(bytes, ChainIdSection.match(chain_id).args[0].literal), prefix=b'Net', ).decode()
[docs] def register_bigmaps(self, stack: MichelsonStack, stdout: List[str], context: ExecutionContext) -> None: if self.big_maps: for item in self.big_maps.args[0].args[::-1]: if not issubclass(item, BigMapInstruction): raise Exception('Only `Big_map` instructions can be used in `big_maps` section') item.add(stack, stdout, context)
[docs] def begin(self, stack: MichelsonStack, stdout: List[str], context: ExecutionContext) -> None: """Prepare stack for contract execution""" for item in self.input.args[0].args[::-1]: if issubclass(item, StackEltInstruction): item.push(stack, stdout, context) else: raise Exception('Only `Stack_elt` instructions can be used in `input` section', item)
[docs] def execute(self, stack: MichelsonStack, stdout: List[str], context: ExecutionContext) -> MichelsonInstruction: """Execute contract in interpreter""" return cast(MichelsonInstruction, self.code.args[0].execute(stack, stdout, context))
[docs] def end(self, stack: MichelsonStack, stdout: List[str], context: ExecutionContext) -> None: """Finish contract execution""" for item in self.output.args[0].args: if not issubclass(item, StackEltInstruction): raise Exception('Only `Stack_elt` instructions can be used in `output` section') item.pull(stack, stdout, context) if len(stack): raise Exception('Stack is not empty after processing `output` section')