Source

Context

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
import inspect
from os import path
from signal import SIGINT
from importlib import import_module

from urllib.parse import urlparse
from .proxy import Proxy, set_actor, ProxyRef, TellWrapper
from .exceptions import HostDownError, AlreadyExistsError, NotFoundError, \
    HostError, IntervalError
from . import util

# import pyactor.thread.parallels
# parallels = pyactor.thread.parallels
core_type = None
available_types = ['thread', 'green_thread']
actor_module = None
intervals = None
parallels = None
future = None
rpcactor = None
signal = None


def set_rabbit_credentials(user, password):
    """
    If you use a RabbitMQ server and want to make remote queries, you might
    need to specify new credentials for connection.

    By default, PyActor uses the guest RabbitMQ user.

    :param str. user: Name for the RabbitMQ user.
    :param str. password: Password for the RabbitMQ user.
    """
    util.RABBIT_USER = user
    util.RABBIT_PASS = password


def set_context(module_name='thread'):
    """
    This function initializes the execution context deciding which
    type of threads are being used: classic python threads or green
    threads, provided by Gevent.

    This should be called first of all in every execution, otherwise,
    the library would not work.

    The default module is 'thread'.

    :param str. module_name: Name of the module you want to use
        ('thread' or 'green_thread').
    """
    global core_type
    if core_type is None and module_name in available_types:
        core_type = module_name
        util.core_type = core_type
        global actor_module
        actor_module = import_module('pyactor.' + module_name + '.actor')
        global intervals
        intervals = import_module('pyactor.' + module_name + '.intervals')
        global parallels
        parallels = import_module('pyactor.' + module_name + '.parallels')
        global future
        future = import_module('pyactor.' + module_name + '.future')
        set_actor(module_name)
        global rpcactor
        rpcactor = import_module('pyactor.' + module_name + '.rpcactor')
        global signal
        if module_name == 'green_thread':
            signal = import_module('gevent')
        else:
            signal = import_module('signal')
    else:
        if core_type is not None:
            raise Exception("The core type was previously configured.")
        raise Exception("Bad core type.")


def create_host(url="local://local:6666/host"):
    """
    This is the main function to create a new Host to which you can
    spawn actors. It will be set by default at local address if no
    parameter *url* is given. This function should be called once
    for execution or after calling :meth:`~.shutdown` to the previous
    host.

    However, it is possible to create locally more than one host
    and simulate a remote communication between them if they are of some
    remote type (`http` or `amqp`), but the first one created will
    be the main host, which is the one hosting the queries from
    the main function.
    Of course, every host must be initialized with a different URL(port).
    Although that, more than one host should not be required for any real
    project.

    :param str. url: URL where to start and bind the host.
    :return: :class:`~.Proxy` to the new host created.
    :raises: Exception if there is a host already created with that URL.
    """
    if url in util.hosts.keys():
        raise HostError("Host already created. Only one host can"
                        " be ran with the same url.")
    else:
        if not util.hosts:
            util.main_host = Host(url)
            util.hosts[url] = util.main_host
        else:
            util.hosts[url] = Host(url)
        return util.hosts[url].proxy


