Skip to content

Application¤

The asab.Application class maintains the global application state. You can provide your own implementation by creating a subclass. There should be only one Application object in the process.

Creating a new ASAB application:

To create a new ASAB application, just create a subclass of asab.Application object and use the run() method:

app.py
import asab

class MyApplication(asab.Application):
    pass

if __name__ == '__main__':
    app = MyApplication()
    app.run()

Then run the application from your terminal

python3 app.py

and you should see the following output:

NOTICE asab.application is ready.

The app will be running until you stop it by pressing Ctrl+C.

To create an application that performs some operations and then stops, use the stop() method.

app_that_terminates.py
import asab

class MyApplication(asab.Application):
    async def main(self):
        print("Hello world!")
        self.stop()

if __name__ == '__main__':
    app = MyApplication()
    app.run()

with the output:

NOTICE asab.application is ready.
Hello world!
NOTICE asab.application [sd exit_code="0"] is exiting ...

Application Lifecycle¤

Runtime of the Application object is driven by asyncio event loop which runs asynchronous tasks and callbacks, performs network IO operations, and runs subprocesses.

ASAB is designed around the inversion of control principle. It means that the ASAB is in control of the application lifecycle. The custom-written code receives the flow from ASAB via callbacks or handlers. Inversion of control is used to increase modularity of the code and make it extensible.

The application lifecycle is divided into 3 phases: init-time, run-time, and exit-time.

Init-time¤

The init-time happens during Application constructor call. At this time:

The asynchronous callback Application.initialize() is intended to be overridden by a user. This is where you typically load Modules and register Services, see Modules and Services section.

class MyApplication(asab.Application):
    async def initialize(self):
        # Custom initialization
        from module_sample import Module
        self.add_module(Module)

Run-time¤

The run-time starts after all the modules and services are loaded. This is where the application typically spends the most time. At this time:

  • Publish-Subscribe message Application.run! is published.
  • The asynchronous callback Application.main() is executed.

The coroutine Application.main() is intended to be overwritten by a user. If main() method is completed without calling stop(), then the application will run forever.

class MyApplication(asab.Application):
    async def main(self):
        print("Hello world!")
        self.stop()

Exit-time¤

The method Application.stop() gracefully terminates the run-time and commences the exit-time. This method is automatically called by SIGINT and SIGTERM. It also includes a response to Ctrl-C on UNIX-like systems. When this method is called exactly three times, it abruptly exits the application (aka emergency abort).

Note

You need to install the win32api module to use Ctrl-C or an emergency abort properly with ASAB on Windows. It is an optional dependency of ASAB.

The parameter exit_code allows you to specify the application exit code.

At exit-time:

  • Publish-Subscribe message Application.exit! is published.
  • Asynchronous callback Application.finalize() is executed.

Application.finalize() is intended to be overridden by an user. It can be used for storing backup data for the next start of the application, custom operations when terminating services, sending signals to other applications etc.

class MyApplication(asab.Application):
    async def finalize(self):
        # Custom finalization
        ...

Command-line parser¤

The method create_argument_parser() creates an argparse.ArgumentParser. This method can be overloaded to adjust command-line argument parser.

The application object calls this method during init-time to process command-line arguments. You can overload this method to provide your own implementation of a command-line argument parser.

The Description attribute is a text that will be displayed in a help text (--help). It is expected that your own value will be provided. The default value is "" (empty string).

Default ASAB arguments:

Argument Type Action
-c, --config str Specify a path to a configuration file
-d, --daemonize bool Run daemonized (in the background)
-k, --kill bool Kill a running daemon and quit
-l, --log-file str Specify a path to a log file
-s, --syslog bool Enable logging to a syslog
-v, --verbose bool Print more information (enable debug output)
-w, --web-api str Activate Asab web API (default listening port is 0.0.0.0:8080)
--startup-housekeeping Trigger housekeeping event immediately after application startup

UTC Time¤

The method time() returns the current "event loop time" in seconds since the epoch as a floating point number. The specific date of the epoch and the handling of leap seconds is platform dependent. On Windows and most Unix systems, the epoch is January 1, 1970, 00:00:00 (UTC) and leap seconds are not counted towards the time in seconds since the epoch. This is commonly referred to as Unix time.

A call of the time.time() function could be expensive. This method provides a cheaper version of the call that returns a current wall time in UTC.


Reference¤

asab.application.Application ¤

The base application object that maintains the global application state.

You can provide your own implementation by creating a subclass. It is intended to be a Singleton.

Examples:

class MyApplication(asab.Application):
        async def main(self):
                print("Hello world!")
                self.stop()

if __name__ == "__main__":
        app = MyApplication()
        app.run()

