diff --git a/Pystan/config/constant_propagation.py b/Pystan/config/constant_propagation.py index a530c34071106acc53ced1ee96915ab2cb9deb93..9c7d6cdbac5c14f05f212c43a8f021b587816df3 100644 --- a/Pystan/config/constant_propagation.py +++ b/Pystan/config/constant_propagation.py @@ -84,24 +84,50 @@ class Constant_propagation(Transfer[state]): variables: frozenset[str] def __init__(self,instr): self.variables = variables_of_instr(instr) + def bottom(self) -> state: - ... + return None + def init_state(self) -> state: - ... + return {v: abstract_value(None) for v in self.variables} + def join(self,s1:state,s2:state) -> state: - ... + if s1 is None: return s2 + if s2 is None: return s1 + result = {} + for v in self.variables: + val1 = s1[v].value + val2 = s2[v].value + if val1 == val2: + result[v] = abstract_value(val1) + else: + result[v] = abstract_value(None) + return result def included(self,s1: state,s2: state) -> bool: - ... + if s1 is None: return True + if s2 is None: return False + for v in self.variables: + val1 = s1[v].value + val2 = s2[v].value + if val1 is not None and val1 != val2: + return False + return True def tr_skip(self,s: state) -> state: - ... + return s def tr_set(self,s: state,v: str,e: ArithExpr) -> state: - ... + if s is None: return None + new_env = s.copy() + val = eval_aexp(s, e) + if val is None: + return None + new_env[v] = val + return new_env def tr_test(self,s: state,c: BoolExpr) -> state: - ... + return s def tr_err(self,s: state,e: Expr) -> state: - ... + return s diff --git a/Pystan/config/iteration.py b/Pystan/config/iteration.py index f91169661c566371dc324842b44f562fe95b4d76..2ca3afbd1fdc314cc23cd489473003c90fd38db2 100644 --- a/Pystan/config/iteration.py +++ b/Pystan/config/iteration.py @@ -38,9 +38,93 @@ class Transfer[S](Protocol): """transfer state across a transition to error state""" ... +# def fixpoint_iteration[T](transfer: Transfer[T], cfg: Cfg) -> dict[Node,T]: +# """computes the fixpoint for the given transfer functions over the given CfG + +# output maps each node to the computed state +# """ + +# # Initialize the mapping +# mapping = {} +# for node in cfg.g.nodes: +# if node == cfg.init_node: +# mapping[node] = transfer.init_state() +# else: +# mapping[node] = transfer.bottom() + +# # Initialize the worklist with all edges from the initial node +# worklist = [] +# for (dst, label) in cfg.init_node.succs: +# worklist.append((cfg.init_node, dst, label)) + +# # Main iteration loop +# while worklist: +# # Take the first edge from the worklist +# src, dst, label = worklist.pop(0) + +# # Apply the transfer function +# src_state = mapping[src] +# new_state = transfer.transfer(label, src_state) + +# # Compute the join with the current state +# result_state = transfer.join(mapping[dst], new_state) + +# # Check if we need to update and propagate +# if not transfer.le(result_state, mapping[dst]): +# mapping[dst] = result_state + +# # Add all outgoing edges from dst to the worklist +# for (next_dst, next_label) in dst.succs: +# worklist.append((dst, next_dst, next_label)) + +# return mapping + + def fixpoint_iteration[T](transfer: Transfer[T], cfg: Cfg) -> dict[Node,T]: """computes the fixpoint for the given transfer functions over the given CfG - - output maps each node to the computed state + + output maps each node to the computed state """ - raise NotImplementedError + + # Étape 1 : Initialisation + mapping: dict[Node, T] = {} + for node in cfg.g.nodes: + if node == cfg.init_node: + mapping[node] = transfer.init_state() + else: + mapping[node] = transfer.bottom() + + # Étape 2 : Initialisation de la worklist avec les successeurs du nœud initial + worklist: list[tuple[Node, Node, Label]] = [] + for (dst, label) in cfg.init_node.succs: + worklist.append((cfg.init_node, dst, label)) + + # Étape 3 : Itération principale + while worklist: + src, dst, label = worklist.pop(0) + src_state = mapping[src] + + # Appliquer la bonne fonction de transfert selon le label + match label: + case LSkip(): + new_state = transfer.tr_skip(src_state) + case LSet(var, expr): + new_state = transfer.tr_set(src_state, var, expr) + case LTest(cond): + new_state = transfer.tr_test(src_state, cond) + case LErr(expr): + new_state = transfer.tr_err(src_state, expr) + case _: + continue # ou raise NotImplementedError + + # Joindre avec l'ancien état + joined_state = transfer.join(mapping[dst], new_state) + + # Si le nouvel état apporte de l'information, on met à jour + if not transfer.included(joined_state, mapping[dst]): + mapping[dst] = joined_state + # Et on ajoute les successeurs à la worklist + for (succ, lab) in dst.succs: + worklist.append((dst, succ, lab)) + + return mapping diff --git a/Pystan/config/opsem.py b/Pystan/config/opsem.py index 1313e985e2c11ca43cc8fcaa926eec02eba135f1..8cc38099adb87a52e25a533dca01efeed513b4e2 100644 --- a/Pystan/config/opsem.py +++ b/Pystan/config/opsem.py @@ -12,39 +12,105 @@ def eval_aexp(env: dict[str,int], exp: ArithExpr) -> int | None: """evaluates an arithmetic expression""" match exp: case AECst(value): - raise NotImplementedError + return value case AEVar(var): - raise NotImplementedError + if var in env: + return env[var] + else: + return None case AEUop(uop,expr): - raise NotImplementedError + val = eval_aexp(env, expr) + if val is None: + return None + + # Match on the unary operator + if uop == Uop.OPP: + return -val + else: + return None case AEBop(bop,left_expr,right_expr): - raise NotImplementedError + left_val = eval_aexp(env, left_expr) + right_val = eval_aexp(env, right_expr) + if left_val is None or right_val is None: + return None + + # Match on the binary operator using the available operations + if bop == Bop.ADD: + return left_val + right_val + elif bop == Bop.MUL: + return left_val * right_val + # Check for other potential operators in Bop + # Using string representation as fallback + elif str(bop) == "Bop.SUB" or str(bop) == "-": + return left_val - right_val + elif str(bop) == "Bop.DIV" or str(bop) == "/": + if right_val == 0: + return None # Division by zero + return left_val // right_val + else: + return None case _: assert False def eval_bexp(env: dict[str,int], exp:BoolExpr) -> bool | None: """evaluates a boolean expression""" match exp: case BEPlain(aexpr): - raise NotImplementedError + val = eval_aexp(env, aexpr) + if val is None: + return None + return val != 0 case BEEq(left_expr,right_expr): - raise NotImplementedError + left_val = eval_aexp(env, left_expr) + right_val = eval_aexp(env, right_expr) + if left_val is None or right_val is None: + return None + return left_val == right_val case BELeq(left_expr,right_expr): - raise NotImplementedError + left_val = eval_aexp(env, left_expr) + right_val = eval_aexp(env, right_expr) + if left_val is None or right_val is None: + return None + return left_val <= right_val case BENeg(expr): - raise NotImplementedError + val = eval_bexp(env, expr) + if val is None: + return None + return not val case _: assert False def eval_instr(env: dict[str,int], instr: Instr) -> dict[str,int] | None: """evaluates an instruction""" match instr: case ISkip(): - raise NotImplementedError + return env.copy() case ISet(var,expr): - raise NotImplementedError + val = eval_aexp(env, expr) + if val is None: + return None + new_env = env.copy() + new_env[var] = val + return new_env case ISeq(first,second): - raise NotImplementedError + env_after_first = eval_instr(env, first) + if env_after_first is None: + return None + return eval_instr(env_after_first, second) case ICond(cond,true_branch,false_branch): - raise NotImplementedError + cond_val = eval_bexp(env, cond) + if cond_val is None: + return None + if cond_val: + return eval_instr(env, true_branch) + else: + return eval_instr(env, false_branch) case ILoop(cond,body): - raise NotImplementedError + cond_val = eval_bexp(env, cond) + if cond_val is None: + return None + if not cond_val: + return env.copy() + env_after_body = eval_instr(env, body) + if env_after_body is None: + return None + return eval_instr(env_after_body, ILoop(cond, body)) case _: assert False