from typing import List
from typing import Tuple
from typing import Type
from typing import Union
from typing import cast
from pytezos.context.abstract import AbstractContext
from pytezos.michelson.instructions.adt import PairInstruction
from pytezos.michelson.instructions.base import MichelsonInstruction
from pytezos.michelson.instructions.base import Wildcard
from pytezos.michelson.instructions.base import format_stdout
from pytezos.michelson.instructions.stack import PushInstruction
from pytezos.michelson.micheline import MichelineSequence
from pytezos.michelson.micheline import MichelsonRuntimeError
from pytezos.michelson.stack import MichelsonStack
from pytezos.michelson.types import BoolType
from pytezos.michelson.types import LambdaType
from pytezos.michelson.types import ListType
from pytezos.michelson.types import MapType
from pytezos.michelson.types import MichelsonType
from pytezos.michelson.types import OptionType
from pytezos.michelson.types import OrType
from pytezos.michelson.types import PairType
from pytezos.michelson.types import SetType
[docs]def execute_dip(
prim: str,
stack: MichelsonStack,
stdout: List[str],
count: int,
body: Type[MichelsonInstruction],
context: AbstractContext,
) -> MichelsonInstruction:
stdout.append(format_stdout(prim, [*Wildcard.n(count)], []))
stack.protect(count=count)
item = body.execute(stack, stdout, context=context)
stack.restore(count=count)
stdout.append(format_stdout(prim, [], [*Wildcard.n(count)], count))
return item
[docs]class DipnInstruction(MichelsonInstruction, prim='DIP', args_len=2):
def __init__(self, item: MichelsonInstruction):
super(DipnInstruction, self).__init__()
self.item = item
[docs] @classmethod
def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext):
depth = cls.args[0].get_int() # type: ignore
item = execute_dip(cls.prim, stack, stdout, count=depth, body=cls.args[1], context=context) # type: ignore
return cls(item)
[docs]class DipInstruction(MichelsonInstruction, prim='DIP', args_len=1):
def __init__(self, item: MichelsonInstruction):
super(DipInstruction, self).__init__()
self.item = item
[docs] @classmethod
def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext):
item = execute_dip(cls.prim, stack, stdout, count=1, body=cls.args[0], context=context) # type: ignore
return cls(item)
[docs]class LambdaInstruction(MichelsonInstruction, prim='LAMBDA', args_len=3):
[docs] @classmethod
def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext):
lambda_type = LambdaType.create_type(args=cls.args[:2])
res = lambda_type(cls.args[2]) # type: ignore
stack.push(res)
stdout.append(format_stdout(cls.prim, [], [res])) # type: ignore
return cls(stack_items_added=1)
[docs]class LambdaRecInstruction(MichelsonInstruction, prim='LAMBDA_REC', args_len=3):
depth = 0
[docs] @classmethod
def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext):
lambda_type = LambdaType.create_type(args=cls.args[:2])
inner = LambdaRecInstruction.create_type(args=cls.args.copy())
inner.depth = cls.depth + 1 # type: ignore
if cls.depth + 1 > 256:
raise MichelsonRuntimeError("Maximum recursive depth reached")
body = MichelineSequence.create_type(args=[inner, cls.args[2]])
res = lambda_type(body) # type: ignore
stack.push(res)
stdout.append(format_stdout(cls.prim, [], [res])) # type: ignore
return cls(stack_items_added=1)
[docs]class ExecInstruction(MichelsonInstruction, prim='EXEC'):
def __init__(self, item: MichelsonInstruction):
super(ExecInstruction, self).__init__(stack_items_added=1)
self.item = item
[docs] @classmethod
def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext):
param, lambda_ = cast(Tuple[MichelsonType, LambdaType], stack.pop2())
assert isinstance(lambda_, LambdaType), f'expected lambda, got {lambda_.prim}'
param.assert_type_equal(lambda_.args[0])
stdout.append(format_stdout(cls.prim, [param, lambda_], [])) # type: ignore
lambda_stack = MichelsonStack.from_items([param])
lambda_body = cast(MichelsonInstruction, lambda_.value)
item = lambda_body.execute(lambda_stack, stdout, context=context)
res = lambda_stack.pop1()
res.assert_type_equal(lambda_.args[1])
assert len(lambda_stack) == 0, f'lambda stack is not empty {lambda_stack}'
stack.push(res)
return cls(item)
[docs]class ApplyInstruction(MichelsonInstruction, prim='APPLY'):
[docs] @classmethod
def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext):
left, lambda_ = cast(Tuple[MichelsonType, LambdaType], stack.pop2())
lambda_.assert_type_in(LambdaType)
lambda_.args[0].assert_type_in(PairType)
left_type, right_type = lambda_.args[0].args
left.assert_type_equal(left_type)
new_value = MichelineSequence.create_type(
args=[
PushInstruction.create_type(args=[left_type, left.to_literal()]),
PairInstruction,
lambda_.value,
]
)
res = LambdaType.create_type(args=[right_type, lambda_.args[1]])(new_value) # type: ignore
stack.push(res)
stdout.append(format_stdout(cls.prim, [left, lambda_], [res])) # type: ignore
return cls(stack_items_added=1)
[docs]class FailwithInstruction(MichelsonInstruction, prim='FAILWITH'):
[docs] @classmethod
def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext):
a = stack.pop1()
assert a.is_packable(), f'expected packable type, got {a.prim}'
raise MichelsonRuntimeError(repr(a))
[docs]class IfInstruction(MichelsonInstruction, prim='IF', args_len=2):
def __init__(self, item: MichelsonInstruction):
super(IfInstruction, self).__init__()
self.item = item
[docs] @classmethod
def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext):
cond = cast(BoolType, stack.pop1())
cond.assert_type_equal(BoolType)
stdout.append(format_stdout(cls.prim, [cond], [])) # type: ignore
branch = cls.args[0] if bool(cond) else cls.args[1]
item = branch.execute(stack, stdout, context=context)
return cls(item)
[docs]class IfConsInstruction(MichelsonInstruction, prim='IF_CONS', args_len=2):
def __init__(self, stack_items_added: int, item: MichelsonInstruction):
super(IfConsInstruction, self).__init__(stack_items_added)
self.item = item
[docs] @classmethod
def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext):
lst = cast(ListType, stack.pop1())
lst.assert_type_in(ListType)
if len(lst) > 0:
head, tail = lst.split_head()
stack.push(tail)
stack.push(head)
stdout.append(format_stdout(cls.prim, [lst], [head, tail])) # type: ignore
branch = cls.args[0]
stack_items_added = 2
else:
stdout.append(format_stdout(cls.prim, [lst], [])) # type: ignore
branch = cls.args[1]
stack_items_added = 0
item = branch.execute(stack, stdout, context=context)
return cls(stack_items_added, item)
[docs]class IfLeftInstruction(MichelsonInstruction, prim='IF_LEFT', args_len=2):
def __init__(self, item: MichelsonInstruction):
super(IfLeftInstruction, self).__init__(stack_items_added=1)
self.item = item
[docs] @classmethod
def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext):
or_ = cast(OrType, stack.pop1())
or_.assert_type_in(OrType)
branch = cls.args[0] if or_.is_left() else cls.args[1]
res = or_.resolve()
stack.push(res)
stdout.append(format_stdout(cls.prim, [or_], [res])) # type: ignore
item = branch.execute(stack, stdout, context=context)
return cls(item)
[docs]class IfNoneInstruction(MichelsonInstruction, prim='IF_NONE', args_len=2):
def __init__(self, stack_items_added: int, item: MichelsonInstruction):
super(IfNoneInstruction, self).__init__(stack_items_added)
self.item = item
[docs] @classmethod
def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext):
opt = cast(OptionType, stack.pop1())
opt.assert_type_in(OptionType)
if opt.is_none():
branch = cls.args[0]
stdout.append(format_stdout(cls.prim, [opt], [])) # type: ignore
stack_items_added = 0
else:
some = opt.get_some()
stack.push(some)
stdout.append(format_stdout(cls.prim, [opt], [some])) # type: ignore
branch = cls.args[1]
stack_items_added = 1
item = branch.execute(stack, stdout, context=context)
return cls(stack_items_added, item)
[docs]class LoopInstruction(MichelsonInstruction, prim='LOOP', args_len=1):
def __init__(self, items: List[MichelsonInstruction]):
super(LoopInstruction, self).__init__()
self.items = items
[docs] @classmethod
def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext):
items = []
while True:
cond = cast(BoolType, stack.pop1())
cond.assert_type_equal(BoolType)
stdout.append(format_stdout(cls.prim, [cond], [])) # type: ignore
if bool(cond):
item = cls.args[0].execute(stack, stdout, context=context)
items.append(item)
else:
break
return cls(items)
[docs]class LoopLeftInstruction(MichelsonInstruction, prim='LOOP_LEFT', args_len=1):
def __init__(self, stack_items_added: int, items: List[MichelsonInstruction]):
super(LoopLeftInstruction, self).__init__(stack_items_added)
self.items = items
[docs] @classmethod
def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext):
stack_items_added = 0
items = []
while True:
or_ = cast(OrType, stack.pop1())
or_.assert_type_in(OrType)
var = or_.resolve()
stack.push(var)
stack_items_added += 1
stdout.append(format_stdout(cls.prim, [or_], [var])) # type: ignore
if or_.is_left():
item = cls.args[0].execute(stack, stdout, context=context)
items.append(item)
else:
break
return cls(stack_items_added, items)
[docs]class MapInstruction(MichelsonInstruction, prim='MAP', args_len=1):
def __init__(self, stack_items_added: int, items: List[MichelsonInstruction]):
super(MapInstruction, self).__init__(stack_items_added)
self.items = items
[docs] @classmethod
def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext):
stack_items_added = 0 # noqa: SIM113
src = cast(Union[ListType, MapType], stack.pop1())
executions = []
items = []
popped = [src]
for elt in src:
if isinstance(src, MapType):
elt = PairType.from_comb(list(elt)) # type: ignore
stack.push(elt) # type: ignore
stack_items_added += 1
stdout.append(format_stdout(cls.prim, popped, [elt])) # type: ignore
execution = cls.args[0].execute(stack, stdout, context=context)
executions.append(execution)
new_elt = stack.pop1()
if isinstance(src, MapType):
items.append((elt[0], new_elt))
else:
items.append(new_elt) # type: ignore
popped = [new_elt] # type: ignore
if items:
res = type(src).from_items(items) # type: ignore
else:
res = src # TODO: need to deduce argument types
stack.push(res)
stack_items_added += 1
stdout.append(format_stdout(cls.prim, popped, [res])) # type: ignore
return cls(stack_items_added, executions)
[docs]class IterInstruction(MichelsonInstruction, prim='ITER', args_len=1):
def __init__(self, stack_items_added: int, items: List[MichelsonInstruction]):
super(IterInstruction, self).__init__(stack_items_added)
self.items = items
[docs] @classmethod
def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext):
stack_items_added = 0 # noqa: SIM113
src = cast(Union[ListType, MapType, SetType], stack.pop1())
executions = []
popped = [src]
for elt in src:
if isinstance(src, MapType):
elt = PairType.from_comb(list(elt)) # type: ignore
stack_items_added += 1
stack.push(elt) # type: ignore
stdout.append(format_stdout(cls.prim, popped, [elt])) # type: ignore
execution = cls.args[0].execute(stack, stdout, context=context)
executions.append(execution)
popped = []
return cls(stack_items_added, executions)