Than we wrote a python function that picks a random rabbit and a random celery picture and returns them as numpy arrays. To make the rabbits have their celery we wrote another function that takes the two numpy arrays and adds a small version of the celery array on to the rabbit array:
>>> rabbit, celery = create_input()
>>> rabbit_with_celery = add_celery_to_rabbit(rabbit, celery)
And here is what that looks like:
Now if we have lots of rabbits and lots of celery we can no longer hand out the celery ourselves. Instead we want a system where rabbits can wait in a queue and some workers go and hand out the celery for us in a parallized fashion.
A good system for running a lot of distributed tasks is Celery. It's a python package that you can install with either pip or conda:
$ conda install -c conda-forge celery
As a backend for our system we use the popular message broker RabbitMQ. And instead of installing it we simply pull a docker image and run it in a container:
$ docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 rabbitmq:3
Note that we expose the internal container port so that our external celery can find it.
Now that we have a system in place we need workers that can hand out the celery to the rabbits. For that we add a Celery decorator to our python function making it a Celery task and telling it where to find the RabbitMQ:
>>> from celery import Celery
>>> app = Celery('mytask', broker='pyamqp://guest@localhost:5672//')
>>> @app.task
... def add_celery_to_rabbit(rabbit_array, celery_array):
... ...
Since we want to have our worker work in the background, we create him inside a screen shell, where he can process the "add celery to rabbits" jobs quietly:
$ screen -S celery
$ celery -A rabbit worker --loglevel=info
Note that our python code is in a module called rabbit.py. Finally we can add jobs to our system using our ipython interpreter:
>>> from rabbit import add_celery_to_rabbit
>>> add_celery_to_rabbit.delay(rabbit_array, celery_array)
Final note: when we tried sending numpy arrays as input to the Celery task we got an error telling us that numpy arrays are not JSON serializable. We got around that error by turning them into lists using array.to_list(). There probably is a better way for that.



Keine Kommentare:
Kommentar veröffentlichen