Skip to content

Hook Manager

hook_manager

Classes:

  • HookManager

    Manages hooks (DGHooks) for a DGraph, supporting both shared and key-specific hooks.

HookManager

HookManager(keys: List[str])

Manages hooks (DGHooks) for a DGraph, supporting both shared and key-specific hooks.

This class allows you to register hooks that modify or enrich batches of graph events via transformation on the current, and optionally past history of the temporal graph. The hook manager executed these transformation transparently to the user during data iteration. Hooks can be shared across all keys or specific to a key. Dependencies between hooks are automatically resolved using topological sorting based on their requires and produces attributes.

Parameters:

  • keys (List[str]) –

    List of valid keys for key-specific hooks. Each key can have its own set of hooks, in addition to shared hooks.

Raises:

Methods:

  • activate

    Context manager to temporarily set a key as active for hook execution.

  • execute_active_hooks

    Executes all hooks (shared + key-specific) for the active key on a batch.

  • register

    Registers a key-specific hook.

  • register_shared

    Registers a shared hook that runs for all keys.

  • reset_state

    Resets the internal state of all stateful hooks.

  • resolve_hooks

    Resolves hook execution order by topologically sorting them based on dependencies.

  • set_active_hooks

    Sets the currently active key for executing hooks.

Source code in tgm/hooks/hook_manager.py
38
39
40
41
42
43
44
45
46
def __init__(self, keys: List[str]) -> None:
    if not len(keys):
        raise ValueError('HookManager keys list must be non-empty')

    self._dirty: Dict[str, bool] = {k: False for k in keys}
    self._key_to_hooks: Dict[str, List[DGHook]] = {k: [] for k in keys}
    self._shared_hooks: List[DGHook] = []
    self._active_key: str | None = None
    self._registered_key = keys
activate
activate(key: str) -> Iterator[None]

Context manager to temporarily set a key as active for hook execution.

Parameters:

  • key (str) –

    The key to activate.

Source code in tgm/hooks/hook_manager.py
196
197
198
199
200
201
202
203
204
205
206
207
208
@contextmanager
def activate(self, key: str) -> Iterator[None]:
    """Context manager to temporarily set a key as active for hook execution.

    Args:
        key (str): The key to activate.
    """
    prev_key = self._active_key
    self.set_active_hooks(key)
    try:
        yield
    finally:
        self._active_key = prev_key  # Restore previous active key
execute_active_hooks
execute_active_hooks(dg: DGraph, batch: DGBatch) -> DGBatch

Executes all hooks (shared + key-specific) for the active key on a batch.

Parameters:

  • dg (DGraph) –

    The graph on which hooks operate.

  • batch (DGBatch) –

    The batch of events to modify.

Returns:

  • DGBatch ( DGBatch ) –

    The modified batch after all hooks are applied.

Raises:

  • RuntimeError

    If no active key is set.

  • UnresolvableHookDependenciesError

    If required attributes are missing or hooks form a cyclic dependency.

Source code in tgm/hooks/hook_manager.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
@log_latency(level=logging.DEBUG)
def execute_active_hooks(self, dg: DGraph, batch: DGBatch) -> DGBatch:
    """Executes all hooks (shared + key-specific) for the active key on a batch.

    Args:
        dg (DGraph): The graph on which hooks operate.
        batch (DGBatch): The batch of events to modify.

    Returns:
        DGBatch: The modified batch after all hooks are applied.

    Raises:
        RuntimeError: If no active key is set.
        UnresolvableHookDependenciesError: If required attributes are missing or hooks form a cyclic dependency.
    """
    if self._active_key is None:
        raise RuntimeError('No active key set. Use activate() context manager.')

    # Lazily validate and topological sort if needed
    key = self._active_key
    if self._dirty[key]:
        self.resolve_hooks(key)

    for hook in self._key_to_hooks[key]:
        start_time = time.perf_counter()
        batch = hook(dg, batch)
        latency = time.perf_counter() - start_time
        logger.debug('%s hook executed in %.4fs', hook.__class__.__name__, latency)

    return batch
register
register(key: str, hook: DGHook) -> None

Registers a key-specific hook.

Parameters:

  • key (str) –

    The key to associate the hook with.

  • hook (DGHook) –

    The hook to register.

Raises:

  • KeyError

    If key is not a declared key.

  • BadHookProtocolError

    If hook does not implement the DGHook protocol.

  • RuntimeError

    If called while a key is active.