class Host(object):
    """
    Host must be created using the function :func:`~create_host`.
    Do not create a Host directly.

    Host is a container for actors. It manages the spawn and
    elimination of actors and their communication through channels. Also
    configures the remote points where the actors will be able to receive
    and send queries remotely. Additionally, controls the correct management
    of its actors' threads and intervals.

    The host is managed as an actor itself so you interact with it through
    its :class:`~.Proxy`. This allows you to pass it to another host to
    spawn remotely.

    :param str. url: URL that identifies the host and where to find it.
    """
    _tell = {'attach_interval', 'detach_interval', 'hello', 'stop_actor'}
    _ask = {'spawn', 'lookup', 'lookup_url', 'say_hello', 'has_actor'}
    _ref = {'spawn', 'lookup', 'lookup_url'}

    def __init__(self, url):
        self.actors = {}
        self.threads = {}
        self.pthreads = {}
        self.intervals = {}
        self.locks = {}
        self.url = url
        self.running = False
        self.alive = True
        self.__load_transport(url)
        self.__init_host()

        self.ppool = None
        # self.cleaner = interval_host(get_host(), CLEAN_INT, self.do_clean)

    def hello(self):
        print("Hello!!")

    def say_hello(self):
        print("Sending hello.")
        return "Hello from HOST!!"

    def __load_transport(self, url):
        """
        For remote communication. Sets this host's communication dispatcher
        at the address and port specified.

        The scheme must be 'http' if using a XMLRPC dispatcher.
        'amqp' for RabbitMQ communications.

        This method is internal. Automatically called when creating the host.

        :param str. url: URL where to bind the host. Must be provided in
            the typical form: 'scheme://address:port/hierarchical_path'
        """
        aurl = urlparse(url)
        addrl = aurl.netloc.split(':')
        self.addr = addrl[0], addrl[1]
        self.transport = aurl.scheme
        self.host_url = aurl

        if aurl.scheme == 'http':
            self.__launch_actor('http',
                                rpcactor.RPCDispatcher(url, self, 'rpc'))

        elif aurl.scheme == 'amqp':
            self.__launch_actor('amqp', rpcactor.RPCDispatcher(url, self,
                                                               'rabbit'))

    def spawn(self, aid, klass, *param, **kparam):
        """
        This method creates an actor attached to this host. It will be
        an instance of the class *klass* and it will be assigned an ID
        that identifies it among the host.

        This method can be called remotely synchronously.

        :param str. aid: identifier for the spawning actor. Unique within
            the host.
        :param class klass: class type of the spawning actor. If you are
            spawning remotely and the class is not in the server module,
            you must specify here the path to that class in the form
            'module.py/Class' so the server can import the class and create
            the instance.
        :param param: arguments for the init function of the
            spawning actor class.
        :param kparam: arguments for the init function of the
            spawning actor class.
        :return: :class:`~.Proxy` to the spawned actor.
        :raises: :class:`AlreadyExistsError`, if the ID specified is
            already in use.
        :raises: :class:`HostDownError` if the host is not initiated.
        """
        if param is None:
            param = []
        if not self.alive:
            raise HostDownError()
        if isinstance(klass, str):
            module, klass = klass.split('/')
            module_ = __import__(module, globals(), locals(),
                                 [klass])
            klass_ = getattr(module_, klass)
        elif isinstance(klass, type):
            klass_ = klass
        else:
            raise Exception(f"Given class is not a class: {klass}")
        url = f'{self.transport}://{self.host_url.netloc}/{aid}'
        if url in self.actors.keys():
            raise AlreadyExistsError(url)
        else:
            obj = klass_(*param, **kparam)
            obj.id = aid
            obj.url = url
            if self.running:
                obj.host = self.proxy
            # else:
            #     obj.host = Exception("Host is not an active actor. \
            #                           Use 'init_host' to make it alive.")

            if hasattr(klass_, '_parallel') and klass_._parallel:
                new_actor = parallels.ActorParallel(url, klass_, obj)
                lock = new_actor.get_lock()
                self.locks[url] = lock
            else:
                new_actor = actor_module.Actor(url, klass_, obj)

            obj.proxy = Proxy(new_actor)
            self.__launch_actor(url, new_actor)
            return Proxy(new_actor)

    def has_actor(self, aid):
        """
        Checks if the given id is used in the host by some actor.

        :param str. aid: identifier of the actor to check.
        :return: True if the id is used within the host.
        """
        url = f'{self.transport}://{self.host_url.netloc}/{aid}'
        return url in self.actors.keys()

    def lookup(self, aid):
        """
        Gets a new proxy that references to the actor of this host
        (only actors in this host) identified by the given ID.

        This method can be called remotely synchronously.

        :param str. aid: identifier of the actor you want.
        :return: :class:`~.Proxy` to the actor required.
        :raises: :class:`NotFoundError` if the actor does not exist.
        :raises: :class:`HostDownError` if the host is down.
        """
        if not self.alive:
            raise HostDownError()
        url = f"{self.transport}://{self.host_url.netloc}/{aid}"
        if url in self.actors.keys():
            return Proxy(self.actors[url])
        else:
            raise NotFoundError(url)

    def shutdown(self):
        # """
        # For internal calls.
        # """
        if self.alive:
            print(f"Host {self.addr} :#: shutting down...")
            for interval_event in self.intervals.values():
                interval_event.set()

            for actor in self.actors.values():
                Proxy(actor).stop()

            # stop the pool (close & join)
            if self.ppool is not None:
                if core_type == 'thread':
                    self.ppool.close()
                self.ppool.join()

            # By now, all pthreads should be gone
            for parallel in self.pthreads.keys():
                parallel.join()

            for thread in self.threads.keys():
                try:
                    thread.join()
                except Exception as e:
                    print(e)

            self.locks.clear()
            self.actors.clear()
            self.threads.clear()
            self.pthreads.clear()
            self.running = False
            self.alive = False

            del util.hosts[self.url]
            if util.main_host.url == self.url:
                util.main_host = (list(util.hosts.values())[0]
                                  if util.hosts.values() else None)

            print(f"Host {self.addr} :#: Bye!")

    def stop_actor(self, aid):
        """
        This method removes one actor from the Host, stopping it and deleting
        all its references.

        :param str. aid: identifier of the actor you want to stop.
        """
        url = f"{self.transport}://{self.host_url.netloc}/{aid}"
        if url != self.url:
            a = self.actors[url]
            Proxy(a).stop()
            a.thread.join()
            del self.actors[url]
            del self.threads[a.thread]

    def lookup_url(self, url, klass, module=None):
        """
        Gets a proxy reference to the actor indicated by the URL in the
        parameters. It can be a local reference or a remote direction to
        another host.

        This method can be called remotely synchronously.

        :param srt. url: address that identifies an actor.
        :param class klass: the class of the actor.
        :param srt. module: if the actor class is not in the calling module,
            you need to specify the module where it is here. Also, the *klass*
            parameter change to be a string.
        :return: :class:`~.Proxy` of the actor requested.
        :raises: :class:`NotFoundError`, if the URL specified do not
            correspond to any actor in the host.
        :raises: :class:`HostDownError`  if the host is down.
        :raises: :class:`HostError`  if there is an error looking for
            the actor in another server.
        """
        if not self.alive:
            raise HostDownError()
        aurl = urlparse(url)
        if self.__is_local(aurl):
            if url not in self.actors.keys():
                raise NotFoundError(url)
            else:
                return Proxy(self.actors[url])
        else:
            try:
                dispatcher = self.actors[aurl.scheme]
                if module is not None:
                    try:
                        module_ = __import__(module, globals(), locals(),
                                             [klass])
                        klass_ = getattr(module_, klass)
                    except Exception as e:
                        raise HostError("At lookup_url: " +
                                        "Import failed for module " + module +
                                        ", class " + klass +
                                        ". Check this values for the lookup." +
                                        " ERROR: " + str(e))
                elif inspect.isclass(klass):
                    klass_ = klass
                else:
                    raise HostError("The class specified to look up is" +
                                    " not a class.")
                remote_actor = actor_module.ActorRef(url, klass_,
                                                     dispatcher.channel)
                return Proxy(remote_actor)
            except HostError:
                raise
            except Exception as e:
                raise HostError(
                    f"ERROR looking for the actor on another server. Hosts must"
                    f" be in http to work properly. {str(e)}")

    def __is_local(self, aurl):
        # '''Private method.
        # Tells if the address given is from this host.
        #
        # :param ParseResult aurl: address to analyze.
        # :return: (*Bool.*) If is local (**True**) or not (**False**).
        # '''
        return self.host_url.netloc == aurl.netloc

    def __launch_actor(self, url, actor):
        # '''Private method.
        # This function makes an actor alive to start processing queries.
        #
        # :param str. url: identifier of the actor.
        # :param Actor actor: instance of the actor.
        # '''
        actor.run()
        self.actors[url] = actor
        self.threads[actor.thread] = url

    def __init_host(self):
        # '''
        # This method creates an actor for the Host so it can spawn actors
        # remotely. Called always from the init function of the host, so
        # no need for calling this directly.
        # '''
        if not self.running and self.alive:
            self.id = self.url
            host = actor_module.Actor(self.url, Host, self)
            self.proxy = Proxy(host)
            # self.actors[self.url] = host
            self.__launch_actor(self.url, host)
            # host.run()
            # self.threads[host.thread] = self.url
            self.running = True

    def attach_interval(self, interval_id, interval_event):
        """Registers an interval event to the host."""
        self.intervals[interval_id] = interval_event

    def detach_interval(self, interval_id):
        """Deletes an interval event from the host registry."""
        del self.intervals[interval_id]

    def dumps(self, param):
        """
        Checks the parameters generating new proxy instances to avoid
        query concurrences from shared proxies and creating proxies for
        actors from another host.
        """
        if isinstance(param, Proxy):
            module_name = param.actor.klass.__module__
            if module_name == '__main__':
                module_name = path.splitext(
                    path.basename(inspect.getfile(param.actor.klass)))[0]
            return ProxyRef(param.actor.url, param.actor.klass.__name__,
                            module_name)
        elif isinstance(param, list):
            return [self.dumps(elem) for elem in param]
        elif isinstance(param, dict):
            new_dict = param
            for key in new_dict.keys():
                new_dict[key] = self.dumps(new_dict[key])
            return new_dict
        elif isinstance(param, tuple):
            return tuple([self.dumps(elem) for elem in param])
        else:
            return param

    def loads(self, param):
        """
        Checks the return parameters generating new proxy instances to
        avoid query concurrences from shared proxies and creating
        proxies for actors from another host.
        """
        if isinstance(param, ProxyRef):
            try:
                return self.lookup_url(param.url, param.klass, param.module)
            except HostError:
                print("Can't lookup for the actor received with the call.",
                      "It does not exist or the url is unreachable.",
                      param)
                raise HostError(param)
        elif isinstance(param, list):
            return [self.loads(elem) for elem in param]
        elif isinstance(param, tuple):
            return tuple([self.loads(elem) for elem in param])
        elif isinstance(param, dict):
            new_dict = param
            for key in new_dict.keys():
                new_dict[key] = self.loads(new_dict[key])
            return new_dict
        else:
            return param

    def new_parallel(self, a_function, *params):
        """
        Register a new thread executing a parallel method.
        """
        # Create a pool if not created (threads or Gevent...)
        if self.ppool is None:
            if core_type == 'thread':
                from multiprocessing.pool import ThreadPool
                self.ppool = ThreadPool(500)
            else:
                from gevent.pool import Pool
                self.ppool = Pool(500)
        # Add the new task to the pool
        self.ppool.apply_async(a_function, *params)


def shutdown(url=None):
    """
    Stops the Host passed by parameter or all of them if none is
    specified, stopping at the same time all its actors.
    Should be called at the end of its usage, to finish correctly
    all the connections and threads.
    """
    if url is None:
        for host in list(util.hosts.values()):
            host.shutdown()
        global core_type
        core_type = None
    else:
        host = util.hosts[url]
        host.shutdown()


def signal_handler(signal=None, frame=None):
    # '''
    # This gets the signal of Ctrl+C and stops the host. It also ends
    # the execution. Needs the invocation of :meth:`serve_forever`.
    #
    # :param signal: SIGINT signal interruption sent with a Ctrl+C.
    # :param frame: the current stack frame. (not used)
    # '''
    print("You pressed Ctrl+C!")
    util.main_host.serving = False
    shutdown(util.main_host.url)


def serve_forever():
    """
    This allows the host (main host) to keep alive indefinitely so its actors
    can receive queries at any time.
    The main thread stays blocked forever.
    To kill the execution, press Ctrl+C.

    See usage example in :ref:`sample6`.
    """
    if not util.main_host.alive:
        raise Exception("This host is already shut down.")
    util.main_host.serving = True
    signal.signal(SIGINT, signal_handler)
    print("Press Ctrl+C to kill the execution")
    while util.main_host is not None and util.main_host.serving:
        try:
            sleep(1)
        except Exception:
            pass
    print("BYE!")


def interval(host, time, actor, method, *args, **kwargs):
    """
    Creates an Event attached to the host for management that will
    execute the *method* of the *actor* every *time* seconds.

    See example in :ref:`sample_inter`

    :rtype:
    :param Proxy host: host that will manage the interval, commonly the
        host of the actor.
    :param float time: seconds for the intervals.
    :param Proxy actor: actor to which make the call every *time* seconds.
    :param Str. method: method of the *actor* to be called.
    :param list args: arguments for *method*.
    :return: :class:`Event` instance of the interval.
    """
    call = getattr(actor, method, None)
    if not callable(call):
        raise IntervalError(
            f"The actor {actor.get_id()} does not have the method {method}.")
    if call.__class__.__name__ in ["TellWrapper", "TellRefWrapper"]:
        # If the method is a normal tell, the interval thread can send
        # the calls normally.
        # It it is a Ref Tell, the proxies in the args would be parsed
        # during the call to this very method. So the call can be made
        # as a normal Tell. The actor will do the loads normally on the
        # receive as it has its methods marked as ref.
        if call.__class__.__name__ is "TellRefWrapper":
            call.__call__ = TellWrapper.__call__

        return intervals.interval_host(host, time, call, *args, **kwargs)
    else:
        raise IntervalError("The callable for the interval must be a tell" +
                            " method of the actor.")