Source code in asab/application.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
class Application(metaclass=Singleton):
	"""The base application object that maintains the global application state.

	You can provide your own implementation by creating a subclass. It is intended to be a Singleton.

	Examples:
	```python
	class MyApplication(asab.Application):
		async def main(self):
			print("Hello world!")
			self.stop()

	if __name__ == "__main__":
		app = MyApplication()
		app.run()
	```
	"""

	Description = "This app is based on ASAB."

	def __init__(self, args: typing.Optional[list] = None, modules: list = []):
		"""
		Initialize the Application provided with arguments and modules.

		Args:
			args: sequence of arguments to be parsed by `Application.parse_arguments()` call.
			modules: list of ASAB modules to be added by `Application.add_module()` call.

		Examples:

		```python
		class MyApplication(asab.Application):
			def __init__(self):
				super().__init__(modules=[asab.web.Module, asab.zookeeper.Module])
		```
		"""

		self.ExitCode: typing.Union[int, str]
		"""
		The actual value of the exit code that can be set via `set_exit_code()` method.

		Examples:
			The example of the exit code handling in the `main()` function of the application:

			```python
			if __name__ == '__main__':
				app = asab.Application()
				exit_code = app.run()
				sys.exit(exit_code)
			```

			| Exit code | Meaning |
			| --- | --- |
			| 0 | success |
			| 1 | abnormal termination of a program perhaps as a result a minor problem in the code |
			| "!RESTART!" | hard restart of the whole application |

		"""

		try:
			# EX_OK code is not available on Windows
			self.ExitCode = os.EX_OK
		except AttributeError:
			self.ExitCode = 0

		# Queue of Services to be initialized
		self.InitServicesQueue = []
		# Queue of Modules to be initialized
		self.InitModulesQueue = []

		# Parse command line
		self.Args = self.parse_arguments(args=args)

		# Obtain HostName
		# The user can provide the actual hostname of the application in ASAB_HOSTNAME environment variable.
		# This can be used to specify the hostname that is discoverable by other services in a cluster, if a local hostname is not suitable.
		self.HostName = os.environ.get('ASAB_HOSTNAME', None)
		if self.HostName is None:
			self.HostName = platform.node()
		os.environ['HOSTNAME'] = self.HostName

		# Load configuration
		Config._load()

		if hasattr(self.Args, "daemonize") and self.Args.daemonize:
			self.daemonize()

		elif hasattr(self.Args, "kill") and self.Args.kill:
			self.daemon_kill()

		# Seed the random generator
		random.seed()

		# Obtain the event loop
		self.Loop = asyncio.get_event_loop()
		if self.Loop.is_closed():
			self.Loop = asyncio.new_event_loop()
			asyncio.set_event_loop(self.Loop)

		self.LaunchTime = time.time()
		self.BaseTime = self.LaunchTime - self.Loop.time()

		# Last time the on_tick/60! was registered
		self.LastOnTick60 = self.LaunchTime

		self.Modules: list[asab.Module] = []
		"""
		A list of modules that has been added to the application.
		"""

		self.Services: dict[str, asab.Service] = {}
		"""
		A dictionary of registered services.
		"""

		# Setup logging
		self.Logging = Logging(self)

		# Configure the event loop
		self.Loop.set_exception_handler(_loop_exception_handler)
		if Config["logging"].getboolean("verbose"):
			self.Loop.set_debug(True)

		# Adding a handler to listen to the interrupt event
		if platform.system() == "Windows":

			try:

				# Windows win32api import
				import win32api

				def handler(type):
					self.stop()
					return True

				win32api.SetConsoleCtrlHandler(handler, True)

			except ImportError as e:
				L.warning("win32api module could not be loaded, because '{}'".format(
					e
				))

		else:

			# POSIX and other reasonable systems
			self.Loop.add_signal_handler(signal.SIGINT, self.stop)
			self.Loop.add_signal_handler(signal.SIGTERM, self.stop)
			self.Loop.add_signal_handler(signal.SIGHUP, self._hup)

		self._stop_event = asyncio.Event()
		self._stop_event.clear()
		self._stop_counter = 0

		from .pubsub import PubSub
		self.PubSub = PubSub(self)

		L.info("Initializing ...")

		self.TaskService = TaskService(self)

		for module in modules:
			self.add_module(module)

		# Set housekeeping time and time limit
		self.HousekeepingTime, self.HousekeepingTimeLimit, self.HousekeepingId = self._initialize_housekeeping_schedule()
		self.HousekeepingMissedEvents: list = []
		# Every 10 minutes listen for housekeeping
		self.PubSub.subscribe("Application.tick/600!", self._on_housekeeping_tick)

		# Run the watchdog to detect lost interactivity on the event loop
		self.WatchdogThreshold = Config["general"].getseconds("watchdog_threshold")

		if self.WatchdogThreshold > 0:
			watchdog_thread = threading.Thread(target=self._watchdog)
			watchdog_thread.daemon = True  # Daemonize the thread to ensure it exits with the main program
			watchdog_thread.start()


	def _watchdog(self):
		"""
		Periodically checks the loop interactivity
		and if the configured threshold is passed,
		the application is killed by a signal.
		"""

		while True:
			current_time = time.time()

			# Check if the configured threshold passed
			if (current_time - self.LastOnTick60) < self.WatchdogThreshold:
				time.sleep(60)  # Sleep one minute (one cycle)
				continue

			# The loop lost its interactivity
			L.critical("The event loop lost its interactivity. Stopping the application!")
			os.kill(os.getpid(), signal.SIGKILL)  # Works only on Linux
			os._exit(5)  # Cannot use sys.exit inside the thread


	def create_argument_parser(
		self,
		prog=None,
		usage=None,
		description=None,
		epilog=None,
		prefix_chars='-',
		fromfile_prefix_chars=None,
		argument_default=None,
		conflict_handler='error',
		add_help=True
	) -> argparse.ArgumentParser:
		"""
		Create an `argparse.ArgumentParser` object supplemented by default ASAB arguments.

		This method can be overridden to adjust argparse configuration.
		Refer to the Python standard library to [`argparse.ArgumentParser`](https://docs.python.org/3/library/argparse.html?highlight=argumentparser#argumentparser-objects) for the details.
		"""

		parser = argparse.ArgumentParser(
			prog=prog,
			usage=usage,
			description=description if description is not None else self.Description,
			epilog=epilog,
			formatter_class=argparse.RawDescriptionHelpFormatter,
			prefix_chars=prefix_chars,
			fromfile_prefix_chars=fromfile_prefix_chars,
			argument_default=argument_default,
			conflict_handler=conflict_handler,
			add_help=add_help
		)
		parser.add_argument('-c', '--config', help='specify a path to a configuration file')
		parser.add_argument('-v', '--verbose', action='store_true', help='print more information (enable debug output)')
		parser.add_argument('-s', '--syslog', action='store_true', help='enable logging to a syslog')
		parser.add_argument('-l', '--log-file', help='specify a path to a log file')
		parser.add_argument('-w', '--web-api', help='activate Asab web API (default listening port is 0.0.0.0:8080)', const="0.0.0.0:8080", nargs="?")
		parser.add_argument('--startup-housekeeping', help='trigger housekeeping event immediately after application startup')
		parser.add_argument('--no-auth', action='store_true', help='disable all authentication and authorization')


		if daemon is not None:
			parser.add_argument('-d', '--daemonize', action='store_true', help='run daemonized (in the background)')
			parser.add_argument('-k', '--kill', action='store_true', help='kill a running daemon and quit')

		return parser


	def parse_arguments(self, args=None):
		"""
		Parse the command line arguments and set the default values for the configuration accordingly.

		Args:
			args: The arguments to parse. If not set, sys.argv[1:] will be used.

		Returns:
			The arguments that were parsed.
		"""

		parser = self.create_argument_parser()
		args = parser.parse_args(args=args)

		if args.config is not None:
			Config._default_values['general']['config_file'] = args.config

		if args.verbose:
			Config._default_values['logging']['verbose'] = True

		if args.syslog:
			Config._default_values['logging:syslog']['enabled'] = True

		if args.log_file:
			Config._default_values['logging:file']['path'] = args.log_file

		if args.web_api:
			if 'web' not in Config._default_values:
				Config._default_values['web'] = {}
			Config._default_values['web']['listen'] = args.web_api

		if args.no_auth:
			if 'auth' not in Config._default_values:
				Config._default_values['auth'] = {}
			Config._default_values['auth']['enabled'] = False

		if args.startup_housekeeping:
			Config._default_values['housekeeping']['run_at_startup'] = True

		return args


	def get_pidfile_path(self) -> typing.Optional[str]:
		"""
		Get the path for PID file from the configuration.

		PID file is a file that contains process id of the ASAB process.
		It is used for interaction with OS respective it's control of running services.

		- If the `pidfile` is set to the empty string, return None.
		- If `pidfile` is set to "!", return the default PID file path (in `/var/run/` folder).
		This is the default value.

		Example of PID path configuration:

		```ini
		[general]
		pidfile=/tmp/my.pid
		```

		Returns:
			The path to the `pidfile`.
		"""

		pidfilepath = Config['general']['pidfile']
		if pidfilepath == "":
			return None
		elif pidfilepath == "!":
			return os.path.join('/var/run', os.path.basename(sys.argv[0]) + '.pid')
		else:
			return pidfilepath


	def daemonize(self):
		if daemon is None:
			print("Install 'python-daemon' module to support daemonizing.", file=sys.stderr)
			sys.exit(1)

		pidfilepath = self.get_pidfile_path()
		if pidfilepath is not None:
			pidfile = daemon.pidfile.TimeoutPIDLockFile(pidfilepath)

		working_dir = Config['general']['working_dir']

		uid = Config['general']['uid']
		if uid == "":
			uid = None

		gid = Config['general']['gid']
		if gid == "":
			gid = None

		signal_map = {
			signal.SIGTTIN: None,
			signal.SIGTTOU: None,
			signal.SIGTSTP: None,
		}

		self.DaemonContext = daemon.DaemonContext(
			working_directory=working_dir,
			signal_map=signal_map,
			pidfile=pidfile,
			uid=uid,
			gid=gid,
		)

		try:
			self.DaemonContext.open()
		except lockfile.AlreadyLocked as e:
			print("Cannot create a PID file '{}':".format(pidfilepath), e, file=sys.stderr)
			sys.exit(1)


	def daemon_kill(self):
		if daemon is None:
			print("Install 'python-daemon' module to support daemonising.", file=sys.stderr)
			sys.exit(1)

		pidfilepath = self.get_pidfile_path()
		if pidfilepath is None:
			sys.exit(0)

		try:
			pid = open(pidfilepath, "r").read()
		except FileNotFoundError:
			print("Pid file '{}' not found.".format(pidfilepath), file=sys.stderr)
			sys.exit(0)

		pid = int(pid)

		for sno in [signal.SIGINT, signal.SIGINT, signal.SIGINT, signal.SIGINT, signal.SIGTERM]:
			try:
				os.kill(pid, sno)
			except ProcessLookupError:
				print("Process with pid '{}' not found.".format(pid), file=sys.stderr)
				sys.exit(0)
			for i in range(10):
				if not os.path.exists(pidfilepath):
					sys.exit(0)
				time.sleep(0.1)
			print("Daemon process (pid: {}) still running ...".format(pid), file=sys.stderr)

		print("Pid file '{}' not found.".format(pidfilepath), file=sys.stderr)
		sys.exit(1)



	def run(self):
		"""Run the application.

		Returns:
			(int): Exit code of the finalized process.
		"""

		# Commence init-time
		self.PubSub.publish("Application.init!")
		self.Loop.run_until_complete(asyncio.gather(
			self._init_time_governor(),
			self.initialize(),

		))

		try:
			# Commence run-time and application main() function
			L.log(LOG_NOTICE, "is ready.")
			self._stop_event.clear()
			self.Loop.run_until_complete(asyncio.gather(
				self._run_time_governor(),
				self.main(),
			))

			# Comence exit-time
			if self.ExitCode == "!RESTART!":
				L.log(LOG_NOTICE, "is restarting ...")
			else:
				L.log(LOG_NOTICE, "is exiting ...", struct_data={'exit_code': self.ExitCode})
			self.Loop.run_until_complete(asyncio.gather(
				self.finalize(),
				self._exit_time_governor(),
			))

			# Python 3.5 lacks support for shutdown_asyncgens()
			if hasattr(self.Loop, "shutdown_asyncgens"):
				self.Loop.run_until_complete(self.Loop.shutdown_asyncgens())
			self.Loop.close()

		finally:
			if self.ExitCode == "!RESTART!":
				os.execv(sys.executable, [os.path.basename(sys.executable)] + sys.argv)

		return self.ExitCode


	def stop(self, exit_code: typing.Optional[int] = None) -> None:
		"""
		Gracefully terminate the _run-time_ and commence the _exit-time_.

		This method is automatically called by `SIGINT` and `SIGTERM`.
		It also includes a response to `Ctrl-C` on UNIX-like system.
		When this method is called 3x, it abruptly exits the application (aka emergency abort).

		Args:
			exit_code (int, optional): Exit code of the finalized process.
		"""
		if exit_code is not None:
			self.set_exit_code(exit_code)

		self._stop_event.set()
		self._stop_counter += 1
		self.PubSub.publish("Application.stop!", self._stop_counter)

		if self._stop_counter >= 3:
			L.fatal("Emergency exit")
			for task in asyncio.all_tasks():
				L.warning("Pending task during emergency exit: {}".format(task))
			try:
				# EX_SOFTWARE code is not available on Windows
				return os._exit(os.EX_SOFTWARE)
			except AttributeError:
				return os._exit(0)

		elif self._stop_counter > 1:
			L.warning("{} tasks still active".format(len(asyncio.all_tasks())))


	def _do_restart(self, event_name):
		self.stop("!RESTART!")

	def restart(self):
		"""
		Schedule a hard restart of the whole application.

		This function works by using `os.execv()`, which replaces the current process with a new one (without creating a new process ID).
		Arguments and environment variables will be retained.

		!!! warning
			Please note that this will work on Unix-based systems only, as it uses a feature specific to Unix.

		!!! hint
			Be careful while using this function, make sure you have some control
			over when and how this function is being called to avoid any unexpected process restarts.
			It is not common to use these types of function calls in Python applications.
		"""
		self.PubSub.subscribe("Application.tick/10!", self._do_restart)


	def _hup(self):
		self.Logging.rotate()
		self.PubSub.publish("Application.hup!")


	# Modules

	def add_module(self, module_class: asab.Module) -> None:
		"""
		Load a new module.
		"""

		for module in self.Modules:
			if isinstance(module, module_class):
				# Already loaded and registered
				return

		module = module_class(self)
		self.Modules.append(module)

		# Enqueue module for initialization (happens in run phase)
		self.InitModulesQueue.append(module)


	# Services

	def get_service(self, service_name: str) -> typing.Optional[asab.Service]:
		"""
		Get a new service by its name.

		Args:
			service_name: Name of the service to retrieve.

		Returns:
			The service object associated with the provided service_name,
			or None if the service is not registered.
		"""
		return self.Services.get(service_name)


	def _register_service(self, service: asab.Service):
		"""
		Register a new service using its name.
		"""

		if service.Name in self.Services:
			L.error("Service '{}' already registered (existing:{} new:{})".format(
				service.Name, self.Services[service.Name], service))
			raise RuntimeError("Service {} already registered".format(service.Name))

		self.Services[service.Name] = service

		# Enqueue service for initialization (happens in run phase)
		self.InitServicesQueue.append(service)


	# Lifecycle callback

	async def initialize(self):
		"""
		This method is called during the application *init-time*. It is intended to be overridden by the user.
		"""
		pass

	async def main(self):
		"""
		This method is called during the application *run-time*. It is intended to be overridden by the user.
		"""
		pass

	async def finalize(self):
		"""
		This method is called during the application *exit-time*. It is intended to be overridden by the user.
		"""
		pass


	# Governors

	async def _init_time_governor(self):
		"""
		Initialize all services that has been created during application construction
		"""
		await self._ensure_initialization()


	async def _run_time_governor(self):
		timeout = Config.getint('general', 'tick_period')
		self.PubSub.publish("Application.run!")

		if Config.getboolean("housekeeping", "run_at_startup", fallback=False):
			L.log(asab.LOG_NOTICE, "Startup housekeeping...")
			self.PubSub.publish("Application.housekeeping!")

		# Wait for stop event & tick in meanwhile
		for cycle_no in itertools.count(1):

			await self._ensure_initialization()

			try:
				await asyncio.wait_for(self._stop_event.wait(), timeout=timeout)
				break
			except asyncio.TimeoutError:
				self.PubSub.publish("Application.tick!")
				if (cycle_no % 10) == 0:
					self.PubSub.publish("Application.tick/10!")
				if (cycle_no % 60) == 0:
					# Rebase a Loop time
					current_time = time.time()
					self.LastOnTick60 = current_time
					self.BaseTime = current_time - self.Loop.time()
					self.PubSub.publish("Application.tick/60!")
				if (cycle_no % 300) == 0:
					self.PubSub.publish("Application.tick/300!")
				if (cycle_no % 600) == 0:
					self.PubSub.publish("Application.tick/600!")
				if (cycle_no % 1800) == 0:
					self.PubSub.publish("Application.tick/1800!")
				if (cycle_no % 3600) == 0:
					self.PubSub.publish("Application.tick/3600!")
				if (cycle_no % 43200) == 0:
					self.PubSub.publish("Application.tick/43200!")
				if (cycle_no % 86400) == 0:
					self.PubSub.publish("Application.tick/86400!")
				continue


	async def _exit_time_governor(self):
		self.PubSub.publish("Application.exit!")

		# Finalize services
		futures = set()
		for service in self.Services.values():
			futures.add(
				asyncio.ensure_future(service.finalize(self))
			)

		while len(futures) > 0:
			done, futures = await asyncio.wait(futures, return_when=asyncio.FIRST_EXCEPTION)
			for fut in done:
				try:
					fut.result()
				except Exception:
					L.exception("Error during finalize call")


		# Finalize modules
		futures = set()
		for module in self.Modules:
			futures.add(
				asyncio.ensure_future(module.finalize(self))
			)

		while len(futures) > 0:
			done, futures = await asyncio.wait(futures, return_when=asyncio.FIRST_EXCEPTION)
			for fut in done:
				try:
					fut.result()
				except Exception:
					L.exception("Error during finalize call")


		# Wait for non-finalized tasks
		tasks_awaiting = 0
		for i in range(3):
			try:
				ts = asyncio.all_tasks(self.Loop)
			except AttributeError:
				# Compatibility for Python 3.6-
				ts = asyncio.Task.all_tasks(self.Loop)
			tasks_awaiting = 0
			for t in ts:
				if t.done():
					continue
				tasks_awaiting += 1
			if tasks_awaiting <= 1:
				# 2 is for _exit_time_governor and wait()
				break

			await asyncio.sleep(1)

		else:
			L.warning("Exiting but {} async task(s) are still waiting".format(tasks_awaiting))


	async def _ensure_initialization(self):
		'''
		This method ensures that any newly add module or registered service is initialized.
		It is called from:
		(1) init-time for modules&services added during application construction.
		(2) run-time for modules&services added during aplication lifecycle.
		'''

		# Initialize modules
		while len(self.InitModulesQueue) > 0:
			module = self.InitModulesQueue.pop()
			try:
				await module.initialize(self)
			except Exception:
				L.exception("Error during module initialization")

		# Initialize services
		while len(self.InitServicesQueue) > 0:
			service = self.InitServicesQueue.pop()
			try:
				await service.initialize(self)
			except Exception:
				L.exception("Error during service initialization")


	def set_exit_code(self, exit_code: typing.Union[int, str], force: bool = False):
		"""
		Set the exit code of the application.

		If `force` is `False`, the exit code will be set only if the previous value is lower than the new one.
		If `force` is `True`, the exit code value is set to `exit_code` value disregarding the previous value.

		Args:
			exit_code (str | int): The exit code value.
			force: Force the exit code reassignment.
		"""
		if self.ExitCode == "!RESTART!":
			return

		if exit_code == "!RESTART!":
			self.ExitCode = exit_code

		elif (self.ExitCode < exit_code) or force:
			L.debug("Exit code set to {}".format(exit_code))
			self.ExitCode = exit_code


	# Time

	def time(self) -> float:
		"""
		Return UTC UNIX timestamp using a loop time (a fast way how to get a wall clock time).

		Returns:
			Current UTC UNIX timestamp.
		"""
		return self.BaseTime + self.Loop.time()


	# Housekeeping

	def _initialize_housekeeping_schedule(self):
		"""
		Set the next housekeeping time and time limit from configuration.

		Returns:
			(next_housekeeping_time, next_time_limit, next_housekeeping_id)
		"""
		config_house_time = datetime.datetime.strptime(Config['housekeeping']['at'], "%H:%M")  # default: 03:00
		config_time_limit = datetime.datetime.strptime(Config['housekeeping']['limit'], "%H:%M")  # default: 05:00

		now = datetime.datetime.now(datetime.timezone.utc)

		next_housekeeping_time = now.replace(
			hour=config_house_time.hour,
			minute=config_house_time.minute,
			second=0,
			microsecond=0)

		# if the app started after the housekeeping time, set it to the next day
		if now > next_housekeeping_time:
			next_housekeeping_time += datetime.timedelta(days=1)

		# compute the time limit for the housekeeping
		time_delta_limit = config_time_limit - config_house_time
		if time_delta_limit < datetime.timedelta(hours=0):
			time_delta_limit += datetime.timedelta(days=1)

		next_time_limit = next_housekeeping_time + time_delta_limit

		# Each time has its id that prevents from accidental executing housekeeping twice.
		next_housekeeping_id = _housekeeping_id(now)


		return (next_housekeeping_time, next_time_limit, next_housekeeping_id)

	def _on_housekeeping_tick(self, message_type):
		"""
		Check if it's time for publishing the 'Application.housekeeping!' message.
		If so, publish the message and set housekeeping time, the time limit and time id for the next day.
		"""
		now = datetime.datetime.now(datetime.timezone.utc)
		today_id = _housekeeping_id(now)

		if self.HousekeepingTime < now:
			if now < self.HousekeepingTimeLimit and self.HousekeepingId <= today_id:
				L.log(asab.LOG_NOTICE, "Housekeeping started.")
				self.PubSub.publish("Application.housekeeping!")
			else:
				L.error(
					"Housekeeping has not been executed: It is past the time limit.",
					struct_data={
						"housekeeping_time": self.HousekeepingTime.strftime("%Y-%m-%d %H:%M:%S"),
						"time_limit": self.HousekeepingTimeLimit.strftime("%Y-%m-%d %H:%M:%S"),
						"housekeeping_id": self.HousekeepingId,
					}
				)
				self.HousekeepingMissedEvents.append(today_id)

			self.HousekeepingTime += datetime.timedelta(days=1)
			self.HousekeepingTimeLimit += datetime.timedelta(days=1)
			self.HousekeepingId = _housekeeping_id(self.HousekeepingTime)

			if len(self.HousekeepingMissedEvents) > 0:
				L.error(
					"One or more Housekeeping events have not been executed.",
					struct_data={
						"missed_housekeeping_events": self.HousekeepingMissedEvents
					})

