fake_log_gen Modification

To enable our system to send log files to Spark Streaming application via sockets, we need to extend the existing fake log generator.

Currently, our fake log generator can generate log lines, and write them into specified files, we want to add new features, like using sockets to sending messages to the existing classes.

Inheritance

We want to have 2 new classes: fake_access_stream and fake_error_stream separately based on fake_access_gen and fake_error_gen to implement the TCP socket transmission.

First, import fake_log_gen module:

from src.fake_log_gen import fake_log_gen

Then, inherit from fake_access_gen and fake_error_gen class:

class fake_access_stream(fake_log_gen.fake_access_gen):
class fake_error_stream(fake_log_gen.fake_error_gen):

Socket Creation

To attach a server mode socket to the generator, add something to the inherited __init__ method. Take fake_access_stream as an example:

def __init__(self, log, config, mode):
    super(fake_access_stream, self).__init__(log, config, mode)

Inside the __init__ method, create a TCP socket:

self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

Address Binding

Bind the socket with the server address:

host = socket.gethostbyname(socket.gethostname())
print('>>> Host Name:\t%s' % str(host))
port = 5555
server_addr = (host, port)
self.s.bind(server_addr)

Put into Server Mode

Calling listen() will put the socket into server mode.

self.s.listen(5)

Wait for Connection

Serevr socket method accept() will make the socket wait for an incomming connection, and passively accept it. This will be a blocking operation.

self.client, self.addr = self.s.accept()
print('>>> Received request from ' + str(self.addr))

Data Sending

In the original generator design, log lines will be written into files. Now, we want to use the server socket to send the same data out. The sending operation will be triggered in the same place where the original writing operation occurs:

@coroutine
def heartbeat_lines(self):
    while True:
        t = datetime.datetime.now().strftime('%d/%b/%Y:%H:%M:%S -0700')
        data = '- - - [%s] "%s" - -' % (t, self.config["heartbeat"]["message"])
        self.log.info(data)
        self.client.send(data.encode())

        yield from asyncio.sleep(int(self.config["heartbeat"]["interval"]))

results matching ""

    No results matching ""