def later(timeout, actor, method, *args, **kwargs):
    """
    Sets a timer that will call the *method* of the *actor* past *timeout*
    seconds.

    See example in :ref:`sample_inter`

    :param int timeout: seconds until the method is called.
    :param Proxy actor: actor to which make the call after *time* seconds.
    :param Str. method: method of the *actor* to be called.
    :param list args: arguments for *method*.
    :return: manager of the later (Timer in thread,
        Greenlet in green_thread)
    """
    call = getattr(actor, method, None)
    if not callable(call):
        raise IntervalError(f"later: The actor {actor.get_id()} does not "
                            f"have the method {method}.")
    if call.__class__.__name__ in ["TellWrapper", "TellRefWrapper"]:
        # As with the interval, args have already been dumped.
        if call.__class__.__name__ is "TellRefWrapper":
            call.__call__ = TellWrapper.__call__
        return intervals.later(timeout, call, *args, **kwargs)
    else:
        raise IntervalError("The callable for the later must be a tell "
                            "method of the actor.")


def sleep(seconds):
    """
    Facade for the sleep function. Do not use time.sleep if you are
    running green threads.
    """
    intervals.sleep(seconds)

Proxy

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
from importlib import import_module
from queue import Empty

from .exceptions import PyActorTimeoutError, HostError
from .util import ASK, TELL, TYPE, METHOD, PARAMS, CHANNEL, TO, RESULT
from .util import get_host, get_lock, get_current

actor_channel = None
future_module = None


def set_actor(module_name):
    global actor_channel
    actor_channel = import_module('.' + module_name + '.channel', __package__)

    global future_module
    future_module = import_module('.' + module_name + '.future', __package__)


class ProxyRef(object):
    def __init__(self, actor, class_, module):
        self.url = actor
        self.klass = class_
        self.module = module

    def __repr__(self):
        return f"ProxyRef(actor={self.url}, class={self.klass} " \
               f"mod={self.module})"


class Proxy(object):
    """
    Proxy is the class that supports to create a remote reference to an
    actor and invoke its methods. All the references to actors will be
    proxies, even the host.
    To get a proxy to an Actor, you should use one of the host functions
    that provide one, like :meth:`~.spawn` or :meth:`~.lookup_url`.

    :param Actor actor: the actor the proxy will manage.
    """

    def __init__(self, actor):
        self.__channel = actor.channel
        self.actor = actor
        self.__lock = get_lock()
        for method in actor.ask_ref:
            setattr(self, method, AskRefWrapper(self.__channel, method,
                                                actor.url))
        for method in actor.tell_ref:
            setattr(self, method, TellRefWrapper(self.__channel, method,
                                                 actor.url))
        for method in actor.tell:
            setattr(self, method, TellWrapper(self.__channel, method,
                                              actor.url))
        for method in actor.ask:
            setattr(self, method, AskWrapper(self.__channel, method,
                                             actor.url))

    def __repr__(self):
        return f"Proxy(actor={self.actor}, tell={self.actor.tell}" \
               f" ref={self.actor.tell_ref}, ask={self.actor.ask}" \
               f" ref={self.actor.ask_ref})"

    def __str__(self):
        return f"{self.actor}'s proxy"

    def __eq__(self, other):
        if isinstance(other, self.__class__):
            return self.actor.url == other.actor.url
        return NotImplemented

    def __ne__(self, other):
        if isinstance(other, self.__class__):
            return not self == other
        return NotImplemented

    def __hash__(self):
        return hash(self.actor.url)

    def get_id(self):
        """
        :return: the id of the actor that this proxy holds.
        :raises: Exception if the proxy holds a remote actor. Use URL.
        """
        try:
            return self.actor.id
        except AttributeError:
            raise Exception("This proxy holds a remote actor." +
                            " Use the url instead of the id.")

    def get_url(self):
        """
        :return: the URL of the actor that this proxy holds.
        """
        return self.actor.url


class TellWrapper(object):
    """
    Wrapper for Tell type queries to the proxy. Creates the request and
    sends it through the channel.

    :param Channel channel: communication way for the query.
    :param str. method: name of the method this query is going to invoke.
    :param str. actor_url: URL address where the actor is set.
    """

    def __init__(self, channel, method, actor_url):
        self.__channel = channel
        self.__method = method
        self.__target = actor_url

    def __call__(self, *args, **kwargs):
        #  SENDING MESSAGE TELL
        # msg = TellRequest(TELL, self.__method, args, self.__target)
        msg = {TYPE: TELL, METHOD: self.__method, PARAMS: (args, kwargs),
               TO: self.__target}
        self.__channel.send(msg)


class AskWrapper(object):
    """
    Wrapper for Ask type queries to the proxy. Calling it blocks the
    execution until the result is returned or timeout is reached. You
    can add the tagged parameter "timeout" to change the time limit to
    wait. Default timeout is set to 10s. It is also possible to specify
    "future=True" to get an instant response with a :class:`Future`
    object with which you can manage the result.

    :param Channel channel: communication way for the query.
    :param str. method: name of the method this query is gonna invoke.
    :param str. actor_url: URL address where the actor is set.
    """

    def __init__(self, channel, method, actor_url):
        self._actor_channel = channel
        self._method = method
        self.target = actor_url

    def __call__(self, *args, **kwargs):
        if 'future' in kwargs.keys():
            future = kwargs['future']
            del kwargs['future']
        else:
            future = False

        self.__lock = get_lock()
        if not future:
            self.__channel = actor_channel.Channel()
            if 'timeout' in kwargs.keys():
                timeout = kwargs['timeout']
                del kwargs['timeout']
            else:
                timeout = 10
            #  SENDING MESSAGE ASK
            # msg = AskRequest(ASK, self._method, args, self.__channel,
            #                  self.target)
            msg = {TYPE: ASK, METHOD: self._method, PARAMS: (args, kwargs),
                   CHANNEL: self.__channel, TO: self.target}
            self._actor_channel.send(msg)
            if self.__lock is not None:
                self.__lock.release()
            try:
                response = self.__channel.receive(timeout)
                result = response[RESULT]
            except Empty:
                if self.__lock is not None:
                    self.__lock.acquire()
                raise PyActorTimeoutError(self._method)
            if self.__lock is not None:
                self.__lock.acquire()
            if isinstance(result, Exception):
                raise result
            else:
                return result
        else:
            future_ref = {METHOD: self._method, PARAMS: (args, kwargs),
                          CHANNEL: self._actor_channel, TO: self.target,
                          'LOCK': self.__lock}
            manager = get_current()
            if manager is None:
                manager = get_host().proxy.actor
            return manager.future_manager.new_future(future_ref)


class AskRefWrapper(AskWrapper):
    """
    Wrapper for Ask queries that have a proxy in parameters or returns.
    """

    def __call__(self, *args, **kwargs):
        if 'future' in kwargs.keys():
            future = kwargs['future']
            del kwargs['future']
        else:
            future = False
        host = get_host()
        if host is not None:
            new_args = host.dumps(list(args))
            new_kwargs = host.dumps(kwargs)
        else:
            raise HostError("No such Host on the context of the call.")

        if future:
            self.__lock = get_lock()
            future_ref = {METHOD: self._method, PARAMS: (new_args, new_kwargs),
                          CHANNEL: self._actor_channel, TO: self.target,
                          'LOCK': self.__lock}

            manager = get_current()
            if manager is None:
                manager = get_host().proxy.actor
            return manager.future_manager.new_future(future_ref, ref=True)
        else:
            result = super(AskRefWrapper, self).__call__(*new_args,
                                                         **new_kwargs)
            return get_host().loads(result)


class TellRefWrapper(TellWrapper):
    """Wrapper for Tell queries that have a proxy in parameters."""

    def __call__(self, *args, **kwargs):
        host = get_host()
        if host is not None:
            new_args = host.dumps(list(args))
            new_kwargs = host.dumps(kwargs)
        else:
            raise HostError("No such Host on the context of the call.")
        return super(TellRefWrapper, self).__call__(*new_args, **new_kwargs)

XMLRPC Server

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import pickle
import threading
import xmlrpc.client
from xmlrpc.server import SimpleXMLRPCRequestHandler
from xmlrpc.server import SimpleXMLRPCServer


class RequestHandler(SimpleXMLRPCRequestHandler):
    rpc_paths = ()


class Source(threading.Thread):
    """
    Facade for simple remote communication using XMLRPCServer.
    """

    def __init__(self, addr):
        threading.Thread.__init__(self)
        ip, port = addr
        self.addr = addr

        self.server = SimpleXMLRPCServer((ip, port),
                                         requestHandler=RequestHandler,
                                         logRequests=False,
                                         allow_none=True)
        # self.server.register_introspection_functions()

    def register_function(self, func):
        self.server.register_function(func, 'send')

    def run(self):
        self.server.serve_forever()

    def stop(self):
        self.server.shutdown()
        self.server.server_close()