ExitCode = os.EX_OK instance-attribute ¤

The actual value of the exit code that can be set via set_exit_code() method.

Examples:

The example of the exit code handling in the main() function of the application:

if __name__ == '__main__':
        app = asab.Application()
        exit_code = app.run()
        sys.exit(exit_code)
Exit code Meaning
0 success
1 abnormal termination of a program perhaps as a result a minor problem in the code
"!RESTART!" hard restart of the whole application

Modules = [] instance-attribute ¤

A list of modules that has been added to the application.

Services = {} instance-attribute ¤

A dictionary of registered services.

__init__(args=None, modules=[]) ¤

Initialize the Application provided with arguments and modules.

Parameters:

Name Type Description Default
args Optional[list]

sequence of arguments to be parsed by Application.parse_arguments() call.

None
modules list

list of ASAB modules to be added by Application.add_module() call.

[]

Examples:

class MyApplication(asab.Application):
        def __init__(self):
                super().__init__(modules=[asab.web.Module, asab.zookeeper.Module])
Source code in asab/application.py
def __init__(self, args: typing.Optional[list] = None, modules: list = []):
	"""
	Initialize the Application provided with arguments and modules.

	Args:
		args: sequence of arguments to be parsed by `Application.parse_arguments()` call.
		modules: list of ASAB modules to be added by `Application.add_module()` call.

	Examples:

	```python
	class MyApplication(asab.Application):
		def __init__(self):
			super().__init__(modules=[asab.web.Module, asab.zookeeper.Module])
	```
	"""

	self.ExitCode: typing.Union[int, str]
	"""
	The actual value of the exit code that can be set via `set_exit_code()` method.

	Examples:
		The example of the exit code handling in the `main()` function of the application:

		```python
		if __name__ == '__main__':
			app = asab.Application()
			exit_code = app.run()
			sys.exit(exit_code)
		```

		| Exit code | Meaning |
		| --- | --- |
		| 0 | success |
		| 1 | abnormal termination of a program perhaps as a result a minor problem in the code |
		| "!RESTART!" | hard restart of the whole application |

	"""

	try:
		# EX_OK code is not available on Windows
		self.ExitCode = os.EX_OK
	except AttributeError:
		self.ExitCode = 0

	# Queue of Services to be initialized
	self.InitServicesQueue = []
	# Queue of Modules to be initialized
	self.InitModulesQueue = []

	# Parse command line
	self.Args = self.parse_arguments(args=args)

	# Obtain HostName
	# The user can provide the actual hostname of the application in ASAB_HOSTNAME environment variable.
	# This can be used to specify the hostname that is discoverable by other services in a cluster, if a local hostname is not suitable.
	self.HostName = os.environ.get('ASAB_HOSTNAME', None)
	if self.HostName is None:
		self.HostName = platform.node()
	os.environ['HOSTNAME'] = self.HostName

	# Load configuration
	Config._load()

	if hasattr(self.Args, "daemonize") and self.Args.daemonize:
		self.daemonize()

	elif hasattr(self.Args, "kill") and self.Args.kill:
		self.daemon_kill()

	# Seed the random generator
	random.seed()

	# Obtain the event loop
	self.Loop = asyncio.get_event_loop()
	if self.Loop.is_closed():
		self.Loop = asyncio.new_event_loop()
		asyncio.set_event_loop(self.Loop)

	self.LaunchTime = time.time()
	self.BaseTime = self.LaunchTime - self.Loop.time()

	# Last time the on_tick/60! was registered
	self.LastOnTick60 = self.LaunchTime

	self.Modules: list[asab.Module] = []
	"""
	A list of modules that has been added to the application.
	"""

	self.Services: dict[str, asab.Service] = {}
	"""
	A dictionary of registered services.
	"""

	# Setup logging
	self.Logging = Logging(self)

	# Configure the event loop
	self.Loop.set_exception_handler(_loop_exception_handler)
	if Config["logging"].getboolean("verbose"):
		self.Loop.set_debug(True)

	# Adding a handler to listen to the interrupt event
	if platform.system() == "Windows":

		try:

			# Windows win32api import
			import win32api

			def handler(type):
				self.stop()
				return True

			win32api.SetConsoleCtrlHandler(handler, True)

		except ImportError as e:
			L.warning("win32api module could not be loaded, because '{}'".format(
				e
			))

	else:

		# POSIX and other reasonable systems
		self.Loop.add_signal_handler(signal.SIGINT, self.stop)
		self.Loop.add_signal_handler(signal.SIGTERM, self.stop)
		self.Loop.add_signal_handler(signal.SIGHUP, self._hup)

	self._stop_event = asyncio.Event()
	self._stop_event.clear()
	self._stop_counter = 0

	from .pubsub import PubSub
	self.PubSub = PubSub(self)

	L.info("Initializing ...")

	self.TaskService = TaskService(self)

	for module in modules:
		self.add_module(module)

	# Set housekeeping time and time limit
	self.HousekeepingTime, self.HousekeepingTimeLimit, self.HousekeepingId = self._initialize_housekeeping_schedule()
	self.HousekeepingMissedEvents: list = []
	# Every 10 minutes listen for housekeeping
	self.PubSub.subscribe("Application.tick/600!", self._on_housekeeping_tick)

	# Run the watchdog to detect lost interactivity on the event loop
	self.WatchdogThreshold = Config["general"].getseconds("watchdog_threshold")

	if self.WatchdogThreshold > 0:
		watchdog_thread = threading.Thread(target=self._watchdog)
		watchdog_thread.daemon = True  # Daemonize the thread to ensure it exits with the main program
		watchdog_thread.start()

