Open AI Gym is a fun toolkit for developing and comparing reinforcement learning algorithms. It provides a variety of environments ranging from classical control problems and Atari games to goal-based robot tasks. Currently it requires an amout of effort to install and run it on Windows 10. In particular you need to recursively install Windows Subsystem for Linux, Ubuntu, Anaconda, Open AI Gym and do a robot dance to render simulation back to you. To make things a bit easier later you would also like to use Jupyter Notebook. In the following you will find a brief step-by-step description as of September 2018 with the end result looking like this:

The end result in action

First we install the Linux subsystem by simply running the following command as Administrator in Power Shell:

Enable-WindowsOptionalFeature -Online -FeatureName Microsoft-Windows-Subsystem-Linux

After restarting the computer, install Ubuntu 18.04 from the Windows Store, launch and run system update:

sudo apt-get update
sudo apt-get upgrade

Next we install Anaconda 3 and its dependencies:

sudo apt-get install cmake zlib1g-dev xorg-dev libgtk2.0-0 python-matplotlib swig python-opengl xvfb
wget https://repo.continuum.io/archive/Anaconda3-5.2.0-Linux-x86_64.sh
chmod +x Anaconda3-5.2.0-Linux-x86_64.sh
./Anaconda3-5.2.0-Linux-x86_64.sh

Then we ensure that .bashrc file has been modified as necessary. If not, adding . /home/username/anaconda3/etc/profile.d/conda.sh at the very end should do the kick.

After this, we relaunch the terminal and create a new conda environment called gym:

conda create --name gym python=3.5
conda activate gym

and install Gym and its dependencies:

git clone https://github.com/openai/gym
cd gym
pip install -e .[all]

As we are also going to need matplotlib, we install it by running:

pip install matplotlib

Next we install jypyter notebooks and create a kernel pointing to the gym environment created in the previous step:

conda install jupyter nb_conda ipykernel
python -m ipykernel install --user --name gym

Now we run the magic command which will create a virtual X server environment and launch the notebook server:

xvfb-run -s "-screen 0 1400x900x24" ~/anaconda3/bin/jupyter-notebook --no-browser

no-browser tells that we don’t want to open a new browser in the terminal, but prompt the address. The address can now be pasted in the browser in your Windows 10, outside of the Ubuntu env.

Next we create a new notebook by choosing “New” and then “gym” (thus launching a new notebook with the kernel we created in the steps above), and writing something like this:

import gym
import 
env = gym.make('CartPole-v0')

from IPython import display
import matplotlib.pyplot as plt
%matplotlib inline

env.reset()
img = plt.imshow(env.render(mode='rgb_array')) # only call this once
for _ in range(100):
    img.set_data(env.render(mode='rgb_array')) # just update the data
    display.display(plt.gcf())
    display.clear_output(wait=True)
    action = env.action_space.sample()
    env.step(action)

This tells to create a new cart pole experiment and perform 100 iterations of doing a random action and rendering the environment to the notebook.

If you are lucky, hitting enter will display an animation of a cart pole failing to balance. Congratulations, this is your first simulation! Replace ‘CartPole-v0’ with ‘Breakout-v0’ and rerun - we are gaming! AWESOME!

Flutter is a great Google SDK allowing effortless creation of mobile apps with native interfaces on iOS and Android. To install it follow the official installation instructions. Here are a few additional tips for Mac and Windows:

  • If you are using fish, add set PATH /Users/simonj/flutter/bin $PATH to your .config/fish/config.fish
  • For android emulation support install Androind Studio and create a new emulated device, lets call it Pixel_2_API_26. To launch the emulator, run ~/Library/Android/sdk/tools/emulator -avd Pixel_2_API_26 on Mac or C:\Users\<name>\AppData\Local\Android\Sdk\emulator\emulator.exe -avd Pixel_2_API_26 on Windows.
  • Disabling Hyper-V may help if you experience Windows crashes when running Android emulator.
  • Some useful commands:
    • flutter doctor # helps to diagnose problems, install missing components, etc.
    • flutter build apk # builds apk
    • flutter install -d # installs apk to a device. to use your actual phone, mount the phone with usb debug enabled
    • flutter devices # to list devices
  • To use visual studio code, follow these instructions or just run code . from your flutter terminal/project and install Flutter and Dart plugins.
  • Check out awesome-flutter for example and inspiration!

