Skip to article frontmatterSkip to article content
from __init__ import install_dependencies

await install_dependencies()
import cProfile
import math
import multiprocessing
import os
import pstats
import threading
import time

import ipywidgets as widgets
from IPython.display import IFrame

%load_ext divewidgets

In this notebook, we will explore how computers keep track of time and how we can create timers using the computer clock.

Current time

In Python, the current local time can be printed as follows using the time module:

print("Current local time:", time.asctime(time.localtime()))

How do computers keep track of the current time?

localtime() obtains the (operating) system time as shown in the source code. Depending on the computer architecture, the operating system further obtain the time from certain hardware clock.

To find the current clock source in Linux, for instance:

print("Available clock sources:")
!cat /sys/devices/system/clocksource/clocksource0/available_clocksource
print("Current clock source:")
!cat /sys/devices/system/clocksource/clocksource0/current_clocksource

A computer’s clock may not be very accurate. It often need to be set once in a while to the correct time.

How to implement a timer?

A straightforward way to implement a timer program is to write a loop that tracks the current time. Specifically, you can record the start time and compute the elapsed time as the difference between the current time and the start time. Unfortunately, the time returned by localtime() has the type struct_time that does not support the subtraction operation:

%%optlite -l -h 500
import time
start = time.localtime()
end = time.localtime()
time_passed = end - start

The solution is to call time() (or time_ns()) (with nanosecond precision), which returns the current time as a float (int) in seconds (nanoseconds) with microsecond (nanosecond) precision:

time.time(), type(time.time())
time.time_ns(), type(time.time_ns())

For linux, the above numbers are the number of seconds and nanoseconds elapsed after the unix epoch:

print("Unix epoch in local time:", time.asctime(time.localtime(0)))

Hence, the current local time can also be obtained by passing time() as an argument to localtime:

print("Current local time:", time.asctime(time.localtime(time.time())))

The following is an implementation of timer using a while loop that returns (None) after a number of nanoseconds specified by the input argument nanosecs:

def timer_ns(nanosecs):
    start = time.time_ns()
    while time.time_ns() - start <= nanosecs:
        pass


timer_ns(nanosecs := int(input("How many nanoseconds to wait?") or 1e9))
print(f"Time's up after {nanosecs}ns!")
def timeit(code, n):
    # YOUR CODE HERE
    raise NotImplementedError
    return avg_run_time
# tests
assert math.isclose(timeit(f"timer_ns({(ns:=3*10**6)})", 5), ns, abs_tol=1e6)
assert math.isclose(timeit(f"timer_ns({(ns:=2*10**6)})", 10), ns, abs_tol=1e6)
assert math.isclose(timeit(f"timer_ns({(ns:=10**6)})", 15), ns, abs_tol=1e6)

Sleeping

Can we use timer_ns to implement an alarm clock? An example is as follows:

def alarm(secs):
    timer_ns(secs*1e9)
    print(f"Time's up after {secs}s!")


alarm(secs:=int(input("How many seconds to wait?") or 1))

While the program appears to work, it is computationally intensive as it keeps the computer busy checking the looping condition:

with cProfile.Profile() as pr:
    alarm(0.001)
    pr.print_stats()

As the profiler shows, time() is called over thousands of time as the timer waited for 1 millisecond. This is called busy waiting or spinning, and is regarded as an anti-pattern that is not recommended unless you want an accuracy better than milliseconds.

A better solution is to pause the execution for the specified number of seconds. This can be done using the sleep function.

with cProfile.Profile() as pr:
    time.sleep(0.001)
    pr.print_stats()

Note that the number of function calls is much lower than before. The number of calls does not depend on the sleep duration either.

with cProfile.Profile() as pr:
    time.sleep(0.0321)
    pr.print_stats()
def alarm(secs):
    # YOUR CODE HERE
    raise NotImplementedError
    print(f"Time's up after {secs}s!")


alarm(secs := int(input("How many seconds to wait?") or 1))
# tests
assert math.isclose(-time.time() + (alarm(0.01) or time.time()), 0.01, abs_tol=1e-2)
assert math.isclose(-time.time() + (alarm(0.02) or time.time()), 0.02, abs_tol=1e-2)
with cProfile.Profile() as pr:
    assert math.isclose(-time.time() + (alarm(0.03) or time.time()), 0.03, abs_tol=1e-2)
    ps = pstats.Stats(pr)