add_module(module_class) ¤

Load a new module.

Source code in asab/application.py
def add_module(self, module_class: asab.Module) -> None:
	"""
	Load a new module.
	"""

	for module in self.Modules:
		if isinstance(module, module_class):
			# Already loaded and registered
			return

	module = module_class(self)
	self.Modules.append(module)

	# Enqueue module for initialization (happens in run phase)
	self.InitModulesQueue.append(module)

create_argument_parser(prog=None, usage=None, description=None, epilog=None, prefix_chars='-', fromfile_prefix_chars=None, argument_default=None, conflict_handler='error', add_help=True) ¤

Create an argparse.ArgumentParser object supplemented by default ASAB arguments.

This method can be overridden to adjust argparse configuration. Refer to the Python standard library to argparse.ArgumentParser for the details.

Source code in asab/application.py
def create_argument_parser(
	self,
	prog=None,
	usage=None,
	description=None,
	epilog=None,
	prefix_chars='-',
	fromfile_prefix_chars=None,
	argument_default=None,
	conflict_handler='error',
	add_help=True
) -> argparse.ArgumentParser:
	"""
	Create an `argparse.ArgumentParser` object supplemented by default ASAB arguments.

	This method can be overridden to adjust argparse configuration.
	Refer to the Python standard library to [`argparse.ArgumentParser`](https://docs.python.org/3/library/argparse.html?highlight=argumentparser#argumentparser-objects) for the details.
	"""

	parser = argparse.ArgumentParser(
		prog=prog,
		usage=usage,
		description=description if description is not None else self.Description,
		epilog=epilog,
		formatter_class=argparse.RawDescriptionHelpFormatter,
		prefix_chars=prefix_chars,
		fromfile_prefix_chars=fromfile_prefix_chars,
		argument_default=argument_default,
		conflict_handler=conflict_handler,
		add_help=add_help
	)
	parser.add_argument('-c', '--config', help='specify a path to a configuration file')
	parser.add_argument('-v', '--verbose', action='store_true', help='print more information (enable debug output)')
	parser.add_argument('-s', '--syslog', action='store_true', help='enable logging to a syslog')
	parser.add_argument('-l', '--log-file', help='specify a path to a log file')
	parser.add_argument('-w', '--web-api', help='activate Asab web API (default listening port is 0.0.0.0:8080)', const="0.0.0.0:8080", nargs="?")
	parser.add_argument('--startup-housekeeping', help='trigger housekeeping event immediately after application startup')
	parser.add_argument('--no-auth', action='store_true', help='disable all authentication and authorization')


	if daemon is not None:
		parser.add_argument('-d', '--daemonize', action='store_true', help='run daemonized (in the background)')
		parser.add_argument('-k', '--kill', action='store_true', help='kill a running daemon and quit')

	return parser