In the following, I briefly show how coef_, intercept_, decision_function, predict_proba and predict are connected in case of a binary LogisticRegression model.

Assume we have trained a model like this:

>>> ...
>>> lrmodel = linear_model.LogisticRegression(C=0.1, class_weight='balanced')
>>> lrmodel.fit(X_train, y_train)

The model’s coef_ attribute represents learned feature weights (w) and intercept_ represents the bias (b). Then the decision_function is equivalent to a matrix of x · w + b:

>>> (X_test @ lrmodel.coef_[0].T + lrmodel.intercept_)[:5]
    array([-0.09915005,  0.17611527, -0.14162106, -0.03107271, -0.01813942])

>>> lrmodel.decision_function(X_test)[:5]
    array([-0.09915005,  0.17611527, -0.14162106, -0.03107271, -0.01813942])

Now, if we take sigmoid of the decision function:

>>> def sigmoid(X): return 1 / (1 + np.exp(-X))

>>> sigmoid(X_test @ lrmodel.coef_[0].T + lrmodel.intercept_)[:5]
    array([ 0.47523277,  0.54391537,  0.46465379,  0.49223245,  0.49546527])

>>> sigmoid(lrmodel.decision_function(X_test))[:5]
    array([ 0.47523277,  0.54391537,  0.46465379,  0.49223245,  0.49546527])

it will be equivalent to the output of predict_proba (each touple is probabilities for -1 and 1). We see that these numbers are exactly the second column (the positive class) here:

>>> lrmodel.predict_proba(X_test)[:5]
    array([[ 0.52476723,  0.47523277],
           [ 0.45608463,  0.54391537],
           [ 0.53534621,  0.46465379],
           [ 0.50776755,  0.49223245],
           [ 0.50453473,  0.49546527]])

Finally, the predict function:

>>> lrmodel.predict(X_test)[:5]
    array([-1,  1, -1, -1, -1], dtype=int64)

is eqivalent to:

>>> [-1 if np.argmax(p) == 0 else 1 for p in lrmodel.predict_proba(X_test)] [:5]
    [-1, 1, -1, -1, -1]

which in our case is:

>>> [1 if p[1] > 0.5 else -1 for p in lrmodel.predict_proba(X_test)] [:5]
    [-1, 1, -1, -1, -1]

Have you wondered how to design and run online experiments? In particular, how to implement an experiment dashboard such as the one enpictured below (in this case Visual Website Optimizer) and how to use this in your product? Good, lets have a quick look!

Visual Website Optimizer

On a pure technicall side, the first thing we have to implement is a way to define an experiment as a set of variables we want to try out and a value mapping for the audience. The most important part here is that, for a signle user, the assignment should be the same for a single experiment, e.g., the user is always in the same of two groups. However, it should different across the experiments. The latter is known as a carry-over error, and it is for example if the same user is assigned to the same test group across different experiments.

Facebook has previously released PlanOut, a platform for Online field experiments. Apart from the language itself, the essense of this project is in the random.py which demonstrates a possible way of mapping users or pages to the random aleternatives. In short, each experiment and variable has a salt to be added on top of the user or page id hash to enforce randomization across experiments and variables. The resulting hash is then mapped to the final value through modulo arithmetic and a set of linear transformations. Given this, it is fairly easy to design an API or a library to represent an experiment with a variety of options and to assign those to users in a controlled and consistent fashion. Or you can just use PlanOut or VWO right out of the box.

Settled with the setup and random assignment parts, the next question is how to actually design and run an experiment for your needs. For this, I highly recomment to take a quick look at the original paper describing PlanOut and its presentation, as well as a nice presentation  and a great tutorial about implementing and analysing online experiments at Facebook. Furthermore, there is a series of interesting publications from Microsoft (a survey, a paper and another paper) explaining numerous caveats of running controlled experiments on the web. In particular it explains statistical significance, power, and several types of mistakes it is possible to run into.

If resarch papers sound just too dry and formal, there are several interesting guides explaining A/B testing and its pitfalls in a very accessible manner:

