Skip to content

Observe

Functions to observe and store LLM calls and Tapes in a persistent storage

Classes:

Functions:

SQLiteWriterThread

Methods:

  • __enter__

    Start the SQLite queue writer when entering the context.

  • __exit__

    Stop the SQLite queue writer when exiting the context.

  • wait_for_empty

    Wait for the queue to be empty and all tasks to be processed.

Attributes:

  • is_empty (bool) –

    Check if the queue is empty.

  • queue (Optional[Queue]) –

    Access the write queue.

Source code in tapeagents/observe.py
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
class SQLiteWriterThread:
    def __init__(self):
        self.write_queue: Optional[queue.Queue] = None
        self.writer_thread: Optional[threading.Thread] = None

    def __enter__(self):
        """Start the SQLite queue writer when entering the context."""
        if self.write_queue is not None:
            return self  # Already running

        self.write_queue = queue.Queue()
        self.writer_thread = threading.Thread(target=self._queue_sqlite_writer, daemon=True)
        self.writer_thread.start()

        # Set the global reference
        global _WRITER_THREAD
        _WRITER_THREAD = self
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Stop the SQLite queue writer when exiting the context."""
        if self.write_queue is not None and self.writer_thread is not None:
            self.wait_for_empty()
            self.write_queue.put(None)  # Signal thread to stop
            self.writer_thread.join()  # Wait for thread to finish
            self.write_queue = None
            self.writer_thread = None

            # Clear the global reference
            global _WRITER_THREAD
            _WRITER_THREAD = None

    def _queue_sqlite_writer(self):
        """The worker function that processes the queue."""
        while True:
            item = self.write_queue.get()
            if item is None:  # Stop signal
                break
            sqlite_writer(item)
            self.write_queue.task_done()

    def wait_for_empty(self, timeout: Optional[float] = None) -> bool:
        """Wait for the queue to be empty and all tasks to be processed."""
        if self.write_queue is None:
            return True

        try:
            self.write_queue.join()
            start_time = time.monotonic()
            logger.info("Waiting for SQLite queue to empty...")
            while not self.write_queue.empty():
                if timeout is not None:
                    elapsed = time.monotonic() - start_time
                    if elapsed >= timeout:
                        return False
                time.sleep(0.1)
                self.write_queue.join()
            return True
        except Exception as e:
            logger.error(f"Error while waiting for queue to empty: {e}")
            return False

    @property
    def queue(self) -> Optional[queue.Queue]:
        """Access the write queue."""
        return self.write_queue

    @property
    def is_empty(self) -> bool:
        """Check if the queue is empty."""
        return self.write_queue is None or self.write_queue.empty()

is_empty: bool property

Check if the queue is empty.

queue: Optional[queue.Queue] property

Access the write queue.

__enter__()

Start the SQLite queue writer when entering the context.

Source code in tapeagents/observe.py
261
262
263
264
265
266
267
268
269
270
271
272
273
def __enter__(self):
    """Start the SQLite queue writer when entering the context."""
    if self.write_queue is not None:
        return self  # Already running

    self.write_queue = queue.Queue()
    self.writer_thread = threading.Thread(target=self._queue_sqlite_writer, daemon=True)
    self.writer_thread.start()

    # Set the global reference
    global _WRITER_THREAD
    _WRITER_THREAD = self
    return self

__exit__(exc_type, exc_val, exc_tb)

Stop the SQLite queue writer when exiting the context.

Source code in tapeagents/observe.py
275
276
277
278
279
280
281
282
283
284
285
286
def __exit__(self, exc_type, exc_val, exc_tb):
    """Stop the SQLite queue writer when exiting the context."""
    if self.write_queue is not None and self.writer_thread is not None:
        self.wait_for_empty()
        self.write_queue.put(None)  # Signal thread to stop
        self.writer_thread.join()  # Wait for thread to finish
        self.write_queue = None
        self.writer_thread = None

        # Clear the global reference
        global _WRITER_THREAD
        _WRITER_THREAD = None

wait_for_empty(timeout=None)

Wait for the queue to be empty and all tasks to be processed.

Source code in tapeagents/observe.py
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
def wait_for_empty(self, timeout: Optional[float] = None) -> bool:
    """Wait for the queue to be empty and all tasks to be processed."""
    if self.write_queue is None:
        return True

    try:
        self.write_queue.join()
        start_time = time.monotonic()
        logger.info("Waiting for SQLite queue to empty...")
        while not self.write_queue.empty():
            if timeout is not None:
                elapsed = time.monotonic() - start_time
                if elapsed >= timeout:
                    return False
            time.sleep(0.1)
            self.write_queue.join()
        return True
    except Exception as e:
        logger.error(f"Error while waiting for queue to empty: {e}")
        return False

init_sqlite_if_not_exists(only_once=True)

Ensure that the tables exist in the sqlite database.

This is only done once per Python process. If you want to change the SQLite path during at run time, you can run this function manually with only_once=False.

Source code in tapeagents/observe.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def init_sqlite_if_not_exists(only_once: bool = True):
    """
    Ensure that the tables exist in the sqlite database.

    This is only done once per Python process.
    If you want to change the SQLite path during at run time, you can run this function manually
    with only_once=False.

    """
    global _checked_sqlite
    if _checked_sqlite and only_once:
        return

    path = sqlite_db_path()
    logger.info(f"use SQLite db at {path}")
    conn = sqlite3.connect(path)
    cursor = conn.cursor()
    cursor.execute("""               
    CREATE TABLE IF NOT EXISTS LLMCalls (
        prompt_id TEXT PRIMARY KEY,
        timestamp TEXT,
        prompt TEXT,
        output TEXT,
        prompt_length_tokens INTEGER,
        output_length_tokens INTEGER,
        cached INTEGER
    )
    """)
    # now create tape table with tape_id index and data column
    cursor.execute("""
    CREATE TABLE IF NOT EXISTS Tapes (
        tape_id TEXT PRIMARY KEY,
        timestamp TEXT,
        length INTEGER,
        metadata TEXT,
        context TEXT,
        steps TEXT
    )
    """)
    cursor.close()
    _checked_sqlite = True

sqlite_store_llm_call(call)

Standalone function to store LLM calls.

Will use the queue if available (within context manager), otherwise falls back to single-threaded mode.

Source code in tapeagents/observe.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def sqlite_store_llm_call(call: LLMCall):
    """Standalone function to store LLM calls.

    Will use the queue if available (within context manager),
    otherwise falls back to single-threaded mode.
    """
    if _WRITER_THREAD is not None and _WRITER_THREAD.queue is not None:
        # We're in a context manager, use the queue
        logger.debug("Using SQLite queue writing mode")
        _WRITER_THREAD.queue.put(call)
    else:
        # We're not in a context manager, use single-threaded mode
        logger.debug("Using single-threaded SQLite writing mode")
        sqlite_writer(call)