class Sink(object):
    """
    Facade for XMLRPC proxies.
    """

    def __init__(self, url):
        self.endpoint = xmlrpc.client.ServerProxy(url)

    def send(self, msg):
        msg = pickle.dumps(msg)
        return self.endpoint.send(msg)

RabbitMQ Server

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import pika
import threading
import pickle
from urllib.parse import urlparse

from pyactor.util import RABBIT_USER, RABBIT_PASS


class Source(threading.Thread):
    """
    Facade for simple remote communication using RabbitMQ.
    This connection uses by default the guest RabbitMQ user. To change
    credentials see :func:`~.setRabbitCredentials`.
    """

    def __init__(self, addr):
        threading.Thread.__init__(self)
        ip, port = addr
        self.url = ip + '/' + str(port)
        credentials = pika.PlainCredentials(RABBIT_USER, RABBIT_PASS)
        params = pika.ConnectionParameters(host=ip, credentials=credentials)
        self.connection = pika.BlockingConnection(params)

        self.channel = self.connection.channel()

        self.channel.queue_declare(queue=self.url)
        # self.channel.basic_qos(prefetch_count=1)

        self.on_message = None

    def register_function(self, func):
        self.on_message = func
        self.channel.basic_consume(self.url, self.on_request,
                                   exclusive=True)

    def run(self):
        self.channel.start_consuming()

    def stop(self):
        self.channel.queue_delete(queue=self.url)
        self.channel.close()
        self.connection.close()

    def on_request(self, ch, method, props, body):
        self.on_message(body)
        ch.basic_ack(delivery_tag=method.delivery_tag)


class Sink(object):
    """
    Facade for RabbitMQ concrete connexions to remote actors.
    This manages the publish to queues.
    """

    def __init__(self, url):
        aurl = urlparse(url)
        address = aurl.netloc.split(':')
        ip, port = address[0], int(address[1])
        self.url = ip + '/' + str(port)
        credentials = pika.PlainCredentials(RABBIT_USER, RABBIT_PASS)
        params = pika.ConnectionParameters(host=ip, credentials=credentials)
        self.connection = pika.BlockingConnection(params)
        self.channel = self.connection.channel()

    def send(self, msg):
        msg = pickle.dumps(msg)
        self.channel.basic_publish(exchange='',
                                   routing_key=self.url,
                                   body=msg)

Thread Actor

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
from copy import copy
from threading import Thread

from .channel import Channel
from .future import FutureManager
from ..util import ASK, TELL, FUTURE, TYPE, ASK_RESPONSE, FUTURE_RESPONSE
from ..util import METHOD, PARAMS, RESULT, CHANNEL, RPC_ID
from ..util import ref_l, ref_d


class ActorRef(object):
    """
    ActorRef contains the main components of an actor. These are the
    URL where it is located, the communication :class:`~.Channel` and
    the class of the actor as also the synchronous and asynchronous
    methods the class implements. When no channel is specified a new
    one will be created which is also the default procedure.

    .. note:: This is a superclass of :py:class:`Actor` and has no
        direct functionality.

    """

    def __init__(self, url, klass, channel=None):
        self.url = url
        self.tell = set()
        self.ask = set()
        self.klass = klass
        if channel:
            self.channel = channel
        else:
            self.channel = Channel()
        if hasattr(klass, '_tell') and klass._tell:
            self.tell = copy(klass._tell)
        if hasattr(klass, '_ask') and klass._ask:
            self.ask = copy(klass._ask)

        if hasattr(klass, '_ref'):
            self.receive = ref_l(self, self.receive)
            self.send_response = ref_d(self, self.send_response)

            self.tell_ref = self.tell & klass._ref
            self.ask_ref = self.ask & klass._ref
            for method in self.ask_ref:
                self.ask.remove(method)
            for method in self.tell_ref:
                self.tell.remove(method)
        else:
            self.ask_ref = set()
            self.tell_ref = set()

        self.tell.add('stop')

    def receive(self, msg):
        raise NotImplementedError()

    def send_response(self, result, msg):
        raise NotImplementedError()

    @property
    def _ref(self):
        return self.tell_ref | self.ask_ref

    def __str__(self):
        return f"Actor {self.url} ({self.klass.__name__})"

    def __repr__(self):
        return f"Actor(url={self.url}, class={self.klass})"


class Actor(ActorRef):
    """
    Actor is the instance of an object to which is possible to access
    and invoke its methods remotely. Main element of the model. The
    host is the one to create them (spawning -> see :meth:`~.spawn`).

    :param str. url: URL where the actor is running.
    :param class klass: class type for the actor.
    :param klass obj: instance of the *klass* class to attach to the
        actor.
    """

    def __init__(self, url, klass, obj):
        super(Actor, self).__init__(url, klass)
        self._obj = obj
        self.id = obj.id
        self.running = True
        self.thread = None
        self.future_manager = FutureManager()

    def __process_queue(self):
        while self.running:
            message = self.channel.receive()
            self.receive(message)

    def is_alive(self):
        """
        :return: (*bool.*) identifies the current state of the actor.
            **True** if it is running.
        """
        return self.running

    def receive(self, msg):
        """
        The message received from the queue specifies a method of the
        class the actor represents. This invokes it. If the
        communication is an ASK, sends the result back
        to the channel included in the message as an ASK_RESPONSE.

        If it is a FUTURE, generates a FUTURE_RESPONSE
        to send the result to the manager.

        :param msg: The message is a dictionary using the constants
            defined in util.py (:mod:`pyactor.util`).
        """
        if msg[TYPE] == TELL and msg[METHOD] == 'stop':
            self.running = False
            self.future_manager.stop()
        else:
            try:
                invoke = getattr(self._obj, msg[METHOD])
                params = msg[PARAMS]
                result = invoke(*params[0], **params[1])
            except Exception as e:
                if msg[TYPE] == TELL:
                    print(e)
                    return
                result = e
            self.send_response(result, msg)

    def send_response(self, result, msg):
        if msg[TYPE] == ASK:
            response = {TYPE: ASK_RESPONSE, RESULT: result,
                        RPC_ID: msg[RPC_ID] if RPC_ID in msg.keys() else None}
            msg[CHANNEL].send(response)
        elif msg[TYPE] == FUTURE:
            response = {TYPE: FUTURE_RESPONSE, RPC_ID: msg[RPC_ID],
                        RESULT: result}
            msg[CHANNEL].send(response)

    def run(self):
        """
        Creates the actor thread which will process the channel queue
        while the actor :meth:`is_alive`, making it able to receive
        queries.
        """
        self.thread = Thread(target=self.__process_queue)
        self.thread.start()

Thread Intervals

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import time as timep
from threading import Thread, Timer, Event


def sleep(time):
    """
    Facade for the sleep function. Avoid using time.sleep.

    :param  int time: time to sleep, in seconds. (Float for second
        divisions)
    """
    timep.sleep(time)


def later(timeout, f, *args, **kwargs):
    """
    Sets a timer that will call the *f* function past *timeout* seconds.

    See example in :ref:`sample_inter`

    :return: :class:`Timer`
    """
    if kwargs is None:
        kwargs = {}
    if args is None:
        args = []
    t = Timer(timeout, f, args, kwargs)
    t.start()
    return t


def interval_host(host, time, f, *args, **kwargs):
    """
    Creates an Event attached to the *host* that will execute the *f*
    function every *time* seconds.

    See example in :ref:`sample_inter`

    :param Proxy host: host proxy. Can be obtained from inside a
        class with ``self.host``.
    :param int time: seconds for the intervals.
    :param func f: function to be called every *time* seconds.
    :param list args: arguments for *f*.
    :return: :class:`Event` instance of the interval.
    """
    def wrap(*args, **kwargs):
        # thread = currentThread()
        args = list(args)
        stop_event = args[0]
        del args[0]
        args = tuple(args)
        while not stop_event.is_set():
            f(*args, **kwargs)
            stop_event.wait(time)
        host.detach_interval(thread_id)

    t2_stop = Event()
    args = list(args)
    args.insert(0, t2_stop)
    args = tuple(args)
    t = Thread(target=wrap, args=args, kwargs=kwargs)
    t.start()
    thread_id = t.getName()
    host.attach_interval(thread_id, t2_stop)
    return t2_stop

Thread Parallel

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import uuid
from threading import Lock, current_thread
from time import sleep

from .actor import Actor
from ..util import get_host, METHOD, PARAMS, TYPE, TELL