assert ps.total_calls < 100

Multiprocessing

The following program implements a digital clock. Run and press Enter to start the clock:

def clock():
    while True:
        print(time.asctime(time.localtime()), end='\r', flush=True)
        time.sleep(0.5)  # why not 1?


if input("Start the clock? [Y/n]").lower() != "n":
    try:
        clock()
    except KeyboardInterrupt:  # used to stop the clock
        print("\nClock stopped.")

Infinite loops are not inherently bad. In fact, your computer constantly runs an infinite loop to allow you to control it until you power it down.

What is not good about the implementation of clock is that it is blocking, i.e., executing the code prevents executing any other code until it completes. To avoid blocking, operating systems allow multiple tasks to run as separate processes concurrently without blocking each other.

clock_process = multiprocessing.Process(target=clock)
clock_process.start()
clock_process

We can continue to execute other codes. E.g., to list all the processes along with their CPU and memory usage:

def ps():
    !ps -eH -o pid,%cpu,%mem,command | awk '{{if($$1=={clock_process.pid}) print "\033[1;31m" $$0 "\033[0m"; else print $$0}}'
ps()

To terminate the process:

clock_process.terminate()
ps()

The terminated process is a zombie process which remains in the process table entry until its parent process reads the status as follows:

clock_process.join()
ps()
def alarm_bg(secs):
    # YOUR CODE HERE
    raise NotImplementedError
    return process


process = alarm_bg(secs := int(input("How many seconds to wait?") or 1))
process.start()
print("Alarm started", end="", flush=True)
while process.is_alive():
    print(".", end="", flush=True)
    time.sleep(0.1)
# tests
alarm_process = alarm_bg(0.1)
alarm_process.start()
i = 0
while alarm_process.is_alive():
    i += 1
    time.sleep(0.001)
assert math.isclose(i/100, 1, rel_tol=1e-1)

Threading

The following is a better implementation of the clock:

def clock_widget():
    while not stop_event.is_set():
        clock_display.value = time.asctime(time.localtime())
        time.sleep(1)


def stop_clock(button):
    stop_event.set()
    clock_thread.join()


clock_display = widgets.Label()
clock_display.style.font_size = "2em"
stop_button = widgets.Button(description="Stop")
stop_button.on_click(stop_clock)
stop_event = threading.Event()
clock_thread = threading.Thread(target=clock_widget)
clock_thread.start()
display(clock_display, stop_button)

With ipywidgets, we have styled the clock to have a larger font size. Additionally, the clock’s execution is non-blocking and can be stopped by pressing the Stop button.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def clock_widget():
    while not stop_event.is_set():
        clock_display.value = time.asctime(time.localtime())
        time.sleep(1)


def stop_clock(button):
    stop_event.set()
    clock_thread.join()


clock_display = widgets.Label()
clock_display.style.font_size = "2em"
stop_button = widgets.Button(description="Stop")
stop_button.on_click(stop_clock)
stop_event = threading.Event()
clock_thread = threading.Thread(target=clock_widget)
clock_thread.start()
display(clock_display, stop_button)

Program 1:A clock running in a separate thread.

Note that threading is used instead of multiprocessing to make the execution non-blocking:

16
17
18
...
clock_thread = threading.Thread(target=clock_widget)
...

The operating system can run multiple tasks as separate threads within the same process, allowing them to share the same memory space. This is crucial because ipywidgets involves complex interactions between the main thread running the Jupyter Notebook and the thread running the clock. In contrast, multiple processes do not share the same memory space, making inter-process communication more complex and resource-intensive.

Unlike multiprocessing, threading does not provide a terminate function to end a thread, as doing so may corrupt the memory shared with the main thread. To properly stop a thread, the code sets a stop_event:

7
8
9
...
    stop_event.set()
    ...

The status of the event is checked by the looping condition of the clock thread:

7
8
9
...
    while not stop_event.is_set():
        ...

This ensures that the thread can terminate gracefully without affecting the shared memory integrity.

!ln -sf $(pwd) ~/www/
url = f"{os.getenv('JUPYTERHUB_BASE_URL')}hub/user-redirect/www/Lab4/"
IFrame(url, width="100%", height=600)