diff options
Diffstat (limited to 'tests/test_pk.py')
-rw-r--r-- | tests/test_pk.py | 601 |
1 files changed, 601 insertions, 0 deletions
diff --git a/tests/test_pk.py b/tests/test_pk.py new file mode 100644 index 0000000..1c73632 --- /dev/null +++ b/tests/test_pk.py @@ -0,0 +1,601 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +"""Tests the PackageKit compatibility layer. + +Since the PackageKit client doesn't support changing system D-Bus sockets, +run the tests only by using nose (e.g. nosetests3 tests.test_pk.PackageKitTest) +or by running the main routine (python3 test_pk.py). + +If a test fails all subsequent ones will fail, too. You will get the following +error message: +gi._glib.GError: GDBus.Error:org.freedesktop.DBus.Error.ServiceUnknown: +The name :1.16 was not provided by any .service files +""" + +import atexit +import logging +import os +import os.path +import shutil +import subprocess +import tempfile +import time +import sys +import unittest + +import apt_pkg +import apt.auth +from gi.repository import GLib +from gi.repository import PackageKitGlib as pk + +import aptdaemon.test + +REPO_PATH = os.path.join(aptdaemon.test.get_tests_dir(), "repo") +DEBUG = True + + +@unittest.skip("Removed PackageKit compat") +class PackageKitTest(aptdaemon.test.AptDaemonTestCase): + + """Test the PackageKit compatibility layer.""" + + def setUp(self): + """Setup a chroot, run the aptdaemon and a fake PolicyKit daemon.""" + # Setup chroot + self.chroot = aptdaemon.test.Chroot() + self.chroot.setup() + self.addCleanup(self.chroot.remove) + self.chroot.add_test_repository() + # set up scratch dir + self.workdir = tempfile.mkdtemp() + # allow tests to add plugins, etc. + self.orig_pythonpath = os.environ.get("PYTHONPATH") + os.environ["PYTHONPATH"] = "%s:%s" % (self.workdir, + os.environ.get("PYTHONPATH", "")) + # write apt config for calling apt-key + apt_conf = os.path.join(self.chroot.path, 'aptconfig') + with open(apt_conf, 'w') as f: + f.write('Dir "%s";\n' % self.chroot.path) + os.environ['APT_CONFIG'] = apt_conf + + # if tests install keys, have aptd query a local fake server + os.environ['APTDAEMON_KEYSERVER'] = 'hkp://localhost:19191' + + self.start_session_aptd(self.chroot.path) + # Start the fake PolikcyKit daemon + self.start_fake_polkitd() + time.sleep(2.0) + + def tearDown(self): + shutil.rmtree(self.workdir) + if self.orig_pythonpath: + os.environ["PYTHONPATH"] = self.orig_pythonpath + + def test_install(self): + """Test installing a package.""" + pkg_name = "silly-depend-base" + + client = pk.Client() + + # Resolve the id of the package + res = client.resolve(pk.FilterEnum.NONE, [pkg_name], None, + lambda p, t, d: True, None) + self.assertEqual(res.get_exit_code(), pk.ExitEnum.SUCCESS) + ids = [] + for pkg in res.get_package_array(): + self.assertEqual(pkg.get_name(), pkg_name) + ids.append(pkg.get_id()) + break + else: + self.fail("Failed to resolve %s" % pkg_name) + + # Simulate + res = client.install_packages(2 ** pk.TransactionFlagEnum.SIMULATE, + ids, None, lambda p, t, d: True, None) + self.assertEqual(res.get_exit_code(), pk.ExitEnum.SUCCESS) + for pkg in res.get_package_array(): + self.assertEqual(pkg.get_name(), "silly-base") + self.assertEqual(pkg.get_info(), + pk.info_enum_from_string("installing")) + break + else: + self.fail("Failed to get dependencies of %s" % pkg_name) + + # Install + res = client.install_packages(pk.TransactionFlagEnum.NONE, ids, None, + lambda p, t, d: True, None) + self.assertEqual(res.get_exit_code(), pk.ExitEnum.SUCCESS) + + # verify list of files + res = client.get_files(ids, None, lambda p, t, d: True, None) + self.assertEqual(res.get_exit_code(), pk.ExitEnum.SUCCESS) + files = res.get_files_array()[0].get_property('files') + # ships two files, plus directories + self.assertGreaterEqual(len(files), 2, + 'expect two files in ' + str(files)) + self.assertTrue('/usr/share/doc/silly-depend-base/copyright' in files, + files) + + def test_install_files(self): + """Test installing local package files.""" + + path_pkg_config = os.path.join( + REPO_PATH, + "silly-config_.1-0_all.deb") + path_pkg = os.path.join( + REPO_PATH, + "silly-depend-base_0.1-0_all.deb") + + client = pk.Client() + + # Fail if more than package should be installed + try: + client.install_files(pk.TransactionFlagEnum.NONE, + [path_pkg_config, path_pkg], None, + lambda p, t, d: True, None) + except GLib.GError as error: + self.assertTrue("Only one package" in error.message) + else: + self.fail("Installing multiple package files didn't fail") + + # Check simulating + res = client.install_files(2 ** pk.TransactionFlagEnum.SIMULATE, + [path_pkg], None, + lambda p, t, d: True, None) + self.assertEqual(res.get_exit_code(), pk.ExitEnum.SUCCESS) + pkgs = res.get_package_array() + if len(pkgs) != 1: + self.fail("Failed to get dependencies") + self.assertEqual(pkgs[0].get_name(), "silly-base") + self.assertEqual(pkgs[0].get_version(), "0.1-0update1") + + # Check the actual installtion + res = client.install_files(pk.TransactionFlagEnum.NONE, + [path_pkg], None, + lambda p, t, d: True, None) + self.assertEqual(res.get_exit_code(), pk.ExitEnum.SUCCESS) + + # verify list of files + res = client.get_files(["silly-depend-base;0.1-0;all;"], + None, lambda p, t, d: True, None) + files = res.get_files_array()[0].get_property('files') + # ships two files, plus directories + self.assertGreaterEqual(len(files), 2, + 'expect two files in ' + str(files)) + self.assertTrue('/usr/share/doc/silly-depend-base/copyright' in files, + files) + + def test_download(self): + """Test downloading packages.""" + pkg_filename = "silly-base_0.1-0update1_all.deb" + pkg_id = "silly-base;0.1-0update1;all;" + temp_dir = tempfile.mkdtemp(prefix="aptd-download-test-") + + client = pk.Client() + res = client.download_packages([pkg_id], temp_dir, + None, lambda p, t, d: True, None) + self.assertEqual(res.get_exit_code(), pk.ExitEnum.SUCCESS) + if not os.path.exists(os.path.join(temp_dir, pkg_filename)): + self.fail("Failed to download the package") + + shutil.rmtree(temp_dir) + + def test_filters(self): + """Test filters.""" + pkg = "silly-base_0.1-0_all.deb" + self.chroot.install_debfile(os.path.join(REPO_PATH, pkg), True) + + client = pk.Client() + + # All version + res = client.resolve(pk.FilterEnum.NONE, ["silly-base"], None, + lambda p, t, d: True, None) + self.assertEqual(res.get_exit_code(), pk.ExitEnum.SUCCESS) + pkgs = res.get_package_array() + if len(pkgs) != 2: + self.fail("Failed to get versions") + versions = ["0.1-0", "0.1-0update1"] + for pkg in pkgs: + self.assertEqual(pkg.get_name(), "silly-base") + versions.remove(pkg.get_version()) + + # Newest version + res = client.resolve(2 ** pk.FilterEnum.NEWEST, ["silly-base"], None, + lambda p, t, d: True, None) + self.assertEqual(res.get_exit_code(), pk.ExitEnum.SUCCESS) + pkgs = res.get_package_array() + if len(pkgs) != 1: + self.fail("Failed to get version") + self.assertEqual(pkgs[0].get_name(), "silly-base") + self.assertEqual(pkgs[0].get_version(), "0.1-0update1") + + # Installed version + res = client.resolve(2 ** pk.FilterEnum.INSTALLED, ["silly-base"], + None, lambda p, t, d: True, None) + self.assertEqual(res.get_exit_code(), pk.ExitEnum.SUCCESS) + pkgs = res.get_package_array() + if len(pkgs) != 1: + self.fail("Failed to get version") + self.assertEqual(pkgs[0].get_name(), "silly-base") + self.assertEqual(pkgs[0].get_version(), "0.1-0") + + # Available version + res = client.resolve(2 ** pk.FilterEnum.NOT_INSTALLED, ["silly-base"], + None, lambda p, t, d: True, None) + self.assertEqual(res.get_exit_code(), pk.ExitEnum.SUCCESS) + pkgs = res.get_package_array() + if len(pkgs) != 1: + self.fail("Failed to get version") + self.assertEqual(pkgs[0].get_name(), "silly-base") + self.assertEqual(pkgs[0].get_version(), "0.1-0update1") + + def test_get_updates(self): + """Test getting updates.""" + pkg = "silly-base_0.1-0_all.deb" + self.chroot.install_debfile(os.path.join(REPO_PATH, pkg), True) + + client = pk.Client() + + res = client.get_updates(pk.FilterEnum.NONE, None, + lambda p, t, d: True, None) + for pkg in res.get_package_array(): + self.assertEqual(pkg.get_name(), "silly-base") + self.assertEqual(pkg.get_version(), "0.1-0update1") + self.assertEqual(pkg.get_info(), + pk.info_enum_from_string("normal")) + break + else: + self.fail("Failed to detect upgrade") + self.assertEqual(res.get_exit_code(), pk.ExitEnum.SUCCESS) + + def test_get_updates_security(self): + """Test if security updates are detected correctly.""" + self.chroot.add_repository(os.path.join(aptdaemon.test.get_tests_dir(), + "repo/security")) + pkg = "silly-base_0.1-0_all.deb" + self.chroot.install_debfile(os.path.join(REPO_PATH, pkg), True) + + client = pk.Client() + + res = client.get_updates(pk.FilterEnum.NONE, None, + lambda p, t, d: True, None) + for pkg in res.get_package_array(): + self.assertEqual(pkg.get_name(), "silly-base") + self.assertEqual(pkg.get_version(), "0.1-0update1") + self.assertEqual(pkg.get_info(), + pk.info_enum_from_string("security")) + break + else: + self.fail("Failed to detect upgrade") + self.assertEqual(res.get_exit_code(), pk.ExitEnum.SUCCESS) + + def test_get_updates_backports(self): + """Test if backports are detected correctly.""" + self.chroot.add_repository(os.path.join(aptdaemon.test.get_tests_dir(), + "repo/backports")) + pkg = "silly-base_0.1-0_all.deb" + self.chroot.install_debfile(os.path.join(REPO_PATH, pkg), True) + + client = pk.Client() + + res = client.get_updates(pk.FilterEnum.NONE, None, + lambda p, t, d: True, None) + for pkg in res.get_package_array(): + self.assertEqual(pkg.get_name(), "silly-base") + self.assertEqual(pkg.get_version(), "0.1-0update1") + self.assertEqual(pkg.get_info(), + pk.info_enum_from_string("enhancement")) + break + else: + self.fail("Failed to detect upgrade") + self.assertEqual(res.get_exit_code(), pk.ExitEnum.SUCCESS) + + def test_require_restart(self): + """Test if the restart-required signal gets emitted.""" + os.makedirs(os.path.join(self.chroot.path, "var/run")) + with open(os.path.join(self.chroot.path, "var/run/reboot-required"), + "w") as reboot_stamp: + reboot_stamp.write("") + client = pk.Client() + + res = client.get_updates(pk.FilterEnum.NONE, None, + lambda p, t, d: True, None) + self.assertEqual(res.get_exit_code(), pk.ExitEnum.SUCCESS) + self.assertEqual(res.get_require_restart_worst(), + pk.RestartEnum.SYSTEM) + + def test_dependencies(self): + """Test getting dependencies and dependants.""" + pkg_id_depend = "silly-depend-base;0.1-0;all;" + pkg_id = "silly-base;0.1-0update1;all;" + + client = pk.Client() + + # Get depends + res = client.get_depends(pk.FilterEnum.NONE, [pkg_id_depend], True, + None, lambda p, t, d: True, None) + self.assertEqual(res.get_exit_code(), pk.ExitEnum.SUCCESS) + for pkg in res.get_package_array(): + self.assertEqual(pkg.get_id(), pkg_id) + break + else: + self.fail("Failed to get dependencies of %s" % pkg_id_depend) + + # Get requires + res = client.get_requires(pk.FilterEnum.NONE, [pkg_id], True, + None, lambda p, t, d: True, None) + self.assertEqual(res.get_exit_code(), pk.ExitEnum.SUCCESS) + for pkg in res.get_package_array(): + self.assertEqual(pkg.get_id(), pkg_id_depend) + break + else: + self.fail("Failed to get dependants of %s" % pkg_id) + + def test_what_provides_unsupported(self): + """Test querying for provides for unsupported type.""" + + client = pk.Client() + + try: + client.what_provides(pk.FilterEnum.NONE, pk.ProvidesEnum.CODEC, + ["gstreamer0.10(decoder-audio/ac3)"], + None, lambda p, t, d: True, None) + self.fail("expected GLib.Error failure") + except GLib.GError as e: + self.assertTrue("codec" in str(e), e) + self.assertTrue("not supported" in str(e), e) + + def test_what_provides_plugin(self): + """Test querying for provides with plugins.""" + + # add plugin for extra codecs + f = open(os.path.join(self.workdir, "extra_codecs.py"), "w") + f.write("""import aptdaemon.pkenums as enums + +def fake_what_provides(cache, type, search): + if type in (enums.PROVIDES_CODEC, enums.PROVIDES_ANY): + if search.startswith('gstreamer'): + return [cache["gstreamer0.10-silly"]] + raise NotImplementedError('cannot handle type ' + str(type)) +""") + f.close() + os.mkdir(os.path.join(self.workdir, "extra_codecs-0.egg-info")) + f = open(os.path.join(self.workdir, "extra_codecs-0.egg-info", + 'entry_points.txt'), "w") + f.write("[packagekit.apt.plugins]\n" + "what_provides=extra_codecs:fake_what_provides\n") + f.close() + + # invalid plugin, should not stop the valid ones + os.mkdir(os.path.join(self.workdir, "nonexisting-1.egg-info")) + f = open(os.path.join(self.workdir, "nonexisting-1.egg-info", + 'entry_points.txt'), "w") + f.write("[packagekit.apt.plugins]\n" + "what_provides=nonexisting:what_provides\n") + f.close() + + # another plugin to test chaining and a new type + f = open(os.path.join(self.workdir, "more_stuff.py"), "w") + f.write("""import aptdaemon.pkenums as enums + +def my_what_provides(cache, type, search): + if type in (enums.PROVIDES_CODEC, enums.PROVIDES_ANY): + if search.startswith('gstreamer'): + return [cache["silly-base"]] + if type in (enums.PROVIDES_LANGUAGE_SUPPORT, enums.PROVIDES_ANY): + if search.startswith('locale('): + return [cache["silly-important"]] + raise NotImplementedError('cannot handle type ' + str(type)) +""") + f.close() + os.mkdir(os.path.join(self.workdir, "more_stuff-0.egg-info")) + f = open(os.path.join(self.workdir, "more_stuff-0.egg-info", + 'entry_points.txt'), "w") + f.write("[packagekit.apt.plugins]\n" + "what_provides=more_stuff:my_what_provides\n") + f.close() + + client = pk.Client() + + # search for CODEC + res = client.what_provides(pk.FilterEnum.NONE, pk.ProvidesEnum.CODEC, + ["gstreamer0.10(decoder-audio/vorbis)"], + None, lambda p, t, d: True, None) + self.assertEqual(res.get_exit_code(), pk.ExitEnum.SUCCESS) + pkgs = [p.get_id().split(";")[0] for p in res.get_package_array()] + self.assertEqual(pkgs, ["gstreamer0.10-silly", "silly-base"]) + + # search for LANGUAGE_SUPPORT + res = client.what_provides(pk.FilterEnum.NONE, + pk.ProvidesEnum.LANGUAGE_SUPPORT, + ["locale(de_DE)"], + None, lambda p, t, d: True, None) + self.assertEqual(res.get_exit_code(), pk.ExitEnum.SUCCESS) + pkgs = [p.get_id().split(";")[0] for p in res.get_package_array()] + self.assertEqual(pkgs, ["silly-important"]) + + # search ANY + res = client.what_provides(pk.FilterEnum.NONE, pk.ProvidesEnum.ANY, + ["gstreamer0.10(decoder-audio/vorbis)"], + None, lambda p, t, d: True, None) + self.assertEqual(res.get_exit_code(), pk.ExitEnum.SUCCESS) + pkgs = [p.get_id().split(";")[0] for p in res.get_package_array()] + self.assertEqual(pkgs, ["gstreamer0.10-silly", "silly-base"]) + + res = client.what_provides(pk.FilterEnum.NONE, pk.ProvidesEnum.ANY, + ["locale(de_DE)"], + None, lambda p, t, d: True, None) + self.assertEqual(res.get_exit_code(), pk.ExitEnum.SUCCESS) + pkgs = [p.get_id().split(";")[0] for p in res.get_package_array()] + self.assertEqual(pkgs, ["silly-important"]) + + res = client.what_provides(pk.FilterEnum.NONE, pk.ProvidesEnum.ANY, + ["modalias(pci:1)"], + None, lambda p, t, d: True, None) + self.assertEqual(res.get_exit_code(), pk.ExitEnum.SUCCESS) + self.assertEqual(res.get_package_array(), []) + + # unsupported type with plugins + try: + client.what_provides(pk.FilterEnum.NONE, + pk.ProvidesEnum.PLASMA_SERVICE, + ["plasma4(dataengine-weather)"], + None, lambda p, t, d: True, None) + self.fail("expected GLib.Error failure") + except GLib.GError as e: + self.assertTrue("plasma" in str(e), e) + self.assertTrue("not supported" in str(e), e) + + def test_repo_enable(self): + """Test adding a repository.""" + client = pk.Client() + + # create test update repo + repo = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, repo) + with open(os.path.join(repo, 'Packages'), 'w') as f: + f.write('''Package: silly-new +Priority: extra +Section: admin +Installed-Size: 44 +Maintainer: Sebastian Heinlein <devel@glatzor.de> +Architecture: all +Source: silly-new (0.1-0) +Version: 1.2.3 +Filename: %s/silly-base_0.1-0update1_all.deb +Size: 1934 +MD5sum: 8e20af56a63a1e0cf40db3b0d07e7989 +SHA1: 7ce87423d9c7a734478c464021994944d07fbf1b +SHA256: d3693c0e3e7a9519b2833fdf1301c7e03e0620edf15b95b4c7329d9eb0bee553 +Description: new package from a third-party repo +''' % self.chroot.path) + + # without the new repo, we do not have it yet + self.assertRaises(GLib.GError, client.resolve, pk.FilterEnum.NONE, + ['silly-new'], None, lambda p, t, d: True, None) + + # now add the new repo + apt_source = 'deb file://%s /' % repo + res = client.repo_enable(apt_source, True, None, + lambda p, t, d: True, None) + self.assertEqual(res.get_exit_code(), pk.ExitEnum.SUCCESS) + + with open(os.path.join(self.chroot.path, 'etc', 'apt', + 'sources.list')) as f: + for line in f: + if line.strip() == apt_source: + break + else: + self.fail('did not find newly added repository in ' + 'sources.list') + + # should not actually download the indexes yet + self.assertRaises(GLib.GError, client.resolve, pk.FilterEnum.NONE, + ['silly-new'], None, lambda p, t, d: True, None) + + res = client.refresh_cache(False, None, lambda p, t, d: True, None) + self.assertEqual(res.get_exit_code(), pk.ExitEnum.SUCCESS) + + # we should now see the new package + res = client.resolve(pk.FilterEnum.NONE, ['silly-new'], None, + lambda p, t, d: True, None) + self.assertEqual(res.get_exit_code(), pk.ExitEnum.SUCCESS) + packages = res.get_package_array() + self.assertEqual(len(packages), 1) + self.assertEqual(packages[0].get_name(), "silly-new") + self.assertEqual(packages[0].get_version(), "1.2.3") + self.assertEqual(packages[0].get_info(), + pk.info_enum_from_string("available")) + + def test_repo_enable_errors(self): + """Test errors when adding a repository.""" + client = pk.Client() + client.set_locale("C") + + try: + client.repo_enable('bogus', True, None, lambda p, t, d: True, None) + except GLib.GError as e: + self.assertTrue('format' in str(e)) + self.assertTrue('bogus' in str(e)) + + try: + client.repo_enable('deb http://example.com', True, None, + lambda p, t, d: True, None) + except GLib.GError as e: + self.assertTrue('format' in str(e), e) + self.assertTrue('http://example.com' in str(e), e) + + def test_install_signature(self): + """Test installing a new GPG key""" + # we do not have any key initially + self.assertEqual(len(apt.auth.list_keys()), 0) + + # launch our keyserver + self.start_keyserver() + + # now add one + client = pk.Client() + res = client.install_signature( + pk.SigTypeEnum.GPG, + 'D0BF65B7DBE28DB62BEDBF1B683C53C7CF982D18', + '', None, lambda p, t, d: True, None) + self.assertEqual(res.get_exit_code(), pk.ExitEnum.SUCCESS) + + # key was imported correctly + key = apt.auth.list_keys()[0] + self.assertEqual('CF982D18', key.keyid) + + def test_install_signature_error(self): + """Test installing a new GPG key with failing server""" + + # do not start keyserver, so http://localhost.. will not exist + client = pk.Client() + client.set_locale("C") + try: + client.install_signature( + pk.SigTypeEnum.GPG, + '1111111111111111111111111111111111111111', + '', None, lambda p, t, d: True, None) + except GLib.GError as e: + self.assertTrue('Failed to download' in str(e), e) + self.assertTrue('11111111' in str(e), e) + + def test_unimplemented(self): + """Test proper error message on unimplemented method.""" + client = pk.Client() + client.set_locale("C") + try: + client.upgrade_system("sid", pk.UpgradeKindEnum.COMPLETE, None, + lambda p, t, d: True, None) + except GLib.GError as e: + self.assertTrue('implemented' in str(e)) + + +@unittest.skipIf(sys.version_info.major < 3, "Python3 only") +def setUp(): + """The PackageKit client cannot handle a changed system D-Bus address. + So we need to setup a static one for the whole test suite. + + This requires to run nosetests to launch this test suite. + """ + proc, address = aptdaemon.test.start_dbus_daemon() + os.environ["DBUS_SYSTEM_BUS_ADDRESS"] = address + # The pk.Client uses a DBus connection with exit-on-disconnect set to + # True which cannot be modified. Furthermore the registered signal + # handlers cannot be removed. Since GIO would kill the test suite if + # the daemon disappears we have to delay killing the daemon + atexit.register(os.kill, proc.pid, 9) + + +def tearDown(): + os.environ["DBUS_SYSTEM_BUS_ADDRESS"] = "" + + +if __name__ == "__main__": + if DEBUG: + logging.basicConfig(level=logging.DEBUG) + setUp() + unittest.main() + tearDown() + +# vim: ts=4 et sts=4 |