Source code for pytezos.michelson.instructions.tezos

import logging
from typing import List
from typing import Optional
from typing import Tuple
from typing import Type
from typing import cast

from pytezos.context.abstract import AbstractContext
from pytezos.michelson.instructions.base import MichelsonInstruction
from pytezos.michelson.instructions.base import format_stdout
from pytezos.michelson.micheline import MichelineLiteral
from pytezos.michelson.micheline import MichelineSequence
from pytezos.michelson.micheline import MichelsonRuntimeError
from pytezos.michelson.sections import ParameterSection
from pytezos.michelson.sections import StorageSection
from pytezos.michelson.sections import ViewSection
from pytezos.michelson.stack import MichelsonStack
from pytezos.michelson.types import AddressType
from pytezos.michelson.types import ChainIdType
from pytezos.michelson.types import ContractType
from pytezos.michelson.types import KeyHashType
from pytezos.michelson.types import MutezType
from pytezos.michelson.types import NatType
from pytezos.michelson.types import OperationType
from pytezos.michelson.types import OptionType
from pytezos.michelson.types import PairType
from pytezos.michelson.types import TimestampType
from pytezos.michelson.types import UnitType
from pytezos.michelson.types.base import MichelsonType


[docs]class AmountInstruction(MichelsonInstruction, prim='AMOUNT'):
[docs] @classmethod def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext): amount = context.get_amount() res = MutezType.from_value(amount) stack.push(res) stdout.append(format_stdout(cls.prim, [], [res])) # type: ignore return cls(stack_items_added=1)
[docs]class BalanceInstruction(MichelsonInstruction, prim='BALANCE'):
[docs] @classmethod def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext): balance = context.get_balance() res = MutezType.from_value(balance) stack.push(res) stdout.append(format_stdout(cls.prim, [], [res])) # type: ignore return cls(stack_items_added=1)
[docs]class ChainIdInstruction(MichelsonInstruction, prim='CHAIN_ID'):
[docs] @classmethod def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext): chain_id = context.get_chain_id() res = ChainIdType.from_value(chain_id) stack.push(res) stdout.append(format_stdout(cls.prim, [], [res])) # type: ignore return cls(stack_items_added=1)
[docs]def get_entrypoint_type(context: AbstractContext, name: str, address=None) -> Optional[Type[MichelsonType]]: expr = context.get_parameter_expr(address) if expr is None: return None parameter = ParameterSection.match(expr) entrypoints = parameter.list_entrypoints() assert name in entrypoints, f'unknown entrypoint {name}' return entrypoints[name]
[docs]class SelfInstruction(MichelsonInstruction, prim='SELF'):
[docs] @classmethod def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext): entrypoint = next(iter(cls.field_names), 'default') self_type = get_entrypoint_type(context, entrypoint) assert self_type, f'parameter type is not defined' self_address = context.get_self_address() res_type = ContractType.create_type(args=[self_type]) res = res_type.from_value(f'{self_address}%{entrypoint}') # type: ignore stack.push(res) stdout.append(format_stdout(cls.prim, [], [res])) # type: ignore return cls(stack_items_added=1)
[docs]class SelfAddressInstruction(MichelsonInstruction, prim='SELF_ADDRESS'):
[docs] @classmethod def execute(cls, stack: 'MichelsonStack', stdout: List[str], context: AbstractContext): res = AddressType.from_value(context.get_self_address()) stack.push(res) stdout.append(format_stdout(cls.prim, [], [res])) # type: ignore return cls(stack_items_added=1)
[docs]class SenderInstruction(MichelsonInstruction, prim='SENDER'):
[docs] @classmethod def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext): sender = context.get_sender() res = AddressType.from_value(sender) stack.push(res) stdout.append(format_stdout(cls.prim, [], [res])) # type: ignore return cls(stack_items_added=1)
[docs]class SourceInstruction(MichelsonInstruction, prim='SOURCE'):
[docs] @classmethod def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext): source = context.get_source() res = AddressType.from_value(source) stack.push(res) stdout.append(format_stdout(cls.prim, [], [res])) # type: ignore return cls(stack_items_added=1)
[docs]class NowInstruction(MichelsonInstruction, prim='NOW'):
[docs] @classmethod def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext): now = context.get_now() res = TimestampType.from_value(now) stack.push(res) stdout.append(format_stdout(cls.prim, [], [res])) # type: ignore return cls(stack_items_added=1)
[docs]class AddressInstruction(MichelsonInstruction, prim='ADDRESS'):
[docs] @classmethod def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext): contract = cast(ContractType, stack.pop1()) contract.assert_type_in(ContractType) res = AddressType.from_value(contract.get_address()) stack.push(res) stdout.append(format_stdout(cls.prim, [contract], [res])) # type: ignore return cls(stack_items_added=1)
[docs]class ContractInstruction(MichelsonInstruction, prim='CONTRACT', args_len=1):
[docs] @classmethod def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext): entrypoint = next(iter(cls.field_names), 'default') address = cast(AddressType, stack.pop1()) address.assert_type_in(AddressType) entrypoint_type = get_entrypoint_type(context, entrypoint, address=str(address)) contract_type = ContractType.create_type(args=cls.args) try: if entrypoint_type is None: stdout.append(f'{cls.prim}: skip type checking for {str(address)}') else: entrypoint_type.assert_type_equal(cls.args[0]) res = OptionType.from_some(contract_type.from_value(f'{str(address)}%{entrypoint}')) # type: ignore except AssertionError: res = OptionType.none(contract_type) stack.push(res) stdout.append(format_stdout(cls.prim, [address], [res])) # type: ignore return cls(stack_items_added=1)
[docs]class ImplicitAccountInstruction(MichelsonInstruction, prim='IMPLICIT_ACCOUNT'):
[docs] @classmethod def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext): key_hash = cast(KeyHashType, stack.pop1()) key_hash.assert_type_equal(KeyHashType) res = ContractType.create_type(args=[UnitType]).from_value(str(key_hash)) # type: ignore stack.push(res) stdout.append(format_stdout(cls.prim, [key_hash], [res])) # type: ignore return cls(stack_items_added=1)
[docs]class CreateContractInstruction(MichelsonInstruction, prim='CREATE_CONTRACT', args_len=1):
[docs] @classmethod def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext): sequence = cast(MichelineSequence, cls.args[0]) assert len(sequence.args) >= 3, f'expected more than 2 sections, got {len(sequence.args)}' assert {arg.prim for arg in sequence.args[:3]} == {'parameter', 'storage', 'code'}, f'unexpected sections' storage_type = cast(Type[MichelsonType], next(arg.args[0] for arg in sequence.args if arg.prim == 'storage')) delegate, amount, initial_storage = cast(Tuple[OptionType, MutezType, MichelsonType], stack.pop3()) delegate.assert_type_equal(OptionType.create_type(args=[KeyHashType])) amount.assert_type_equal(MutezType) initial_storage.assert_type_equal(storage_type) originated_address = AddressType.from_value(context.get_originated_address()) context.spend_balance(int(amount)) origination = OperationType.origination( source=context.get_self_address(), script=cls.args[0], # type: ignore storage=initial_storage, balance=int(amount), delegate=None if delegate.is_none() else str(delegate.get_some()), ) stack.push(originated_address) stack.push(origination) stdout.append(format_stdout(cls.prim, [delegate, amount, initial_storage], [origination, originated_address])) # type: ignore return cls(stack_items_added=2)
[docs]class SetDelegateInstruction(MichelsonInstruction, prim='SET_DELEGATE'):
[docs] @classmethod def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext): delegate = cast(OptionType, stack.pop1()) delegate.assert_type_equal(OptionType.create_type(args=[KeyHashType])) delegation = OperationType.delegation( source=context.get_self_address(), delegate=None if delegate.is_none() else str(delegate.get_some()), ) stack.push(delegation) stdout.append(format_stdout(cls.prim, [delegate], [delegation])) # type: ignore return cls(stack_items_added=1)
[docs]class TransferTokensInstruction(MichelsonInstruction, prim='TRANSFER_TOKENS'):
[docs] @classmethod def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext): parameter, amount, destination = cast(Tuple[MichelsonType, MutezType, ContractType], stack.pop3()) amount.assert_type_equal(MutezType) assert isinstance(destination, ContractType), f'expected contract, got {destination.prim}' param_type = destination.args[0] parameter.assert_type_equal(param_type) ep_type = get_entrypoint_type(context, destination.get_entrypoint(), address=destination.get_address()) if ep_type: parameter.assert_type_equal(ep_type, message='destination contract parameter') transaction = OperationType.transaction( source=context.get_self_address(), destination=destination.get_address(), amount=int(amount), entrypoint=destination.get_entrypoint(), value=parameter.to_micheline_value(), param_type=param_type, ) stack.push(transaction) stdout.append(format_stdout(cls.prim, [parameter, amount, destination], [transaction])) # type: ignore return cls(stack_items_added=1)
[docs]class VotingPowerInstruction(MichelsonInstruction, prim='VOTING_POWER'):
[docs] @classmethod def execute(cls, stack: 'MichelsonStack', stdout: List[str], context: AbstractContext): address = cast(KeyHashType, stack.pop1()) address.assert_type_equal(KeyHashType) res = NatType.from_value(context.get_voting_power(str(address))) stack.push(res) stdout.append(format_stdout(cls.prim, [address], [res])) # type: ignore return cls(stack_items_added=1)
[docs]class TotalVotingPowerInstruction(MichelsonInstruction, prim='TOTAL_VOTING_POWER'):
[docs] @classmethod def execute(cls, stack: 'MichelsonStack', stdout: List[str], context: AbstractContext): res = NatType.from_value(context.get_total_voting_power()) stack.push(res) stdout.append(format_stdout(cls.prim, [], [res])) # type: ignore return cls(stack_items_added=1)
[docs]class LevelInstruction(MichelsonInstruction, prim='LEVEL'):
[docs] @classmethod def execute(cls, stack: 'MichelsonStack', stdout: List[str], context: AbstractContext): res = NatType.from_value(context.get_level()) stack.push(res) stdout.append(format_stdout(cls.prim, [], [res])) # type: ignore return cls(stack_items_added=1)
[docs]class ViewInstruction(MichelsonInstruction, prim='VIEW', args_len=2):
[docs] @classmethod def execute(cls, stack: 'MichelsonStack', stdout: List[str], context: AbstractContext): input_value, view_address = cast(Tuple[MichelsonType, AddressType], stack.pop2()) name = cast(Type[MichelineLiteral], cls.args[0]).get_string() address: Optional[str] = str(view_address) if address == context.get_self_address(): address = None else: # FIXME: spawn new context with patched BALANCE and others logging.warning('PyTezos does not support external views with BALANCE or other context-dependent opcodes') return_ty = cast(Type[MichelsonType], cls.args[1]) result = context.get_view_result(address=address, name=name) if result is not None: logging.info('Using patched VIEW result') res = OptionType.from_some(return_ty.from_python_object(result)) else: try: view_expr = context.get_view_expr(name, address=address) if view_expr is None: raise MichelsonRuntimeError(f'Failed to load view {str(view_address)}%{name}') view_ty = ViewSection.match(view_expr) return_ty.assert_type_equal(view_ty.args[2], message=f'view {name} return type') except (MichelsonRuntimeError, AssertionError) as e: stdout.append(f'VIEW: {str(e)}') res = OptionType.none(return_ty) else: storage_expr = context.get_storage_value(address) storage_ty = StorageSection.match(context.get_storage_expr()) storage_value = storage_ty.from_micheline_value(storage_expr).item parameter = PairType.from_comb([input_value, storage_value]) view_stack = MichelsonStack([parameter]) # FIXME: need to patch context.balance view_code = cast(MichelineSequence, view_ty.args[3]) view_code.execute(view_stack, stdout, context) if len(view_stack) != 1: raise MichelsonRuntimeError('Expected single item on the stack, got', view_stack) res = OptionType.from_some(view_stack.pop1()) stack.push(res) stdout.append(format_stdout(cls.prim, [input_value, view_address], [res])) # type: ignore return cls(stack_items_added=1)
[docs]class OpenChestInstruction(MichelsonInstruction, prim='OPEN_CHEST'):
[docs] @classmethod def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext): raise NotImplementedError
[docs]class MinBlockTimeInstruction(MichelsonInstruction, prim='MIN_BLOCK_TIME'):
[docs] @classmethod def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext): res = NatType.from_value(context.get_min_block_time()) stack.push(res) stdout.append(format_stdout(cls.prim, [], [res])) # type: ignore return cls(stack_items_added=1)
[docs]class EmitInstruction(MichelsonInstruction, prim='EMIT', args_len=1):
[docs] @classmethod def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext): event_type = cast(Type[MichelsonType], cls.args[0]) payload = stack.pop1() payload.assert_type_equal(event_type) tag = cls.field_names[0] if len(cls.field_names) == 1 else '' res = OperationType.event( source=context.get_self_address(), event_type=event_type, payload=payload.to_micheline_value(), tag=tag ) stack.push(res) stdout.append(format_stdout(cls.prim, [payload], [res], arg=f'%{tag}')) # type: ignore return cls(stack_items_added=0)