finalize() async ¤

This method is called during the application exit-time. It is intended to be overridden by the user.

Source code in asab/application.py
async def finalize(self):
	"""
	This method is called during the application *exit-time*. It is intended to be overridden by the user.
	"""
	pass

get_pidfile_path() ¤

Get the path for PID file from the configuration.

PID file is a file that contains process id of the ASAB process. It is used for interaction with OS respective it's control of running services.

  • If the pidfile is set to the empty string, return None.
  • If pidfile is set to "!", return the default PID file path (in /var/run/ folder). This is the default value.

Example of PID path configuration:

[general]
pidfile=/tmp/my.pid

Returns:

Type Description
Optional[str]

The path to the pidfile.

Source code in asab/application.py
def get_pidfile_path(self) -> typing.Optional[str]:
	"""
	Get the path for PID file from the configuration.

	PID file is a file that contains process id of the ASAB process.
	It is used for interaction with OS respective it's control of running services.

	- If the `pidfile` is set to the empty string, return None.
	- If `pidfile` is set to "!", return the default PID file path (in `/var/run/` folder).
	This is the default value.

	Example of PID path configuration:

	```ini
	[general]
	pidfile=/tmp/my.pid
	```

	Returns:
		The path to the `pidfile`.
	"""

	pidfilepath = Config['general']['pidfile']
	if pidfilepath == "":
		return None
	elif pidfilepath == "!":
		return os.path.join('/var/run', os.path.basename(sys.argv[0]) + '.pid')
	else:
		return pidfilepath

