aboutsummaryrefslogtreecommitdiff
path: root/AptUrl/gtk/aptdaemon.py
blob: 31e4b1f4d0f587b1ff771f33d6786a62f5fb5a3d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
from aptdaemon.gtk3widgets import AptErrorDialog, AptProgressDialog, AptConfirmDialog
import aptdaemon.errors
import aptdaemon.enums
import aptdaemon.client
from gi.repository import Gtk
__version__ = "1.0.0"

import gi
gi.require_version('Gtk', '3.0')
gi.require_version('GdkX11', '3.0')  # Needed to get xid


class APT(object):

    def __init__(self, parent_window=None):
        self.parent_window = parent_window
        self.progress_callback = None
        self.finished_callback = None
        self.error_callback = None
        self.cancelled_callback = None

    def set_progress_callback(self, progress_callback):
        self.progress_callback = progress_callback

    def set_finished_callback(self, finished_callback):
        self.finished_callback = finished_callback

    def set_error_callback(self, error_callback):
        self.error_callback = error_callback

    def set_cancelled_callback(self, cancelled_callback):
        self.cancelled_callback = cancelled_callback

    def update_cache(self):
        aptdaemon_client = aptdaemon.client.AptClient()
        update_transaction = aptdaemon_client.update_cache()
        self._run_transaction(update_transaction)

    def install_file(self, path):
        aptdaemon_client = aptdaemon.client.AptClient()
        aptdaemon_client.install_file(
            path, force=True, wait=False, reply_handler=self._simulate_trans, error_handler=self._on_error)

    def install_packages(self, packages):
        aptdaemon_client = aptdaemon.client.AptClient()
        aptdaemon_client.install_packages(
            packages, reply_handler=self._simulate_trans, error_handler=self._on_error)

    def remove_packages(self, packages):
        aptdaemon_client = aptdaemon.client.AptClient()
        aptdaemon_client.remove_packages(
            packages, reply_handler=self._simulate_trans, error_handler=self._on_error)

    def _run_transaction(self, transaction):
        if self.progress_callback is None:
            dia = AptProgressDialog(transaction, parent=self.parent_window)
            dia.run(close_on_finished=True, show_error=True,
                    reply_handler=lambda: True, error_handler=self._on_error)
            transaction.connect("finished", self._on_finish)
        else:
            AptDaemonTransaction(transaction, self.progress_callback,
                                 self.finished_callback, self.error_callback, self.parent_window)

    def _simulate_trans(self, trans):
        trans.simulate(reply_handler=lambda: self._confirm_deps(
            trans), error_handler=self._on_error)

    def _confirm_deps(self, trans):
        try:
            if [pkgs for pkgs in trans.dependencies if pkgs]:
                dia = AptConfirmDialog(trans, parent=self.parent_window)
                res = dia.run()
                dia.hide()
                if res != Gtk.ResponseType.OK:
                    if self.cancelled_callback is not None:
                        self.cancelled_callback()
                    return
            self._run_transaction(trans)
        except Exception as e:
            print(e)

    def _on_error(self, error):
        if isinstance(error, aptdaemon.errors.NotAuthorizedError):
            if self.cancelled_callback != None:
                self.cancelled_callback()
            return
        elif not isinstance(error, aptdaemon.errors.TransactionFailed):
            # Catch internal errors of the client
            error = aptdaemon.errors.TransactionFailed(
                aptdaemon.enums.ERROR_UNKNOWN, str(error))
        dia = AptErrorDialog(error)
        dia.run()
        dia.hide()

    def _on_finish(self, transaction, exit_state):
        if self.finished_callback is not None:
            self.finished_callback(transaction, exit_state)


class AptDaemonTransaction():

    def __init__(self, transaction, progress_callback, finished_callback, error_callback, parent_window):
        self.progress_callback = progress_callback
        self.finished_callback = finished_callback
        self.error_callback = error_callback
        self.transaction = transaction
        self.parent_window = parent_window
        transaction.set_debconf_frontend("gnome")
        transaction.connect("progress-changed", self.on_transaction_progress)
        # transaction.connect("cancellable-changed", self.on_driver_changes_cancellable_changed)
        transaction.connect("finished", self.on_transaction_finish)
        transaction.connect("error", self.on_transaction_error)
        transaction.run()

    def on_transaction_progress(self, transaction, progress):
        if self.progress_callback is not None:
            self.progress_callback(progress)

    def on_transaction_error(self, transaction, error_code, error_details):
        if self.error_callback is not None:
            self.error_callback(error_code, error_details)

    def on_transaction_finish(self, transaction, exit_state):
        if (exit_state == aptdaemon.enums.EXIT_SUCCESS):
            if self.finished_callback is not None:
                self.finished_callback(transaction, exit_state)