class ActorParallel(Actor):
    """
    Actor with parallel methods. Parallel methods are invoked in new
    threads, so their invocation do not block the actor allowing it to
    process many queries at a time.
    To avoid concurrence problems, this actors use Locks to guarantee
    its correct state.
    """

    def __init__(self, url, klass, obj):
        super(ActorParallel, self).__init__(url, klass, obj)
        self.__lock = Lock()
        self.pending = {}
        self.ask_parallel = (self.ask | self.ask_ref) & klass._parallel
        self.tell_parallel = (self.tell | self.tell_ref) & klass._parallel

        for method in self.ask_parallel:
            setattr(self._obj, method,
                    ParallelAskWrapper(getattr(self._obj, method), self,
                                       self.__lock))
        for method in self.tell_parallel:
            setattr(self._obj, method,
                    ParallelTellWrapper(getattr(self._obj, method), self,
                                        self.__lock))

    def receive(self, msg):
        """
        Overwriting :meth:`Actor.receive`. Adds the checks and
        features required by parallel methods.

        :param msg: The message is a dictionary using the constants
            defined in util.py (:mod:`pyactor.util`).
        """
        if msg[TYPE] == TELL and msg[METHOD] == 'stop':
            self.running = False
        else:
            try:
                invoke = getattr(self._obj, msg[METHOD])
                params = msg[PARAMS]

                if msg[METHOD] in self.ask_parallel:
                    rpc_id = str(uuid.uuid4())
                    # add rpc message to pendent AskResponse s
                    self.pending[rpc_id] = msg
                    # insert an rpc id to args
                    para = list(params[0])
                    para.insert(0, rpc_id)
                    invoke(*para, **params[1])
                    return
                else:
                    with self.__lock:
                        sleep(0.001)
                        result = invoke(*params[0], **params[1])
            except Exception as e:
                result = e
                print(result)

            self.send_response(result, msg)

    def receive_from_ask(self, result, rpc_id):
        msg = self.pending[rpc_id]
        del self.pending[rpc_id]
        self.send_response(result, msg)

    def get_lock(self):
        """
        :return: :class:`Lock` of the actor.
        """
        return self.__lock


class ParallelAskWrapper(object):
    """Wrapper for ask methods that have to be called in a parallel way."""

    def __init__(self, method, actor, lock):
        self.__method = method
        self.__actor = actor
        self.__lock = lock

    def __call__(self, *args, **kwargs):
        args = list(args)
        rpc_id = args[0]
        del args[0]
        args = tuple(args)
        self.host = get_host()
        param = (self.__method, rpc_id, args, kwargs)
        self.host.new_parallel(self.invoke, param)

    def invoke(self, func, rpc_id, args, kwargs):
        # put the process in the host list pthreads
        self.host.pthreads[current_thread()] = self.__actor.url
        with self.__lock:
            sleep(0.001)
            try:
                result = func(*args, **kwargs)
            except Exception as e:
                result = e
        self.__actor.receive_from_ask(result, rpc_id)
        # remove the process from pthreads
        del self.host.pthreads[current_thread()]


class ParallelTellWrapper(object):
    """
    Wrapper for tell methods that have to be called in a parallel way.
    """

    def __init__(self, method, actor, lock):
        self.__method = method
        self.__actor = actor
        self.__lock = lock

    def __call__(self, *args, **kwargs):
        self.host = get_host()
        param = (self.__method, args, kwargs)
        self.host.new_parallel(self.invoke, param)

    def invoke(self, func, args, kwargs):
        self.host.pthreads[current_thread()] = self.__actor.url
        with self.__lock:
            sleep(0.001)
            func(*args, **kwargs)
        del self.host.pthreads[current_thread()]

Thread Future

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
import uuid
from threading import Condition, Thread

from .channel import Channel
from ..exceptions import PyActorTimeoutError, FutureError
from ..util import TELL, FUTURE, TYPE, METHOD, PARAMS, CHANNEL, TO
from ..util import get_current, get_host, get_lock, RPC_ID, RESULT

PENDING = 'PENDING'
RUNNING = 'RUNNING'
FINISHED = 'FINISHED'


class Future(object):
    """
    Container for the result of an ask query sent asynchronously which
    could not be resolved yet.

    :param str. fid: future ID.
    """

    def __init__(self, fid, future_ref, manager_channel):
        self.__condition = Condition()
        self.__state = PENDING
        self.__result = None
        self.__exception = None
        self.__callbacks = []

        self.__method = future_ref[METHOD]
        self.__params = future_ref[PARAMS]
        self.__actor_channel = future_ref[CHANNEL]
        self.__target = future_ref[TO]
        self.__channel = manager_channel
        self.__id = fid

    def _invoke_callbacks(self):
        for callback in self.__callbacks:
            try:
                # msg = TellRequest(TELL, callback[0], [self], callback[2])
                msg = {TYPE: TELL, METHOD: callback[0], PARAMS: ([self], {}),
                       TO: callback[2]}
                callback[1].send(msg)
            except Exception as e:
                raise FutureError(
                    f"Exception calling callback for {self!r}: {e!r}")

    def running(self):
        """Return True if the future is currently executing."""
        with self.__condition:
            return self.__state == RUNNING

    def done(self):
        """Return True if the future finished executing."""
        with self.__condition:
            return self.__state == FINISHED

    def __get__result(self):
        if self.__exception is not None:
            raise self.__exception
        else:
            return self.__result

    def add_callback(self, method):
        """
        Attaches a method that will be called when the future finishes.

        :param method: A callable from an actor that will be called
            when the future completes. The only argument for that
            method must be the future itself from which you can get the
            result though `future.:meth:`result()``. If the future has
            already completed, then the callable will be called
            immediately.

        .. note:: This functionality only works when called from an actor,
            specifying a method from the same actor.
        """
        from_actor = get_current()
        if from_actor is not None:
            callback = (method, from_actor.channel, from_actor.url)
            with self.__condition:
                if self.__state is not FINISHED:
                    self.__callbacks.append(callback)
                    return
            # Invoke the callback directly
            # msg = TellRequest(TELL, method, [self], from_actor.url)
            msg = {TYPE: TELL, METHOD: method, PARAMS: ([self], {}),
                   TO: from_actor.url}
            from_actor.channel.send(msg)
        else:
            raise FutureError("add_callback only works when called " +
                              "from inside an actor")

    def result(self, timeout=None):
        """
        Returns the result of the call that the future represents.

        :param timeout: The number of seconds to wait for the result
            if the future has not been completed. None, the default,
            sets no limit.
        :returns: The result of the call that the future represents.
        :raises: TimeoutError: If the timeout is reached before the
            future ends execution.
        :raises: Exception: If the call raises the Exception.
        """
        with self.__condition:
            if self.__state == FINISHED:
                return self.__get__result()

            lock = get_lock()
            if lock is not None:
                lock.release()
            self.__condition.wait(timeout)
            if lock is not None:
                lock.acquire()

            if self.__state == FINISHED:
                return self.__get__result()
            else:
                raise PyActorTimeoutError(f"Future: {self.__method!r}")

    def exception(self, timeout=None):
        """
        Return a exception raised by the call that the future
        represents.
        :param timeout: The number of seconds to wait for the exception
            if the future has not been completed. None, the default,
            sets no limit.
        :returns: The exception raised by the call that the future
            represents or None if the call completed without raising.
        :raises: TimeoutError: If the timeout is reached before the
            future ends execution.
        """
        with self.__condition:
            if self.__state == FINISHED:
                return self.__exception

            lock = get_lock()
            if lock is not None:
                lock.release()
            self.__condition.wait(timeout)
            if lock is not None:
                lock.acquire()

            if self.__state == FINISHED:
                return self.__exception
            else:
                raise PyActorTimeoutError(f"Future: {self.__method!r}")

    def send_work(self):
        """
        Sends the query to the actor for it to start executing the work.

        It is possible to execute once again a future that has finished
        if necessary (overwriting the results), but only one execution
        at a time.
        """
        if self.__set_running():
            # msg = FutureRequest(FUTURE, self.__method, self.__params,
            #                     self.__channel, self.__target, self.__id)
            msg = {TYPE: FUTURE, METHOD: self.__method, PARAMS: self.__params,
                   CHANNEL: self.__channel, TO: self.__target,
                   RPC_ID: self.__id}
            self.__actor_channel.send(msg)
        else:
            raise FutureError("Future already running.")

    def __set_running(self):
        # """This is only called internally from send_work().
        # It marks the future as running or returns false if it
        # already was running."""
        with self.__condition:
            if self.__state in [PENDING, FINISHED]:
                self.__state = RUNNING
                return True
            elif self.__state == RUNNING:
                return False

    def set_result(self, result):
        """
        Sets the return value of work associated with the future.
        Only called internally.
        """
        with self.__condition:
            self.__result = result
            self.__state = FINISHED
            self.__condition.notify_all()
        self._invoke_callbacks()

    def set_exception(self, exception):
        """
        Sets the result of the future as being the given exception.
        Only called internally.
        """
        with self.__condition:
            self.__exception = exception
            self.__state = FINISHED
            self.__condition.notify_all()
        self._invoke_callbacks()