Finally, when it comes to presenting and interpreting the results, this A/B test analysis tool provides a simple view similar to the one in the screenshot above. Otherwise, there are several online calculators for various computations that do not require a PhD in statistics - sample size, experiment duration,comparing two rates (chi-squared test), comparing two series (t-test).

Have fun!

In the previous posts I have mentioned using Scikit-Learn, gRPC, Mesos and Prometheus. In the following, I would like to tell how all these components can be used to build a classification service and my experience with running it in a relatively large production system. For practical reasons I omit most of the actual code, and instead describe the important parts of the server script and give a reference to the external documentation when necessary.

THE PROBLEM

As a part of our daily operation at Cxense we crawl millions of Web-pages and extract their content including named entities, keywords, annotations, etc. As a part of this process we automatically detect language, page type, sentiment, main topics, etc. Skipping the details, in the following we implement yet another text classifier using Scikit-Learn.

As most of our system is implemented in Java, also including the crawler, we implement this classifier as a micro-service. For some documents therefore, the crawler will call our service, providing page title, url, text, language code and some additional information and in return retrieve a list of class names and their approximate probabilities. We further use an absolute time limit of 100 ms (end-to-end) for the classification task.

THE SOLUTION

For classification itself we use a simple two-stage pipeline, consisting of a TfidfVectorizer and a OneVsRestClassifier using LinearSVC. A separate model is trained for each of the several supported languages, serialized and distributed on the deployment. In order to communicate with our service, we use gRPC, where we define the protocol in the proto3 format and compiled it for both Java (the client) and Python (the server):

syntax = "proto3";

service Classifier {
    rpc Classify(Document) returns (Classification) {}
}

message Document {
    string url = 1;
    string title = 2;
    string text = 3;
    Language lang = 4;
    ...
}

message Classification {
    repeated Class classes = 1;
    message Class {
        string name = 1;
        float prob = 2;
    }
}

Next we implement a simple servicer, which invokes the classifier for a given language with the remaining request fields and return the classification results (class names and scores) wrapped in a response object:

...
class ClassifierServicer(proto_grpc.ClassifierServicer):

    def __init__(self):
        self.models = ...
        
    def Classify(self, request, context):
        url = request.url
        title = request.title
        text = request.text
        lang = proto.Document.Language.Name(request.lang)
        results = self.models[lang].classify(url, title, text)
        return proto.Classification(
            [proto.Classification.Class(name, prob) for name, prob in results])

To measure classification latency and the number of exceptions we further add a number of Prometheus metrics and annotate the classification method:

...
class ClassifierServicer(proto_grpc.ClassifierServicer): 
    ...
    REQUEST_LATENCY = Histogram('clf_latency_seconds', 'Request latency in seconds')
    REQUEST_ERROR_COUNTER = Counter('clf_failures', 'Number of failed requests')

    ...
    @REQUEST_LATENCY.time()
    @REQUEST_ERROR_COUNTER.count_exceptions()
    def Classify(self, request, context): 
        ...

To log the classification requests and results we add a queue to the servicer and write serialized JSON objects to it on request. We also implement a scheduled thread that drains the queue and writes the strings to disk:

...
     def __init__(self):
        ...
    	self.log_queue = queue.Queue()

   	def Classify(self, request, context): 
    	...
    	self.log_queue.put(json.dumps({'url': url, 'title': title, 'text': text, 'language': lang, 'classification': dict(results)}))
    
    def log_requests(self):
        # Drain the queue:
        requests = []
        while not self.log_queue.empty():
            requests.append(self.log_queue.get())
    
        # Write to file:
        if requests:
            filename = datetime.now().strftime('./request-%Y-%m-%dT%H.log')
            with open(filename, 'a', encoding='utf_8') as f:
                for request in requests:
                    f.write(request + '\n')
    
        # Repeat every second:
        t = threading.Timer(1, self.log_requests)
        t.daemon = True
        t.start()

The reason for doing this is that we want to avoid waiting for disk I/O on the classification requests. In fact this trick dramatically improves the observed latency on the machines with heavy I/O load and bursty requests.

Further we customize the HTTP server used by Prometheus client to return metrics on /metrics path (used for metric collection), “status”:”OK” on /status (used for health checks) and 404 otherwise:

