merge changes to master
This commit is contained in:
100
osmsearch.py
Normal file
100
osmsearch.py
Normal file
@@ -0,0 +1,100 @@
|
|||||||
|
#!/usr/bin/python
|
||||||
|
"""
|
||||||
|
Script to retrieve all Openstreetmap items matching a query and save
|
||||||
|
them as GPX Waypoints to a given output file in GPX format.
|
||||||
|
"""
|
||||||
|
|
||||||
|
__author__ = "fordprefect"
|
||||||
|
__date__ = "2020-03-26"
|
||||||
|
__version__ = "0.1"
|
||||||
|
|
||||||
|
import gpxpy
|
||||||
|
import requests
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
assert sys.version_info >= (3, 6), "At least Python 3.8 required due to fStrings. Replace all f\"…\" and comment this line to enable running in lower versions (but really, you should just update your Python version…)."
|
||||||
|
|
||||||
|
class OSMSearch():
|
||||||
|
def __init__(self, args):
|
||||||
|
self.url = args.pop("url")
|
||||||
|
self.maxsearchnums = args.pop("maxsearchnums")
|
||||||
|
self.gpxfilename = args.pop("outputfilename")
|
||||||
|
self.use_boundingbox = args.pop("use_boundingbox")
|
||||||
|
if self.use_boundingbox:
|
||||||
|
assert isinstance(list, args["boundingbox"])
|
||||||
|
assert len(args["boundingbox"]) == 4
|
||||||
|
else:
|
||||||
|
args.pop("boundingbox")
|
||||||
|
|
||||||
|
self.searchargs = args
|
||||||
|
|
||||||
|
self.initialize_gpx_file()
|
||||||
|
|
||||||
|
self.search()
|
||||||
|
|
||||||
|
self.save_to_disk()
|
||||||
|
|
||||||
|
def initialize_gpx_file(self):
|
||||||
|
if os.path.isfile(self.gpxfilename):
|
||||||
|
# open file and parse
|
||||||
|
#if verbosity > 0: print(f"found file, extending")
|
||||||
|
self.gpxfile = gpxpy.parse(open(gpxfilename, "r"))
|
||||||
|
else:
|
||||||
|
#if verbosity > 0: print("creating new file")
|
||||||
|
self.gpxfile = gpxpy.gpx.GPX()
|
||||||
|
|
||||||
|
def get_argstring(self):
|
||||||
|
argstrings = []
|
||||||
|
|
||||||
|
if self.use_boundingbox:
|
||||||
|
argstring.append(",".join(list(map(str, self.searchargs["boundingbox"]))))
|
||||||
|
for arg in self.searchargs:
|
||||||
|
argstrings.append(f"{arg}={self.searchargs[arg]}")
|
||||||
|
|
||||||
|
return "?"+"&".join(argstrings)
|
||||||
|
|
||||||
|
|
||||||
|
def search(self):
|
||||||
|
"""
|
||||||
|
"""
|
||||||
|
|
||||||
|
for i in range(self.maxsearchnums):
|
||||||
|
# fetch request
|
||||||
|
r = requests.get(self.url + self.get_argstring())
|
||||||
|
|
||||||
|
if r.status_code != 200:
|
||||||
|
#if verbosity > 0: print(f"Query gone wrong: HTTP returned {r.status_code}")
|
||||||
|
exit(1)
|
||||||
|
|
||||||
|
# parse reply
|
||||||
|
reply = r.json()
|
||||||
|
assert isinstance(reply, list), "Unexpected type of reply: " + type(reply)
|
||||||
|
if len(reply) == 0:
|
||||||
|
if verbosity > 0: print(f"found {len(ignore_ids)} results in {i-1} iterations, finishing")
|
||||||
|
break
|
||||||
|
|
||||||
|
for point in reply:
|
||||||
|
self.searchargs["ignore_ids"].append(point["place_id"])
|
||||||
|
self.gpxfile.waypoints.append(gpxpy.gpx.GPXWaypoint(latitude=point["lat"], longitude=point["lon"], name=point["display_name"]))
|
||||||
|
|
||||||
|
def save_to_disk(self):
|
||||||
|
with open(self.gpxfilename, "w") as f:
|
||||||
|
f.write(self.gpxfile.to_xml())
|
||||||
|
#if verbosity > 0: print(f"GPX file written to {gpxfilename}")
|
||||||
|
|
||||||
|
default_args = {
|
||||||
|
"q": "Camping",
|
||||||
|
"format": "json",
|
||||||
|
"limit": 50,
|
||||||
|
"use_boundingbox": False,
|
||||||
|
"boundingbox": [],
|
||||||
|
"outputfilename": "test.gpx",
|
||||||
|
"ignore_ids": [],
|
||||||
|
"maxsearchnums": 10,
|
||||||
|
"url": "https://nominatim.openstreetmap.org/search/",
|
||||||
|
}
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
# cli invocation goes here
|
||||||
|
# TODO: take user arguments and bring together with sensible defaults above
|
||||||
|
OSMSearch(default_args)
|
||||||
96
search.py
96
search.py
@@ -1,96 +0,0 @@
|
|||||||
"""
|
|
||||||
Script to retrieve all Openstreetmap items matching a query and save
|
|
||||||
them as GPX Waypoints to a given output file in GPX format.
|
|
||||||
"""
|
|
||||||
|
|
||||||
__author__ = "fordprefect"
|
|
||||||
__date__ = "2020-03-25"
|
|
||||||
__version__ = "0.1"
|
|
||||||
|
|
||||||
import gpxpy
|
|
||||||
import requests
|
|
||||||
import os
|
|
||||||
from packaging import version
|
|
||||||
import sys
|
|
||||||
assert sys.version_info >= (3, 6), "At least Python 3.8 required due to fStrings. Replace all f\"…\" and comment this line to enable running in lower versions (but really, you should just update your Python version…)."
|
|
||||||
assert version.parse(gpxpy.__version) >= version.parse("1.4.0"), "Update gpxpy to at least v1.4.0"
|
|
||||||
|
|
||||||
#####################################
|
|
||||||
######## configuration
|
|
||||||
#####################################
|
|
||||||
|
|
||||||
# search url of nominatim (don't change if you don't know what you are doing!)
|
|
||||||
url = "https://nominatim.openstreetmap.org/search/"
|
|
||||||
|
|
||||||
# search arguments, change query here
|
|
||||||
# for all supported keys see http://nominatim.org/release-docs/latest/api/Search/
|
|
||||||
args = {"q": "Wasserwanderplatz", "country": "Germany", "format": "json", "limit": 50}
|
|
||||||
|
|
||||||
# bounding box defined by two points [x1, y1, x2, y2]
|
|
||||||
use_boundingbox = True
|
|
||||||
boundingbox = [53.898, 11.338, 52.700, 14.332]
|
|
||||||
|
|
||||||
# output file name
|
|
||||||
gpxfilename = f"nominatimsearch_{args['q']}.gpx"
|
|
||||||
|
|
||||||
# OSM IDs to be ignored (used to find all matching items)
|
|
||||||
ignore_ids = []
|
|
||||||
|
|
||||||
# maximum number of search queries
|
|
||||||
maxsearchnums = 10
|
|
||||||
|
|
||||||
# verbosity
|
|
||||||
verbosity = 0
|
|
||||||
|
|
||||||
#####################################
|
|
||||||
######## end of configuration
|
|
||||||
#####################################
|
|
||||||
|
|
||||||
def get_arguments():
|
|
||||||
"""Wrapper for cgi-environment.
|
|
||||||
Takes no options, returns dict of arguments
|
|
||||||
"""
|
|
||||||
with cgi.FieldStorage() as args:
|
|
||||||
return {i: args.getvalue(i) for i in args}
|
|
||||||
|
|
||||||
## initialize gpx file
|
|
||||||
if os.path.isfile(gpxfilename):
|
|
||||||
# open file and parse
|
|
||||||
if verbosity > 0: print(f"found file, extending")
|
|
||||||
gpxfile = gpxpy.parse(open(gpxfilename, "r"))
|
|
||||||
else:
|
|
||||||
if verbosity > 0: print("creating new file")
|
|
||||||
gpxfile = gpxpy.gpx.GPX()
|
|
||||||
|
|
||||||
# search loop
|
|
||||||
for i in range(maxsearchnums):
|
|
||||||
# fetch request
|
|
||||||
if use_boundingbox:
|
|
||||||
args["viewbox"] = ",".join(list(map(str, boundingbox)))
|
|
||||||
if ignore_ids != []:
|
|
||||||
args["exclude_place_ids"] = ",".join(list(map(str, ignore_ids)))
|
|
||||||
r = requests.get(url + "?" + "&".join([f"{key}={args[key]}" for key in args]))
|
|
||||||
|
|
||||||
if r.status_code != 200:
|
|
||||||
if verbosity > 0: print(f"Query gone wrong: HTTP returned {r.status_code}")
|
|
||||||
exit(1)
|
|
||||||
|
|
||||||
# parse reply
|
|
||||||
reply = r.json()
|
|
||||||
assert isinstance(reply, list), "Unexpected type of reply: " + type(reply)
|
|
||||||
if len(reply) == 0:
|
|
||||||
if verbosity > 0: print(f"found {len(ignore_ids)} results in {i-1} iterations, finishing")
|
|
||||||
break
|
|
||||||
|
|
||||||
for point in reply:
|
|
||||||
pid = point["place_id"]
|
|
||||||
ignore_ids.append(pid)
|
|
||||||
lat = point["lat"]
|
|
||||||
lon = point["lon"]
|
|
||||||
name = point["display_name"]
|
|
||||||
gpxfile.waypoints.append(gpxpy.gpx.GPXWaypoint(latitude=lat, longitude=lon, name=name))
|
|
||||||
|
|
||||||
# save to disk
|
|
||||||
with open(gpxfilename, "w") as f:
|
|
||||||
f.write(gpxfile.to_xml())
|
|
||||||
if verbosity > 0: print(f"GPX file written to {gpxfilename}")
|
|
||||||
Reference in New Issue
Block a user