class FutureRef(Future):
    def result(self, timeout=None):
        """
        Returns the result of the call that the future represents.

        :param timeout: The number of seconds to wait for the result
            if the future has not been completed. None, the default,
            sets no limit.
        :returns: The result of the call that the future represents.
        :raises: TimeoutError: If the timeout is reached before the
            future ends execution.
        :raises: Exception: If the call raises the Exception.
        """
        result = super(FutureRef, self).result(timeout)
        return get_host().loads(result)


class FutureManager(object):
    """
    A manager that controls the creation and execution of the futures in a host.
    """

    def __init__(self):
        self.running = False
        self.channel = Channel()
        self.futures = {}
        self.t = None

    def __queue_management(self):
        self.running = True
        while self.running:
            response = self.channel.receive()
            if response == 'stop':
                self.running = False
            else:
                result = response[RESULT]
                future = self.futures[response[RPC_ID]]
                if isinstance(result, Exception):
                    future.set_exception(result)
                else:
                    future.set_result(result)

    def new_future(self, future_ref, ref=False):
        future_id = str(uuid.uuid4())
        if not ref:
            future = Future(future_id, future_ref, self.channel)
        else:
            future = FutureRef(future_id, future_ref, self.channel)
        future.send_work()
        self.futures[future_id] = future

        if not self.running:
            self.t = Thread(target=self.__queue_management)
            self.t.start()
        return future

    def stop(self):
        self.channel.send('stop')
        if self.t is not None:
            self.t.join()
            self.t = None
        self.futures = {}

Thread Dispatcher

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
import uuid
import pickle
import traceback
from urllib.parse import urlparse
from importlib import import_module

from .actor import Actor
from .channel import Channel
from ..util import FUTURE, ASK_RESPONSE, FUTURE_RESPONSE
from ..util import TYPE, METHOD, TELL, ASK, CHANNEL, FROM, TO, RPC_ID


class RPCDispatcher(Actor):
    """
    This is the actor that will manage remote sends and receives
    with other hosts. Each host has one, configured depending on
    the scheme specified when created.
    """

    def __init__(self, url, host, mode):
        self.server_model = import_module('pyactor.' + mode + 'server')
        self.url = url
        self.host = host
        aurl = urlparse(url)
        address = aurl.netloc.split(':')
        ip, port = address[0], address[1]
        self.source = self.server_model.Source((ip, int(port)))
        self.source.register_function(self.on_message)
        self.source.start()
        self.running = True
        self.channel = Channel()
        self.pending = {}  # Sent to another host
        self.executing = {}  # Waiting for the response in this server
        self.tell = ['stop']
        self.ask = []
        self.ask_ref = []
        self.tell_ref = []
        self.sinks = {}

    def get_sink(self, url):
        if url in self.sinks.keys():
            return self.sinks[url]
        else:
            self.sinks[url] = self.server_model.Sink(url)
            return self.sinks[url]

    def receive(self, msg):
        if msg[TYPE] == TELL and msg[METHOD] == 'stop':
            self.running = False
            self.source.stop()
        else:
            try:
                if msg[TYPE] == TELL:
                    self.get_sink(msg[TO]).send(msg)
                elif msg[TYPE] == ASK:
                    rpc_id = str(uuid.uuid4())
                    msg[RPC_ID] = rpc_id
                    self.pending[rpc_id] = msg[CHANNEL]
                    del msg[CHANNEL]
                    msg[FROM] = self.url
                    self.get_sink(msg[TO]).send(msg)
                elif msg[TYPE] == ASK_RESPONSE or msg[TYPE] == FUTURE_RESPONSE:
                    try:
                        if msg[RPC_ID] in self.executing.keys():
                            sink = self.get_sink(self.executing[msg[RPC_ID]])
                            sink.send(msg)
                            del self.executing[msg[RPC_ID]]
                    except TypeError as p:
                        print("Pickle ERR: impossible to marshall a return." +
                              " Returning a Proxy without the method in " +
                              f"_ref? {p}")
                    except Exception as e:
                        print(("Error sending a response to {!r}. "
                               .format(self.executing[msg[RPC_ID]])) + str(e))
                        del self.executing[msg[RPC_ID]]
                elif msg[TYPE] == FUTURE:
                    rpc_id = msg[RPC_ID]
                    self.pending[rpc_id] = msg[CHANNEL]
                    del msg[CHANNEL]
                    msg[FROM] = self.url
                    self.get_sink(msg[TO]).send(msg)
            except TypeError as p:
                print("Pickle ERROR: impossible to marshall a parameter." +
                      f"Passing a Proxy without the method in _ref? {p}")
            except Exception as e:
                print(e)

    def on_message(self, msg):
        try:
            msg = pickle.loads(msg.data)
            if msg[TYPE] == TELL:
                self.host.actors[msg[TO]].channel.send(msg)
            elif msg[TYPE] == ASK or msg[TYPE] == FUTURE:
                # Save rpc id and actor channel
                rpc_id = msg[RPC_ID]
                self.executing[rpc_id] = msg[FROM]
                # Change msg callback channel, add id
                msg[CHANNEL] = self.channel
                self.host.actors[msg[TO]].channel.send(msg)
            elif msg[TYPE] == ASK_RESPONSE or msg[TYPE] == FUTURE_RESPONSE:
                if msg[RPC_ID] in self.pending.keys():
                    self.pending[msg[RPC_ID]].send(msg)
                    del self.pending[msg[RPC_ID]]
        except KeyError as ke:
            print("ERROR: The actor", ke, "is offline.")
        except Exception as e:
            print(f"{self.url} :#: Connection ERROR: {e}")
            traceback.print_exc()

Green Thread Actor

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
from copy import copy

from gevent import spawn

from .channel import Channel
from .future import FutureManager
from ..util import ASK, TELL, FUTURE, TYPE, ASK_RESPONSE, FUTURE_RESPONSE
from ..util import METHOD, PARAMS, RESULT, CHANNEL, RPC_ID
from ..util import ref_l, ref_d


class ActorRef(object):
    """
    ActorRef contains the main components of an actor. These are the
    URL where it is located, the communication :class:`~.Channel` and
    the class of the actor as also the synchronous and asynchronous
    methods the class implements. When no channel is specified a new
    one will be created which is also the default procedure.

    .. note:: This is a superclass of :py:class:`Actor` and has no
        direct functionality.

    """

    def __init__(self, url, klass, channel=None):
        self.url = url
        self.tell = set()
        self.ask = set()
        self.klass = klass
        if channel:
            self.channel = channel
        else:
            self.channel = Channel()
        if hasattr(klass, '_tell') and klass._tell:
            self.tell = copy(klass._tell)
        if hasattr(klass, '_ask') and klass._ask:
            self.ask = copy(klass._ask)

        if hasattr(klass, '_ref'):
            self.receive = ref_l(self, self.receive)
            self.send_response = ref_d(self, self.send_response)

            self.tell_ref = self.tell & klass._ref
            self.ask_ref = self.ask & klass._ref
            for method in self.ask_ref:
                self.ask.remove(method)
            for method in self.tell_ref:
                self.tell.remove(method)
        else:
            self.ask_ref = set()
            self.tell_ref = set()

        self.tell.add('stop')

    def receive(self, msg):
        raise NotImplementedError()

    def send_response(self, result, msg):
        raise NotImplementedError()

    @property
    def _ref(self):
        return self.tell_ref | self.ask_ref

    def __str__(self):
        return f"Actor {self.url} ({self.klass.__name__})"

    def __repr__(self):
        return f"Actor(url={self.url}, class={self.klass})"