...
def start_http_server(port):
    class HttpRequestHandler(BaseHTTPRequestHandler):
        def do_GET(self):
            if self.path == '/status':
                self.send_response(200)
                self.send_header('Content-type', 'text/plain; charset=utf-8')
                self.end_headers()
                self.wfile.write(bytes('{"status": "ok"}', 'utf-8'))
            elif self.path == '/metrics':
                self.send_response(200)
                self.send_header('Content-type', CONTENT_TYPE_LATEST)
                self.end_headers()
                self.wfile.write(generate_latest(core.REGISTRY))
            else:
                self.send_error(404)
    
        def log_message(self, format, *args):
            return
    
    class HttpRequestServer(threading.Thread):
        def run(self):
            httpd = HTTPServer(('', port), HttpRequestHandler)
            httpd.serve_forever()
    
    t = HttpRequestServer()
    t.daemon = True
    t.start()

Now, we implement the server itself as a thread taking two ports as arguments. The http port is used for health checks and metric collection and the grpc port is used for classification requests. For the http port we will use the number supplied by Aurora (see below) and for the gprc port we use port 0 to get whatever is available. To know which ports were allocated we write both to a JSON file:

...
class ModelServer(threading.Thread):

    def __init__(self, http_port, grpc_port=0):
        super(ModelServer, self).__init__()
        self._http_port = http_port
        self._grpc_port = grpc_port
        self._classifier = ClassifierServicer()
        self._stopper = threading.Event()
    
    def stop(self):
        self._stopper.set()
    
    def stopped(self):
        return self._stopper.isSet()
    
    def run(self):
        self._classifier.log_requests()
    
        grpc_server = grpc.server(futures.ThreadPoolExecutor(max_workers=8))
        proto_grpc.add_ClassifierServicer_to_server(self._classifier, grpc_server)
        self._grpc_port = grpc_server.add_insecure_port('[::]:%s' % self._grpc_port)
        grpc_server.start()
    
        start_http_server(self._http_port)
    
        service_info = json.dumps({'services': [
                {'service': 'clf-grpc', 'port': self._grpc_port},
                {'service': 'clf-health', 'port': self._http_port}]})
        print_and_flush(service_info)
    
        with open('./service.json', 'w', encoding='utf-8') as f:
            f.write(service_info)
    
        try:
            while not self.stopped():
                time.sleep(1)
        except KeyboardInterrupt:
            print_and_flush("halting...")
        finally:
            grpc_server.stop(0)

def print_and_flush(s):
    print(s)
    sys.stdout.flush()

if __name__ == '__main__':
    print_and_flush('loading...')
    ModelServer(http_port=int(sys.argv[1])).start()

Threading logic herein allows us quite simple unit test usage, for example:

...
class TestAll(unittest.TestCase):

    @classmethod
    def setUpClass(cls):
        ...
        # Build and serialize test models.
        for language in [...]:
            train_model(language)
    
        # Start the server, give it some time.
        cls.server = ModelServer(http_port=12345)
        cls.server.start()
        time.sleep(1)
    
    def test_basics(self):
        address = 'localhost:%s' % TestAll.server._grpc_port
        result = classify(address, ...)
        self.assert...
       
    @classmethod
    def tearDownClass(cls):
        # Stop the server, give it some time.
        cls.server.stop()
        time.sleep(1)
    
        # Delete temporary files.
        os.remove('service.json')
        for logfile in glob('request-*.log'):
            os.remove(logfile)

if __name__ == '__main__':
	unittest.main()

Further, we provide Aurora configuration that consists of the following tasks:

  1. Fetch code and data from into the sandbox.
  2. Activate virtual environment and start the server using thermos.ports[http]
  3. Wait for service.json to be written and register clf-health and clf-grpc in the consulate. Use clf-health for httpcheck.
  4. On shutdown, deregister the service in consulate.
  5. Otherwise, delete request logs that are more than 12 hours old.
fetch_all = ...

run_server = Process(
  name = 'run_server',
  cmdline = 'source venv/bin/activate && venv/bin/python server.py ')