Source code in tgm/hooks/hook_manager.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def register(self, key: str, hook: DGHook) -> None:
    """Registers a key-specific hook.

    Args:
        key (str): The key to associate the hook with.
        hook (DGHook): The hook to register.

    Raises:
        KeyError: If `key` is not a declared key.
        BadHookProtocolError: If `hook` does not implement the `DGHook` protocol.
        RuntimeError: If called while a key is active.
    """
    self._ensure_valid_key(key)
    self._ensure_valid_hook(hook)
    self._ensure_no_active_key()
    self._key_to_hooks[key].append(hook)
    self._dirty[key] = True  # Mark registered key as 'dirty'
    logger.debug('Registered hook: %s for key: %s', hook.__class__.__name__, key)
register_shared
register_shared(hook: DGHook) -> None

Registers a shared hook that runs for all keys.

Parameters:

  • hook (DGHook) –

    The hook to register.

Raises:

  • BadHookProtocolError

    If hook does not implement the DGHook protocol.

  • RuntimeError

    If called while a key is active.

Source code in tgm/hooks/hook_manager.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def register_shared(self, hook: DGHook) -> None:
    """Registers a shared hook that runs for all keys.

    Args:
        hook (DGHook): The hook to register.

    Raises:
        BadHookProtocolError: If `hook` does not implement the `DGHook` protocol.
        RuntimeError: If called while a key is active.
    """
    self._ensure_valid_hook(hook)
    self._ensure_no_active_key()
    self._shared_hooks.append(hook)
    for k in self._dirty:  # Mark all keys as 'dirty'
        self._dirty[k] = True
    logger.debug('Registered shared hook: %s', hook.__class__.__name__)
reset_state
reset_state(key: str | None = None) -> None

Resets the internal state of all stateful hooks.

Parameters:

  • key (str | None, default: None ) –

    If specified, resets only hooks for this key. Otherwise resets all keys and shared hooks.

Source code in tgm/hooks/hook_manager.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def reset_state(self, key: str | None = None) -> None:
    """Resets the internal state of all stateful hooks.

    Args:
        key (str | None): If specified, resets only hooks for this key.
            Otherwise resets all keys and shared hooks.
    """
    logger.debug('Resetting hooks for key: %s', key)
    if key is not None:
        self._ensure_valid_key(key)

    for hook in self._shared_hooks:
        logger.debug('Resetting state for shared hook: %s', hook.__class__.__name__)
        hook.reset_state()

    keys_to_reset = [key] if key is not None else list(self._key_to_hooks.keys())
    for k in keys_to_reset:
        for h in self._key_to_hooks[k]:
            logger.debug('Resetting state for keyed hook: %s', h.__class__.__name__)
            h.reset_state()
resolve_hooks
resolve_hooks(key: str | None = None) -> None

Resolves hook execution order by topologically sorting them based on dependencies.

Parameters:

  • key (str | None, default: None ) –

    If specified, resolves hooks only for this key. Otherwise resolves all keys.

Raises:

  • KeyError

    If key is invalid.

  • UnresolvableHookDependenciesError

    If required attributes are missing or hooks form a cyclic dependency.

Source code in tgm/hooks/hook_manager.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
def resolve_hooks(self, key: str | None = None) -> None:
    """Resolves hook execution order by topologically sorting them based on dependencies.

    Args:
        key (str | None): If specified, resolves hooks only for this key.
            Otherwise resolves all keys.

    Raises:
        KeyError: If `key` is invalid.
        UnresolvableHookDependenciesError: If required attributes are missing or hooks form a cyclic dependency.
    """
    if key is not None:
        self._ensure_valid_key(key)

    keys_to_validate = [key] if key else list(self._key_to_hooks.keys())
    for k in keys_to_validate:
        hooks = self._shared_hooks + [
            h for h in self._key_to_hooks[k] if h not in self._shared_hooks
        ]
        logger.debug('Running topological sort to resolve hooks for key: %s', k)
        self._key_to_hooks[k] = self._topological_sort_hooks(hooks)
        self._dirty[k] = False
set_active_hooks
set_active_hooks(key: str) -> None

Sets the currently active key for executing hooks.

Parameters:

  • key (str) –

    The key to activate.

Raises:

  • KeyError

    If key is not a declared key.

Source code in tgm/hooks/hook_manager.py
108
109
110
111
112
113
114
115
116
117
118
119
def set_active_hooks(self, key: str) -> None:
    """Sets the currently active key for executing hooks.

    Args:
        key (str): The key to activate.

    Raises:
        KeyError: If `key` is not a declared key.
    """
    self._ensure_valid_key(key)
    self._active_key = key
    logger.debug('Set active hooks to key: %s', key)