class Actor(ActorRef):
    """
    Actor is the instance of an object to which is possible to access
    and invoke its methods remotely. Main element of the model. The
    host is the one to create them (spawning -> see :meth:`~.spawn`).

    :param str. url: URL where the actor is running.
    :param class klass: class type for the actor.
    :param klass obj: instance of the *klass* class to attach to the
        actor.
    """

    def __init__(self, url, klass, obj):
        super(Actor, self).__init__(url, klass)
        self._obj = obj
        self.id = obj.id
        self.running = True
        self.thread = None
        self.future_manager = FutureManager()

    def __process_queue(self):
        while self.running:
            message = self.channel.receive()
            self.receive(message)

    def is_alive(self):
        """
        :return: (*bool.*) identifies the current state of the actor.
            **True** if it is running.
        """
        return self.running

    def receive(self, msg):
        """
        The message received from the queue specifies a method of the
        class the actor represents. This invokes it. If the
        communication is an ASK, sends the result back
        to the channel included in the message as an ASK_RESPONSE.

        If it is a FUTURE, generates a FUTURE_RESPONSE
        to send the result to the manager.

        :param msg: The message is a dictionary using the constants
            defined in util.py (:mod:`pyactor.util`).
        """
        if msg[TYPE] == TELL and msg[METHOD] == 'stop':
            self.running = False
            self.future_manager.stop()
        else:
            try:
                invoke = getattr(self._obj, msg[METHOD])
                params = msg[PARAMS]
                result = invoke(*params[0], **params[1])
            except Exception as e:
                if msg[TYPE] == TELL:
                    print(e)
                    return
                result = e
            self.send_response(result, msg)

    def send_response(self, result, msg):
        if msg[TYPE] == ASK:
            response = {TYPE: ASK_RESPONSE, RESULT: result,
                        RPC_ID: msg[RPC_ID] if RPC_ID in msg.keys() else None}
            msg[CHANNEL].send(response)
        elif msg[TYPE] == FUTURE:
            response = {TYPE: FUTURE_RESPONSE, RPC_ID: msg[RPC_ID],
                        RESULT: result}
            msg[CHANNEL].send(response)

    def run(self):
        """
        Creates the actor thread which will process the channel queue
        while the actor :meth:`is_alive`, making it able to receive
        queries.
        """
        self.thread = spawn(self.__process_queue)

Grenn Thread Intervals

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
from gevent.event import Event
from gevent import spawn
from gevent import sleep as gsleep


def sleep(seconds):
    """
    Facade for the sleep function. Do not use time.sleep if you are
    running green threads.

    :param  int time: time to sleep, in seconds. (Float for second
        divisions)
    """
    gsleep(seconds)


def later(timeout, f, *args, **kwargs):
    """
    Sets a timer that will call the *f* function past *timeout* seconds.

    See example in :ref:`sample_inter`

    :return: :class:`Greenlet` new 'thread' which will perform the call
        when specified.
    """
    def wrap(*args, **kwargs):
        sleep(timeout)
        return f(*args, **kwargs)

    return spawn(wrap, *args, **kwargs)


def interval_host(host, time, f, *args, **kwargs):
    """
    Creates an Event attached to the *host* that will execute the *f*
    function every *time* seconds.

    See example in :ref:`sample_inter`

    :param Proxy host: host proxy. Can be obtained from inside a
        class with ``self.host``.
    :param int time: seconds for the intervals.
    :param func f: function to be called every *time* seconds.
    :param list args: arguments for *f*.
    :return: :class:`Event` instance of the interval.
    """
    def wrap(*args, **kwargs):
        # thread = getcurrent()
        args = list(args)
        stop_event = args[0]
        del args[0]
        args = tuple(args)
        while not stop_event.is_set():
            f(*args, **kwargs)
            stop_event.wait(time)
        host.detach_interval(thread_id)

    t2_stop = Event()
    args = list(args)
    args.insert(0, t2_stop)
    args = tuple(args)
    t = spawn(wrap, *args, **kwargs)
    thread_id = t
    host.attach_interval(thread_id, t2_stop)
    return t2_stop

Green Thread Parallel

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import uuid

from gevent import getcurrent

from .actor import Actor
from ..util import get_host, METHOD, PARAMS, TYPE, TELL


class ActorParallel(Actor):
    """
    Actor with parallel methods. Parallel methods are invoked in new
    threads, so their invocation do not block the actor allowing it to
    process many queries at a time.
    Green threads do not have concurrence problems so no need to use
    Locks in this implementation.
    """

    def __init__(self, url, klass, obj):
        super(ActorParallel, self).__init__(url, klass, obj)
        self.pending = {}
        self.ask_parallel = (self.ask | self.ask_ref) & klass._parallel
        self.tell_parallel = (self.tell | self.tell_ref) & klass._parallel

        for method in self.ask_parallel:
            setattr(self._obj, method,
                    ParallelAskWrapper(getattr(self._obj, method), self))
        for method in self.tell_parallel:
            setattr(self._obj, method,
                    ParallelTellWrapper(getattr(self._obj, method), self))

    def receive(self, msg):
        """
        Overwriting :meth:`Actor.receive`. Adds the checks and
        features required by parallel methods.

        :param msg: The message is a dictionary using the constants
            defined in util.py (:mod:`pyactor.util`).
        """
        if msg[TYPE] == TELL and msg[METHOD] == 'stop':
            self.running = False
        else:
            try:
                invoke = getattr(self._obj, msg[METHOD])
                params = msg[PARAMS]

                if msg[METHOD] in self.ask_parallel:
                    rpc_id = str(uuid.uuid4())
                    # add rpc message to pendent AskResponse s
                    self.pending[rpc_id] = msg
                    # insert an rpc id to args
                    para = list(params[0])
                    para.insert(0, rpc_id)
                    invoke(*para, **params[1])
                    return
                else:
                    result = invoke(*params[0], **params[1])
            except Exception as e:
                result = e
                print(result)
            self.send_response(result, msg)

    def receive_from_ask(self, result, rpc_id):
        msg = self.pending[rpc_id]
        del self.pending[rpc_id]
        self.send_response(result, msg)

    # For compatibility. Green threads do not use Locks.
    def get_lock(self):
        return None


class ParallelAskWrapper(object):
    """Wrapper for ask methods that have to be called in a parallel way."""

    def __init__(self, method, actor):
        self.__method = method
        self.__actor = actor

    def __call__(self, *args, **kwargs):
        args = list(args)
        rpc_id = args[0]
        del args[0]
        args = tuple(args)
        self.host = get_host()
        param = (self.__method, rpc_id, args, kwargs)
        self.host.new_parallel(self.invoke, param)

    def invoke(self, func, rpc_id, args, kwargs):
        # put the process in the host list pthreads
        self.host.pthreads[getcurrent()] = self.__actor.url
        try:
            result = func(*args, **kwargs)
        except Exception as e:
            result = e
        self.__actor.receive_from_ask(result, rpc_id)
        # remove the process from pthreads
        del self.host.pthreads[getcurrent()]


class ParallelTellWrapper(object):
    """
    Wrapper for tell methods that have to be called in a parallel way.
    """

    def __init__(self, method, actor):
        self.__method = method
        self.__actor = actor

    def __call__(self, *args, **kwargs):
        self.host = get_host()
        param = (self.__method, args, kwargs)
        self.host.new_parallel(self.invoke, param)

    def invoke(self, func, args, kwargs):
        # put the process in the host list pthreads
        self.host.pthreads[getcurrent()] = self.__actor.url
        func(*args, **kwargs)
        # remove the process from pthreads
        del self.host.pthreads[getcurrent()]

Green Thread Future

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
import uuid

from gevent import spawn
from gevent.event import Event

from .channel import Channel
from ..exceptions import PyActorTimeoutError, FutureError
from ..util import TELL, FUTURE, TYPE, METHOD, PARAMS, CHANNEL, TO
from ..util import get_current, get_host, RPC_ID, RESULT

PENDING = 'PENDING'
RUNNING = 'RUNNING'
FINISHED = 'FINISHED'