get_service(service_name) ¤

Get a new service by its name.

Parameters:

Name Type Description Default
service_name str

Name of the service to retrieve.

required

Returns:

Type Description
Optional[Service]

The service object associated with the provided service_name,

Optional[Service]

or None if the service is not registered.

Source code in asab/application.py
def get_service(self, service_name: str) -> typing.Optional[asab.Service]:
	"""
	Get a new service by its name.

	Args:
		service_name: Name of the service to retrieve.

	Returns:
		The service object associated with the provided service_name,
		or None if the service is not registered.
	"""
	return self.Services.get(service_name)

initialize() async ¤

This method is called during the application init-time. It is intended to be overridden by the user.

Source code in asab/application.py
async def initialize(self):
	"""
	This method is called during the application *init-time*. It is intended to be overridden by the user.
	"""
	pass

main() async ¤

This method is called during the application run-time. It is intended to be overridden by the user.

Source code in asab/application.py
async def main(self):
	"""
	This method is called during the application *run-time*. It is intended to be overridden by the user.
	"""
	pass

parse_arguments(args=None) ¤

Parse the command line arguments and set the default values for the configuration accordingly.

Parameters:

Name Type Description Default
args

The arguments to parse. If not set, sys.argv[1:] will be used.

None

Returns:

Type Description

