1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
from aptdaemon.gtk3widgets import AptErrorDialog, AptProgressDialog, AptConfirmDialog
import aptdaemon.errors
import aptdaemon.enums
import aptdaemon.client
from gi.repository import Gtk
__version__ = "1.0.0"
import gi
gi.require_version('Gtk', '3.0')
gi.require_version('GdkX11', '3.0') # Needed to get xid
class APT(object):
def __init__(self, parent_window=None):
self.parent_window = parent_window
self.progress_callback = None
self.finished_callback = None
self.error_callback = None
self.cancelled_callback = None
def set_progress_callback(self, progress_callback):
self.progress_callback = progress_callback
def set_finished_callback(self, finished_callback):
self.finished_callback = finished_callback
def set_error_callback(self, error_callback):
self.error_callback = error_callback
def set_cancelled_callback(self, cancelled_callback):
self.cancelled_callback = cancelled_callback
def update_cache(self):
aptdaemon_client = aptdaemon.client.AptClient()
update_transaction = aptdaemon_client.update_cache()
self._run_transaction(update_transaction)
def install_file(self, path):
aptdaemon_client = aptdaemon.client.AptClient()
aptdaemon_client.install_file(
path, force=True, wait=False, reply_handler=self._simulate_trans, error_handler=self._on_error)
def install_packages(self, packages):
aptdaemon_client = aptdaemon.client.AptClient()
aptdaemon_client.install_packages(
packages, reply_handler=self._simulate_trans, error_handler=self._on_error)
def remove_packages(self, packages):
aptdaemon_client = aptdaemon.client.AptClient()
aptdaemon_client.remove_packages(
packages, reply_handler=self._simulate_trans, error_handler=self._on_error)
def _run_transaction(self, transaction):
if self.progress_callback is None:
dia = AptProgressDialog(transaction, parent=self.parent_window)
dia.run(close_on_finished=True, show_error=True,
reply_handler=lambda: True, error_handler=self._on_error)
transaction.connect("finished", self._on_finish)
else:
AptDaemonTransaction(transaction, self.progress_callback,
self.finished_callback, self.error_callback, self.parent_window)
def _simulate_trans(self, trans):
trans.simulate(reply_handler=lambda: self._confirm_deps(
trans), error_handler=self._on_error)
def _confirm_deps(self, trans):
try:
if [pkgs for pkgs in trans.dependencies if pkgs]:
dia = AptConfirmDialog(trans, parent=self.parent_window)
res = dia.run()
dia.hide()
if res != Gtk.ResponseType.OK:
if self.cancelled_callback is not None:
self.cancelled_callback()
return
self._run_transaction(trans)
except Exception as e:
print(e)
def _on_error(self, error):
if isinstance(error, aptdaemon.errors.NotAuthorizedError):
if self.cancelled_callback != None:
self.cancelled_callback()
return
elif not isinstance(error, aptdaemon.errors.TransactionFailed):
# Catch internal errors of the client
error = aptdaemon.errors.TransactionFailed(
aptdaemon.enums.ERROR_UNKNOWN, str(error))
dia = AptErrorDialog(error)
dia.run()
dia.hide()
def _on_finish(self, transaction, exit_state):
if self.finished_callback is not None:
self.finished_callback(transaction, exit_state)
class AptDaemonTransaction():
def __init__(self, transaction, progress_callback, finished_callback, error_callback, parent_window):
self.progress_callback = progress_callback
self.finished_callback = finished_callback
self.error_callback = error_callback
self.transaction = transaction
self.parent_window = parent_window
transaction.set_debconf_frontend("gnome")
transaction.connect("progress-changed", self.on_transaction_progress)
# transaction.connect("cancellable-changed", self.on_driver_changes_cancellable_changed)
transaction.connect("finished", self.on_transaction_finish)
transaction.connect("error", self.on_transaction_error)
transaction.run()
def on_transaction_progress(self, transaction, progress):
if self.progress_callback is not None:
self.progress_callback(progress)
def on_transaction_error(self, transaction, error_code, error_details):
if self.error_callback is not None:
self.error_callback(error_code, error_details)
def on_transaction_finish(self, transaction, exit_state):
if (exit_state == aptdaemon.enums.EXIT_SUCCESS):
if self.finished_callback is not None:
self.finished_callback(transaction, exit_state)
|