class Future(object):
    """
    Container for the result of an ask query sent asynchronously which
    could not be resolved yet.

    :param str. fid: future ID.
    """

    def __init__(self, fid, future_ref, manager_channel):
        self.__condition = Event()
        self.__state = PENDING
        self.__result = None
        self.__exception = None
        self.__callbacks = []

        self.__method = future_ref[METHOD]
        self.__params = future_ref[PARAMS]
        self.__actor_channel = future_ref[CHANNEL]
        self.__target = future_ref[TO]
        self.__channel = manager_channel
        self.__id = fid

    def _invoke_callbacks(self):
        for callback in self.__callbacks:
            try:
                # msg = TellRequest(TELL, callback[0], [self], callback[2])
                msg = {TYPE: TELL, METHOD: callback[0], PARAMS: ([self], {}),
                       TO: callback[2]}
                callback[1].send(msg)
            except Exception as e:
                raise FutureError(
                    f"Exception calling callback for {self!r}: {e!r}")

    def running(self):
        """Return True if the future is currently executing."""
        # with self.__condition:
        return self.__state == RUNNING

    def done(self):
        """Return True if the future finished executing."""
        # with self.__condition:
        return self.__state == FINISHED

    def __get__result(self):
        if self.__exception is not None:
            raise self.__exception
        else:
            return self.__result

    def add_callback(self, method):
        """
        Attaches a method that will be called when the future finishes.

        :param method: A callable from an actor that will be called
            when the future completes. The only argument for that
            method must be the future itself from which you can get the
            result though `future.:meth:`result()``. If the future has
            already completed, then the callable will be called
            immediately.

        .. note:: This functionality only works when called from an actor,
            specifying a method from the same actor.
        """
        from_actor = get_current()
        if from_actor is not None:
            callback = (method, from_actor.channel, from_actor.url)
            # with self.__condition:
            if self.__state is not FINISHED:
                self.__callbacks.append(callback)
                return
            # Invoke the callback directly
            # msg = TellRequest(TELL, method, [self], from_actor.url)
            msg = {TYPE: TELL, METHOD: method, PARAMS: ([self], {}),
                   TO: from_actor.url}
            from_actor.channel.send(msg)
        else:
            raise FutureError("add_callback only works when called " +
                              "from inside an actor")

    def result(self, timeout=None):
        """
        Returns the result of the call that the future represents.

        :param timeout: The number of seconds to wait for the result
            if the future has not been completed. None, the default,
            sets no limit.
        :returns: The result of the call that the future represents.
        :raises: TimeoutError: If the timeout is reached before the
            future ends execution.
        :raises: Exception: If the call raises the Exception.
        """
        # with self.__condition:
        if self.__state == FINISHED:
            return self.__get__result()

        self.__condition.wait(timeout)

        if self.__state == FINISHED:
            return self.__get__result()
        else:
            raise PyActorTimeoutError(f"Future: {self.__method!r}")

    def exception(self, timeout=None):
        """
        Return a exception raised by the call that the future
        represents.
        :param timeout: The number of seconds to wait for the exception
            if the future has not been completed. None, the default,
            sets no limit.
        :returns: The exception raised by the call that the future
            represents or None if the call completed without raising.
        :raises: TimeoutError: If the timeout is reached before the
            future ends execution.
        """
        # with self.__condition:
        if self.__state == FINISHED:
            return self.__exception

        self.__condition.wait(timeout)

        if self.__state == FINISHED:
            return self.__exception
        else:
            raise PyActorTimeoutError(f"Future: {self.__method!r}")

    def send_work(self):
        """
        Sends the query to the actor for it to start executing the work.

        It is possible to execute once again a future that has finished
        if necessary (overwriting the results), but only one execution
        at a time.
        """
        if self.__set_running():
            # msg = FutureRequest(FUTURE, self.__method, self.__params,
            #                     self.__channel, self.__target, self.__id)
            msg = {TYPE: FUTURE, METHOD: self.__method, PARAMS: self.__params,
                   CHANNEL: self.__channel, TO: self.__target,
                   RPC_ID: self.__id}
            self.__actor_channel.send(msg)
        else:
            raise FutureError("Future already running.")

    def __set_running(self):
        # """This is only called internally from send_work().
        # It marks the future as running or returns false if it
        # already was running."""
        # with self.__condition:
        if self.__state in [PENDING, FINISHED]:
            self.__condition.clear()
            self.__state = RUNNING
            return True
        elif self.__state == RUNNING:
            return False

    def set_result(self, result):
        """
        Sets the return value of work associated with the future.
        Only called internally.
        """
        # with self.__condition:
        self.__result = result
        self.__state = FINISHED
        self.__condition.set()
        self._invoke_callbacks()

    def set_exception(self, exception):
        """
        Sets the result of the future as being the given exception.
        Only called internally.
        """
        # with self.__condition:
        self.__exception = exception
        self.__state = FINISHED
        self.__condition.set()
        self._invoke_callbacks()


class FutureRef(Future):
    def result(self, timeout=None):
        """
        Returns the result of the call that the future represents.

        :param timeout: The number of seconds to wait for the result
            if the future has not been completed. None, the default,
            sets no limit.
        :returns: The result of the call that the future represents.
        :raises: TimeoutError: If the timeout is reached before the
            future ends execution.
        :raises: Exception: If the call raises the Exception.
        """
        result = super(FutureRef, self).result(timeout)
        return get_host().loads(result)


class FutureManager(object):
    """
    A manager that controls the creation and execution of the futures in a host.
    """

    def __init__(self):
        self.running = False
        self.channel = Channel()
        self.futures = {}
        self.t = None

    def __queue_management(self):
        self.running = True
        while self.running:
            response = self.channel.receive()
            if response == 'stop':
                self.running = False
            else:
                result = response[RESULT]
                future = self.futures[response[RPC_ID]]
                if isinstance(result, Exception):
                    future.set_exception(result)
                else:
                    future.set_result(result)

    def new_future(self, future_ref, ref=False):
        future_id = str(uuid.uuid4())
        if not ref:
            future = Future(future_id, future_ref, self.channel)
        else:
            future = FutureRef(future_id, future_ref, self.channel)
        future.send_work()
        self.futures[future_id] = future

        if not self.running:
            self.t = spawn(self.__queue_management)
        return future

    def stop(self):
        self.channel.send('stop')
        if self.t is not None:
            self.t.join()
            self.t = None
        self.futures = {}

Green Thread Dispatcher

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
import uuid
import pickle
import traceback
from urllib.parse import urlparse
from importlib import import_module

from .actor import Actor
from .channel import Channel
from ..util import FUTURE, ASK_RESPONSE, FUTURE_RESPONSE
from ..util import TYPE, METHOD, TELL, ASK, CHANNEL, FROM, TO, RPC_ID


class RPCDispatcher(Actor):
    """
    This is the actor that will manage remote sends and receives
    with other hosts. Each host has one, configured depending on
    the scheme specified when created.
    """

    def __init__(self, url, host, mode):
        self.server_model = import_module('pyactor.' + mode + 'server')
        self.url = url
        self.host = host
        aurl = urlparse(url)
        address = aurl.netloc.split(':')
        ip, port = address[0], address[1]
        self.source = self.server_model.Source((ip, int(port)))
        self.source.register_function(self.on_message)
        self.source.start()
        self.running = True
        self.channel = Channel()
        self.pending = {}  # Sent to another host
        self.executing = {}  # Waiting for the response in this server
        self.tell = ['stop']
        self.ask = []
        self.ask_ref = []
        self.tell_ref = []
        self.sinks = {}

    def get_sink(self, url):
        if url in self.sinks.keys():
            return self.sinks[url]
        else:
            self.sinks[url] = self.server_model.Sink(url)
            return self.sinks[url]

    def receive(self, msg):
        if msg[TYPE] == TELL and msg[METHOD] == 'stop':
            self.running = False
            self.source.stop()
        else:
            try:
                if msg[TYPE] == TELL:
                    self.get_sink(msg[TO]).send(msg)
                elif msg[TYPE] == ASK:
                    rpc_id = str(uuid.uuid4())
                    msg[RPC_ID] = rpc_id
                    self.pending[rpc_id] = msg[CHANNEL]
                    del msg[CHANNEL]
                    msg[FROM] = self.url
                    self.get_sink(msg[TO]).send(msg)
                elif msg[TYPE] == ASK_RESPONSE or msg[TYPE] == FUTURE_RESPONSE:
                    try:
                        if msg[RPC_ID] in self.executing.keys():
                            sink = self.get_sink(self.executing[msg[RPC_ID]])
                            sink.send(msg)
                            del self.executing[msg[RPC_ID]]
                    except TypeError as p:
                        print("Pickle ERR: impossible to marshall a return." +
                              " Returning a Proxy without the method in " +
                              f"_ref? {p}")
                    except Exception as e:
                        print(("Error sending a response to {!r}. "
                               .format(self.executing[msg[RPC_ID]])) + str(e))
                        del self.executing[msg[RPC_ID]]
                elif msg[TYPE] == FUTURE:
                    rpc_id = msg[RPC_ID]
                    self.pending[rpc_id] = msg[CHANNEL]
                    del msg[CHANNEL]
                    msg[FROM] = self.url
                    self.get_sink(msg[TO]).send(msg)
            except TypeError as p:
                print("Pickle ERROR: impossible to marshall a parameter." +
                      f"Passing a Proxy without the method in _ref? {p}")
            except Exception as e:
                print(e)

    def on_message(self, msg):
        try:
            msg = pickle.loads(msg.data)
            if msg[TYPE] == TELL:
                self.host.actors[msg[TO]].channel.send(msg)
            elif msg[TYPE] == ASK or msg[TYPE] == FUTURE:
                # Save rpc id and actor channel
                rpc_id = msg[RPC_ID]
                self.executing[rpc_id] = msg[FROM]
                # Change msg callback channel, add id
                msg[CHANNEL] = self.channel
                self.host.actors[msg[TO]].channel.send(msg)
            elif msg[TYPE] == ASK_RESPONSE or msg[TYPE] == FUTURE_RESPONSE:
                if msg[RPC_ID] in self.pending.keys():
                    self.pending[msg[RPC_ID]].send(msg)
                    del self.pending[msg[RPC_ID]]
        except KeyError as ke:
            print("ERROR: The actor", ke, "is offline.")
        except Exception as e:
            print(f"{self.url} :#: Connection ERROR: {e}")
            traceback.print_exc()