The arguments that were parsed.

Source code in asab/application.py
def parse_arguments(self, args=None):
	"""
	Parse the command line arguments and set the default values for the configuration accordingly.

	Args:
		args: The arguments to parse. If not set, sys.argv[1:] will be used.

	Returns:
		The arguments that were parsed.
	"""

	parser = self.create_argument_parser()
	args = parser.parse_args(args=args)

	if args.config is not None:
		Config._default_values['general']['config_file'] = args.config

	if args.verbose:
		Config._default_values['logging']['verbose'] = True

	if args.syslog:
		Config._default_values['logging:syslog']['enabled'] = True

	if args.log_file:
		Config._default_values['logging:file']['path'] = args.log_file

	if args.web_api:
		if 'web' not in Config._default_values:
			Config._default_values['web'] = {}
		Config._default_values['web']['listen'] = args.web_api

	if args.no_auth:
		if 'auth' not in Config._default_values:
			Config._default_values['auth'] = {}
		Config._default_values['auth']['enabled'] = False

	if args.startup_housekeeping:
		Config._default_values['housekeeping']['run_at_startup'] = True

	return args

restart() ¤

Schedule a hard restart of the whole application.

This function works by using os.execv(), which replaces the current process with a new one (without creating a new process ID). Arguments and environment variables will be retained.

