Concurrency

As we discussed in Classes, for each log pattern(Apache access log, Apache error log), we define an individual class:

  • fake_access_gen
  • fake_error_gen

In a cluster monitoring system, we may have to audit multiple log sources, at that time, we can create multiple such generators to simulate, thus implement the log-source-level concurrency.

However, in each class, we defined several methods, for example in fake_error_gen:

  • heartbeat_lines(self)
  • warn_lines(self)
  • error_lines(self)

It would be better for us to run these methods in a concurrent way too. The advantage of doing this would be that, the different kinds of log messages can be controlled separately. We only need to concern about the randomness of intervals among certain kind of log lines, as well as the randomness of contents of certain kind of log lines.

That is, we only concern about the intervals between 2 error lines, but not the intervals between an error line and a warning line.

Asynchronous Programming

To implement this concurrency, here in Visor we make use of asynchronous programming.

Asynchronous programming enables single-threaded concurrent coding, which means that we can implement concurrency but still keep the simplicity of sequential codes.

Now we have 3 kinds of log lines in fake_error_gen, if we continue to add more types in the future, and use short intervals, there could be heavy I/Os.

With asynchronous programming, the currently running task will not get interrupted, but may yield control to other tasks. Thus, our generator can handle other log line functions while waiting for one certain function.

asyncio

asyncio is a standard Python library that provides the asynchronous programming.

Coroutines

An asynchronous function in Python is typically called a 'coroutine', it's a generator.

Just decorate these functions with @asyncio.coroutine:

@asyncio.coroutine
    def heartbeat_lines(self):
    while True:
            self.log.info("[-] [-] " + self.config["heartbeat"]["message"])
            yield from asyncio.sleep(int(self.config["heartbeat"]["interval"]))

@asyncio.coroutine
    def access_lines(self):
    ...

...

The yield from and asyncio.sleep() will be discussed in the following paragraphs.

Sleep

We need to use asyncio.sleep() to pause a coroutine, to let other tasks have a chance to run(non-blocking).

Yield from

yield from expression will actually call a coroutine, if it is used inside of a function which is decorated by @asyncio.coroutine.

yield from asyncio.sleep(int(self.config["heartbeat"]["interval"]))

Here, use the yield from to wait results from asyncio.sleep, the coroutine will be paused until the yield from get a value.

During this pausing time, if there are other pending tasks, a context switch will happen.

Wait

Here for access log, we have two tasks: self.heartbeat_lines() and self.access_lines() to run.

Use asyncio.wait to combine them into one task that wait for both of them to complete.

asyncio.wait([
    self.heartbeat_lines(),
    self.access_lines()]    
)

Event loop

The event loop is used to schedule the coroutines. It registers all the coroutines and distribute the flow of control between them. If one coroutine is sleeping, the event loop will know that coroutine will be busy for a while, so the loop will pause this coroutine and look for another one to run.

Here we schedule the combined wait task to run:

def run(self):
    loop = asyncio.get_event_loop()
    loop.run_until_complete(
        asyncio.wait([
            self.heartbeat_lines(),
            self.access_lines()]    
        )
    )
    loop.close()

Conclusion

The idea of using asyncio here is that:

  • Structure the code into sub-tasks that need to be executed concurrently.
  • Decorate these sub-tasks as coroutines.
  • Schedule them as you like.

results matching ""

    No results matching ""