register_service = Process(
    name = 'register_service',
    cmdline = 'HTTP=$(service-file-get clf-health service.json 300); \
        GRPC=$(service-file-get clf-grpc service.json 0); \
        consulate register clf-health -s clf-health -a $(hostname) -p $HTTP httpcheck 10 http://localhost:$HTTP/status; \
        consulate register clf-grpc -s clf-grpc -a $(hostname) -p $GRPC httpcheck 10 http://localhost:$HTTP/status')

deregister_service = Process(
    name = 'deregister_service',
    final = True,
    cmdline = '/usr/local/bin/consulate deregister clf-health; \
        /usr/local/bin/consulate deregister clf-grpc; sleep 5')

log_removal = Process(name = 'log_removal', cmdline = 'while true; do find . -name request\*.log -mmin +720 -print0 | xargs -0 --no-run-if-empty rm; sleep 3600; done')

server_task = SequentialTask(processes = [fetch_all, run_server], resources = Resources(cpu=1, ram=4*GB, disk=3*GB))
register_task = Task(processes=[register_service], resources=Resources(cpu=0.01, ram=0.1*MB, disk=1*MB))
deregister_task = Task(processes=[deregister_service], resources=Resources(cpu=0.01, ram=0.1*MB, disk=1*MB))
log_removal = Task(processes=[log_removal], resources=Resources(cpu=0.01, ram=0.1*MB, disk=1*MB))

jobs = []
for dc in [...]:
  jobs.append(Service(cluster = dc,
          environment = 'prod',
          tier = 'preferred',
          role = ...,
          name = 'clf',
          update_config = UpdateConfig(watch_secs = 300, batch_size = 1),
          task = Tasks.combine(server_task, register_task, deregister_task, log_removal),
          constraints = {'host': 'limit:1'},
          instances = 4))

Note that here we use four instances per data center, each requires slightly more than 1 CPU, 4GB RAM, 3GB disk. We also restrict our job to have no more than one instance per host

From here we can start, stop, update and scale our jobs using Aurora’s client commands. Beyond what is mentioned we implement classification service client in Java and embed it into the crawler. Cxense codebase includes code for automatic resolve of clf-grpc to a list of healthy servers, and even scheduling up to 3 retries with 20 ms between and final time-out at 100ms. Here we also use Prometheus to monitor client latency, number of failed requests, etc. Moreover, we configure metric export from both clients and service, set up a number of service alerts (on inactivity, too long latency or high error rate) and Grafana dashboards (one for each DC).

THE EXPERIENCE

Initially I was quite skeptical about using Python/Scikit-Learn in production. My suspicions “were confirmed” by a few obstacles:

  1. The gRPC threads above are bound to one CPU and it is really hard to do anything about that in Python. However, this is not a big deal as we can scale by instances instead of cores. In fact, it is better.
  2. Ocasionally tasks get assigned to “slow” nodes which makes 90+ percentile latency higher in orders of magnitude. After some investigation with colleagues, we found that this may happen on I/O overloaded nodes. Delayed logging demonstrated above gave us a dramatic improvement here, so it wasn’t that much of issue anymore. Otherwise, we could add a supervisor to restart unlucky jobs.
  3. Grpc address lookup makes client-observed latency significantly worse than the clasification port itself. However, our codebase implements a short-term address cache, and for cached addresses the latency increase is not a big deal. The problem we have seen initially was that with a large number of crawlers and relatively small fraction of classification requests, the cold-cache chance is quite high. With increasing number of requests however, the chance of this goes down and the latency goes down as well.
  4. We have observed that for bursty traffic latency jitter can be quite high and the first few requsts after a pause are likely to be out-of-time. For the client, I assume it is because of the cost of loading models back into memory and CPU caches. For the server, I assume it is becauce of the closed connections and cold address caches. The funny part here was that this issue is less with increased number of requests. In fact, we have seen latency (both for the client and the server) go down after doubling the number of requests by adding support for a new language without increasing the total resource budget (CPU, RAM, disk).

So in total, the experience in prod was quite postive. Apart from the points mentioned above, there were no problems or accidents and I have not seen any server-side exceptions. The only time I had to find why the classifier was suddenly inactive was when AWS S3 went AWOL and broke the Internet.

On the final note, here is a dashboard illustrating the performance on production traffic in one of our datacenters (the legend was removed from some of the charts).