Warning

Please note that this will work on Unix-based systems only, as it uses a feature specific to Unix.

Hint

Be careful while using this function, make sure you have some control
over when and how this function is being called to avoid any unexpected process restarts.
It is not common to use these types of function calls in Python applications.
Source code in asab/application.py
def restart(self):
	"""
	Schedule a hard restart of the whole application.

	This function works by using `os.execv()`, which replaces the current process with a new one (without creating a new process ID).
	Arguments and environment variables will be retained.

	!!! warning
		Please note that this will work on Unix-based systems only, as it uses a feature specific to Unix.

	!!! hint
		Be careful while using this function, make sure you have some control
		over when and how this function is being called to avoid any unexpected process restarts.
		It is not common to use these types of function calls in Python applications.
	"""
	self.PubSub.subscribe("Application.tick/10!", self._do_restart)

run() ¤

Run the application.

Returns:

Type Description
int

Exit code of the finalized process.

Source code in asab/application.py
def run(self):
	"""Run the application.

	Returns:
		(int): Exit code of the finalized process.
	"""

	# Commence init-time
	self.PubSub.publish("Application.init!")
	self.Loop.run_until_complete(asyncio.gather(
		self._init_time_governor(),
		self.initialize(),

	))

	try:
		# Commence run-time and application main() function
		L.log(LOG_NOTICE, "is ready.")
		self._stop_event.clear()
		self.Loop.run_until_complete(asyncio.gather(
			self._run_time_governor(),
			self.main(),
		))

		# Comence exit-time
		if self.ExitCode == "!RESTART!":
			L.log(LOG_NOTICE, "is restarting ...")
		else:
			L.log(LOG_NOTICE, "is exiting ...", struct_data={'exit_code': self.ExitCode})
		self.Loop.run_until_complete(asyncio.gather(
			self.finalize(),
			self._exit_time_governor(),
		))

		# Python 3.5 lacks support for shutdown_asyncgens()
		if hasattr(self.Loop, "shutdown_asyncgens"):
			self.Loop.run_until_complete(self.Loop.shutdown_asyncgens())
		self.Loop.close()

	finally:
		if self.ExitCode == "!RESTART!":
			os.execv(sys.executable, [os.path.basename(sys.executable)] + sys.argv)

	return self.ExitCode

set_exit_code(exit_code, force=False) ¤

Set the exit code of the application.

If force is False, the exit code will be set only if the previous value is lower than the new one. If force is True, the exit code value is set to exit_code value disregarding the previous value.

Parameters:

Name Type Description Default
exit_code str | int

The exit code value.

required
force bool

Force the exit code reassignment.

False
Source code in asab/application.py
def set_exit_code(self, exit_code: typing.Union[int, str], force: bool = False):
	"""
	Set the exit code of the application.

	If `force` is `False`, the exit code will be set only if the previous value is lower than the new one.
	If `force` is `True`, the exit code value is set to `exit_code` value disregarding the previous value.

	Args:
		exit_code (str | int): The exit code value.
		force: Force the exit code reassignment.
	"""
	if self.ExitCode == "!RESTART!":
		return

	if exit_code == "!RESTART!":
		self.ExitCode = exit_code

	elif (self.ExitCode < exit_code) or force:
		L.debug("Exit code set to {}".format(exit_code))
		self.ExitCode = exit_code

stop(exit_code=None) ¤

Gracefully terminate the run-time and commence the exit-time.

This method is automatically called by SIGINT and SIGTERM. It also includes a response to Ctrl-C on UNIX-like system. When this method is called 3x, it abruptly exits the application (aka emergency abort).

Parameters:

Name Type Description Default
exit_code int

Exit code of the finalized process.

None
Source code in asab/application.py
def stop(self, exit_code: typing.Optional[int] = None) -> None:
	"""
	Gracefully terminate the _run-time_ and commence the _exit-time_.

	This method is automatically called by `SIGINT` and `SIGTERM`.
	It also includes a response to `Ctrl-C` on UNIX-like system.
	When this method is called 3x, it abruptly exits the application (aka emergency abort).

	Args:
		exit_code (int, optional): Exit code of the finalized process.
	"""
	if exit_code is not None:
		self.set_exit_code(exit_code)

	self._stop_event.set()
	self._stop_counter += 1
	self.PubSub.publish("Application.stop!", self._stop_counter)

	if self._stop_counter >= 3:
		L.fatal("Emergency exit")
		for task in asyncio.all_tasks():
			L.warning("Pending task during emergency exit: {}".format(task))
		try:
			# EX_SOFTWARE code is not available on Windows
			return os._exit(os.EX_SOFTWARE)
		except AttributeError:
			return os._exit(0)

	elif self._stop_counter > 1:
		L.warning("{} tasks still active".format(len(asyncio.all_tasks())))

time() ¤

Return UTC UNIX timestamp using a loop time (a fast way how to get a wall clock time).

Returns:

Type Description
float

Current UTC UNIX timestamp.

Source code in asab/application.py
def time(self) -> float:
	"""
	Return UTC UNIX timestamp using a loop time (a fast way how to get a wall clock time).

	Returns:
		Current UTC UNIX timestamp.
	"""
	return self.BaseTime + self.Loop.time()